sliding-sync/tests-integration/room_subscriptions_test.go
Kegan Dougal 0342a99524 bugfix: prevent clients starving themselves by constantly changing req params
Because the proxy services changes to req params preferentially to live
data, if the client constantly changes the window (e.g due to spidering)
then it can accidentally stop the delivery of live events to the client
until the spidering process is complete. To help address this, we now
process live updates _even if_ we have some data to send to the client.
This is bounded in size to prevent the inverse happening: constantly
seeing new live events which starves changes to req params. This should
hopefully strike the right balance.

With regression test.
2023-07-04 10:52:31 +01:00

176 lines
5.7 KiB
Go

package syncv3
import (
"encoding/json"
"testing"
"time"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
)
// Test that if you /join a room and then immediately add a room subscription for said room before the
// proxy is aware of it, that you still get all the data for that room when it does arrive.
func TestRoomSubscriptionJoinRoomRace(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
raceRoom := roomEvents{
roomID: "!race:localhost",
events: createRoomState(t, alice, time.Now()),
}
// add the account and queue a dummy response so there is a poll loop and we can get requests serviced
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: "!unimportant",
events: createRoomState(t, alice, time.Now()),
}),
},
})
// dummy request to start the poll loop
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(nil))
// now make a room subscription for a room which does not yet exist from the proxy's pov
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
raceRoom.roomID: {
RequiredState: [][2]string{
{"m.room.create", ""},
},
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(nil))
// now the proxy becomes aware of it
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(raceRoom),
},
})
v2.waitUntilEmpty(t, alice) // ensure we have processed it fully so we know it should exist
// hit the proxy again with this connection, we should get the data
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
raceRoom.roomID: {
m.MatchRoomInitial(true),
m.MatchRoomRequiredState([]json.RawMessage{raceRoom.events[0]}), // the create event
},
}))
}
// Regression test for https://github.com/vector-im/element-x-ios-rageshakes/issues/314
// Caused by: the user cache getting corrupted and missing events, caused by it incorrectly replacing
// its timeline with an older one.
// To the end user, it manifests as missing messages in the timeline, because the proxy incorrectly
// said the events are C,F,G and not E,F,G.
func TestRoomSubscriptionMisorderedTimeline(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
roomState := createRoomState(t, alice, time.Now())
abcInitialEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "A"),
testutils.NewMessageEvent(t, alice, "B"),
testutils.NewMessageEvent(t, alice, "C"),
}
room := roomEvents{
roomID: "!room:localhost",
events: append(roomState, abcInitialEvents...),
}
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(room),
},
})
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"list": {
Ranges: sync3.SliceRanges{{0, 10}},
RoomSubscription: sync3.RoomSubscription{TimelineLimit: 1},
},
},
})
// test that we get 1 event initally due to the timeline limit
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchRoomTimeline(abcInitialEvents[len(abcInitialEvents)-1:]),
},
}))
// now live stream 2 events
deLiveEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "D"),
testutils.NewMessageEvent(t, alice, "E"),
}
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: deLiveEvents,
}),
},
})
v2.waitUntilEmpty(t, alice)
// now add a room sub with timeline limit = 5, we will need to hit the DB to satisfy this.
// We might destroy caches in a bad way. We might not return the most recent 5 events.
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
room.roomID: {
TimelineLimit: 5,
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
// we append live events AFTER processing the new timeline limit, so 7 events not 5.
// TODO: ideally we'd just return abcde here.
m.MatchRoomTimeline(append(roomState[len(roomState)-2:], append(abcInitialEvents, deLiveEvents...)...)),
},
}), m.LogResponse(t))
// live stream the final 2 events
fgLiveEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "F"),
testutils.NewMessageEvent(t, alice, "G"),
}
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: fgLiveEvents,
}),
},
})
v2.waitUntilEmpty(t, alice)
// now ask for timeline limit = 3, which may miss events if the caches got corrupted.
// Do this on a fresh connection to force loadPos to update.
res = v3.mustDoV3Request(t, aliceToken, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
room.roomID: {
TimelineLimit: 3,
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchRoomTimeline(append(deLiveEvents[1:], fgLiveEvents...)),
},
}), m.LogResponse(t))
}