bugfix: don't wedge pollers when they get bad state blocks

We returned an error in Initialise when there is no create event
in the state block. If this happens, the poller automatically retries
the same request, tightlooping as the data is always going to cause
the same error. Instead, return an `internal.DataError` which is a way
of expressly telling the pollers to continue and advance the since token.

With regression test.

NB: This regression test also caught a potential bug which this PR could
have introduced. This PR lets the since token skip over bad responses, but
we want to make damn sure we process EVERYTHING ELSE in that response. In
particular, we could have skipped over large sections of `parseRoomsResponse`
as we would bail early on errors. This is now fixed to bail only after processing
the entire joined rooms map.
This commit is contained in:
Kegan Dougal 2023-09-12 14:16:21 +01:00
parent 6ad321d86c
commit 471bb8c898
3 changed files with 135 additions and 4 deletions

View File

@ -238,7 +238,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
Str("room_id", roomID).
Int("len_state", len(events)).
Msg(errMsg)
return fmt.Errorf(errMsg)
return internal.NewDataError(errMsg)
}
// Insert the events.
@ -389,6 +389,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
})
sentry.CaptureMessage(msg)
})
// by not returning an error, we are telling the poller it is fine to not retry this request.
return 0, nil, nil
}
}

View File

@ -768,12 +768,20 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
timelineCalls := 0
typingCalls := 0
receiptCalls := 0
// try to process all rooms, rather than bailing out at the first room which returns an error.
// This is CRITICAL if the error returned is an `internal.DataError` as in that case we will
// NOT RETRY THE SYNC REQUEST, meaning if we didn't process all the rooms we would lose data.
// Currently, Accumulate/Initialise can return DataErrors when a new room is seen without a
// create event.
// NOTE: we process rooms non-deterministically (ranging over keys in a map).
var lastErr error
for roomID, roomData := range res.Rooms.Join {
if len(roomData.State.Events) > 0 {
stateCalls++
prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err != nil {
return fmt.Errorf("Initialise[%s]: %w", roomID, err)
lastErr = fmt.Errorf("Initialise[%s]: %w", roomID, err)
continue
}
if len(prependStateEvents) > 0 {
// The poller has just learned of these state events due to an
@ -812,7 +820,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
if len(roomData.AccountData.Events) > 0 {
err := p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events)
if err != nil {
return fmt.Errorf("OnAccountData[%s]: %w", roomID, err)
lastErr = fmt.Errorf("OnAccountData[%s]: %w", roomID, err)
continue
}
}
if len(roomData.Timeline.Events) > 0 {
@ -820,7 +829,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
if err != nil {
return fmt.Errorf("Accumulate[%s]: %w", roomID, err)
lastErr = fmt.Errorf("Accumulate[%s]: %w", roomID, err)
continue
}
}
@ -831,6 +841,9 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount)
}
}
if lastErr != nil {
return lastErr
}
for roomID, roomData := range res.Rooms.Leave {
if len(roomData.Timeline.Events) > 0 {
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)

View File

@ -2,6 +2,7 @@ package syncv3
import (
"encoding/json"
"net/http"
"testing"
"time"
@ -228,3 +229,119 @@ func TestMalformedEventsState(t *testing.T) {
},
}))
}
// Regression test for https://github.com/matrix-org/sliding-sync/issues/295
// This test:
// - injects a good room and a bad room in the v2 response
// - then injects a good room update if the since token advanced
// - checks we see all updates
// In the past, the bad room in the first v2 response caused the proxy to retry and never advance,
// wedging the poller.
func TestBadCreateInitialiseDoesntWedgePolling(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
goodRoom := "!good:localhost"
badRoom := "!bad:localhost"
v2.addAccount(t, alice, aliceToken)
// we should see the since token increment, if we see repeats it means
// we aren't returning DataErrors when we should be.
wantSinces := []string{"", "1", "2"}
ch := make(chan bool)
v2.checkRequest = func(token string, req *http.Request) {
if len(wantSinces) == 0 {
return
}
gotSince := req.URL.Query().Get("since")
t.Logf("checkRequest got since=%v", gotSince)
want := wantSinces[0]
wantSinces = wantSinces[1:]
if gotSince != want {
t.Errorf("v2.checkRequest since got '%v' want '%v'", gotSince, want)
}
if len(wantSinces) == 0 {
close(ch)
}
}
// initial sync, everything fine
v2.queueResponse(alice, sync2.SyncResponse{
NextBatch: "1",
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
goodRoom: {
Timeline: sync2.TimelineResponse{
Events: createRoomState(t, alice, time.Now()),
},
},
},
},
})
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 1,
},
},
},
})
// we should only see 1 room
m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
goodRoom: {
m.MatchJoinCount(1),
},
},
))
// now inject a bad room and some extra good event
extraGoodEvent := testutils.NewMessageEvent(t, alice, "Extra!", testutils.WithTimestamp(time.Now().Add(time.Second)))
v2.queueResponse(alice, sync2.SyncResponse{
NextBatch: "2",
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
goodRoom: {
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{extraGoodEvent},
},
},
badRoom: {
State: sync2.EventsResponse{
// BAD: missing create event
Events: createRoomState(t, alice, time.Now())[1:],
},
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{
testutils.NewMessageEvent(t, alice, "Hello World"),
},
},
},
},
},
})
v2.waitUntilEmpty(t, alice)
// we should see the extra good event and not the bad room
aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{})
// we should only see 1 room
m.MatchResponse(t, aliceRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
goodRoom: {
m.MatchRoomTimelineMostRecent(1, []json.RawMessage{extraGoodEvent}),
},
},
))
// make sure we've seen all the v2 requests
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("timed out waiting for all v2 requests")
}
}