diff --git a/state/accumulator.go b/state/accumulator.go index 2f07a81..a0553b2 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -238,7 +238,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia Str("room_id", roomID). Int("len_state", len(events)). Msg(errMsg) - return fmt.Errorf(errMsg) + return internal.NewDataError(errMsg) } // Insert the events. @@ -389,6 +389,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch }) sentry.CaptureMessage(msg) }) + // by not returning an error, we are telling the poller it is fine to not retry this request. return 0, nil, nil } } diff --git a/sync2/poller.go b/sync2/poller.go index 5ff27ea..837fa9c 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -768,12 +768,20 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro timelineCalls := 0 typingCalls := 0 receiptCalls := 0 + // try to process all rooms, rather than bailing out at the first room which returns an error. + // This is CRITICAL if the error returned is an `internal.DataError` as in that case we will + // NOT RETRY THE SYNC REQUEST, meaning if we didn't process all the rooms we would lose data. + // Currently, Accumulate/Initialise can return DataErrors when a new room is seen without a + // create event. + // NOTE: we process rooms non-deterministically (ranging over keys in a map). + var lastErr error for roomID, roomData := range res.Rooms.Join { if len(roomData.State.Events) > 0 { stateCalls++ prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events) if err != nil { - return fmt.Errorf("Initialise[%s]: %w", roomID, err) + lastErr = fmt.Errorf("Initialise[%s]: %w", roomID, err) + continue } if len(prependStateEvents) > 0 { // The poller has just learned of these state events due to an @@ -812,7 +820,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro if len(roomData.AccountData.Events) > 0 { err := p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events) if err != nil { - return fmt.Errorf("OnAccountData[%s]: %w", roomID, err) + lastErr = fmt.Errorf("OnAccountData[%s]: %w", roomID, err) + continue } } if len(roomData.Timeline.Events) > 0 { @@ -820,7 +829,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited) err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events) if err != nil { - return fmt.Errorf("Accumulate[%s]: %w", roomID, err) + lastErr = fmt.Errorf("Accumulate[%s]: %w", roomID, err) + continue } } @@ -831,6 +841,9 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount) } } + if lastErr != nil { + return lastErr + } for roomID, roomData := range res.Rooms.Leave { if len(roomData.Timeline.Events) > 0 { p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited) diff --git a/tests-integration/regressions_test.go b/tests-integration/regressions_test.go index fc6e198..308e6fc 100644 --- a/tests-integration/regressions_test.go +++ b/tests-integration/regressions_test.go @@ -2,6 +2,7 @@ package syncv3 import ( "encoding/json" + "net/http" "testing" "time" @@ -228,3 +229,119 @@ func TestMalformedEventsState(t *testing.T) { }, })) } + +// Regression test for https://github.com/matrix-org/sliding-sync/issues/295 +// This test: +// - injects a good room and a bad room in the v2 response +// - then injects a good room update if the since token advanced +// - checks we see all updates +// In the past, the bad room in the first v2 response caused the proxy to retry and never advance, +// wedging the poller. +func TestBadCreateInitialiseDoesntWedgePolling(t *testing.T) { + pqString := testutils.PrepareDBConnectionString() + // setup code + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + + goodRoom := "!good:localhost" + badRoom := "!bad:localhost" + + v2.addAccount(t, alice, aliceToken) + // we should see the since token increment, if we see repeats it means + // we aren't returning DataErrors when we should be. + wantSinces := []string{"", "1", "2"} + ch := make(chan bool) + v2.checkRequest = func(token string, req *http.Request) { + if len(wantSinces) == 0 { + return + } + gotSince := req.URL.Query().Get("since") + t.Logf("checkRequest got since=%v", gotSince) + want := wantSinces[0] + wantSinces = wantSinces[1:] + if gotSince != want { + t.Errorf("v2.checkRequest since got '%v' want '%v'", gotSince, want) + } + if len(wantSinces) == 0 { + close(ch) + } + } + + // initial sync, everything fine + v2.queueResponse(alice, sync2.SyncResponse{ + NextBatch: "1", + Rooms: sync2.SyncRoomsResponse{ + Join: map[string]sync2.SyncV2JoinResponse{ + goodRoom: { + Timeline: sync2.TimelineResponse{ + Events: createRoomState(t, alice, time.Now()), + }, + }, + }, + }, + }) + aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{ + Lists: map[string]sync3.RequestList{ + "a": { + Ranges: sync3.SliceRanges{{0, 20}}, + RoomSubscription: sync3.RoomSubscription{ + TimelineLimit: 1, + }, + }, + }, + }) + // we should only see 1 room + m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + goodRoom: { + m.MatchJoinCount(1), + }, + }, + )) + + // now inject a bad room and some extra good event + extraGoodEvent := testutils.NewMessageEvent(t, alice, "Extra!", testutils.WithTimestamp(time.Now().Add(time.Second))) + v2.queueResponse(alice, sync2.SyncResponse{ + NextBatch: "2", + Rooms: sync2.SyncRoomsResponse{ + Join: map[string]sync2.SyncV2JoinResponse{ + goodRoom: { + Timeline: sync2.TimelineResponse{ + Events: []json.RawMessage{extraGoodEvent}, + }, + }, + badRoom: { + State: sync2.EventsResponse{ + // BAD: missing create event + Events: createRoomState(t, alice, time.Now())[1:], + }, + Timeline: sync2.TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewMessageEvent(t, alice, "Hello World"), + }, + }, + }, + }, + }, + }) + v2.waitUntilEmpty(t, alice) + + // we should see the extra good event and not the bad room + aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{}) + // we should only see 1 room + m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{ + goodRoom: { + m.MatchRoomTimelineMostRecent(1, []json.RawMessage{extraGoodEvent}), + }, + }, + )) + + // make sure we've seen all the v2 requests + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("timed out waiting for all v2 requests") + } + +}