Prioritise retriable errors over unretriable errors

Bump to Go 1.20 for errors.Join and added introspection to
errors.As to inspect []error.
This commit is contained in:
Kegan Dougal 2023-09-12 14:57:40 +01:00
parent 471bb8c898
commit 7c80b5424a
4 changed files with 188 additions and 13 deletions

View File

@ -1,4 +1,4 @@
FROM docker.io/golang:1.19-alpine AS base
FROM docker.io/golang:1.20-alpine AS base
WORKDIR /build

2
go.mod
View File

@ -1,6 +1,6 @@
module github.com/matrix-org/sliding-sync
go 1.18
go 1.20
require (
github.com/ReneKroon/ttlcache/v2 v2.8.1

View File

@ -774,13 +774,13 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
// Currently, Accumulate/Initialise can return DataErrors when a new room is seen without a
// create event.
// NOTE: we process rooms non-deterministically (ranging over keys in a map).
var lastErr error
var lastErrs []error
for roomID, roomData := range res.Rooms.Join {
if len(roomData.State.Events) > 0 {
stateCalls++
prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err != nil {
lastErr = fmt.Errorf("Initialise[%s]: %w", roomID, err)
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
continue
}
if len(prependStateEvents) > 0 {
@ -820,7 +820,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
if len(roomData.AccountData.Events) > 0 {
err := p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events)
if err != nil {
lastErr = fmt.Errorf("OnAccountData[%s]: %w", roomID, err)
lastErrs = append(lastErrs, fmt.Errorf("OnAccountData[%s]: %w", roomID, err))
continue
}
}
@ -829,7 +829,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
if err != nil {
lastErr = fmt.Errorf("Accumulate[%s]: %w", roomID, err)
lastErrs = append(lastErrs, fmt.Errorf("Accumulate[%s]: %w", roomID, err))
continue
}
}
@ -841,15 +841,13 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount)
}
}
if lastErr != nil {
return lastErr
}
for roomID, roomData := range res.Rooms.Leave {
if len(roomData.Timeline.Events) > 0 {
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
if err != nil {
return fmt.Errorf("Accumulate_Leave[%s]: %w", roomID, err)
lastErrs = append(lastErrs, fmt.Errorf("Accumulate_Leave[%s]: %w", roomID, err))
continue
}
}
// Pass the leave event directly to OnLeftRoom. We need to do this _in addition_ to calling Accumulate to handle
@ -865,14 +863,15 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
if leaveEvent != nil {
err := p.receiver.OnLeftRoom(ctx, p.userID, roomID, leaveEvent)
if err != nil {
return fmt.Errorf("OnLeftRoom[%s]: %w", roomID, err)
lastErrs = append(lastErrs, fmt.Errorf("OnLeftRoom[%s]: %w", roomID, err))
continue
}
}
}
for roomID, roomData := range res.Rooms.Invite {
err := p.receiver.OnInvite(ctx, p.userID, roomID, roomData.InviteState.Events)
if err != nil {
return fmt.Errorf("OnInvite[%s]: %w", roomID, err)
lastErrs = append(lastErrs, fmt.Errorf("OnInvite[%s]: %w", roomID, err))
}
}
@ -881,8 +880,34 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
p.totalTimelineCalls += timelineCalls
p.totalTyping += typingCalls
p.totalInvites += len(res.Rooms.Invite)
if len(lastErrs) == 0 {
return nil
}
if len(lastErrs) == 1 {
return lastErrs[0]
}
// there are 2 classes of error:
// - non-retriable errors (aka internal.DataError)
// - retriable errors (transient DB connection failures, etc)
// If we have ANY retriable error they need to take priority over the non-retriable error else we will lose data.
// E.g in the case where a single sync response has room A with bad data (so wants to skip over it) and room B
// with good data but the DB got pulled momentarily, we want to retry and NOT advance the since token else we will
// lose the events in room B.
// To implement this, we _strip out_ any data errors and return that. If they are all data errors then we just return them.
var retriableOnlyErrs []error
for _, e := range lastErrs {
e := e
if shouldRetry(e) {
retriableOnlyErrs = append(retriableOnlyErrs, e)
}
}
// return retriable errors as a priority over unretriable
if len(retriableOnlyErrs) > 0 {
return errors.Join(retriableOnlyErrs...)
}
// we only have unretriable errors, so return them
return errors.Join(lastErrs...)
}
func (p *poller) maybeLogStats(force bool) {
if !force && timeSince(p.lastLogged) < logInterval {

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/rs/zerolog"
)
@ -1014,6 +1015,155 @@ func TestPollerDoesNotResendOnDataError(t *testing.T) {
poller.Terminate()
}
// The purpose of this test is to make sure we don't incorrectly skip retrying when a v2 response has many errors,
// some of which are retriable and some of which are not.
func TestPollerResendsOnDataErrorWithOtherErrors(t *testing.T) {
pid := PollerID{UserID: "@TestPollerResendsOnDataErrorWithOtherErrors:localhost", DeviceID: "FOOBAR"}
dontRetryRoomID := "!dont-retry:localhost"
// make a receiver which will return a DataError when Accumulate is called
receiver := &overrideDataReceiver{
accumulate: func(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error {
if roomID == dontRetryRoomID {
return internal.NewDataError("accumulate this is a test: %v", 42)
}
return fmt.Errorf("accumulate retriable error")
},
onAccountData: func(ctx context.Context, userID, roomID string, events []json.RawMessage) error {
return fmt.Errorf("onAccountData retriable error")
},
onLeftRoom: func(ctx context.Context, userID, roomID string, leaveEvent json.RawMessage) error {
return internal.NewDataError("onLeftRoom this is a test: %v", 42)
},
}
poller := newPoller(pid, "Authorization: hello world", nil, receiver, zerolog.New(os.Stderr), false)
testCases := []struct {
name string
res SyncResponse
wantRetry bool
}{
{
name: "single unretriable error",
res: SyncResponse{
NextBatch: "2",
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
dontRetryRoomID: {
Timeline: TimelineResponse{
Events: []json.RawMessage{
testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"),
},
},
},
},
},
},
wantRetry: false,
},
{
name: "single retriable error",
res: SyncResponse{
NextBatch: "2",
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
"!retry:localhost": {
AccountData: EventsResponse{
Events: []json.RawMessage{
testutils.NewAccountData(t, "m.retry", map[string]interface{}{}),
},
},
},
},
},
},
wantRetry: true,
},
{
name: "1 retriable error, 1 unretriable error",
res: SyncResponse{
NextBatch: "2",
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
"!retry:localhost": {
AccountData: EventsResponse{
Events: []json.RawMessage{
testutils.NewAccountData(t, "m.retry", map[string]interface{}{}),
},
},
},
dontRetryRoomID: {
Timeline: TimelineResponse{
Events: []json.RawMessage{
testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"),
},
},
},
},
},
},
wantRetry: true,
},
{
name: "2 unretriable errors",
res: SyncResponse{
NextBatch: "2",
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
dontRetryRoomID: {
Timeline: TimelineResponse{
Events: []json.RawMessage{
testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"),
},
},
},
},
Leave: map[string]SyncV2LeaveResponse{
dontRetryRoomID: {
Timeline: TimelineResponse{
Events: []json.RawMessage{
testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"),
},
},
},
},
},
},
wantRetry: false,
},
{
// sanity to make sure the 2x unretriable are both independently unretriable
name: "another 1 unretriable error",
res: SyncResponse{
NextBatch: "2",
Rooms: SyncRoomsResponse{
Leave: map[string]SyncV2LeaveResponse{
dontRetryRoomID: {
Timeline: TimelineResponse{
Events: []json.RawMessage{
testutils.NewMessageEvent(t, pid.UserID, "Don't Retry Me!"),
},
},
},
},
},
},
wantRetry: false,
},
}
// rather than set up the entire loop and machinery, just directly call parseRoomsResponse with various failure modes
for _, tc := range testCases {
err := poller.parseRoomsResponse(context.Background(), &tc.res)
if err == nil {
t.Errorf("%s: got no error", tc.name)
continue
}
t.Logf(tc.name, err)
got := shouldRetry(err)
if got != tc.wantRetry {
t.Errorf("%s: got retry %v want %v", tc.name, got, tc.wantRetry)
}
}
}
func waitForInitialSync(t *testing.T, poller *poller) {
go func() {
poller.Poll(initialSinceToken)