From 471bb8c89836079f96a89cddaa603e39958c0d64 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 12 Sep 2023 14:16:21 +0100 Subject: [PATCH] bugfix: don't wedge pollers when they get bad state blocks We returned an error in Initialise when there is no create event in the state block. If this happens, the poller automatically retries the same request, tightlooping as the data is always going to cause the same error. Instead, return an `internal.DataError` which is a way of expressly telling the pollers to continue and advance the since token. With regression test. NB: This regression test also caught a potential bug which this PR could have introduced. This PR lets the since token skip over bad responses, but we want to make damn sure we process EVERYTHING ELSE in that response. In particular, we could have skipped over large sections of `parseRoomsResponse` as we would bail early on errors. This is now fixed to bail only after processing the entire joined rooms map. --- state/accumulator.go | 3 +- sync2/poller.go | 19 ++++- tests-integration/regressions_test.go | 117 ++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 4 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index 2f07a81..a0553b2 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -238,7 +238,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia Str("room_id", roomID). Int("len_state", len(events)). Msg(errMsg) - return fmt.Errorf(errMsg) + return internal.NewDataError(errMsg) } // Insert the events. @@ -389,6 +389,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch }) sentry.CaptureMessage(msg) }) + // by not returning an error, we are telling the poller it is fine to not retry this request. return 0, nil, nil } } diff --git a/sync2/poller.go b/sync2/poller.go index 5ff27ea..837fa9c 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -768,12 +768,20 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro timelineCalls := 0 typingCalls := 0 receiptCalls := 0 + // try to process all rooms, rather than bailing out at the first room which returns an error. + // This is CRITICAL if the error returned is an `internal.DataError` as in that case we will + // NOT RETRY THE SYNC REQUEST, meaning if we didn't process all the rooms we would lose data. + // Currently, Accumulate/Initialise can return DataErrors when a new room is seen without a + // create event. + // NOTE: we process rooms non-deterministically (ranging over keys in a map). + var lastErr error for roomID, roomData := range res.Rooms.Join { if len(roomData.State.Events) > 0 { stateCalls++ prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events) if err != nil { - return fmt.Errorf("Initialise[%s]: %w", roomID, err) + lastErr = fmt.Errorf("Initialise[%s]: %w", roomID, err) + continue } if len(prependStateEvents) > 0 { // The poller has just learned of these state events due to an @@ -812,7 +820,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro if len(roomData.AccountData.Events) > 0 { err := p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events) if err != nil { - return fmt.Errorf("OnAccountData[%s]: %w", roomID, err) + lastErr = fmt.Errorf("OnAccountData[%s]: %w", roomID, err) + continue } } if len(roomData.Timeline.Events) > 0 { @@ -820,7 +829,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited) err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events) if err != nil { - return fmt.Errorf("Accumulate[%s]: %w", roomID, err) + lastErr = fmt.Errorf("Accumulate[%s]: %w", roomID, err) + continue } } @@ -831,6 +841,9 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount) } } + if lastErr != nil { + return lastErr + } for roomID, roomData := range res.Rooms.Leave { if len(roomData.Timeline.Events) > 0 { p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited) diff --git a/tests-integration/regressions_test.go b/tests-integration/regressions_test.go index fc6e198..308e6fc 100644 --- a/tests-integration/regressions_test.go +++ b/tests-integration/regressions_test.go @@ -2,6 +2,7 @@ package syncv3 import ( "encoding/json" + "net/http" "testing" "time" @@ -228,3 +229,119 @@ func TestMalformedEventsState(t *testing.T) { }, })) } + +// Regression test for https://github.com/matrix-org/sliding-sync/issues/295 +// This test: +// - injects a good room and a bad room in the v2 response +// - then injects a good room update if the since token advanced +// - checks we see all updates +// In the past, the bad room in the first v2 response caused the proxy to retry and never advance, +// wedging the poller. +func TestBadCreateInitialiseDoesntWedgePolling(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + // setup code + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + + goodRoom := "!good:localhost" + badRoom := "!bad:localhost" + + v2.addAccount(t, alice, aliceToken) + // we should see the since token increment, if we see repeats it means + // we aren't returning DataErrors when we should be. + wantSinces := []string{"", "1", "2"} + ch := make(chan bool) + v2.checkRequest = func(token string, req *http.Request) { + if len(wantSinces) == 0 { + return + } + gotSince := req.URL.Query().Get("since") + t.Logf("checkRequest got since=%v", gotSince) + want := wantSinces[0] + wantSinces = wantSinces[1:] + if gotSince != want { + t.Errorf("v2.checkRequest since got '%v' want '%v'", gotSince, want) + } + if len(wantSinces) == 0 { + close(ch) + } + } + + // initial sync, everything fine + v2.queueResponse(alice, sync2.SyncResponse{ + NextBatch: "1", + Rooms: sync2.SyncRoomsResponse{ + Join: map[string]sync2.SyncV2JoinResponse{ + goodRoom: { + Timeline: sync2.TimelineResponse{ + Events: createRoomState(t, alice, time.Now()), + }, + }, + }, + }, + }) + aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: sync3.SliceRanges{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 1, + }, + }, + }, + }) + // we should only see 1 room + m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + goodRoom: { + m.MatchJoinCount(1), + }, + }, + )) + + // now inject a bad room and some extra good event + extraGoodEvent := testutils.NewMessageEvent(t, alice, "Extra!", testutils.WithTimestamp(time.Now().Add(time.Second))) + v2.queueResponse(alice, sync2.SyncResponse{ + NextBatch: "2", + Rooms: sync2.SyncRoomsResponse{ + Join: map[string]sync2.SyncV2JoinResponse{ + goodRoom: { + Timeline: sync2.TimelineResponse{ + Events: []json.RawMessage{extraGoodEvent}, + }, + }, + badRoom: { + State: sync2.EventsResponse{ + // BAD: missing create event + Events: createRoomState(t, alice, time.Now())[1:], + }, + Timeline: sync2.TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewMessageEvent(t, alice, "Hello World"), + }, + }, + }, + }, + }, + }) + v2.waitUntilEmpty(t, alice) + + // we should see the extra good event and not the bad room + aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) + // we should only see 1 room + m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + goodRoom: { + m.MatchRoomTimelineMostRecent(1, []json.RawMessage{extraGoodEvent}), + }, + }, + )) + + // make sure we've seen all the v2 requests + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for all v2 requests") + } + +}