mirror of
synced 2025-03-10 13:37:11 +00:00
1263 lines
43 KiB
1263 lines
43 KiB
package syncv3
import (
// Tests that if Alice is syncing with Device A, then begins syncing on a new Device B, we use
// a custom filter on the first sync to just pull out to-device events (which is faster)
func TestSecondPollerFiltersToDevice(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
deviceAToken := "DEVICE_A_TOKEN"
v2.addAccountWithDeviceID(alice, "A", deviceAToken)
v2.queueResponse(deviceAToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: "!unimportant",
events: createRoomState(t, alice, time.Now()),
// seed the proxy with data and get the first poller running
v3.mustDoV3Request(t, deviceAToken, sync3.Request{})
// now sync with device B, and check we send the filter up
deviceBToken := "DEVICE_B_TOKEN"
v2.addAccountWithDeviceID(alice, "B", deviceBToken)
var seenInitialRequest atomic.Bool
v2.SetCheckRequest(func(token string, req *http.Request) {
if token != deviceBToken {
qps := req.URL.Query()
since := qps.Get("since")
filter := qps.Get("filter")
t.Logf("CheckRequest: %v since=%v filter=%v", token, since, filter)
if filter == "" {
t.Errorf("expected a filter on all v2 syncs from poller, but got none")
filterJSON := gjson.Parse(filter)
timelineLimit := filterJSON.Get("room.timeline.limit").Int()
roomsFilter := filterJSON.Get("room.rooms")
if !seenInitialRequest.Load() {
// First poll: should be an initial sync, limit 1, excluding all room timelines.
if since != "" {
t.Errorf("Expected no since token on first poll, but got %v", since)
if timelineLimit != 1 {
t.Errorf("Expected timeline limit of 1 on first poll, but got %d", timelineLimit)
if !roomsFilter.Exists() {
t.Errorf("Expected roomsFilter set to empty list on first poll, but got no roomFilter")
if len(roomsFilter.Array()) != 0 {
t.Errorf("Expected roomsFilter set to empty list on first poll, but got %v", roomsFilter.Raw)
} else {
// Second poll: should be an incremental sync, limit 50, including all room timelines.
if since == "" {
t.Errorf("Expected nonempty since token on second poll, but got empty")
if timelineLimit != 50 {
t.Errorf("Expected timeline limit of 50 on second poll, but got %d", timelineLimit)
if roomsFilter.Exists() {
t.Errorf("Expected missing roomsFilter on second poll, but got %v", roomsFilter.Raw)
wantMsg := json.RawMessage(`{"type":"f","content":{"f":"b"}}`)
v2.queueResponse(deviceBToken, sync2.SyncResponse{
NextBatch: "a",
ToDevice: sync2.EventsResponse{
Events: []json.RawMessage{
boolTrue := true
res := v3.mustDoV3Request(t, deviceBToken, sync3.Request{
Extensions: extensions.Request{
ToDevice: &extensions.ToDeviceRequest{
Core: extensions.Core{Enabled: &boolTrue},
if !seenInitialRequest.Load() {
t.Fatalf("did not see initial request for 2nd device")
// the first request will not wait for the response before returning due to device A. Poll again
// and now we should see the to-device msg.
res = v3.mustDoV3RequestWithPos(t, deviceBToken, res.Pos, sync3.Request{})
m.MatchResponse(t, res, m.MatchToDeviceMessages([]json.RawMessage{wantMsg}))
func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
t.Log("Alice creates a room.")
v2.addAccount(t, alice, aliceToken)
const roomID = "!unimportant"
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: createRoomState(t, alice, time.Now()),
t.Log("Alice sliding syncs, explicitly requesting power levels.")
aliceReq := sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: [][2]int64{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 10,
RequiredState: [][2]string{{"m.room.power_levels", ""}},
res := v3.mustDoV3Request(t, aliceToken, aliceReq)
t.Log("Alice's poller receives a gappy poll with a state block. The power levels and room name have changed.")
nameEvent := testutils.NewStateEvent(
map[string]interface{}{"name": "banana"},
powerLevelsEvent := testutils.NewStateEvent(
"users": map[string]int{alice: 100},
"events_default": 10,
messageEvent := testutils.NewMessageEvent(t, alice, "hello")
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
State: sync2.EventsResponse{
Events: []json.RawMessage{nameEvent, powerLevelsEvent},
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{messageEvent},
Limited: true,
PrevBatch: "batchymcbatchface",
v2.waitUntilEmpty(t, aliceToken)
t.Log("Alice incremental sliding syncs.")
_, respBytes, statusCode := v3.doV3Request(t, context.Background(), aliceToken, res.Pos, sync3.Request{})
t.Log("The server should have closed the long-polling session.")
assertUnknownPos(t, respBytes, statusCode)
t.Log("Alice sliding syncs from scratch.")
res = v3.mustDoV3Request(t, aliceToken, aliceReq)
t.Log("Alice sees the new room name and power levels.")
m.MatchResponse(t, res, m.MatchRoomSubscription(roomID,
// Similar to TestPollerHandlesUnknownStateEventsOnIncrementalSync. Here we are testing
// that if Alice's poller sees Bob leave in a state block, the events seen in that
// timeline are not visible to Bob.
func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
v2.addAccount(t, alice, aliceToken)
v2.addAccount(t, bob, bobToken)
const roomID = "!unimportant"
t.Log("Alice and Bob's pollers initial sync. Both see the same state: that Alice and Bob share a room.")
initialTimeline := createRoomState(t, alice, time.Now())
bobJoin := testutils.NewStateEvent(
map[string]interface{}{"membership": "join"},
initialJoinBlock := v2JoinTimeline(roomEvents{
roomID: roomID,
events: append(initialTimeline, bobJoin),
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock},
v2.queueResponse(bobToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock},
t.Log("Alice makes an initial sliding sync request.")
syncRequest := sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: [][2]int64{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 10,
aliceRes := v3.mustDoV3Request(t, aliceToken, syncRequest)
t.Log("Alice sees herself and Bob joined to the room.")
m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{roomID})),
m.MatchRoomSubscription(roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{bobJoin})),
t.Log("Bob makes an initial sliding sync request.")
bobRes := v3.mustDoV3Request(t, bobToken, syncRequest)
t.Log("Bob sees himself and Alice joined to the room.")
m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{roomID})),
m.MatchRoomSubscription(roomID, m.MatchJoinCount(2)),
t.Log("Alice's poller receives a gappy incremental sync response. Bob has left in the gap. The timeline includes a message from Alice.")
bobLeave := testutils.NewStateEvent(
map[string]interface{}{"membership": "leave"},
aliceMessage := testutils.NewMessageEvent(t, alice, "hello")
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
State: sync2.EventsResponse{
Events: []json.RawMessage{bobLeave},
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{aliceMessage},
Limited: true,
PrevBatch: "batchymcbatchface",
v2.waitUntilEmpty(t, aliceToken)
t.Log("Bob makes an incremental sliding sync request.")
_, respBytes, statusCode := v3.doV3Request(t, context.Background(), bobToken, bobRes.Pos, sync3.Request{})
assertUnknownPos(t, respBytes, statusCode)
t.Log("Bob makes a new sliding sync session.")
bobRes = v3.mustDoV3Request(t, bobToken, syncRequest)
t.Log("He shouldn't see any evidence of the room.")
m.MatchList("a", m.MatchV3Count(0)),
func TestPollersCanBeResumedAfterExpiry(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// Start the mock sync v2 server and add a device for alice and for bob.
v2 := runTestV2Server(t)
defer v2.close()
const aliceDevice = "alice_phone"
const bobDevice = "bob_desktop"
v2.addAccountWithDeviceID(alice, aliceDevice, aliceToken)
v2.addAccountWithDeviceID(bob, bobDevice, bobToken)
// Queue up a sync v2 response for both Alice and Bob.
v2.queueResponse(aliceToken, sync2.SyncResponse{NextBatch: "alice_response_1"})
v2.queueResponse(bobToken, sync2.SyncResponse{NextBatch: "bob_response_1"})
// Inject an old token from Alice and a new token from Bob into the DB.
v2Store := sync2.NewStore(pqString, os.Getenv("SYNCV3_SECRET"))
err := sqlutil.WithTransaction(v2Store.DB, func(txn *sqlx.Tx) (err error) {
err = v2Store.DevicesTable.InsertDevice(txn, alice, aliceDevice)
if err != nil {
err = v2Store.DevicesTable.InsertDevice(txn, bob, bobDevice)
if err != nil {
_, err = v2Store.TokensTable.Insert(txn, aliceToken, alice, aliceDevice, time.UnixMicro(0))
if err != nil {
_, err = v2Store.TokensTable.Insert(txn, bobToken, bob, bobDevice, time.Now())
if err != nil {
t.Log("Start the v3 server and its pollers.")
v3 := runTestServer(t, v2, pqString)
go v3.h2.StartV2Pollers()
defer v3.close()
t.Log("Alice's poller should be active.")
v2.waitUntilEmpty(t, aliceToken)
t.Log("Bob's poller should be active.")
v2.waitUntilEmpty(t, bobToken)
t.Log("Manually trigger a poller cleanup.")
t.Log("Queue up a sync v2 response for both Alice and Bob. Alice's response includes account data.")
accdata := testutils.NewAccountData(t, "dummytype", map[string]any{})
v2.queueResponse(aliceToken, sync2.SyncResponse{
NextBatch: "alice_response_2",
AccountData: sync2.EventsResponse{
Events: []json.RawMessage{
v2.queueResponse(bobToken, sync2.SyncResponse{NextBatch: "bob_response_2"})
t.Log("Wait for Bob's poller to poll")
v2.waitUntilEmpty(t, bobToken)
// Alice's poller has likely already made an HTTP response. But her poller should
// have been terminated before the request was received, so its since token
// should not have been persisted to the DB.
t.Log("Alice's since token in the DB should not have advanced.")
// TODO: surprising that there isn't a function to get the since token for a device!
var since string
err = v2Store.DB.Get(&since, `SELECT since FROM syncv3_sync2_devices WHERE user_id = $1 AND device_id = $2`, alice, aliceDevice)
if err != nil {
if since != "alice_response_1" {
t.Errorf("Alice's sync token in DB was %s, expected alice_response_1", since)
t.Log("Requeue the same response for Alice's restarted poller to consume.")
v2.queueResponse(aliceToken, sync2.SyncResponse{
NextBatch: "alice_response_2",
AccountData: sync2.EventsResponse{
Events: []json.RawMessage{
t.Log("Alice makes a new sliding sync request")
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Extensions: extensions.Request{
AccountData: &extensions.AccountDataRequest{
Core: extensions.Core{
Enabled: &boolTrue,
t.Log("Alice's poller should have been polled.")
v2.waitUntilEmpty(t, aliceToken)
t.Log("Alice should see her account data")
m.MatchResponse(t, res, m.MatchAccountData([]json.RawMessage{accdata}, nil))
// Regression test for https://github.com/matrix-org/sliding-sync/issues/287#issuecomment-1706522718
func TestPollerExpiryEnsurePollingRace(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
defer v2.close()
v3 := runTestServer(t, v2, pqString)
defer v3.close()
v2.addAccount(t, alice, aliceToken)
// Arrange the following:
// 1. A request arrives from an unknown token.
// 2. The API makes a /whoami lookup for the new token. That returns without error.
// 3. The old token expires.
// 4. The poller tries to call /sync but finds that the token has expired.
v2.SetCheckRequest(func(token string, req *http.Request) {
if token != aliceToken {
t.Fatalf("unexpected poll from %s", token)
// Expire the token before we process the request.
t.Log("Alice's token expires.")
t.Log("Alice makes a sliding sync request with a token that's about to expire.")
_, resBytes, status := v3.doV3Request(t, context.Background(), aliceToken, "", sync3.Request{})
if status != http.StatusUnauthorized {
t.Fatalf("Should have got 401 http response; got %d\n%s", status, resBytes)
// Regression test for the bugfix for https://github.com/matrix-org/sliding-sync/issues/287#issuecomment-1706522718
// Specifically, we could cache the failure and never tell the poller about new tokens, wedging the client(!). This
// seems to have been due to the following:
// - client hits sync for the first time. We /whoami and remember the token->user mapping in TokensTable.
// - client syncing + poller syncing, everything happy.
// - token expires. OnExpiredToken is sent to EnsurePoller which removes the entry from EnsurePoller and nukes the conns.
// - client hits sync, gets 400 M_UNKNOWN_POS due to nuked conns.
// - client hits a fresh /sync: for whatever reason, the token is NOT 401d there and then by the /whoami lookup failing.
// Maybe failed to remove the token, but don't see any logs to suggest this. Seems to be an OIDC thing.
// - EnsurePoller starts a poller, which immediately 401s as the token is expired.
// - OnExpiredToken is sent first, which removes the entry in EnsurePoller.
// - OnInitialSyncComplete[success=false] is sent after, which MAKES A NEW ENTRY with success=false.
// - proxy sends back 401 M_UNKNOWN_TOKEN.
// - At this point the proxy is wedged. Any token, no matter how valid they are, will not hit EnsurePoller because
// we cached success=false for that (user,device).
// Traceable in the logs which show spam of this log line without "Poller: v2 poll loop started" interleaved.
// 12:45:33 ERR EnsurePolling failed, returning 401 conn=encryption device=xx user=@xx:xx.xx
// To test this failure mode we:
// - Create Alice and sync her poller.
// - Expire her token immediately, just like the test TestPollerExpiryEnsurePollingRace
// - Do another request with a valid new token, this should succeed.
func TestPollerExpiryEnsurePollingRaceDoesntWedge(t *testing.T) {
newToken := "NEW_ALICE_TOKEN"
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
defer v2.close()
v3 := runTestServer(t, v2, pqString)
defer v3.close()
v2.addAccount(t, alice, aliceToken)
// Arrange the following:
// 1. A request arrives from an unknown token.
// 2. The API makes a /whoami lookup for the new token. That returns without error.
// 3. The old token expires.
// 4. The poller tries to call /sync but finds that the token has expired.
// NEW 5. Using a "new token" works.
var gotNewToken atomic.Bool
v2.SetCheckRequest(func(token string, req *http.Request) {
if token == newToken {
t.Log("recv new token")
if token != aliceToken {
t.Fatalf("unexpected poll from %s", token)
// Expire the token before we process the request.
t.Log("Alice's token expires.")
t.Log("Alice makes a sliding sync request with a token that's about to expire.")
_, resBytes, status := v3.doV3Request(t, context.Background(), aliceToken, "", sync3.Request{})
if status != http.StatusUnauthorized {
t.Fatalf("Should have got 401 http response; got %d\n%s", status, resBytes)
// make a new token and use it
v2.addAccount(t, alice, newToken)
_, resBytes, status = v3.doV3Request(t, context.Background(), newToken, "", sync3.Request{})
if status != http.StatusOK {
t.Fatalf("Should have got 200 http response; got %d\n%s", status, resBytes)
if !gotNewToken.Load() {
t.Fatalf("never saw a v2 poll with the new token")
func TestTimelineStopsLoadingWhenMissingPrevious(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
const roomID = "!unimportant"
t.Log("Alice creates a room.")
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: "!unimportant",
events: createRoomState(t, alice, time.Now()),
t.Log("Alice syncs, starting a poller.")
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomID: {
TimelineLimit: 10,
t.Log("Her response includes the room she created..")
m.MatchResponse(t, res, m.MatchRoomSubscription(roomID))
t.Log("Alice's poller receives a gappy sync with a timeline event.")
msgAfterGap := testutils.NewMessageEvent(t, alice, "school's out for summer")
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{msgAfterGap},
Limited: true,
PrevBatch: "dummyPrevBatch",
v2.waitUntilEmpty(t, aliceToken)
t.Log("Alice makes a new connection and syncs, requesting the last 10 timeline events.")
res = v3.mustDoV3Request(t, aliceToken, sync3.Request{
ConnID: "conn2",
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomID: {
TimelineLimit: 10,
t.Log("The response's timeline should only include the event after the gap.")
m.MatchResponse(t, res, m.MatchRoomSubscription(roomID,
// The "prepend state events" mechanism added in
// https://github.com/matrix-org/sliding-sync/pull/71 ensured that the proxy
// communicated state events in "gappy syncs" to users. But it did so via Accumulate,
// which made one snapshot for each state event. This was not an accurate model of the
// room's history (the state block comes in no particular order) and had awful
// performance for large gappy states.
// We now want to handle these in Initialise, making a single snapshot for the state
// block. This test ensures that is the case. The logic is very similar to the e2e test
// TestGappyState.
func TestGappyStateDoesNotAccumulateTheStateBlock(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
defer v2.close()
v3 := runTestServer(t, v2, pqString)
defer v3.close()
v2.addAccount(t, alice, aliceToken)
v2.addAccount(t, bob, bobToken)
t.Log("Alice creates a room, sets its name and sends a message.")
const roomID = "!unimportant"
name1 := testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]any{
"name": "wonderland",
msg1 := testutils.NewMessageEvent(t, alice, "0118 999 881 999 119 7253")
joinTimeline := v2JoinTimeline(roomEvents{
roomID: roomID,
events: append(
createRoomState(t, alice, time.Now()),
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: joinTimeline,
t.Log("Alice sliding syncs with a huge timeline limit, subscribing to the room she just created.")
aliceReq := sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomID: {TimelineLimit: 100},
res := v3.mustDoV3Request(t, aliceToken, aliceReq)
t.Log("Alice sees the room with the expected name, with the name event and message at the end of the timeline.")
m.MatchResponse(t, res, m.MatchRoomSubscription(roomID,
m.MatchRoomTimelineMostRecent(2, []json.RawMessage{name1, msg1}),
t.Log("Alice's poller receives a gappy sync, including a room name change, bob joining, and two messages.")
stateBlock := make([]json.RawMessage, 0)
for i := 0; i < 10; i++ {
statePiece := testutils.NewStateEvent(t, "com.example.custom", fmt.Sprintf("%d", i), alice, map[string]any{})
stateBlock = append(stateBlock, statePiece)
name2 := testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]any{
"name": "not wonderland",
bobJoin := testutils.NewJoinEvent(t, bob)
stateBlock = append(stateBlock, name2, bobJoin)
msg2 := testutils.NewMessageEvent(t, alice, "Good morning!")
msg3 := testutils.NewMessageEvent(t, alice, "That's a nice tnetennba.")
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
State: sync2.EventsResponse{
Events: stateBlock,
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{msg2, msg3},
Limited: true,
PrevBatch: "dummyPrevBatch",
v2.waitUntilEmpty(t, aliceToken)
t.Log("Alice syncs. The server should close her long-polling session.")
_, respBytes, statusCode := v3.doV3Request(t, context.Background(), aliceToken, res.Pos, sync3.Request{})
assertUnknownPos(t, respBytes, statusCode)
t.Log("Alice sliding syncs from scratch. She should see the two most recent message in the timeline only. The room name should have changed too.")
res = v3.mustDoV3Request(t, aliceToken, aliceReq)
m.MatchResponse(t, res, m.MatchRoomSubscription(roomID,
m.MatchRoomName("not wonderland"),
// In particular, we shouldn't see state here because it's not part of the timeline.
// Nor should we see msg1, as that comes before a gap.
m.MatchRoomTimeline([]json.RawMessage{msg2, msg3}),
// Right, this has turned out to be very involved. This test has three varying
// parameters:
// - Bert's initial membership (in 3 below),
// - his final membership in (5), and
// - whether his sync in (6) is initial or long-polling ("live").
// The test:
// 1. Registers two users Ana and Bert.
// 2. Has Ana create a public room.
// 3. Sets an initial membership for Bert in that room.
// 4. Sliding syncs for Bert, if he will live-sync in (6) below.
// 5. Gives Ana's poller a gappy poll in which Bert's membership changes.
// 6. Has Bert do a sliding sync.
// 7. Ana invites Bert to a DM.
// We perform the following assertions:
// - After (3), Ana sees her membership, Bert's initial membership, appropriate
// join and invite counts, and an appropriate timeline.
// - If applicable: after (4), Bert sees his initial membership.
// - After (5), Ana's connection is closed. When opening a new one, she sees her
// membership, Bert's new membership, and the post-gap timeline.
// - After (6), Bert's connection is closed if he was expecting a live update.
// - After (6), Bert sees his new membership (if there is anything to see).
// - After (7), Bert sees the DM invite.
// Remarks:
// - Use a per-test Ana and Bert here so we don't clash with the global constants
// alice and bob.
// - We're feeding all this information in via Ana's poller to check that stuff
// propagates from her poller to Bert's client. However, when Bob's membership is
// "invite" we need to directly send the invite to his poller.
// - Step (7) serves as a sentinel to prove that the proxy has processed (5) in the
// case where there is nothing for Bert to see in (6), e.g. a preemptive ban or
// an unban during the gap.
// - Testing all the membership transitions is likely overkill. But it was useful
// for finding edge cases in the proxy's assumptions at first, before we decided to
// nuke conns and userCaches and start from scratch.
func TestClientsSeeMembershipTransitionsInGappyPolls(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
// TODO remove this? Otherwise running tests is sloooooow
v2.timeToWaitForV2Response /= 20
defer v2.close()
v3 := runTestServer(t, v2, pqString)
defer v3.close()
type testcase struct {
// Inputs
beforeMembership string
afterMembership string
viaLiveUpdate bool
// Scratch space
id string
ana string
anaToken string
bert string
bertToken string
publicRoomID string // room that will receive gappy state
dmRoomID string // DM between ana and bert, used to send a sentinel message
var tcs []testcase
transitions := map[string][]string{
// before: {possible after}
// https://spec.matrix.org/v1.8/client-server-api/#room-membership for the list of allowed transitions
"none": {"ban", "invite", "join", "leave"},
"invite": {"ban", "join", "leave"},
// Note: can also join->join here e.g. for displayname change, but will do that in a separate test
"join": {"ban", "leave"},
"leave": {"ban", "invite", "join"},
"ban": {"leave"},
for before, afterOptions := range transitions {
for _, after := range afterOptions {
for _, live := range []bool{true, false} {
idStr := fmt.Sprintf("%s-%s", before, after)
if live {
idStr += "-live"
tc := testcase{
beforeMembership: before,
afterMembership: after,
viaLiveUpdate: live,
id: idStr,
publicRoomID: fmt.Sprintf("!%s-public", idStr),
dmRoomID: fmt.Sprintf("!%s-dm", idStr),
// Using ana and bert to stop myself from pulling in package-level constants alice and bob
ana: fmt.Sprintf("@ana-%s:localhost", idStr),
bert: fmt.Sprintf("@bert-%s:localhost", idStr),
tc.anaToken = tc.ana + "_token"
tc.bertToken = tc.bert + "_token"
tcs = append(tcs, tc)
ssRequest := sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 10}},
RoomSubscription: sync3.RoomSubscription{
RequiredState: [][2]string{{"m.room.member", "*"}},
TimelineLimit: 20,
setup := func(t *testing.T, tc testcase) (publicEvents []json.RawMessage, anaMembership json.RawMessage, anaRes *sync3.Response) {
// 1. Register two users Ana and Bert.
v2.addAccount(t, tc.ana, tc.anaToken)
v2.addAccount(t, tc.bert, tc.bertToken)
// 2. Have Ana create a public room.
t.Log("Ana creates a public room.")
publicEvents = createRoomState(t, tc.ana, time.Now())
for _, ev := range publicEvents {
parsed := gjson.ParseBytes(ev)
if parsed.Get("type").Str == "m.room.member" && parsed.Get("state_key").Str == tc.ana {
anaMembership = ev
// 3. Set an initial membership for Bert.
var wantJoinCount int
var wantInviteCount int
var bertMembership json.RawMessage
switch tc.beforeMembership {
case "none":
t.Log("Bert has no membership in the room.")
wantJoinCount = 1
wantInviteCount = 0
case "invite":
t.Log("Bert is invited.")
bertMembership = testutils.NewStateEvent(t, "m.room.member", tc.bert, tc.ana, map[string]any{"membership": "invite"})
wantJoinCount = 1
wantInviteCount = 1
case "join":
t.Log("Bert joins the room.")
bertMembership = testutils.NewJoinEvent(t, tc.bert)
wantJoinCount = 2
wantInviteCount = 0
case "leave":
t.Log("Bert is pre-emptively kicked.")
bertMembership = testutils.NewStateEvent(t, "m.room.member", tc.bert, tc.ana, map[string]any{"membership": "leave"})
wantJoinCount = 1
wantInviteCount = 0
case "ban":
t.Log("Bert is banned.")
bertMembership = testutils.NewStateEvent(t, "m.room.member", tc.bert, tc.ana, map[string]any{"membership": "ban"})
wantJoinCount = 1
wantInviteCount = 0
panic(fmt.Errorf("unknown beforeMembership %s", tc.beforeMembership))
if len(bertMembership) > 0 {
publicEvents = append(publicEvents, bertMembership)
t.Log("Ana's poller sees the public room for the first time.")
v2.queueResponse(tc.anaToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
tc.publicRoomID: {
Timeline: sync2.TimelineResponse{
Events: publicEvents,
PrevBatch: "anaPublicPrevBatch1",
NextBatch: "anaSync1",
t.Log("Ana sliding syncs, requesting all room members.")
anaRes = v3.mustDoV3Request(t, tc.anaToken, ssRequest)
t.Log("She sees herself joined to both rooms, with appropriate timelines and counts.")
// Note: we only expect timeline[1:] here, not the create event. See
// https://github.com/matrix-org/sliding-sync/issues/343
expectedMembers := []json.RawMessage{anaMembership}
if len(bertMembership) > 0 {
expectedMembers = append(expectedMembers, bertMembership)
m.MatchResponse(t, anaRes,
gappyPoll := func(t *testing.T, tc testcase, anaMembership json.RawMessage, anaRes *sync3.Response) (newMembership json.RawMessage, publicTimeline []json.RawMessage) {
t.Logf("Ana's poller gets a gappy sync response for the public room. Bert's membership is now %s, and Ana has sent 10 messages.", tc.afterMembership)
publicTimeline = make([]json.RawMessage, 10)
for i := range publicTimeline {
publicTimeline[i] = testutils.NewMessageEvent(t, tc.ana, fmt.Sprintf("hello %d", i))
switch tc.afterMembership {
case "invite":
newMembership = testutils.NewStateEvent(t, "m.room.member", tc.bert, tc.ana, map[string]any{"membership": "invite"})
case "join":
newMembership = testutils.NewJoinEvent(t, tc.bert)
case "leave":
newMembership = testutils.NewStateEvent(t, "m.room.member", tc.bert, tc.ana, map[string]any{"membership": "leave"})
case "ban":
newMembership = testutils.NewStateEvent(t, "m.room.member", tc.bert, tc.ana, map[string]any{"membership": "ban"})
panic(fmt.Errorf("unknown afterMembership %s", tc.afterMembership))
v2.queueResponse(tc.anaToken, sync2.SyncResponse{
NextBatch: "ana2",
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
tc.publicRoomID: {
State: sync2.EventsResponse{
Events: []json.RawMessage{newMembership},
Timeline: sync2.TimelineResponse{
Events: publicTimeline,
Limited: true,
PrevBatch: "anaPublicPrevBatch2",
v2.waitUntilEmpty(t, tc.anaToken)
if tc.afterMembership == "invite" {
t.Log("Bert's poller sees his invite.")
v2.queueResponse(tc.bertToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Invite: map[string]sync2.SyncV2InviteResponse{
tc.publicRoomID: {
InviteState: sync2.EventsResponse{
// TODO: this really ought to be stripped state events
Events: []json.RawMessage{anaMembership, newMembership},
NextBatch: tc.bert + "_invite",
t.Log("Ana syncs.")
_, respBytes, statusCode := v3.doV3Request(t, context.Background(), tc.anaToken, anaRes.Pos, sync3.Request{})
t.Log("Her long-polling session has been closed by the server.")
assertUnknownPos(t, respBytes, statusCode)
t.Log("Ana syncs again from scratch.")
anaRes = v3.mustDoV3Request(t, tc.anaToken, ssRequest)
t.Log("She sees both her and Bob's membership, and the timeline from the gappy poll.")
// Note: we don't expect to see the pre-gap timeline, here because we stop at
// the first gap we see in the timeline.
m.MatchResponse(t, anaRes, m.MatchRoomSubscription(tc.publicRoomID,
m.MatchRoomRequiredState([]json.RawMessage{anaMembership, newMembership}),
for _, tc := range tcs {
t.Run(tc.id, func(t *testing.T) {
// 1--3: Register users, create public room, set Bert's membership.
publicEvents, anaMembership, anaRes := setup(t, tc)
defer func() {
// Cleanup these users once we're done with them. This helps stop log spam when debugging.
// Ensure the proxy considers Bert to already be polling. In particular, if
// Bert is initially invited, make sure his poller sees the invite.
if tc.beforeMembership == "invite" {
t.Log("Bert's poller sees his invite.")
v2.queueResponse(tc.bertToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Invite: map[string]sync2.SyncV2InviteResponse{
tc.publicRoomID: {
InviteState: sync2.EventsResponse{
// TODO: this really ought to be stripped state events
Events: publicEvents,
NextBatch: tc.bert + "_invite",
} else {
t.Log("Queue up an empty poller response for Bert.")
v2.queueResponse(tc.bertToken, sync2.SyncResponse{
NextBatch: tc.bert + "_empty_sync",
t.Log("Bert makes a dummy request with a different connection ID, to ensure his poller has started.")
v3.mustDoV3Request(t, tc.bertToken, sync3.Request{
ConnID: "bert-dummy-conn",
var bertRes *sync3.Response
// 4: sliding sync for Bert, if he will live-sync in (6) below.
if tc.viaLiveUpdate {
t.Log("Bert sliding syncs.")
bertRes = v3.mustDoV3Request(t, tc.bertToken, ssRequest)
// Bert will see the entire history of these rooms, so there shouldn't be any prev batch tokens.
expectedSubscriptions := map[string][]m.RoomMatcher{}
switch tc.beforeMembership {
case "invite":
t.Log("Bert sees his invite.")
expectedSubscriptions[tc.publicRoomID] = []m.RoomMatcher{
case "join":
t.Log("Bert sees his join.")
expectedSubscriptions[tc.publicRoomID] = []m.RoomMatcher{
case "none":
case "leave":
case "ban":
t.Log("Bert does not see the room.")
panic(fmt.Errorf("unknown beforeMembership %s", tc.beforeMembership))
m.MatchResponse(t, bertRes, m.MatchRoomSubscriptionsStrict(expectedSubscriptions))
// 5: Ana receives a gappy poll, plus a sentinel in her DM with Bert.
newMembership, publicTimeline := gappyPoll(t, tc, anaMembership, anaRes)
// 6: Bert sliding syncs.
if tc.viaLiveUpdate {
wasInvolvedInRoom := tc.beforeMembership == "join" || tc.beforeMembership == "invite"
if wasInvolvedInRoom {
t.Log("Bert makes an incremental sliding sync.")
_, respBytes, statusCode := v3.doV3Request(t, context.Background(), tc.bertToken, bertRes.Pos, ssRequest)
assertUnknownPos(t, respBytes, statusCode)
} else {
t.Log("Queue up an empty poller response for Bert. so the proxy will consider him to be polling.")
v2.queueResponse(tc.bertToken, sync2.SyncResponse{
NextBatch: tc.bert + "_empty_sync",
t.Log("Bert makes new sliding sync connection.")
bertRes = v3.mustDoV3Request(t, tc.bertToken, ssRequest)
// Work out what Bert should see.
respMatchers := []m.RespMatcher{}
switch tc.afterMembership {
case "invite":
t.Log("Bert should see his invite.")
respMatchers = append(respMatchers,
m.MatchList("a", m.MatchV3Count(1)),
case "join":
t.Log("Bert should see himself joined to the room, and Alice's messages.")
respMatchers = append(respMatchers,
m.MatchList("a", m.MatchV3Count(1)),
m.MatchRoomRequiredState([]json.RawMessage{anaMembership, newMembership}),
m.MatchRoomTimelineMostRecent(len(publicTimeline), publicTimeline),
case "leave":
case "ban":
respMatchers = append(respMatchers, m.MatchList("a", m.MatchV3Count(0)))
// Any prior connection has been closed by the server, so Bert won't see
// a transition here.
t.Logf("Bob shouldn't see his %s (membership was: %s)", tc.afterMembership, tc.beforeMembership)
respMatchers = append(respMatchers, m.MatchRoomSubscriptionsStrict(nil))
panic(fmt.Errorf("unknown afterMembership %s", tc.afterMembership))
m.MatchResponse(t, bertRes, respMatchers...)
// 7: Ana invites Bert to a DM. He accepts.
// This is a sentinel which proves the proxy has processed the gappy poll
// properly in the situations where there's nothing for Bert to see in his
// second sync, e.g. ban -> leave (an unban).
t.Log("Ana invites Bert to a DM. He accepts.")
bertDMJoin := testutils.NewJoinEvent(t, tc.bert)
dmTimeline := append(
createRoomState(t, tc.ana, time.Now()),
testutils.NewStateEvent(t, "m.room.member", tc.bert, tc.ana, map[string]any{"membership": "invite"}),
v2.queueResponse(tc.anaToken, sync2.SyncResponse{
NextBatch: "ana3",
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
tc.dmRoomID: {
Timeline: sync2.TimelineResponse{
Events: dmTimeline,
PrevBatch: "anaDM",
v2.waitUntilEmpty(t, tc.anaToken)
t.Log("Bert sliding syncs")
bertRes = v3.mustDoV3RequestWithPos(t, tc.bertToken, bertRes.Pos, ssRequest)
t.Log("Bert sees his join to the DM.")
m.MatchResponse(t, bertRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
tc.dmRoomID: {m.MatchRoomLacksInviteState(), m.MatchRoomTimelineMostRecent(1, []json.RawMessage{bertDMJoin})},
// This is a minimal version of the test above, which is helpful for debugging (because
// the above test is a monstrosity---apologies to the reader.)
func TestTimelineAfterRequestingStateAfterGappyPoll(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
defer v2.close()
v3 := runTestServer(t, v2, pqString)
defer v3.close()
alice := "alice"
aliceToken := "alicetoken"
bob := "bob"
roomID := "!unimportant"
v2.addAccount(t, alice, aliceToken)
t.Log("alice creates a public room.")
timeline1 := createRoomState(t, alice, time.Now())
var aliceMembership json.RawMessage
for _, ev := range timeline1 {
parsed := gjson.ParseBytes(ev)
if parsed.Get("type").Str == "m.room.member" && parsed.Get("state_key").Str == alice {
aliceMembership = ev
if len(aliceMembership) == 0 {
t.Fatal("Initial timeline did not have a membership for Alice")
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
Timeline: sync2.TimelineResponse{
Events: timeline1,
PrevBatch: "alicePublicPrevBatch1",
NextBatch: "aliceSync1",
t.Log("alice sliding syncs, requesting all memberships in state.")
aliceReq := sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomID: {
TimelineLimit: 20,
RequiredState: [][2]string{{"m.room.member", "*"}},
aliceRes := v3.mustDoV3Request(t, aliceToken, aliceReq)
t.Log("She sees herself joined to her room, with an appropriate timeline.")
// Note: we only expect timeline1[1:] here, excluding the create event. See
// https://github.com/matrix-org/sliding-sync/issues/343
m.MatchResponse(t, aliceRes,
t.Logf("Alice's poller gets a gappy sync response for the public room. bob's membership is now join, and alice has sent 10 messages.")
timeline2 := make([]json.RawMessage, 10)
for i := range timeline2 {
timeline2[i] = testutils.NewMessageEvent(t, alice, fmt.Sprintf("hello %d", i))
bobMembership := testutils.NewJoinEvent(t, bob)
v2.queueResponse(aliceToken, sync2.SyncResponse{
NextBatch: "alice2",
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
State: sync2.EventsResponse{
Events: []json.RawMessage{bobMembership},
Timeline: sync2.TimelineResponse{
Events: timeline2,
Limited: true,
PrevBatch: "alicePublicPrevBatch2",
v2.waitUntilEmpty(t, aliceToken)
t.Log("Alice does an incremental sliding sync.")
_, respBytes, statusCode := v3.doV3Request(t, context.Background(), aliceToken, aliceRes.Pos, sync3.Request{})
t.Log("Her long-polling session has been closed by the server.")
assertUnknownPos(t, respBytes, statusCode)
t.Log("Alice syncs again from scratch.")
aliceRes = v3.mustDoV3Request(t, aliceToken, aliceReq)
t.Log("She sees both her and Bob's membership, and the timeline from the gappy poll.")
// Note: we don't expect to see timeline1 here because we stop at the first gap we
// see in the timeline.
m.MatchResponse(t, aliceRes, m.MatchRoomSubscription(roomID,
m.MatchRoomRequiredState([]json.RawMessage{aliceMembership, bobMembership}),
func assertUnknownPos(t *testing.T, respBytes []byte, statusCode int) {
if statusCode != http.StatusBadRequest {
t.Errorf("Got status %d, expected %d", statusCode, http.StatusBadRequest)
if errcode := gjson.GetBytes(respBytes, "errcode").Str; errcode != "M_UNKNOWN_POS" {
t.Errorf("Got errcode %s, expected %s", errcode, "M_UNKNOWN_POS")