From 7c80b5424a43c762105f13333088fa1a898abd77 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 12 Sep 2023 14:57:40 +0100 Subject: [PATCH] Prioritise retriable errors over unretriable errors Bump to Go 1.20 for errors.Join and added introspection to errors.As to inspect []error. --- Dockerfile | 2 +- go.mod | 2 +- sync2/poller.go | 47 ++++++++++---- sync2/poller_test.go | 150 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 13 deletions(-) diff --git a/Dockerfile b/Dockerfile index 4a43ef8..fe1a255 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM docker.io/golang:1.19-alpine AS base +FROM docker.io/golang:1.20-alpine AS base WORKDIR /build diff --git a/go.mod b/go.mod index 48709fe..07fa66e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/matrix-org/sliding-sync -go 1.18 +go 1.20 require ( github.com/ReneKroon/ttlcache/v2 v2.8.1 diff --git a/sync2/poller.go b/sync2/poller.go index 837fa9c..1e6efc5 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -774,13 +774,13 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro // Currently, Accumulate/Initialise can return DataErrors when a new room is seen without a // create event. // NOTE: we process rooms non-deterministically (ranging over keys in a map). - var lastErr error + var lastErrs []error for roomID, roomData := range res.Rooms.Join { if len(roomData.State.Events) > 0 { stateCalls++ prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events) if err != nil { - lastErr = fmt.Errorf("Initialise[%s]: %w", roomID, err) + lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err)) continue } if len(prependStateEvents) > 0 { @@ -820,7 +820,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro if len(roomData.AccountData.Events) > 0 { err := p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events) if err != nil { - lastErr = fmt.Errorf("OnAccountData[%s]: %w", roomID, err) + lastErrs = append(lastErrs, fmt.Errorf("OnAccountData[%s]: %w", roomID, err)) continue } } @@ -829,7 +829,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited) err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events) if err != nil { - lastErr = fmt.Errorf("Accumulate[%s]: %w", roomID, err) + lastErrs = append(lastErrs, fmt.Errorf("Accumulate[%s]: %w", roomID, err)) continue } } @@ -841,15 +841,13 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount) } } - if lastErr != nil { - return lastErr - } for roomID, roomData := range res.Rooms.Leave { if len(roomData.Timeline.Events) > 0 { p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited) err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events) if err != nil { - return fmt.Errorf("Accumulate_Leave[%s]: %w", roomID, err) + lastErrs = append(lastErrs, fmt.Errorf("Accumulate_Leave[%s]: %w", roomID, err)) + continue } } // Pass the leave event directly to OnLeftRoom. We need to do this _in addition_ to calling Accumulate to handle @@ -865,14 +863,15 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro if leaveEvent != nil { err := p.receiver.OnLeftRoom(ctx, p.userID, roomID, leaveEvent) if err != nil { - return fmt.Errorf("OnLeftRoom[%s]: %w", roomID, err) + lastErrs = append(lastErrs, fmt.Errorf("OnLeftRoom[%s]: %w", roomID, err)) + continue } } } for roomID, roomData := range res.Rooms.Invite { err := p.receiver.OnInvite(ctx, p.userID, roomID, roomData.InviteState.Events) if err != nil { - return fmt.Errorf("OnInvite[%s]: %w", roomID, err) + lastErrs = append(lastErrs, fmt.Errorf("OnInvite[%s]: %w", roomID, err)) } } @@ -881,7 +880,33 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro p.totalTimelineCalls += timelineCalls p.totalTyping += typingCalls p.totalInvites += len(res.Rooms.Invite) - return nil + if len(lastErrs) == 0 { + return nil + } + if len(lastErrs) == 1 { + return lastErrs[0] + } + // there are 2 classes of error: + // - non-retriable errors (aka internal.DataError) + // - retriable errors (transient DB connection failures, etc) + // If we have ANY retriable error they need to take priority over the non-retriable error else we will lose data. + // E.g in the case where a single sync response has room A with bad data (so wants to skip over it) and room B + // with good data but the DB got pulled momentarily, we want to retry and NOT advance the since token else we will + // lose the events in room B. + // To implement this, we _strip out_ any data errors and return that. If they are all data errors then we just return them. + var retriableOnlyErrs []error + for _, e := range lastErrs { + e := e + if shouldRetry(e) { + retriableOnlyErrs = append(retriableOnlyErrs, e) + } + } + // return retriable errors as a priority over unretriable + if len(retriableOnlyErrs) > 0 { + return errors.Join(retriableOnlyErrs...) + } + // we only have unretriable errors, so return them + return errors.Join(lastErrs...) } func (p *poller) maybeLogStats(force bool) { diff --git a/sync2/poller_test.go b/sync2/poller_test.go index 1a92324..6eb1efd 100644 --- a/sync2/poller_test.go +++ b/sync2/poller_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/matrix-org/sliding-sync/internal" + "github.com/matrix-org/sliding-sync/testutils" "github.com/rs/zerolog" ) @@ -1014,6 +1015,155 @@ func TestPollerDoesNotResendOnDataError(t *testing.T) { poller.Terminate() } +// The purpose of this test is to make sure we don't incorrectly skip retrying when a v2 response has many errors, +// some of which are retriable and some of which are not. +func TestPollerResendsOnDataErrorWithOtherErrors(t *testing.T) { + pid := PollerID{UserID: "@TestPollerResendsOnDataErrorWithOtherErrors:localhost", DeviceID: "FOOBAR"} + dontRetryRoomID := "!dont-retry:localhost" + // make a receiver which will return a DataError when Accumulate is called + receiver := &overrideDataReceiver{ + accumulate: func(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error { + if roomID == dontRetryRoomID { + return internal.NewDataError("accumulate this is a test: %v", 42) + } + return fmt.Errorf("accumulate retriable error") + }, + onAccountData: func(ctx context.Context, userID, roomID string, events []json.RawMessage) error { + return fmt.Errorf("onAccountData retriable error") + }, + onLeftRoom: func(ctx context.Context, userID, roomID string, leaveEvent json.RawMessage) error { + return internal.NewDataError("onLeftRoom this is a test: %v", 42) + }, + } + poller := newPoller(pid, "Authorization: hello world", nil, receiver, zerolog.New(os.Stderr), false) + testCases := []struct { + name string + res SyncResponse + wantRetry bool + }{ + { + name: "single unretriable error", + res: SyncResponse{ + NextBatch: "2", + Rooms: SyncRoomsResponse{ + Join: map[string]SyncV2JoinResponse{ + dontRetryRoomID: { + Timeline: TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"), + }, + }, + }, + }, + }, + }, + wantRetry: false, + }, + { + name: "single retriable error", + res: SyncResponse{ + NextBatch: "2", + Rooms: SyncRoomsResponse{ + Join: map[string]SyncV2JoinResponse{ + "!retry:localhost": { + AccountData: EventsResponse{ + Events: []json.RawMessage{ + testutils.NewAccountData(t, "m.retry", map[string]interface{}{}), + }, + }, + }, + }, + }, + }, + wantRetry: true, + }, + { + name: "1 retriable error, 1 unretriable error", + res: SyncResponse{ + NextBatch: "2", + Rooms: SyncRoomsResponse{ + Join: map[string]SyncV2JoinResponse{ + "!retry:localhost": { + AccountData: EventsResponse{ + Events: []json.RawMessage{ + testutils.NewAccountData(t, "m.retry", map[string]interface{}{}), + }, + }, + }, + dontRetryRoomID: { + Timeline: TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"), + }, + }, + }, + }, + }, + }, + wantRetry: true, + }, + { + name: "2 unretriable errors", + res: SyncResponse{ + NextBatch: "2", + Rooms: SyncRoomsResponse{ + Join: map[string]SyncV2JoinResponse{ + dontRetryRoomID: { + Timeline: TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"), + }, + }, + }, + }, + Leave: map[string]SyncV2LeaveResponse{ + dontRetryRoomID: { + Timeline: TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"), + }, + }, + }, + }, + }, + }, + wantRetry: false, + }, + { + // sanity to make sure the 2x unretriable are both independently unretriable + name: "another 1 unretriable error", + res: SyncResponse{ + NextBatch: "2", + Rooms: SyncRoomsResponse{ + Leave: map[string]SyncV2LeaveResponse{ + dontRetryRoomID: { + Timeline: TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"), + }, + }, + }, + }, + }, + }, + wantRetry: false, + }, + } + // rather than set up the entire loop and machinery, just directly call parseRoomsResponse with various failure modes + for _, tc := range testCases { + err := poller.parseRoomsResponse(context.Background(), &tc.res) + if err == nil { + t.Errorf("%s: got no error", tc.name) + continue + } + t.Logf(tc.name, err) + got := shouldRetry(err) + if got != tc.wantRetry { + t.Errorf("%s: got retry %v want %v", tc.name, got, tc.wantRetry) + } + } +} + func waitForInitialSync(t *testing.T, poller *poller) { go func() { poller.Poll(initialSinceToken)