sliding-sync/sync3/handler/connstate_test.go
Kegan Dougal 4c6d504022 bugfix: ensure metadata about space children doesn't leak to active connections
If Alice and Bob are in the same space, and Bob creates a child in that space,
Alice would incorrectly receive global metadata about that child room if Alice
was live syncing at that time. This leak did not expose confidential information
as Alice could receive all that metadata via the /rooms/{roomId}/hierarchy endpoint
already. However, it would cause clients to put the space child room into the room
list which would be very confusing, as it would have no timeline and no other data.
2024-01-12 12:15:38 +00:00

809 lines
26 KiB
Go

package handler
import (
"context"
"encoding/json"
"fmt"
"reflect"
"testing"
"time"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/sync3/caches"
"github.com/matrix-org/sliding-sync/sync3/extensions"
"github.com/matrix-org/sliding-sync/testutils"
)
type joinChecker struct{}
func (c *joinChecker) IsUserJoined(userID, roomID string) bool {
return true
}
type NopExtensionHandler struct{}
func (h *NopExtensionHandler) Handle(ctx context.Context, req extensions.Request, extCtx extensions.Context) (res extensions.Response) {
return
}
func (h *NopExtensionHandler) HandleLiveUpdate(ctx context.Context, update caches.Update, req extensions.Request, res *extensions.Response, extCtx extensions.Context) {
}
type NopUserCacheStore struct{}
func (s *NopUserCacheStore) GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string) {
return
}
func (s *NopUserCacheStore) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*state.LatestEvents, error) {
return nil, nil
}
type NopJoinTracker struct{}
func (t *NopJoinTracker) IsUserJoined(userID, roomID string) bool {
return true
}
type NopTransactionFetcher struct{}
func (t *NopTransactionFetcher) TransactionIDForEvents(userID, deviceID string, eventIDs []string) (eventIDToTxnID map[string]string) {
return
}
func newRoomMetadata(roomID string, lastMsgTimestamp spec.Timestamp) internal.RoomMetadata {
m := internal.NewRoomMetadata(roomID)
m.NameEvent = "Room " + roomID
m.LastMessageTimestamp = uint64(lastMsgTimestamp)
return *m
}
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{[]byte(`{}`)},
}
}
return result
}
// Sync an account with 3 rooms and check that we can grab all rooms and they are sorted correctly initially. Checks
// that basic UPDATE and DELETE/INSERT works when tracking all rooms.
func TestConnStateInitial(t *testing.T) {
ConnID := sync3.ConnID{
DeviceID: "d",
}
userID := "@TestConnStateInitial_alice:localhost"
deviceID := "yep"
timestampNow := spec.Timestamp(1632131678061).Time()
// initial sort order B, C, A
roomA := newRoomMetadata("!a:localhost", spec.AsTimestamp(timestampNow.Add(-8*time.Second)))
roomB := newRoomMetadata("!b:localhost", spec.AsTimestamp(timestampNow))
roomC := newRoomMetadata("!c:localhost", spec.AsTimestamp(timestampNow.Add(-4*time.Second)))
timeline := map[string]json.RawMessage{
roomA.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "a"}),
roomB.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "b"}),
roomC.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "c"}),
}
globalCache := caches.NewGlobalCache(nil)
globalCache.Startup(map[string]internal.RoomMetadata{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
})
dispatcher := sync3.NewDispatcher()
dispatcher.Startup(map[string][]string{
roomA.RoomID: {userID},
roomB.RoomID: {userID},
roomC.RoomID: {userID},
})
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) {
return 1, map[string]*internal.RoomMetadata{
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
}, map[string]internal.EventMetadata{
roomA.RoomID: {NID: 123, Timestamp: 123},
roomB.RoomID: {NID: 456, Timestamp: 456},
roomC.RoomID: {NID: 780, Timestamp: 789},
}, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}, &joinChecker{})
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{timeline[roomID]},
}
}
return result
}
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
if userID != cs.UserID() {
t.Fatalf("UserID returned wrong value, got %v want %v", cs.UserID(), userID)
}
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 9},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Rooms: map[string]sync3.Room{
roomB.RoomID: {
Name: roomB.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomB.RoomID]},
},
roomC.RoomID: {
Name: roomC.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomC.RoomID]},
},
roomA.RoomID: {
Name: roomA.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomA.RoomID]},
},
},
Lists: map[string]sync3.ResponseList{
"a": {
Count: 3,
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: [2]int64{0, 2},
RoomIDs: []string{
roomB.RoomID, roomC.RoomID, roomA.RoomID,
},
},
},
},
},
})
// bump A to the top
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, testutils.WithTimestamp(timestampNow.Add(1*time.Second)))
dispatcher.OnNewEvent(context.Background(), roomA.RoomID, newEvent, 1)
// request again for the diff
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 9},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Rooms: map[string]sync3.Room{
roomA.RoomID: {
Timeline: []json.RawMessage{newEvent},
},
},
Lists: map[string]sync3.ResponseList{
"a": {
Count: 3,
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(2),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
RoomID: roomA.RoomID,
},
},
},
},
})
// another message should just update
newEvent = testutils.NewEvent(t, "unimportant", "me", struct{}{}, testutils.WithTimestamp(timestampNow.Add(2*time.Second)))
dispatcher.OnNewEvent(context.Background(), roomA.RoomID, newEvent, 2)
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 9},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Rooms: map[string]sync3.Room{
roomA.RoomID: {
Timeline: []json.RawMessage{newEvent},
},
},
Lists: map[string]sync3.ResponseList{
"a": {
Count: 3,
},
},
})
}
// Test that multiple ranges can be tracked in a single request
func TestConnStateMultipleRanges(t *testing.T) {
t.Skip("flakey")
ConnID := sync3.ConnID{
DeviceID: "d",
}
userID := "@TestConnStateMultipleRanges_alice:localhost"
deviceID := "yep"
timestampNow := spec.Timestamp(1632131678061)
var rooms []*internal.RoomMetadata
var roomIDs []string
globalCache := caches.NewGlobalCache(nil)
dispatcher := sync3.NewDispatcher()
roomIDToRoom := make(map[string]internal.RoomMetadata)
for i := int64(0); i < 10; i++ {
roomID := fmt.Sprintf("!%d:localhost", i)
room := internal.RoomMetadata{
RoomID: roomID,
NameEvent: fmt.Sprintf("Room %d", i),
// room 1 is most recent, 10 is least recent
LastMessageTimestamp: uint64(uint64(timestampNow) - uint64(i*1000)),
}
rooms = append(rooms, &room)
roomIDs = append(roomIDs, roomID)
roomIDToRoom[roomID] = room
globalCache.Startup(map[string]internal.RoomMetadata{
room.RoomID: room,
})
dispatcher.Startup(map[string][]string{
roomID: {userID},
})
}
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) {
roomMetadata := make(map[string]*internal.RoomMetadata)
joinTimings = make(map[string]internal.EventMetadata)
for i, r := range rooms {
roomMetadata[r.RoomID] = rooms[i]
joinTimings[r.RoomID] = internal.EventMetadata{
NID: 123456, // Dummy values
Timestamp: 123456,
}
}
return 1, roomMetadata, joinTimings, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}, &joinChecker{})
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
// request first page
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 2},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: [2]int64{0, 2},
RoomIDs: []string{roomIDs[0], roomIDs[1], roomIDs[2]},
},
},
},
},
})
// add on a different non-overlapping range
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 2}, {4, 6},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: [2]int64{4, 6},
RoomIDs: []string{roomIDs[4], roomIDs[5], roomIDs[6]},
},
},
},
},
})
// pull room 8 to position 0 should result in DELETE[6] and INSERT[0]
// 0,1,2,3,4,5,6,7,8,9
// `----` `----`
// ` ` ` `
// 8,0,1,2,3,4,5,6,7,9
//
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, testutils.WithTimestamp(timestampNow.Time().Add(2*time.Second)))
dispatcher.OnNewEvent(context.Background(), roomIDs[8], newEvent, 1)
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 2}, {4, 6},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
RoomID: roomIDs[8],
},
},
},
},
})
// pull room 9 to position 3 should result in DELETE[6] and INSERT[4] with room 2
// 0,1,2,3,4,5,6,7,8,9 index
// 8,0,1,2,3,4,5,6,7,9 room
// `----` `----`
// ` ` ` `
// 8,0,1,9,2,3,4,5,6,7 room
middleTimestamp := int64((roomIDToRoom[roomIDs[1]].LastMessageTimestamp + roomIDToRoom[roomIDs[2]].LastMessageTimestamp) / 2)
newEvent = testutils.NewEvent(t, "unimportant", "me", struct{}{}, testutils.WithTimestamp(spec.Timestamp(middleTimestamp).Time()))
dispatcher.OnNewEvent(context.Background(), roomIDs[9], newEvent, 1)
t.Logf("new event %s : %s", roomIDs[9], string(newEvent))
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 2}, {4, 6},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(4),
RoomID: roomIDs[2],
},
},
},
},
})
}
// Regression test
func TestBumpToOutsideRange(t *testing.T) {
ConnID := sync3.ConnID{
DeviceID: "d",
}
userID := "@TestBumpToOutsideRange_alice:localhost"
deviceID := "yep"
timestampNow := spec.Timestamp(1632131678061)
roomA := newRoomMetadata("!a:localhost", timestampNow)
roomB := newRoomMetadata("!b:localhost", timestampNow-1000)
roomC := newRoomMetadata("!c:localhost", timestampNow-2000)
roomD := newRoomMetadata("!d:localhost", timestampNow-3000)
globalCache := caches.NewGlobalCache(nil)
globalCache.Startup(map[string]internal.RoomMetadata{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
roomD.RoomID: roomD,
})
dispatcher := sync3.NewDispatcher()
dispatcher.Startup(map[string][]string{
roomA.RoomID: {userID},
roomB.RoomID: {userID},
roomC.RoomID: {userID},
roomD.RoomID: {userID},
})
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) {
return 1, map[string]*internal.RoomMetadata{
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
roomD.RoomID: &roomD,
}, map[string]internal.EventMetadata{
roomA.RoomID: {NID: 1, Timestamp: 1},
roomB.RoomID: {NID: 2, Timestamp: 2},
roomC.RoomID: {NID: 3, Timestamp: 3},
roomD.RoomID: {NID: 4, Timestamp: 4},
}, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}, &joinChecker{})
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
// Ask for A,B
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 1},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: 4,
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: [2]int64{0, 1},
RoomIDs: []string{
roomA.RoomID, roomB.RoomID,
},
},
},
},
},
})
// D gets bumped to C's position but it's still outside the range so nothing should happen
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, testutils.WithTimestamp(spec.Timestamp(roomC.LastMessageTimestamp+2).Time()))
dispatcher.OnNewEvent(context.Background(), roomD.RoomID, newEvent, 1)
// expire the context after 10ms so we don't wait forevar
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
res, err = cs.OnIncomingRequest(ctx, ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 1},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
if len(res.Lists["a"].Ops) > 0 {
t.Errorf("response returned ops, expected none")
}
}
// Test that room subscriptions can be made and that events are pushed for them.
func TestConnStateRoomSubscriptions(t *testing.T) {
ConnID := sync3.ConnID{
DeviceID: "d",
}
userID := "@TestConnStateRoomSubscriptions_alice:localhost"
deviceID := "yep"
timestampNow := spec.Timestamp(1632131678061)
roomA := newRoomMetadata("!a:localhost", timestampNow)
roomB := newRoomMetadata("!b:localhost", spec.Timestamp(timestampNow-1000))
roomC := newRoomMetadata("!c:localhost", spec.Timestamp(timestampNow-2000))
roomD := newRoomMetadata("!d:localhost", spec.Timestamp(timestampNow-3000))
roomIDs := []string{roomA.RoomID, roomB.RoomID, roomC.RoomID, roomD.RoomID}
globalCache := caches.NewGlobalCache(nil)
globalCache.Startup(map[string]internal.RoomMetadata{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
roomD.RoomID: roomD,
})
dispatcher := sync3.NewDispatcher()
dispatcher.Startup(map[string][]string{
roomA.RoomID: {userID},
roomB.RoomID: {userID},
roomC.RoomID: {userID},
roomD.RoomID: {userID},
})
timeline := map[string]json.RawMessage{
roomA.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "a"}),
roomB.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "b"}),
roomC.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "c"}),
roomD.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "d"}),
}
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinTimings map[string]internal.EventMetadata, loadPositions map[string]int64, err error) {
return 1, map[string]*internal.RoomMetadata{
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
roomD.RoomID: &roomD,
}, map[string]internal.EventMetadata{
roomA.RoomID: {NID: 1, Timestamp: 1},
roomB.RoomID: {NID: 2, Timestamp: 2},
roomC.RoomID: {NID: 3, Timestamp: 3},
roomD.RoomID: {NID: 4, Timestamp: 4},
}, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, &NopUserCacheStore{}, &NopTransactionFetcher{}, &joinChecker{})
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{timeline[roomID]},
}
}
return result
}
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
// subscribe to room D
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomD.RoomID: {
TimelineLimit: 20,
},
},
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 1},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: len(roomIDs),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: [2]int64{0, 1},
RoomIDs: []string{
roomA.RoomID, roomB.RoomID,
},
},
},
},
},
Rooms: map[string]sync3.Room{
roomA.RoomID: {
Name: roomA.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomA.RoomID],
},
},
roomB.RoomID: {
Name: roomB.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomB.RoomID],
},
},
roomD.RoomID: {
Name: roomD.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomD.RoomID],
},
},
},
})
// room D gets a new event but it's so old it doesn't bump to the top of the list
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, testutils.WithTimestamp(spec.Timestamp(timestampNow-20000).Time()))
dispatcher.OnNewEvent(context.Background(), roomD.RoomID, newEvent, 1)
// we should get this message even though it's not in the range because we are subscribed to this room.
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 1},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: len(roomIDs),
},
},
Rooms: map[string]sync3.Room{
roomD.RoomID: {
Timeline: []json.RawMessage{
newEvent,
},
},
},
// TODO: index markers as this new event should bump D into the tracked range
})
// now swap to room C
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomC.RoomID: {
TimelineLimit: 20,
},
},
UnsubscribeRooms: []string{roomD.RoomID},
Lists: map[string]sync3.RequestList{"a": {
Sort: []string{sync3.SortByRecency},
Ranges: sync3.SliceRanges([][2]int64{
{0, 1},
}),
}},
}, false, time.Now())
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Lists: map[string]sync3.ResponseList{
"a": {
Count: len(roomIDs),
},
},
Rooms: map[string]sync3.Room{
roomC.RoomID: {
Name: roomC.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomC.RoomID],
},
},
},
})
}
func checkResponse(t *testing.T, checkRoomIDsOnly bool, got, want *sync3.Response) {
t.Helper()
if len(got.Lists) != len(want.Lists) {
t.Errorf("got %v lists, want %v", len(got.Lists), len(want.Lists))
}
for listKey, wl := range want.Lists {
gl, exists := got.Lists[listKey]
if !exists {
t.Errorf("no response key for '%s'", listKey)
continue
}
if wl.Count > 0 && gl.Count != wl.Count {
t.Errorf("response list %v got count %d want %d", listKey, gl.Count, wl.Count)
}
if len(wl.Ops) > 0 {
t.Logf("got %v", serialise(t, gl))
t.Logf("want %v", serialise(t, wl))
t.Logf("DEBUG %v", serialise(t, got))
defer func() {
t.Helper()
if !t.Failed() {
t.Logf("OK!")
}
}()
if len(gl.Ops) != len(wl.Ops) {
t.Fatalf("got %d ops, want %d", len(gl.Ops), len(wl.Ops))
}
for i, wantOpVal := range wl.Ops {
gotOp := gl.Ops[i]
if gotOp.Op() != wantOpVal.Op() {
t.Errorf("operation i=%d got '%s' want '%s'", i, gotOp.Op(), wantOpVal.Op())
}
switch wantOp := wantOpVal.(type) {
case *sync3.ResponseOpRange:
gotOpRange, ok := gotOp.(*sync3.ResponseOpRange)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpRange but it isn't", i, gotOp.Op())
}
if !reflect.DeepEqual(gotOpRange.Range, wantOp.Range) {
t.Errorf("operation i=%d (%s) got range %v want range %v", i, gotOp.Op(), gotOpRange.Range, wantOp.Range)
}
if len(gotOpRange.RoomIDs) != len(wantOp.RoomIDs) {
t.Fatalf("operation i=%d (%s) got %d rooms in array, want %d", i, gotOp.Op(), len(gotOpRange.RoomIDs), len(wantOp.RoomIDs))
}
for j := range wantOp.RoomIDs {
checkRoomIDsEqual(t, gotOpRange.RoomIDs[j], wantOp.RoomIDs[j])
}
case *sync3.ResponseOpSingle:
gotOpSingle, ok := gotOp.(*sync3.ResponseOpSingle)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpSingle but it isn't", i, gotOp.Op())
}
if *gotOpSingle.Index != *wantOp.Index {
t.Errorf("operation i=%d (%s) single op on index %d want index %d", i, gotOp.Op(), *gotOpSingle.Index, *wantOp.Index)
}
checkRoomIDsEqual(t, gotOpSingle.RoomID, wantOp.RoomID)
}
}
}
}
if len(want.Rooms) > 0 {
if len(want.Rooms) != len(got.Rooms) {
t.Errorf("wrong number of room subs returned, got %d want %d", len(got.Rooms), len(want.Rooms))
}
for wantRoomID := range want.Rooms {
if _, ok := got.Rooms[wantRoomID]; !ok {
t.Errorf("wanted room %s in 'rooms' but it wasn't there", wantRoomID)
continue
}
wantTimeline := want.Rooms[wantRoomID].Timeline
gotTimeline := got.Rooms[wantRoomID].Timeline
if len(wantTimeline) > 0 {
if !reflect.DeepEqual(gotTimeline, wantTimeline) {
t.Errorf("timeline mismatch for room %s:\ngot %v\nwant %v", wantRoomID, serialise(t, gotTimeline), serialise(t, wantTimeline))
}
}
if want.Rooms[wantRoomID].Initial != got.Rooms[wantRoomID].Initial {
t.Errorf("'initial' flag mismatch on room %s: got %v want %v", wantRoomID, got.Rooms[wantRoomID].Initial, want.Rooms[wantRoomID].Initial)
}
}
// TODO: check inside room objects
}
}
func checkRoomIDsEqual(t *testing.T, got, want string) {
t.Helper()
if got != want {
t.Fatalf("got room '%s' want room '%s'", got, want)
}
}
func serialise(t *testing.T, thing interface{}) string {
b, err := json.Marshal(thing)
if err != nil {
t.Fatalf("cannot serialise: %s", err)
}
return string(b)
}
func intPtr(val int) *int {
return &val
}