package syncv3 import ( "encoding/json" "fmt" "testing" "time" "github.com/jmoiron/sqlx" slidingsync "github.com/matrix-org/sliding-sync" "github.com/matrix-org/sliding-sync/sqlutil" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync2" "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/testutils" "github.com/matrix-org/sliding-sync/testutils/m" "github.com/tidwall/sjson" ) // Inject 20 rooms with A,B,C as the most recent events. Then do a v3 request [0,3] with a timeline limit of 3 // and make sure we get scrolback for the 4 rooms we care about. Then, restart the server (so it repopulates caches) // and attempt the same request again, making sure we get the same results. Then add in some "live" v2 events // and make sure the initial scrollback includes these new live events. func TestTimelines(t *testing.T) { // setup code pqString := testutils.PrepareDBConnectionString() v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() // make 20 rooms, last room is most recent, and send A,B,C into each room allRooms := make([]roomEvents, 20) for i := 0; i < len(allRooms); i++ { ts := time.Now().Add(time.Duration(i) * time.Minute) roomName := fmt.Sprintf("My Room %d", i) allRooms[i] = roomEvents{ roomID: fmt.Sprintf("!TestTimelines_%d:localhost", i), name: roomName, events: append(createRoomState(t, alice, ts), []json.RawMessage{ testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, testutils.WithTimestamp(ts.Add(3*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, testutils.WithTimestamp(ts.Add(4*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "B"}, testutils.WithTimestamp(ts.Add(5*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "C"}, testutils.WithTimestamp(ts.Add(6*time.Second))), }...), } } latestTimestamp := time.Now().Add(10 * time.Hour) v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(allRooms...), }, }) // most recent 4 rooms var wantRooms []roomEvents i := 0 for len(wantRooms) < 4 { wantRooms = append(wantRooms, allRooms[len(allRooms)-i-1]) i++ } numTimelineEventsPerRoom := 3 t.Run("timelines load initially", testTimelineLoadInitialEvents(v3, aliceToken, len(allRooms), wantRooms, numTimelineEventsPerRoom)) // restart the server v3.restart(t, v2, pqString) t.Run("timelines load initially after restarts", testTimelineLoadInitialEvents(v3, aliceToken, len(allRooms), wantRooms, numTimelineEventsPerRoom)) // inject some live events liveEvents := []roomEvents{ { roomID: allRooms[0].roomID, events: []json.RawMessage{ testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "ping"}, testutils.WithTimestamp(latestTimestamp.Add(1*time.Minute))), }, }, { roomID: allRooms[1].roomID, events: []json.RawMessage{ testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "ping2"}, testutils.WithTimestamp(latestTimestamp.Add(2*time.Minute))), }, }, } v2.waitUntilEmpty(t, alice) // add these live events to the global view of the timeline allRooms[0].events = append(allRooms[0].events, liveEvents[0].events...) allRooms[1].events = append(allRooms[1].events, liveEvents[1].events...) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(liveEvents...), }, }) v2.waitUntilEmpty(t, alice) // now we want the new live rooms and then the most recent 2 rooms from before wantRooms = append([]roomEvents{ allRooms[1], allRooms[0], }, wantRooms[0:2]...) t.Run("live events are added to the timeline initially", testTimelineLoadInitialEvents(v3, aliceToken, len(allRooms), wantRooms, numTimelineEventsPerRoom)) } // Create 20 rooms and send A,B,C into each. Then bump various rooms "live streamed" from v2 and ensure // the correct delta operations are sent e.g DELETE/INSERT/UPDATE. func TestTimelinesLiveStream(t *testing.T) { // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, "") defer v2.close() defer v3.close() // make 20 rooms, last room is most recent, and send A,B,C into each room allRooms := make([]roomEvents, 20) latestTimestamp := time.Now() for i := 0; i < len(allRooms); i++ { ts := time.Now().Add(time.Duration(i) * time.Minute) roomName := fmt.Sprintf("My Room %d", i) allRooms[i] = roomEvents{ roomID: fmt.Sprintf("!TestTimelinesLiveStream_%d:localhost", i), name: roomName, events: append(createRoomState(t, alice, ts), []json.RawMessage{ testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, testutils.WithTimestamp(ts.Add(3*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, testutils.WithTimestamp(ts.Add(4*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "B"}, testutils.WithTimestamp(ts.Add(5*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "C"}, testutils.WithTimestamp(ts.Add(6*time.Second))), }...), } if ts.After(latestTimestamp) { latestTimestamp = ts.Add(10 * time.Second) } } v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(allRooms...), }, }) numTimelineEventsPerRoom := 3 // send a live event in allRooms[i] (always 1s newer) bumpRoom := func(i int) { latestTimestamp = latestTimestamp.Add(1 * time.Second) ev := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": fmt.Sprintf("bump %d", i)}, testutils.WithTimestamp(latestTimestamp)) allRooms[i].events = append(allRooms[i].events, ev) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: allRooms[i].roomID, events: []json.RawMessage{ev}, }), }, }) v2.waitUntilEmpty(t, alice) } // most recent 4 rooms var wantRooms []roomEvents i := 0 for len(wantRooms) < 4 { wantRooms = append(wantRooms, allRooms[len(allRooms)-i-1]) i++ } // first request => rooms 19,18,17,16 res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, int64(len(wantRooms) - 1)}, // first N rooms }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: int64(numTimelineEventsPerRoom), }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(len(allRooms)), m.MatchV3Ops( m.MatchV3SyncOpFn(func(op *sync3.ResponseOpRange) error { if len(op.RoomIDs) != len(wantRooms) { return fmt.Errorf("want %d rooms, got %d", len(wantRooms), len(op.RoomIDs)) } for i := range wantRooms { err := wantRooms[i].MatchRoom(op.RoomIDs[i], res.Rooms[op.RoomIDs[i]], m.MatchRoomName(wantRooms[i].name), m.MatchRoomTimelineMostRecent(numTimelineEventsPerRoom, wantRooms[i].events), ) if err != nil { return err } } return nil }), ))) bumpRoom(7) // next request, DELETE 3; INSERT 0 7; res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, int64(len(wantRooms) - 1)}, // first N rooms }, // sticky remember the timeline_limit }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(len(allRooms)), m.MatchV3Ops( m.MatchV3DeleteOp(3), m.MatchV3InsertOp(0, allRooms[7].roomID), )), m.MatchRoomSubscription( allRooms[7].roomID, m.MatchRoomName(allRooms[7].name), m.MatchRoomTimelineMostRecent(numTimelineEventsPerRoom, allRooms[7].events), )) bumpRoom(7) // next request, UPDATE 0 7; res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, int64(len(wantRooms) - 1)}, // first N rooms }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(len(allRooms))), m.MatchNoV3Ops()) bumpRoom(18) // next request, DELETE 2; INSERT 0 18; res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, int64(len(wantRooms) - 1)}, // first N rooms }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(len(allRooms)), m.MatchV3Ops( m.MatchV3DeleteOp(2), m.MatchV3InsertOp(0, allRooms[18].roomID), )), m.MatchRoomSubscription(allRooms[18].roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{allRooms[18].events[len(allRooms[18].events)-1]}))) } func TestMultipleWindows(t *testing.T) { // setup code pqString := testutils.PrepareDBConnectionString() v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() // make 20 rooms, first room is most recent, and send A,B,C into each room allRooms := make([]roomEvents, 20) for i := 0; i < len(allRooms); i++ { ts := time.Now().Add(time.Duration(i) * -1 * time.Minute) roomName := fmt.Sprintf("My Room %d", i) allRooms[i] = roomEvents{ roomID: fmt.Sprintf("!TestMultipleWindows_%d:localhost", i), name: roomName, events: append(createRoomState(t, alice, ts), []json.RawMessage{ testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, testutils.WithTimestamp(ts.Add(3*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, testutils.WithTimestamp(ts.Add(4*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "B"}, testutils.WithTimestamp(ts.Add(5*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "C"}, testutils.WithTimestamp(ts.Add(6*time.Second))), }...), } } v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(allRooms...), }, }) numTimelineEventsPerRoom := 2 // request 3 windows res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 2}, // first 3 rooms [2]int64{10, 12}, // 3 rooms in the middle [2]int64{17, 19}, // last 3 rooms }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: int64(numTimelineEventsPerRoom), }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(len(allRooms)), m.MatchV3Ops( m.MatchV3SyncOp(0, 2, []string{allRooms[0].roomID, allRooms[1].roomID, allRooms[2].roomID}), m.MatchV3SyncOp(10, 12, []string{allRooms[10].roomID, allRooms[11].roomID, allRooms[12].roomID}), m.MatchV3SyncOp(17, 19, []string{allRooms[17].roomID, allRooms[18].roomID, allRooms[19].roomID}), ))) // bump room 18 to position 0 latestTimestamp := time.Now().Add(time.Hour) bumpRoom := func(i int) { latestTimestamp = latestTimestamp.Add(1 * time.Second) ev := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": fmt.Sprintf("bump %d", i)}, testutils.WithTimestamp(latestTimestamp)) allRooms[i].events = append(allRooms[i].events, ev) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: allRooms[i].roomID, events: []json.RawMessage{ev}, }), }, }) v2.waitUntilEmpty(t, alice) } bumpRoom(18) res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 2}, // first 3 rooms [2]int64{10, 12}, // 3 rooms in the middle [2]int64{17, 19}, // last 3 rooms }, }}, }) // Range A Range B Range C // _____ ________ ________ // 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //18 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 // DELETE 2 DELETE 12 DELETE 18 // INSERT 0,18 INSERT 10,9 INSERT 17,16 m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(len(allRooms)), m.MatchV3Ops( m.MatchV3DeleteOp(18), m.MatchV3InsertOp(17, allRooms[16].roomID), m.MatchV3DeleteOp(2), m.MatchV3InsertOp(0, allRooms[18].roomID), m.MatchV3DeleteOp(12), m.MatchV3InsertOp(10, allRooms[9].roomID), ), )) } func TestInitialFlag(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() roomID := "!a:localhost" v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: createRoomState(t, alice, time.Now()), }), }, }) res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 10, }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Ops( m.MatchV3SyncOp(0, 0, []string{roomID}), )), m.MatchRoomSubscription(roomID, m.MatchRoomInitial(true))) // send an update v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline( roomEvents{ roomID: roomID, events: []json.RawMessage{ testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{}), }, }, ), }, }) v2.waitUntilEmpty(t, alice) res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, }}, }) m.MatchResponse(t, res, m.MatchNoV3Ops(), m.MatchRoomSubscription(roomID, m.MatchRoomInitial(false))) } // Regression test for in-the-wild bug: // // ERR missing events in database! // ERR V2: failed to accumulate room error="failed to extract nids from inserted events, asked for 9 got 8" // // We should be able to gracefully handle duplicate events in the timeline. func TestDuplicateEventsInTimeline(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() roomID := "!a:localhost" dupeEvent := testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{}) v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, state: createRoomState(t, alice, time.Now()), events: []json.RawMessage{ testutils.NewStateEvent(t, "m.room.topic", "", alice, map[string]interface{}{}), dupeEvent, dupeEvent, }, }), }, }) res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 10, }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{roomID})), ), m.MatchRoomSubscription(roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{dupeEvent}))) } // Regression test for https://github.com/matrix-org/sliding-sync/commit/39d6e99f967e55b609f8ef8b4271c04ebb053d37 // Request a timeline_limit of 0 for the room list. Sometimes when a new event arrives it causes an // unrelated room to be sent to the client (e.g tracking rooms [5,10] and room 15 bumps to room 2, // causing all the rooms to shift so you're now actually tracking [4,9] - the client knows 5-9 but // room 4 is new, so you notify about that room and not the one which had a new event (room 15). // Ensure that room 4 is given to the client. In the past, this would panic when timeline limit = 0 // as the timeline was loaded using the timeline limit of the client, and an unchecked array access // into the timeline func TestTimelineMiddleWindowZeroTimelineLimit(t *testing.T) { // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, "") defer v2.close() defer v3.close() // make 20 rooms, first room is most recent, and send A,B,C into each room allRooms := make([]roomEvents, 20) for i := 0; i < len(allRooms); i++ { ts := time.Now().Add(-1 * time.Duration(i) * time.Minute) roomName := fmt.Sprintf("My Room %d", i) allRooms[i] = roomEvents{ roomID: fmt.Sprintf("!TestTimelineMiddleWindowZeroTimelineLimit_%d:localhost", i), name: roomName, events: append(createRoomState(t, alice, ts), []json.RawMessage{ testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, testutils.WithTimestamp(ts.Add(3*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, testutils.WithTimestamp(ts.Add(4*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "B"}, testutils.WithTimestamp(ts.Add(5*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "C"}, testutils.WithTimestamp(ts.Add(6*time.Second))), }...), } } v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(allRooms...), }, }) // Request rooms 5-10 with a 0 timeline limit res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{5, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 0, }, }}, }) wantRooms := allRooms[5:11] m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(len(allRooms)), m.MatchV3Ops( m.MatchV3SyncOpFn(func(op *sync3.ResponseOpRange) error { if len(op.RoomIDs) != len(wantRooms) { return fmt.Errorf("want %d rooms, got %d", len(wantRooms), len(op.RoomIDs)) } for i := range wantRooms { err := wantRooms[i].MatchRoom(op.RoomIDs[i], res.Rooms[op.RoomIDs[i]], m.MatchRoomName(wantRooms[i].name), m.MatchRoomTimeline(nil), ) if err != nil { return err } } return nil }), ))) // bump room 15 to 2 v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: allRooms[15].roomID, events: []json.RawMessage{ testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "bump"}), }, }), }, }) v2.waitUntilEmpty(t, alice) // should see room 4, the server should not panic res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{5, 10}, }, }}, }) m.MatchResponse(t, res, m.MatchLists(map[string][]m.ListMatcher{ "a": { m.MatchV3Count(len(allRooms)), m.MatchV3Ops( m.MatchV3DeleteOp(10), m.MatchV3InsertOp(5, allRooms[4].roomID), ), }, })) } // Regression test to ensure that the 'state' block NEVER appears when requesting a high timeline_limit. // In the past, the proxy treated state/timeline sections as the 'same' in that they were inserted into the // events table and had stream positions associated with them. This could cause ancient state events to appear // in the timeline if the timeline_limit was greatert than the number of genuine timeline events received via // v2 sync. func TestHistoryDoesntIncludeState(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() roomID := "!a:localhost" prevBatch := "P_B" room := roomEvents{ roomID: roomID, // these events should NEVER appear in the timeline state: createRoomState(t, alice, time.Now()), // these events are the timeline and should appear events: []json.RawMessage{ testutils.NewStateEvent(t, "m.room.topic", "", alice, map[string]interface{}{"topic": "boo"}), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hello"}), }, prevBatch: prevBatch, } v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(room), }, }) res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 10, }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{roomID})), ), m.MatchRoomSubscription(roomID, m.MatchRoomTimeline(room.events), m.MatchRoomPrevBatch(prevBatch))) } // Test that transaction IDs come down the user's stream correctly in the case where 2 clients are // in the same room. func TestTimelineTxnID(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() roomID := "!a:localhost" latestTimestamp := time.Now() // Alice and Bob are in the same room room := roomEvents{ roomID: roomID, events: append( createRoomState(t, alice, latestTimestamp), testutils.NewJoinEvent(t, bob), ), } v2.addAccount(t, alice, aliceToken) v2.addAccount(t, bob, bobToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(room), }, }) v2.queueResponse(bob, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(room), }, }) aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 2, }, }, }, }) bobRes := v3.mustDoV3Request(t, bobToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 2, }, }, }, }) // Alice has sent a message but it arrives down Bob's poller first, so it has no txn_id txnID := "m1234567890" newEvent := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hi"}, testutils.WithUnsigned(map[string]interface{}{ "transaction_id": txnID, })) newEventNoUnsigned, err := sjson.DeleteBytes(newEvent, "unsigned") if err != nil { t.Fatalf("failed to delete bytes: %s", err) } v2.queueResponse(bob, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: []json.RawMessage{newEventNoUnsigned}, }), }, }) v2.waitUntilEmpty(t, bob) // now it arrives down Alice's poller, but the event has already been persisted at this point! // We need a txn ID cache to remember it. v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: []json.RawMessage{newEvent}, }), }, }) v2.waitUntilEmpty(t, alice) // now Alice syncs, she should see the event with the txn ID aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, }, }, }) m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1)), m.MatchNoV3Ops(), m.MatchRoomSubscription( roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{newEvent}), )) // now Bob syncs, he should see the event without the txn ID bobRes = v3.mustDoV3RequestWithPos(t, bobToken, bobRes.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, }, }, }) m.MatchResponse(t, bobRes, m.MatchList("a", m.MatchV3Count(1)), m.MatchNoV3Ops(), m.MatchRoomSubscription( roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{newEventNoUnsigned}), )) } // TestTimelineTxnID checks that Alice sees her transaction_id if // - Bob's poller sees Alice's event, // - Alice's poller sees Alice's event with txn_id, and // - Alice syncs. // // This test is similar but not identical. It checks that Alice sees her transaction_id if // - Bob's poller sees Alice's event, // - Alice does an incremental sync, which should omit her event, // - Alice's poller sees Alice's event with txn_id, and // - Alice syncs, seeing her event with txn_id. func TestTimelineTxnIDBuffersForTxnID(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString, slidingsync.Opts{ // This needs to be greater than the request timeout, which is hardcoded to a // minimum of 100ms in connStateLive.liveUpdate. This ensures that the // liveUpdate call finishes before the TxnIDWaiter publishes the update, // meaning that Alice doesn't see her event before the txn ID is known. MaxTransactionIDDelay: 200 * time.Millisecond, }) defer v2.close() defer v3.close() roomID := "!a:localhost" latestTimestamp := time.Now() t.Log("Alice and Bob are in the same room") room := roomEvents{ roomID: roomID, events: append( createRoomState(t, alice, latestTimestamp), testutils.NewJoinEvent(t, bob), ), } v2.addAccount(t, alice, aliceToken) v2.addAccount(t, bob, bobToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(room), }, NextBatch: "alice_after_initial_poll", }) v2.queueResponse(bob, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(room), }, NextBatch: "bob_after_initial_poll", }) t.Log("Alice and Bob make initial sliding syncs.") aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 2, }, }, }, }) bobRes := v3.mustDoV3Request(t, bobToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 2, }, }, }, }) t.Log("Alice has sent a message... but it arrives down Bob's poller first, without a transaction_id") txnID := "m1234567890" newEvent := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hi"}, testutils.WithUnsigned(map[string]interface{}{ "transaction_id": txnID, })) newEventNoUnsigned, err := sjson.DeleteBytes(newEvent, "unsigned") if err != nil { t.Fatalf("failed to delete bytes: %s", err) } v2.queueResponse(bob, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: []json.RawMessage{newEventNoUnsigned}, }), }, }) t.Log("Bob's poller sees the message.") v2.waitUntilEmpty(t, bob) t.Log("Bob makes an incremental sliding sync") bobRes = v3.mustDoV3RequestWithPos(t, bobToken, bobRes.Pos, sync3.Request{}) t.Log("Bob should see the message without a transaction_id") m.MatchResponse(t, bobRes, m.MatchList("a", m.MatchV3Count(1)), m.MatchNoV3Ops(), m.MatchRoomSubscription( roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{newEventNoUnsigned}), )) t.Log("Alice requests an incremental sliding sync with no request changes.") aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) t.Log("Alice should see no messages.") m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(nil)) // Now the message arrives down Alice's poller. v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: []json.RawMessage{newEvent}, }), }, }) t.Log("Alice's poller sees the message with transaction_id.") v2.waitUntilEmpty(t, alice) t.Log("Alice makes another incremental sync request.") aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) t.Log("Alice's sync response includes the message with the txn ID.") m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1)), m.MatchNoV3Ops(), m.MatchRoomSubscription( roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{newEvent}), )) } // Similar to TestTimelineTxnIDBuffersForTxnID, this test checks: // - Bob's poller sees Alice's event, // - Alice does an incremental sync, which should omit her event, // - Alice's poller sees Alice's event without a txn_id, and // - Alice syncs, seeing her event without txn_id. // I.e. we're checking that the "all clear" empties out the buffer of events. func TestTimelineTxnIDRespectsAllClear(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString, slidingsync.Opts{ // This needs to be greater than the request timeout, which is hardcoded to a // minimum of 100ms in connStateLive.liveUpdate. This ensures that the // liveUpdate call finishes before the TxnIDWaiter publishes the update, // meaning that Alice doesn't see her event before the txn ID is known. MaxTransactionIDDelay: 200 * time.Millisecond, }) defer v2.close() defer v3.close() roomID := "!a:localhost" latestTimestamp := time.Now() t.Log("Alice and Bob are in the same room") room := roomEvents{ roomID: roomID, events: append( createRoomState(t, alice, latestTimestamp), testutils.NewJoinEvent(t, bob), ), } v2.addAccount(t, alice, aliceToken) v2.addAccount(t, bob, bobToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(room), }, NextBatch: "alice_after_initial_poll", }) v2.queueResponse(bob, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(room), }, NextBatch: "bob_after_initial_poll", }) t.Log("Alice and Bob make initial sliding syncs.") aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 2, }, }, }, }) bobRes := v3.mustDoV3Request(t, bobToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 2, }, }, }, }) t.Log("Alice has sent a message... but it arrives down Bob's poller first, without a transaction_id") newEventNoTxn := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hi"}) v2.queueResponse(bob, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: []json.RawMessage{newEventNoTxn}, }), }, }) t.Log("Bob's poller sees the message.") v2.waitUntilEmpty(t, bob) t.Log("Bob makes an incremental sliding sync") bobRes = v3.mustDoV3RequestWithPos(t, bobToken, bobRes.Pos, sync3.Request{}) t.Log("Bob should see the message without a transaction_id") m.MatchResponse(t, bobRes, m.MatchList("a", m.MatchV3Count(1)), m.MatchNoV3Ops(), m.MatchRoomSubscription( roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{newEventNoTxn}), )) t.Log("Alice requests an incremental sliding sync with no request changes.") aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) t.Log("Alice should see no messages.") m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(nil)) // Now the message arrives down Alice's poller. v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: []json.RawMessage{newEventNoTxn}, }), }, }) t.Log("Alice's poller sees the message without transaction_id.") v2.waitUntilEmpty(t, alice) t.Log("Alice makes another incremental sync request.") aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) t.Log("Alice's sync response includes the event without a txn ID.") m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1)), m.MatchNoV3Ops(), m.MatchRoomSubscription( roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{newEventNoTxn}), )) } // Executes a sync v3 request without a ?pos and asserts that the count, rooms and timeline events m.Match the inputs given. func testTimelineLoadInitialEvents(v3 *testV3Server, token string, count int, wantRooms []roomEvents, numTimelineEventsPerRoom int) func(t *testing.T) { return func(t *testing.T) { t.Helper() res := v3.mustDoV3Request(t, token, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, int64(len(wantRooms) - 1)}, // first N rooms }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: int64(numTimelineEventsPerRoom), }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(count), m.MatchV3Ops( m.MatchV3SyncOpFn(func(op *sync3.ResponseOpRange) error { if len(op.RoomIDs) != len(wantRooms) { return fmt.Errorf("want %d rooms, got %d", len(wantRooms), len(op.RoomIDs)) } for i := range wantRooms { err := wantRooms[i].MatchRoom(op.RoomIDs[i], res.Rooms[op.RoomIDs[i]], m.MatchRoomName(wantRooms[i].name), m.MatchRoomTimelineMostRecent(numTimelineEventsPerRoom, wantRooms[i].events), ) if err != nil { return err } } return nil }), ))) } } // Test that prev batch tokens appear correctly. // 1: When there is no newer prev_batch, none is present. // 2: When there is a newer prev_batch, it is present. func TestPrevBatchInTimeline(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() roomID := "!a:localhost" v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ prevBatch: "create", roomID: roomID, state: createRoomState(t, alice, time.Now()), events: []json.RawMessage{ testutils.NewStateEvent(t, "m.room.topic", "", alice, map[string]interface{}{}), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hello"}), }, }), }, }) res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 1, }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Ops( m.MatchV3SyncOp(0, 0, []string{roomID}), ), ), m.MatchRoomSubscription(roomID, m.MatchRoomPrevBatch(""))) // now make a newer prev_batch and try again v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ prevBatch: "newer", roomID: roomID, events: []json.RawMessage{ testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hello 2"}), }, }), }, }) v2.waitUntilEmpty(t, alice) testCases := []struct { timelineLimit int64 wantPrevBatch string }{ { timelineLimit: 1, wantPrevBatch: "newer", // the latest event m.Matches the start of the timeline for the new sync, so prev batches align }, { timelineLimit: 2, // the end of the timeline for the initial sync, we do not have a prev batch for this event. // we cannot return 'create' here else we will miss the topic event before this event // hence we return the cloest prev batch which is later than this event and hope clients can // deal with dupes. wantPrevBatch: "newer", }, { timelineLimit: 3, wantPrevBatch: "create", // the topic event, the start of the timeline for the initial sync, so prev batches align }, } for _, tc := range testCases { res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: tc.timelineLimit, }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Ops( m.MatchV3SyncOp(0, 0, []string{roomID}), ), ), m.MatchRoomSubscription(roomID, m.MatchRoomPrevBatch(tc.wantPrevBatch))) } } // Test that you can get a window with timeline_limit: 1, then increase the limit to 3 and get the // room timeline changes only (without any req_state or list ops sent). Likewise, do the same // but for required_state (initially empty, then set stuff and only get that) func TestTrickling(t *testing.T) { // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, "") defer v2.close() defer v3.close() // make 10 rooms, first room is most recent, and send A,B,C into each room allRooms := make([]roomEvents, 10) for i := 0; i < len(allRooms); i++ { ts := time.Now().Add(-1 * time.Duration(i) * time.Minute) roomName := fmt.Sprintf("My Room %d", i) allRooms[i] = roomEvents{ roomID: fmt.Sprintf("!TestTimelineTrickle_%d:localhost", i), name: roomName, events: append(createRoomState(t, alice, ts), []json.RawMessage{ testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, testutils.WithTimestamp(ts.Add(3*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, testutils.WithTimestamp(ts.Add(4*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "B"}, testutils.WithTimestamp(ts.Add(5*time.Second))), testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "C"}, testutils.WithTimestamp(ts.Add(6*time.Second))), }...), } } v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(allRooms...), }, }) // always request the top 3 rooms testCases := []struct { name string initialSub sync3.RoomSubscription nextSub sync3.RoomSubscription wantInitialSubs map[string][]m.RoomMatcher wantNextSubs map[string][]m.RoomMatcher }{ { name: "Timeline trickling", initialSub: sync3.RoomSubscription{ TimelineLimit: 1, RequiredState: [][2]string{{"m.room.create", ""}}, }, wantInitialSubs: map[string][]m.RoomMatcher{ allRooms[0].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[0].events[len(allRooms[0].events)-1]}), m.MatchRoomRequiredState([]json.RawMessage{allRooms[0].events[0]}), }, allRooms[1].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[1].events[len(allRooms[1].events)-1]}), m.MatchRoomRequiredState([]json.RawMessage{allRooms[1].events[0]}), }, allRooms[2].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[2].events[len(allRooms[2].events)-1]}), m.MatchRoomRequiredState([]json.RawMessage{allRooms[2].events[0]}), }, }, nextSub: sync3.RoomSubscription{ TimelineLimit: 3, RequiredState: [][2]string{{"m.room.create", ""}}, }, wantNextSubs: map[string][]m.RoomMatcher{ allRooms[0].roomID: { m.MatchRoomTimeline(allRooms[0].events[len(allRooms[0].events)-3:]), m.MatchRoomRequiredState(nil), }, allRooms[1].roomID: { m.MatchRoomTimeline(allRooms[1].events[len(allRooms[1].events)-3:]), m.MatchRoomRequiredState(nil), }, allRooms[2].roomID: { m.MatchRoomTimeline(allRooms[2].events[len(allRooms[2].events)-3:]), m.MatchRoomRequiredState(nil), }, }, }, { name: "Required State trickling", initialSub: sync3.RoomSubscription{ TimelineLimit: 1, RequiredState: [][2]string{{"m.room.create", ""}}, }, wantInitialSubs: map[string][]m.RoomMatcher{ allRooms[0].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[0].events[len(allRooms[0].events)-1]}), m.MatchRoomRequiredState([]json.RawMessage{allRooms[0].events[0]}), }, allRooms[1].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[1].events[len(allRooms[1].events)-1]}), m.MatchRoomRequiredState([]json.RawMessage{allRooms[1].events[0]}), }, allRooms[2].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[2].events[len(allRooms[2].events)-1]}), m.MatchRoomRequiredState([]json.RawMessage{allRooms[2].events[0]}), }, }, // now add in the room member event nextSub: sync3.RoomSubscription{ TimelineLimit: 1, RequiredState: [][2]string{{"m.room.create", ""}, {"m.room.member", alice}}, }, wantNextSubs: map[string][]m.RoomMatcher{ allRooms[0].roomID: { m.MatchRoomTimeline(nil), m.MatchRoomRequiredState([]json.RawMessage{allRooms[0].events[0], allRooms[0].events[1]}), }, allRooms[1].roomID: { m.MatchRoomTimeline(nil), m.MatchRoomRequiredState([]json.RawMessage{allRooms[1].events[0], allRooms[1].events[1]}), }, allRooms[2].roomID: { m.MatchRoomTimeline(nil), m.MatchRoomRequiredState([]json.RawMessage{allRooms[2].events[0], allRooms[2].events[1]}), }, }, }, { name: "Timeline trickling from 0", initialSub: sync3.RoomSubscription{ TimelineLimit: 0, }, wantInitialSubs: map[string][]m.RoomMatcher{ allRooms[0].roomID: { m.MatchRoomTimeline(nil), m.MatchRoomRequiredState(nil), }, allRooms[1].roomID: { m.MatchRoomTimeline(nil), m.MatchRoomRequiredState(nil), }, allRooms[2].roomID: { m.MatchRoomTimeline(nil), m.MatchRoomRequiredState(nil), }, }, nextSub: sync3.RoomSubscription{ TimelineLimit: 1, }, wantNextSubs: map[string][]m.RoomMatcher{ allRooms[0].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[0].events[len(allRooms[0].events)-1]}), m.MatchRoomRequiredState(nil), }, allRooms[1].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[1].events[len(allRooms[1].events)-1]}), m.MatchRoomRequiredState(nil), }, allRooms[2].roomID: { m.MatchRoomTimeline([]json.RawMessage{allRooms[2].events[len(allRooms[2].events)-1]}), m.MatchRoomRequiredState(nil), }, }, }, } for _, tc := range testCases { t.Logf(tc.name) // request top 3 rooms with initial subscription res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{ "a": { Ranges: [][2]int64{{0, 2}}, Sort: []string{sync3.SortByRecency}, RoomSubscription: tc.initialSub, }, }, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Ops(m.MatchV3SyncOp(0, 2, []string{allRooms[0].roomID, allRooms[1].roomID, allRooms[2].roomID}))), m.MatchRoomSubscriptionsStrict(tc.wantInitialSubs), ) // next request changes the subscription res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{ Lists: map[string]sync3.RequestList{ "a": { Ranges: [][2]int64{{0, 2}}, Sort: []string{sync3.SortByRecency}, RoomSubscription: tc.nextSub, }, }, }) // assert we got what we were expecting m.MatchResponse(t, res, m.MatchNoV3Ops(), m.MatchRoomSubscriptionsStrict(tc.wantNextSubs), ) } } func TestNumLiveBulk(t *testing.T) { v2 := runTestV2Server(t) v3 := runTestServer(t, v2, "") defer v2.close() defer v3.close() roomID := "!bulk:test" v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, state: createRoomState(t, alice, time.Now()), events: []json.RawMessage{ testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, testutils.WithTimestamp(time.Now().Add(time.Second))), }, }), }, }) // initial syncs -> no live events res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{ "the_list": { RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 3, RequiredState: [][2]string{ {"m.room.encryption", ""}, {"m.room.tombstone", ""}, }, }, Sort: []string{sync3.SortByRecency, sync3.SortByName}, Ranges: sync3.SliceRanges{{0, 1}}, }, }, }) m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict( map[string][]m.RoomMatcher{ roomID: { m.MatchNumLive(0), }, }, ), m.MatchList("the_list", m.MatchV3Count(1), m.MatchV3Ops( m.MatchV3SyncOp(0, 0, []string{roomID}), ))) // inject 10 events in batches of 2, 1, 3, 4 batchSizes := []int{2, 1, 3, 4} count := 0 var completeTimeline []json.RawMessage for _, sz := range batchSizes { var timeline []json.RawMessage for i := 0; i < sz; i++ { timeline = append(timeline, testutils.NewEvent( t, "m.room.message", alice, map[string]interface{}{"body": fmt.Sprintf("Msg %d", count)}, testutils.WithTimestamp(time.Now().Add(time.Minute*time.Duration(1+count))), )) count++ } v2.queueResponse(aliceToken, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, events: timeline, }), }, }) v2.waitUntilEmpty(t, aliceToken) completeTimeline = append(completeTimeline, timeline...) } res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{}) m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict( map[string][]m.RoomMatcher{ roomID: { m.MatchRoomTimeline(completeTimeline), m.MatchNumLive(10), }, }, )) } // Ensure that clients cannot just set timeline_limit: 99999 and DoS the server func TestSensibleLimitToTimelineLimit(t *testing.T) { pqString := testutils.PrepareDBConnectionString() // setup code v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() roomID := "!a:localhost" var hundredEvents = make([]json.RawMessage, 100) for i := 0; i < 100; i++ { hundredEvents[i] = testutils.NewEvent(t, "m.room.message", alice, map[string]any{ "msgtype": "m.text", "body": fmt.Sprintf("msg %d", i), }, testutils.WithTimestamp(time.Now().Add(time.Second))) } v2.addAccount(t, alice, aliceToken) v2.queueResponse(alice, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: v2JoinTimeline(roomEvents{ roomID: roomID, state: createRoomState(t, alice, time.Now()), events: hundredEvents, }), }, }) res := v3.mustDoV3Request(t, aliceToken, sync3.Request{ Lists: map[string]sync3.RequestList{"a": { Ranges: sync3.SliceRanges{ [2]int64{0, 10}, }, RoomSubscription: sync3.RoomSubscription{ TimelineLimit: 99999, }, }}, }) m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{roomID})), ), m.MatchRoomSubscription(roomID, m.MatchRoomTimeline(hundredEvents[50:]))) // caps at 50 } // Regression test for a thing which Synapse can sometimes send down sync v2. // See https://github.com/matrix-org/sliding-sync/issues/367 // This would cause this room to not be processed at all, which is bad. func TestSeeCreateEvent(t *testing.T) { // setup code pqString := testutils.PrepareDBConnectionString() db, err := sqlx.Open("postgres", pqString) if err != nil { t.Fatalf("failed to open postgres: %s", err) } v2 := runTestV2Server(t) v3 := runTestServer(t, v2, pqString) defer v2.close() defer v3.close() roomID := "!TestSeeCreateEvent:localhost" userID := "@TestSeeCreateEvent:localhost" token := "TestSeeCreateEvent_TOKEN" v2.addAccount(t, userID, token) v2.queueResponse(userID, sync2.SyncResponse{ Rooms: sync2.SyncRoomsResponse{ Join: map[string]sync2.SyncV2JoinResponse{ roomID: { State: sync2.EventsResponse{ Events: []json.RawMessage{ testutils.NewStateEvent(t, "m.room.join_rules", "", "@someone:somewhere", map[string]interface{}{"join_rule": "invite"}), testutils.NewStateEvent(t, "m.room.power_levels", "", "@someone:somewhere", map[string]interface{}{"users_default": 0}), testutils.NewStateEvent(t, "m.room.history_visibility", "", "@someone:somewhere", map[string]interface{}{"history_visibility": "shared"}), }, }, Timeline: sync2.TimelineResponse{ Events: []json.RawMessage{ testutils.NewStateEvent(t, "m.room.create", "", "@someone:somewhere", map[string]interface{}{"room_version": "10"}), testutils.NewJoinEvent(t, "@someone:somewhere"), testutils.NewJoinEvent(t, "@someone2:somewhere"), }, }, }, }, }, }) v3.mustDoV3Request(t, token, sync3.Request{}) // ensure the room exists roomsTable := state.NewRoomsTable(db) err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error { nids, err := roomsTable.LatestNIDs(txn, []string{roomID}) if err != nil { t.Fatalf("LatestNIDs: %s", err) } if len(nids) != 1 { t.Fatalf("LatestNIDs missing: %+v", nids) } nid, ok := nids[roomID] if !ok { t.Fatalf("LatestNIDs missing room %s : %+v", roomID, nids) } if nid == 0 { t.Fatalf("LatestNIDs 0 nid for room %s", roomID) } return nil }) if err != nil { t.Fatalf("WithTransaction: %s", err) } }