From a25b2ee39d3fcf959ba3f5180c944198071d6e82 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 14 Nov 2023 13:56:06 +0000 Subject: [PATCH 1/5] fix avatar updates; add e2e test for rejoining --- state/storage.go | 8 +- tests-e2e/client_test.go | 6 +- tests-e2e/state_updates_test.go | 238 ++++++++++++++++++++++++++++++++ 3 files changed, 248 insertions(+), 4 deletions(-) create mode 100644 tests-e2e/state_updates_test.go diff --git a/state/storage.go b/state/storage.go index 0f24379..b2cbcc8 100644 --- a/state/storage.go +++ b/state/storage.go @@ -316,7 +316,7 @@ func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error { FROM syncv3_events JOIN snapshot ON ( event_nid = ANY (ARRAY_CAT(events, membership_events)) ) - WHERE (event_type IN ('m.room.name', 'm.room.avatar', 'm.room.canonical_alias') AND state_key = '') + WHERE (event_type IN ('m.room.name', 'm.room.avatar', 'm.room.canonical_alias', 'm.room.encryption') AND state_key = '') OR (event_type = 'm.room.member' AND membership IN ('join', '_join', 'invite', '_invite')) ORDER BY event_nid ASC ;`, metadata.RoomID) @@ -334,9 +334,11 @@ func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error { case "m.room.name": metadata.NameEvent = gjson.GetBytes(ev.JSON, "content.name").Str case "m.room.avatar": - metadata.AvatarEvent = gjson.GetBytes(ev.JSON, "content.avatar_url").Str + metadata.AvatarEvent = gjson.GetBytes(ev.JSON, "content.url").Str case "m.room.canonical_alias": metadata.CanonicalAlias = gjson.GetBytes(ev.JSON, "content.alias").Str + case "m.room.encryption": + metadata.Encrypted = true case "m.room.member": heroMemberships.append(&events[i]) switch ev.Membership { @@ -365,7 +367,7 @@ func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error { metadata.Heroes = append(metadata.Heroes, hero) } - // For now, don't bother reloading Encrypted, PredecessorID and UpgradedRoomID. + // For now, don't bother reloading PredecessorID and UpgradedRoomID. // These shouldn't be changing during a room's lifetime in normal operation. // We haven't updated LatestEventsByType because that's not part of the timeline. diff --git a/tests-e2e/client_test.go b/tests-e2e/client_test.go index c8724a0..9ca4839 100644 --- a/tests-e2e/client_test.go +++ b/tests-e2e/client_test.go @@ -62,6 +62,9 @@ func (c *CSAPI) Scrollback(t *testing.T, roomID, prevBatch string, limit int) gj func (c *CSAPI) SlidingSync(t *testing.T, data sync3.Request, opts ...client.RequestOpt) (resBody *sync3.Response) { t.Helper() res := c.DoSlidingSync(t, data, opts...) + if res.StatusCode != 200 { + t.Fatalf("SlidingSync returned %v", res.Status) + } body := client.ParseJSON(t, res) if err := json.Unmarshal(body, &resBody); err != nil { t.Fatalf("failed to unmarshal response: %v", err) @@ -194,7 +197,8 @@ func (c *CSAPI) SlidingSyncUntilEvent(t *testing.T, pos string, data sync3.Reque return nil } } - return fmt.Errorf("found room %s but missing event", roomID) + b, _ := json.Marshal(room.Timeline) + return fmt.Errorf("found room %s but missing event, timeline=%v", roomID, string(b)) }) } diff --git a/tests-e2e/state_updates_test.go b/tests-e2e/state_updates_test.go new file mode 100644 index 0000000..d06ca41 --- /dev/null +++ b/tests-e2e/state_updates_test.go @@ -0,0 +1,238 @@ +package syncv3_test + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/matrix-org/complement/b" + "github.com/matrix-org/complement/client" + "github.com/matrix-org/sliding-sync/sync3" + "github.com/matrix-org/sliding-sync/testutils/m" +) + +// The purpose of this test is to check that if Alice is in the room and is using the proxy, and then +// she leaves the room so no _proxy user_ is in the room, and events are sent whilst she is gone, that +// upon rejoining the proxy has A) the correct latest state and B) the correct latest timeline, such that +// the prev_batch token is correct to get earlier messages. +func TestRejoining(t *testing.T) { + alice := registerNamedUser(t, "alice") + bob := registerNamedUser(t, "bob") + charlie := registerNamedUser(t, "charlie") + doris := registerNamedUser(t, "doris") + + // public chat => shared history visibility + roomID := bob.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + alice.JoinRoom(t, roomID, nil) + sendMessage(t, alice, roomID, "first message") + firstTopicEventID := sendTopic(t, bob, roomID, "First Topic") + + // alice is the proxy user + res := alice.SlidingSync(t, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 5, + RequiredState: [][2]string{{"*", "*"}}, // all state + }, + Ranges: sync3.SliceRanges{{0, 20}}, + BumpEventTypes: []string{"m.room.message"}, + }, + }, + }) + m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict( + map[string][]m.RoomMatcher{ + roomID: { + m.MatchJoinCount(2), + m.MatchRoomName(bob.Localpart), + // despite asking for 5 timeline events, we only get 1 on the initial sync so prev_batch is correct + MatchRoomTimeline([]Event{ + { + ID: firstTopicEventID, + }, + }), + }, + }, + )) + + // alice leaves the room => no proxy user in the room anymore! + alice.MustLeaveRoom(t, roomID) + res = alice.SlidingSyncUntilEvent(t, res.Pos, sync3.Request{}, roomID, Event{ + Content: map[string]interface{}{ + "membership": "leave", + }, + Type: "m.room.member", + StateKey: &alice.UserID, + Sender: alice.UserID, + }) + + // now a few things happen: + // - charlie joins the room (ensures joins are reflected) + // - bob leaves the room (ensures leaves are reflected) + // - doris is invited to the room (ensures invites are reflected) + // - the topic is changed (ensures state updates) + // - the room avatar is set (ensure avatar field updates and new state is added) + // - 50 messages are sent (forcing a gappy sync) + charlie.MustJoinRoom(t, roomID, []string{"hs1"}) + bob.MustInviteRoom(t, roomID, doris.UserID) + newTopic := "New Topic Name" + newAvatar := "mxc://example.org/JWEIFJgwEIhweiWJE" + sendTopic(t, bob, roomID, newTopic) + sendAvatar(t, bob, roomID, newAvatar) + bob.MustLeaveRoom(t, roomID) + messageEventIDs := make([]string, 0, 50) + for i := 0; i < 50; i++ { + messageEventIDs = append(messageEventIDs, sendMessage(t, charlie, roomID, fmt.Sprintf("message %d", i))) + } + + // alice rejoins the room. + alice.MustJoinRoom(t, roomID, []string{"hs1"}) + + // we should get a 400 expired connection + start := time.Now() + for { + httpRes := alice.DoSlidingSync(t, sync3.Request{}, WithPos(res.Pos)) + if httpRes.StatusCode == 400 { + t.Logf("connection expired") + break + } + if time.Since(start) > 5*time.Second { + t.Fatalf("did not get expired connection on rejoin") + } + body := client.ParseJSON(t, httpRes) + if err := json.Unmarshal(body, &res); err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + time.Sleep(100 * time.Millisecond) + } + + aliceJoin := Event{ + Content: map[string]interface{}{ + "displayname": alice.Localpart, + "membership": "join", + }, + Type: "m.room.member", + StateKey: &alice.UserID, + Sender: alice.UserID, + } + // ensure state is correct: new connection + res = alice.SlidingSyncUntilEvent(t, "", sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 5, + RequiredState: [][2]string{{"*", "*"}}, // all state + }, + Ranges: sync3.SliceRanges{{0, 20}}, + BumpEventTypes: []string{"m.room.message"}, + }, + }, + }, roomID, aliceJoin) + m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + roomID: { + m.MatchInviteCount(1), // doris + m.MatchJoinCount(2), // alice and charlie + MatchRoomTimeline([]Event{aliceJoin}), + m.MatchRoomAvatar(newAvatar), + MatchRoomRequiredState([]Event{ + { + Type: "m.room.topic", + StateKey: b.Ptr(""), + Content: map[string]interface{}{ + "topic": newTopic, + }, + }, + { + Type: "m.room.avatar", + StateKey: b.Ptr(""), + Content: map[string]interface{}{ + "url": newAvatar, + }, + }, + aliceJoin, + { + Content: map[string]interface{}{ + "displayname": charlie.Localpart, + "membership": "join", + }, + Type: "m.room.member", + StateKey: &charlie.UserID, + Sender: charlie.UserID, + }, + { + Content: map[string]interface{}{ + "membership": "invite", + "displayname": doris.Localpart, + }, + Type: "m.room.member", + StateKey: &doris.UserID, + Sender: bob.UserID, + }, + { + Content: map[string]interface{}{ + "membership": "leave", + }, + Type: "m.room.member", + StateKey: &bob.UserID, + Sender: bob.UserID, + }, + }), + }, + })) + + /* + // pull out the prev_batch and check we can backpaginate correctly + prevBatch := res.Rooms[roomID].PrevBatch + must.NotEqual(t, prevBatch, "", "missing prev_batch") + numScrollbackItems := 10 + scrollback := alice.Scrollback(t, roomID, prevBatch, numScrollbackItems) + chunk := scrollback.Get("chunk").Array() + var sbEvents []json.RawMessage + for _, e := range chunk { + sbEvents = append(sbEvents, json.RawMessage(e.Raw)) + } + must.Equal(t, len(chunk), 10, "chunk length mismatch") + var wantTimeline []Event + for i := 0; i < numScrollbackItems; i++ { + wantTimeline = append(wantTimeline, Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": fmt.Sprintf("message %d", 40+i), // 40-49 + }, + }) + } + must.NotError(t, "chunk mismatch", eventsEqual(wantTimeline, sbEvents)) */ + +} + +func sendMessage(t *testing.T, client *CSAPI, roomID, text string) (eventID string) { + return client.Unsafe_SendEventUnsynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": text, + }, + }) +} + +func sendTopic(t *testing.T, client *CSAPI, roomID, text string) (eventID string) { + return client.Unsafe_SendEventUnsynced(t, roomID, b.Event{ + Type: "m.room.topic", + StateKey: b.Ptr(""), + Content: map[string]interface{}{ + "topic": text, + }, + }) +} + +func sendAvatar(t *testing.T, client *CSAPI, roomID, mxcURI string) (eventID string) { + return client.Unsafe_SendEventUnsynced(t, roomID, b.Event{ + Type: "m.room.avatar", + StateKey: b.Ptr(""), + Content: map[string]interface{}{ + "url": mxcURI, + }, + }) +} From c0b8fa05b85f46c4efece1c581e9c8de67453c99 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 14 Nov 2023 14:48:39 +0000 Subject: [PATCH 2/5] Test prev batches and set prev_batch on live timeline events --- sync3/caches/user.go | 12 +++++++ sync3/handler/connstate_live.go | 8 ++++- tests-e2e/state_updates_test.go | 59 ++++++++++++++++++++------------- 3 files changed, 55 insertions(+), 24 deletions(-) diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 0db8657..8dbdbc3 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -7,8 +7,10 @@ import ( "sync" "github.com/getsentry/sentry-go" + "github.com/jmoiron/sqlx" "github.com/matrix-org/sliding-sync/internal" + "github.com/matrix-org/sliding-sync/sqlutil" "github.com/matrix-org/sliding-sync/state" "github.com/tidwall/gjson" "github.com/tidwall/sjson" @@ -415,6 +417,16 @@ func (c *UserCache) Invites() map[string]UserRoomData { return invites } +// AttemptToFetchPrevBatch tries to find a prev_batch value for the given event. This may not always succeed. +func (c *UserCache) AttemptToFetchPrevBatch(roomID string, firstTimelineEvent *EventData) (prevBatch string) { + var err error + sqlutil.WithTransaction(c.store.DB, func(txn *sqlx.Tx) error { + prevBatch, err = c.store.EventsTable.SelectClosestPrevBatch(txn, roomID, firstTimelineEvent.NID) + return err + }) + return +} + // AnnotateWithTransactionIDs should be called just prior to returning events to the client. This // will modify the events to insert the correct transaction IDs if needed. This is required because // events are globally scoped, so if Alice sends a message, Bob might receive it first on his v2 loop diff --git a/sync3/handler/connstate_live.go b/sync3/handler/connstate_live.go index b5e0832..262c10c 100644 --- a/sync3/handler/connstate_live.go +++ b/sync3/handler/connstate_live.go @@ -234,6 +234,13 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, roomIDtoTimeline := s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, map[string][]json.RawMessage{ roomEventUpdate.RoomID(): {roomEventUpdate.EventData.Event}, }) + if len(r.Timeline) == 0 && r.PrevBatch == "" { + // attempt to fill in the prev_batch value for this room + prevBatch := s.userCache.AttemptToFetchPrevBatch(roomEventUpdate.RoomID(), roomEventUpdate.EventData) + if prevBatch != "" { + r.PrevBatch = prevBatch + } + } r.Timeline = append(r.Timeline, roomIDtoTimeline[roomEventUpdate.RoomID()]...) roomID := roomEventUpdate.RoomID() sender := roomEventUpdate.EventData.Sender @@ -278,7 +285,6 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, if delta.JoinCountChanged { thisRoom.JoinedCount = roomUpdate.GlobalRoomMetadata().JoinCount } - response.Rooms[roomUpdate.RoomID()] = thisRoom } if delta.HighlightCountChanged || delta.NotificationCountChanged { diff --git a/tests-e2e/state_updates_test.go b/tests-e2e/state_updates_test.go index d06ca41..7a0c7b0 100644 --- a/tests-e2e/state_updates_test.go +++ b/tests-e2e/state_updates_test.go @@ -6,8 +6,11 @@ import ( "testing" "time" + "golang.org/x/exp/slices" + "github.com/matrix-org/complement/b" "github.com/matrix-org/complement/client" + "github.com/matrix-org/complement/must" "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/testutils/m" ) @@ -181,29 +184,39 @@ func TestRejoining(t *testing.T) { }, })) - /* - // pull out the prev_batch and check we can backpaginate correctly - prevBatch := res.Rooms[roomID].PrevBatch - must.NotEqual(t, prevBatch, "", "missing prev_batch") - numScrollbackItems := 10 - scrollback := alice.Scrollback(t, roomID, prevBatch, numScrollbackItems) - chunk := scrollback.Get("chunk").Array() - var sbEvents []json.RawMessage - for _, e := range chunk { - sbEvents = append(sbEvents, json.RawMessage(e.Raw)) - } - must.Equal(t, len(chunk), 10, "chunk length mismatch") - var wantTimeline []Event - for i := 0; i < numScrollbackItems; i++ { - wantTimeline = append(wantTimeline, Event{ - Type: "m.room.message", - Content: map[string]interface{}{ - "msgtype": "m.text", - "body": fmt.Sprintf("message %d", 40+i), // 40-49 - }, - }) - } - must.NotError(t, "chunk mismatch", eventsEqual(wantTimeline, sbEvents)) */ + // This response will not return a prev_batch token for the rejoin because the poller has already started, + // so will be using a timeline limit of 50. This means the prev_batch from sync v2 will be for the N-50th + // event, and SS wants to return a prev_batch for N-1th event (the join), but cannot. Subsequent events + // will have a prev_batch though, so trigger one now. This also conveniently makes sure that alice is joined + // to the room still. + eventID := sendMessage(t, alice, roomID, "give me a prev batch please") + res = alice.SlidingSyncUntilEvent(t, res.Pos, sync3.Request{}, roomID, Event{ID: eventID}) + m.MatchResponse(t, res, m.LogResponse(t)) + + // pull out the prev_batch and check we can backpaginate correctly + prevBatch := res.Rooms[roomID].PrevBatch + must.NotEqual(t, prevBatch, "", "missing prev_batch") + numScrollbackItems := 10 + scrollback := alice.Scrollback(t, roomID, prevBatch, numScrollbackItems) + chunk := scrollback.Get("chunk").Array() + var sbEvents []json.RawMessage + for _, e := range chunk { + sbEvents = append(sbEvents, json.RawMessage(e.Raw)) + } + must.Equal(t, len(chunk), 10, "chunk length mismatch") + var wantTimeline []Event + for i := 0; i < (numScrollbackItems - 1); i++ { + wantTimeline = append(wantTimeline, Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": fmt.Sprintf("message %d", 41+i), // 40-49 + }, + }) + } + wantTimeline = append(wantTimeline, aliceJoin) + slices.Reverse(wantTimeline) // /messages returns in reverse chronological order + must.NotError(t, "chunk mismatch", eventsEqual(wantTimeline, sbEvents)) } From 62b3662e2959c9e5207138e462235ce916a8c125 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 14 Nov 2023 16:12:04 +0000 Subject: [PATCH 3/5] Interface out store functions from UserCache to make tests happier --- state/storage.go | 10 ++++++++++ sync3/caches/user.go | 19 +++++++++---------- sync3/handler/connstate_test.go | 20 +++++++++++++++----- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/state/storage.go b/state/storage.go index b2cbcc8..eefbeac 100644 --- a/state/storage.go +++ b/state/storage.go @@ -753,6 +753,16 @@ func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, return result, err } +func (s *Storage) GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string) { + var err error + sqlutil.WithTransaction(s.DB, func(txn *sqlx.Tx) error { + // discard the error, we don't care if we fail as it's best effort + prevBatch, err = s.EventsTable.SelectClosestPrevBatch(txn, roomID, eventNID) + return err + }) + return +} + // visibleEventNIDsBetweenForRooms determines which events a given user has permission to see. // It accepts a nid range [from, to]. For each given room, it calculates the NID range // [A1, B1] within [from, to] in which the user has permission to see events. diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 8dbdbc3..5cdd0f1 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -7,10 +7,8 @@ import ( "sync" "github.com/getsentry/sentry-go" - "github.com/jmoiron/sqlx" "github.com/matrix-org/sliding-sync/internal" - "github.com/matrix-org/sliding-sync/sqlutil" "github.com/matrix-org/sliding-sync/state" "github.com/tidwall/gjson" "github.com/tidwall/sjson" @@ -177,6 +175,12 @@ type UserCacheListener interface { OnUpdate(ctx context.Context, up Update) } +// Subset of store functions used by the user cache +type UserCacheStore interface { + LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*state.LatestEvents, error) + GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string) +} + // Tracks data specific to a given user. Specifically, this is the map of room ID to UserRoomData. // This data is user-scoped, not global or connection scoped. type UserCache struct { @@ -187,14 +191,14 @@ type UserCache struct { listeners map[int]UserCacheListener listenersMu *sync.RWMutex id int - store *state.Storage + store UserCacheStore globalCache *GlobalCache txnIDs TransactionIDFetcher ignoredUsers map[string]struct{} ignoredUsersMu *sync.RWMutex } -func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage, txnIDs TransactionIDFetcher) *UserCache { +func NewUserCache(userID string, globalCache *GlobalCache, store UserCacheStore, txnIDs TransactionIDFetcher) *UserCache { // see SyncLiveHandler.userCache for the initialisation proper, which works by // firing off a bunch of OnBlahBlah callbacks. uc := &UserCache{ @@ -419,12 +423,7 @@ func (c *UserCache) Invites() map[string]UserRoomData { // AttemptToFetchPrevBatch tries to find a prev_batch value for the given event. This may not always succeed. func (c *UserCache) AttemptToFetchPrevBatch(roomID string, firstTimelineEvent *EventData) (prevBatch string) { - var err error - sqlutil.WithTransaction(c.store.DB, func(txn *sqlx.Tx) error { - prevBatch, err = c.store.EventsTable.SelectClosestPrevBatch(txn, roomID, firstTimelineEvent.NID) - return err - }) - return + return c.store.GetClosestPrevBatch(roomID, firstTimelineEvent.NID) } // AnnotateWithTransactionIDs should be called just prior to returning events to the client. This diff --git a/sync3/handler/connstate_test.go b/sync3/handler/connstate_test.go index d1c8ffd..c5b4f11 100644 --- a/sync3/handler/connstate_test.go +++ b/sync3/handler/connstate_test.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" - "github.com/matrix-org/sliding-sync/state" "reflect" "testing" "time" + "github.com/matrix-org/sliding-sync/state" + "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/sync3" @@ -26,6 +27,15 @@ func (h *NopExtensionHandler) Handle(ctx context.Context, req extensions.Request func (h *NopExtensionHandler) HandleLiveUpdate(ctx context.Context, update caches.Update, req extensions.Request, res *extensions.Response, extCtx extensions.Context) { } +type NopUserCacheStore struct{} + +func (s *NopUserCacheStore) GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string) { + return +} +func (s *NopUserCacheStore) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*state.LatestEvents, error) { + return nil, nil +} + type NopJoinTracker struct{} func (t *NopJoinTracker) IsUserJoined(userID, roomID string) bool { @@ -96,7 +106,7 @@ func TestConnStateInitial(t *testing.T) { roomC.RoomID: {NID: 780, Timestamp: 789}, }, nil, nil } - userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) + userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}) dispatcher.Register(context.Background(), userCache.UserID, userCache) dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache) userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents { @@ -269,7 +279,7 @@ func TestConnStateMultipleRanges(t *testing.T) { } return 1, roomMetadata, joinTimings, nil, nil } - userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) + userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}) userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride dispatcher.Register(context.Background(), userCache.UserID, userCache) dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache) @@ -448,7 +458,7 @@ func TestBumpToOutsideRange(t *testing.T) { }, nil, nil } - userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) + userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}) userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride dispatcher.Register(context.Background(), userCache.UserID, userCache) dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache) @@ -551,7 +561,7 @@ func TestConnStateRoomSubscriptions(t *testing.T) { roomD.RoomID: {NID: 4, Timestamp: 4}, }, nil, nil } - userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) + userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}) userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents { result := make(map[string]state.LatestEvents) for _, roomID := range roomIDs { From 079fc9cb75cda17828cbe545330a24182b4b7bb1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 16 Nov 2023 10:08:35 +0000 Subject: [PATCH 4/5] Review comments --- tests-e2e/state_updates_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests-e2e/state_updates_test.go b/tests-e2e/state_updates_test.go index 7a0c7b0..909bab7 100644 --- a/tests-e2e/state_updates_test.go +++ b/tests-e2e/state_updates_test.go @@ -49,7 +49,10 @@ func TestRejoining(t *testing.T) { roomID: { m.MatchJoinCount(2), m.MatchRoomName(bob.Localpart), - // despite asking for 5 timeline events, we only get 1 on the initial sync so prev_batch is correct + // The sliding sync request we just did was the first ever SS request for Alice. + // This means the proxy will do an initial v2 request. + // Despite asking for 5 timeline events, we only get 1 on the initial sync due to how the sync v2 filters are setup. + // They are set up that way to ensure that prev_batch is always valid, when it is returned. MatchRoomTimeline([]Event{ { ID: firstTopicEventID, @@ -136,6 +139,9 @@ func TestRejoining(t *testing.T) { roomID: { m.MatchInviteCount(1), // doris m.MatchJoinCount(2), // alice and charlie + // Note: we only get 1 timeline event here because the proxy treats all rooms + // as having history visibility = joined, so the join event is the earliest + // thing she can see. MatchRoomTimeline([]Event{aliceJoin}), m.MatchRoomAvatar(newAvatar), MatchRoomRequiredState([]Event{ From b58b9ef48e508864807791d9528ea11462c694bf Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 16 Nov 2023 10:34:00 +0000 Subject: [PATCH 5/5] Also check state is correct when other users use the proxy --- tests-e2e/state_updates_test.go | 90 ++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/tests-e2e/state_updates_test.go b/tests-e2e/state_updates_test.go index 909bab7..f65c2c9 100644 --- a/tests-e2e/state_updates_test.go +++ b/tests-e2e/state_updates_test.go @@ -87,9 +87,8 @@ func TestRejoining(t *testing.T) { sendTopic(t, bob, roomID, newTopic) sendAvatar(t, bob, roomID, newAvatar) bob.MustLeaveRoom(t, roomID) - messageEventIDs := make([]string, 0, 50) for i := 0; i < 50; i++ { - messageEventIDs = append(messageEventIDs, sendMessage(t, charlie, roomID, fmt.Sprintf("message %d", i))) + sendMessage(t, charlie, roomID, fmt.Sprintf("message %d", i)) } // alice rejoins the room. @@ -224,6 +223,93 @@ func TestRejoining(t *testing.T) { slices.Reverse(wantTimeline) // /messages returns in reverse chronological order must.NotError(t, "chunk mismatch", eventsEqual(wantTimeline, sbEvents)) + // charlie starts using the proxy, and he never had before. We should see him as joined + // even though that happened in a "gappy sync". This is super important because charlie's + // initial sync will include his join BUT we have already processed it so it'll be ignored, + // meaning we rely entirely on alice's gappy sync to know charlie is joined. + res = charlie.SlidingSync(t, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 5, + RequiredState: [][2]string{{"*", "*"}}, // all state + }, + Ranges: sync3.SliceRanges{{0, 20}}, + BumpEventTypes: []string{"m.room.message"}, + }, + }, + }) + m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + roomID: { + // charlie is allowed to see everything up to his join, so he sees alice's join + // AND some earlier events + MatchRoomTimeline([]Event{ + { + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "message 47", + }, + }, + { + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "message 48", + }, + }, + { + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "message 49", + }, + }, + aliceJoin, + {ID: eventID}, // give me a prev batch message + }), + m.MatchInviteCount(1), // doris + m.MatchJoinCount(2), // alice and charlie + m.MatchRoomAvatar(newAvatar), + }, + })) + + // doris starts using the proxy, and she never had before. We should see her as invited + // even though that happened in a "gappy sync". This is less interesting as the initial + // sync will include the invite_state, but still worth checking. + res = doris.SlidingSync(t, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 5, + RequiredState: [][2]string{{"*", "*"}}, // all state + }, + Ranges: sync3.SliceRanges{{0, 20}}, + BumpEventTypes: []string{"m.room.message"}, + }, + }, + }) + m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + roomID: { + m.MatchRoomHasInviteState(), + }, + })) + + // bob starts using the proxy, and he never had before. We should NOT see this room + // as he left. + res = bob.SlidingSync(t, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 5, + RequiredState: [][2]string{{"*", "*"}}, // all state + }, + Ranges: sync3.SliceRanges{{0, 20}}, + BumpEventTypes: []string{"m.room.message"}, + }, + }, + }) + m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{})) } func sendMessage(t *testing.T, client *CSAPI, roomID, text string) (eventID string) {