sliding-sync/tests-integration/room_subscriptions_test.go

176 lines
5.7 KiB
Go
Raw Permalink Normal View History

package syncv3
import (
"encoding/json"
"testing"
"time"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
)
// Test that if you /join a room and then immediately add a room subscription for said room before the
// proxy is aware of it, that you still get all the data for that room when it does arrive.
func TestRoomSubscriptionJoinRoomRace(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
raceRoom := roomEvents{
roomID: "!race:localhost",
events: createRoomState(t, alice, time.Now()),
}
// add the account and queue a dummy response so there is a poll loop and we can get requests serviced
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: "!unimportant",
events: createRoomState(t, alice, time.Now()),
}),
},
})
// dummy request to start the poll loop
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(nil))
// now make a room subscription for a room which does not yet exist from the proxy's pov
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
raceRoom.roomID: {
RequiredState: [][2]string{
{"m.room.create", ""},
},
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(nil))
// now the proxy becomes aware of it
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(raceRoom),
},
})
v2.waitUntilEmpty(t, alice) // ensure we have processed it fully so we know it should exist
// hit the proxy again with this connection, we should get the data
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
raceRoom.roomID: {
m.MatchRoomInitial(true),
m.MatchRoomRequiredState([]json.RawMessage{raceRoom.events[0]}), // the create event
},
}))
}
2023-05-11 16:36:02 +01:00
// Regression test for https://github.com/vector-im/element-x-ios-rageshakes/issues/314
// Caused by: the user cache getting corrupted and missing events, caused by it incorrectly replacing
// its timeline with an older one.
// To the end user, it manifests as missing messages in the timeline, because the proxy incorrectly
2023-05-11 16:45:59 +01:00
// said the events are C,F,G and not E,F,G.
func TestRoomSubscriptionMisorderedTimeline(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
roomState := createRoomState(t, alice, time.Now())
abcInitialEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "A"),
testutils.NewMessageEvent(t, alice, "B"),
testutils.NewMessageEvent(t, alice, "C"),
}
room := roomEvents{
roomID: "!room:localhost",
events: append(roomState, abcInitialEvents...),
}
2023-05-16 12:35:35 +01:00
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(room),
},
})
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"list": {
Ranges: sync3.SliceRanges{{0, 10}},
RoomSubscription: sync3.RoomSubscription{TimelineLimit: 1},
},
},
})
// test that we get 1 event initally due to the timeline limit
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchRoomTimeline(abcInitialEvents[len(abcInitialEvents)-1:]),
},
}))
// now live stream 2 events
deLiveEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "D"),
testutils.NewMessageEvent(t, alice, "E"),
}
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: deLiveEvents,
}),
},
})
v2.waitUntilEmpty(t, alice)
// now add a room sub with timeline limit = 5, we will need to hit the DB to satisfy this.
// We might destroy caches in a bad way. We might not return the most recent 5 events.
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
room.roomID: {
TimelineLimit: 5,
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
// we append live events AFTER processing the new timeline limit, so 7 events not 5.
// TODO: ideally we'd just return abcde here.
m.MatchRoomTimeline(append(roomState[len(roomState)-2:], append(abcInitialEvents, deLiveEvents...)...)),
},
}), m.LogResponse(t))
// live stream the final 2 events
fgLiveEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "F"),
testutils.NewMessageEvent(t, alice, "G"),
}
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: fgLiveEvents,
}),
},
})
v2.waitUntilEmpty(t, alice)
// now ask for timeline limit = 3, which may miss events if the caches got corrupted.
// Do this on a fresh connection to force loadPos to update.
res = v3.mustDoV3Request(t, aliceToken, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
room.roomID: {
TimelineLimit: 3,
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchRoomTimeline(append(deLiveEvents[1:], fgLiveEvents...)),
},
}), m.LogResponse(t))
}