diff --git a/sync3/handler/connstate_live.go b/sync3/handler/connstate_live.go index 648f650..777289c 100644 --- a/sync3/handler/connstate_live.go +++ b/sync3/handler/connstate_live.go @@ -81,35 +81,45 @@ func (s *connStateLive) liveUpdate( return case update := <-s.updates: internal.Logf(ctx, "liveUpdate", "process live update") - - s.processLiveUpdate(ctx, update, response) - // pass event to extensions AFTER processing - roomIDsToLists := s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists) - s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{ - IsInitial: false, - RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(), - UserID: s.userID, - DeviceID: s.deviceID, - RoomIDsToLists: roomIDsToLists, - }) + s.processUpdate(ctx, update, response, ex) // if there's more updates and we don't have lots stacked up already, go ahead and process another for len(s.updates) > 0 && response.ListOps() < 50 { update = <-s.updates - s.processLiveUpdate(ctx, update, response) - s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{ - IsInitial: false, - RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(), - UserID: s.userID, - DeviceID: s.deviceID, - RoomIDsToLists: roomIDsToLists, - }) + s.processUpdate(ctx, update, response, ex) } } } + + // If a client constantly changes their request params in every request they make, we will never consume from + // the update channel as the response will always have data already. In an effort to prevent starvation of new + // data, we will process some updates even though we have data already, but only if A) we didn't live stream + // due to natural circumstances, B) it isn't an initial request and C) there is in fact some data there. + numQueuedUpdates := len(s.updates) + if !hasLiveStreamed && !isInitial && numQueuedUpdates > 0 { + for i := 0; i < numQueuedUpdates; i++ { + update := <-s.updates + s.processUpdate(ctx, update, response, ex) + } + log.Debug().Int("num_queued", numQueuedUpdates).Msg("liveUpdate: caught up") + } + log.Trace().Bool("live_streamed", hasLiveStreamed).Msg("liveUpdate: returning") // TODO: op consolidation } +func (s *connStateLive) processUpdate(ctx context.Context, update caches.Update, response *sync3.Response, ex extensions.Request) { + s.processLiveUpdate(ctx, update, response) + // pass event to extensions AFTER processing + roomIDsToLists := s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists) + s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{ + IsInitial: false, + RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(), + UserID: s.userID, + DeviceID: s.deviceID, + RoomIDsToLists: roomIDsToLists, + }) +} + func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, response *sync3.Response) bool { internal.AssertWithContext(ctx, "processLiveUpdate: response list length != internal list length", s.lists.Len() == len(response.Lists)) internal.AssertWithContext(ctx, "processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists)) diff --git a/tests-e2e/num_live_test.go b/tests-e2e/num_live_test.go index 5a47a02..acc9726 100644 --- a/tests-e2e/num_live_test.go +++ b/tests-e2e/num_live_test.go @@ -1,10 +1,13 @@ package syncv3_test import ( + "fmt" "testing" + "time" "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/testutils/m" + "github.com/tidwall/gjson" ) func TestNumLive(t *testing.T) { @@ -126,3 +129,70 @@ func TestNumLive(t *testing.T) { }, })) } + +// Test that if you constantly change req params, we still see live traffic. It does this by: +// - Creating 11 rooms. +// - Hitting /sync with a range [0,1] then [0,2] then [0,3]. Each time this causes a new room to be returned. +// - Interleaving each /sync request with genuine events sent into a room. +// - ensuring we see the genuine events by the time we finish. +func TestReqParamStarvation(t *testing.T) { + alice := registerNewUser(t) + bob := registerNewUser(t) + roomID := alice.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + }) + numOtherRooms := 10 + for i := 0; i < numOtherRooms; i++ { + bob.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + }) + } + bob.JoinRoom(t, roomID, nil) + res := bob.SlidingSyncUntilMembership(t, "", roomID, bob, "join") + + wantEventIDs := make(map[string]bool) + for i := 0; i < numOtherRooms; i++ { + res = bob.SlidingSync(t, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: sync3.SliceRanges{{0, int64(i)}}, // [0,0], [0,1], ... [0,9] + }, + }, + }, WithPos(res.Pos)) + + // mark off any event we see in wantEventIDs + for _, r := range res.Rooms { + for _, ev := range r.Timeline { + gotEventID := gjson.GetBytes(ev, "event_id").Str + wantEventIDs[gotEventID] = false + } + } + + // send an event in the first few syncs to add to wantEventIDs + // We do this for the first few /syncs and don't dictate which response they should arrive + // in, as we do not know and cannot force the proxy to deliver the event in a particular response. + if i < 3 { + eventID := alice.SendEventSynced(t, roomID, Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": fmt.Sprintf("msg %d", i), + }, + }) + wantEventIDs[eventID] = true + } + + // it's possible the proxy won't see this event before the next /sync + // and that is the reason why we don't send it, as opposed to starvation. + // To try to counter this, sleep a bit. This is why we sleep on every cycle and + // why we send the events early on. + time.Sleep(50 * time.Millisecond) + } + + // at this point wantEventIDs should all have false values if we got the events + for evID, unseen := range wantEventIDs { + if unseen { + t.Errorf("failed to see event %v", evID) + } + } +} diff --git a/tests-integration/room_subscriptions_test.go b/tests-integration/room_subscriptions_test.go index ba89ad9..3b4bbd5 100644 --- a/tests-integration/room_subscriptions_test.go +++ b/tests-integration/room_subscriptions_test.go @@ -137,12 +137,9 @@ func TestRoomSubscriptionMisorderedTimeline(t *testing.T) { }) m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ room.roomID: { - // TODO: this is the correct result, but due to how timeline loading works currently - // it will be returning the last 5 events BEFORE D,E, which isn't ideal but also isn't - // incorrect per se due to the fact that clients don't know when D,E have been processed - // on the server. - // m.MatchRoomTimeline(append(abcInitialEvents, deLiveEvents...)), - m.MatchRoomTimeline(append(roomState[len(roomState)-2:], abcInitialEvents...)), + // we append live events AFTER processing the new timeline limit, so 7 events not 5. + // TODO: ideally we'd just return abcde here. + m.MatchRoomTimeline(append(roomState[len(roomState)-2:], append(abcInitialEvents, deLiveEvents...)...)), }, }), m.LogResponse(t))