mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
bugfix: handle malformed state/timeline responses
Specifically, look for the create event in the timeline as this has been seen in the wild on Synapse. Fixes #367.
This commit is contained in:
parent
421a572edd
commit
c868e540db
@ -10,6 +10,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
@ -791,8 +793,47 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
|
||||
stateCalls++
|
||||
prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
|
||||
if err != nil {
|
||||
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
|
||||
continue
|
||||
_, ok := err.(*internal.DataError)
|
||||
if ok {
|
||||
// This typically happens when we are missing an m.room.create event.
|
||||
// Synapse may sometimes send the m.room.create event erroneously in the timeline,
|
||||
// so check if that is the case here. See https://github.com/matrix-org/complement/pull/690
|
||||
var createEvent json.RawMessage
|
||||
for i, ev := range roomData.Timeline.Events {
|
||||
if gjson.ParseBytes(ev).Get("type").Str == "m.room.create" {
|
||||
createEvent = roomData.Timeline.Events[i]
|
||||
// remove the create event from the timeline so we don't double process it
|
||||
roomData.Timeline.Events = append(roomData.Timeline.Events[:i], roomData.Timeline.Events[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if createEvent != nil {
|
||||
roomData.State.Events = slices.Insert(roomData.State.Events, 0, createEvent)
|
||||
// retry the processing of the room state
|
||||
prependStateEvents, err = p.receiver.Initialise(ctx, roomID, roomData.State.Events)
|
||||
if err == nil {
|
||||
const warnMsg = "parseRoomsResponse: m.room.create event was found in the timeline not state"
|
||||
logger.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int(
|
||||
"timeline", len(roomData.Timeline.Events),
|
||||
).Int("state", len(roomData.State.Events)).Msg(warnMsg)
|
||||
hub := internal.GetSentryHubFromContextOrDefault(ctx)
|
||||
hub.WithScope(func(scope *sentry.Scope) {
|
||||
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
||||
"room_id": roomID,
|
||||
"timeline": len(roomData.Timeline.Events),
|
||||
"state": len(roomData.State.Events),
|
||||
})
|
||||
hub.CaptureMessage(warnMsg)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
// either err isn't a data error OR we retried Initialise and it still returned an error
|
||||
// either way, give up.
|
||||
if err != nil {
|
||||
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
if len(prependStateEvents) > 0 {
|
||||
// The poller has just learned of these state events due to an
|
||||
|
@ -6,8 +6,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
slidingsync "github.com/matrix-org/sliding-sync"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/sqlutil"
|
||||
"github.com/matrix-org/sliding-sync/state"
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
"github.com/matrix-org/sliding-sync/sync3"
|
||||
"github.com/matrix-org/sliding-sync/testutils"
|
||||
@ -1335,3 +1338,69 @@ func TestNumLiveBulk(t *testing.T) {
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
// Regression test for a thing which Synapse can sometimes send down sync v2.
|
||||
// See https://github.com/matrix-org/sliding-sync/issues/367
|
||||
// This would cause this room to not be processed at all, which is bad.
|
||||
func TestSeeCreateEvent(t *testing.T) {
|
||||
// setup code
|
||||
pqString := testutils.PrepareDBConnectionString()
|
||||
db, err := sqlx.Open("postgres", pqString)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open postgres: %s", err)
|
||||
}
|
||||
v2 := runTestV2Server(t)
|
||||
v3 := runTestServer(t, v2, pqString)
|
||||
defer v2.close()
|
||||
defer v3.close()
|
||||
|
||||
roomID := "!TestSeeCreateEvent:localhost"
|
||||
userID := "@TestSeeCreateEvent:localhost"
|
||||
token := "TestSeeCreateEvent_TOKEN"
|
||||
v2.addAccount(t, userID, token)
|
||||
v2.queueResponse(userID, sync2.SyncResponse{
|
||||
Rooms: sync2.SyncRoomsResponse{
|
||||
Join: map[string]sync2.SyncV2JoinResponse{
|
||||
roomID: {
|
||||
State: sync2.EventsResponse{
|
||||
Events: []json.RawMessage{
|
||||
testutils.NewStateEvent(t, "m.room.join_rules", "", "@someone:somewhere", map[string]interface{}{"join_rule": "invite"}),
|
||||
testutils.NewStateEvent(t, "m.room.power_levels", "", "@someone:somewhere", map[string]interface{}{"users_default": 0}),
|
||||
testutils.NewStateEvent(t, "m.room.history_visibility", "", "@someone:somewhere", map[string]interface{}{"history_visibility": "shared"}),
|
||||
},
|
||||
},
|
||||
Timeline: sync2.TimelineResponse{
|
||||
Events: []json.RawMessage{
|
||||
testutils.NewStateEvent(t, "m.room.create", "", "@someone:somewhere", map[string]interface{}{"room_version": "10"}),
|
||||
testutils.NewJoinEvent(t, "@someone:somewhere"),
|
||||
testutils.NewJoinEvent(t, "@someone2:somewhere"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
v3.mustDoV3Request(t, token, sync3.Request{})
|
||||
// ensure the room exists
|
||||
roomsTable := state.NewRoomsTable(db)
|
||||
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
|
||||
nids, err := roomsTable.LatestNIDs(txn, []string{roomID})
|
||||
if err != nil {
|
||||
t.Fatalf("LatestNIDs: %s", err)
|
||||
}
|
||||
if len(nids) != 1 {
|
||||
t.Fatalf("LatestNIDs missing: %+v", nids)
|
||||
}
|
||||
nid, ok := nids[roomID]
|
||||
if !ok {
|
||||
t.Fatalf("LatestNIDs missing room %s : %+v", roomID, nids)
|
||||
}
|
||||
if nid == 0 {
|
||||
t.Fatalf("LatestNIDs 0 nid for room %s", roomID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("WithTransaction: %s", err)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user