2/? : Refactor API shape to be closer to the current MSC

Specifically:
 - Remove top-level `ops`, and replace with `lists`.
 - Remove list indexes from `ops`, and rely on contextual location information.
 - Remove top-level `counts` and instead embed them into each list contextually.
 - Refactor connstate to reflect new API shape.

Still to do:
 - Remove `rooms` / `room` from the op response, and bundle it into the
   top-level `rooms`.
 - Remove `UPDATE` op.
 - Add `room_id` / `room_ids` field to ops to let clients know which rooms each op relates to.
This commit is contained in:
Kegan Dougal 2022-05-25 11:36:30 +01:00
parent 735f638a11
commit a82615978b
17 changed files with 628 additions and 540 deletions

View File

@ -74,7 +74,7 @@ func TestMultipleConnsAtStartup(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Ops(
MatchResponse(t, res, MatchV3Ops(0,
MatchV3SyncOpWithMatchers(MatchRoomRange(
[]roomMatcher{
MatchRoomID(roomID),
@ -134,7 +134,7 @@ func TestOutstandingRequestsGetCancelled(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(0,
MatchV3SyncOpWithMatchers(
MatchRoomRange(
[]roomMatcher{MatchRoomID(roomA)},
@ -160,8 +160,8 @@ func TestOutstandingRequestsGetCancelled(t *testing.T) {
},
}},
})
MatchResponse(t, res2, MatchV3Count(2), MatchV3Ops(
MatchV3InvalidateOp(0, 0, 1),
MatchResponse(t, res2, MatchV3Count(2), MatchV3Ops(0,
MatchV3InvalidateOp(0, 1),
MatchV3SyncOpWithMatchers(
MatchRoomRange(
[]roomMatcher{MatchRoomID(roomB)},
@ -185,7 +185,7 @@ func TestOutstandingRequestsGetCancelled(t *testing.T) {
if time.Since(startTime) > time.Second {
t.Errorf("took >1s to process request which should have been interrupted before timing out, took %v", time.Since(startTime))
}
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops())
MatchResponse(t, res, MatchV3Count(2), MatchNoV3Ops())
}
// Regression test to ensure that ?timeout= isn't reset when live events come in.
@ -232,7 +232,7 @@ func TestConnectionTimeoutNotReset(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(0,
MatchV3SyncOpWithMatchers(
MatchRoomRange(
[]roomMatcher{
@ -282,6 +282,6 @@ func TestConnectionTimeoutNotReset(t *testing.T) {
if dur > (1500 * time.Millisecond) { // 0.5s leeway
t.Fatalf("request took %v to complete, expected ~1s", dur)
}
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops())
MatchResponse(t, res, MatchV3Count(2), MatchNoV3Ops())
}

View File

@ -71,26 +71,22 @@ func TestFiltersEncryption(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Counts([]int{1, 1}), MatchV3Ops(
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != 1 {
return fmt.Errorf("want %d rooms, got %d", 1, len(op.Rooms))
}
if op.List != 0 {
return fmt.Errorf("unknown list: %d", op.List)
}
return allRooms[0].MatchRoom(op.Rooms[0]) // encrypted room
}),
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != 1 {
return fmt.Errorf("want %d rooms, got %d", 1, len(op.Rooms))
}
if op.List != 1 {
return fmt.Errorf("unknown list: %d", op.List)
}
return allRooms[1].MatchRoom(op.Rooms[0]) // encrypted room
}),
))
MatchResponse(t, res, MatchV3Counts([]int{1, 1}),
MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != 1 {
return fmt.Errorf("want %d rooms, got %d", 1, len(op.Rooms))
}
return allRooms[0].MatchRoom(op.Rooms[0]) // encrypted room
})),
MatchV3Ops(1,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != 1 {
return fmt.Errorf("want %d rooms, got %d", 1, len(op.Rooms))
}
return allRooms[1].MatchRoom(op.Rooms[0]) // encrypted room
}),
))
// change the unencrypted room into an encrypted room
v2.queueResponse(alice, sync2.SyncResponse{
@ -131,11 +127,10 @@ func TestFiltersEncryption(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Counts([]int{len(allRooms), 0}), MatchV3Ops(
MatchV3DeleteOp(1, 0),
MatchV3DeleteOp(0, 1),
MatchV3InsertOp(0, 0, unencryptedRoomID),
))
MatchResponse(t, res, MatchV3Counts([]int{len(allRooms), 0}),
MatchV3Ops(1, MatchV3DeleteOp(0)),
MatchV3Ops(0, MatchV3DeleteOp(1), MatchV3InsertOp(0, unencryptedRoomID)),
)
// requesting the encrypted list from scratch returns 2 rooms now
res = v3.mustDoV3Request(t, aliceToken, sync3.Request{
@ -148,7 +143,7 @@ func TestFiltersEncryption(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != len(allRooms) {
return fmt.Errorf("want %d rooms, got %d", len(allRooms), len(op.Rooms))
@ -221,14 +216,11 @@ func TestFiltersInvite(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Counts([]int{1, 0}), MatchV3Ops(
MatchResponse(t, res, MatchV3Counts([]int{1, 0}), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != 1 {
return fmt.Errorf("want %d rooms, got %d", 1, len(op.Rooms))
}
if op.List != 0 {
return fmt.Errorf("unknown list: %d", op.List)
}
if op.Rooms[0].RoomID != roomID {
return fmt.Errorf("unknown invite room: %s", op.Rooms[0].RoomID)
}

4
go.mod
View File

@ -5,7 +5,7 @@ go 1.14
require (
github.com/ReneKroon/ttlcache/v2 v2.8.1
github.com/gorilla/mux v1.8.0
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/jmoiron/sqlx v1.3.3
github.com/lib/pq v1.10.1
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 // indirect
@ -14,7 +14,7 @@ require (
github.com/rs/zerolog v1.21.0
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/tidwall/gjson v1.10.2
github.com/tidwall/sjson v1.2.3 // indirect
github.com/tidwall/sjson v1.2.3
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect
)

11
go.sum
View File

@ -20,7 +20,6 @@ github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG
github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk=
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
@ -31,13 +30,11 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.1 h1:6VXZrLU0jHBYyAqrSPa+MgPfnSvTPuMgK+k0o5kVFWo=
github.com/lib/pq v1.10.1/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bhrnp3Ky1qgx/fzCtCALOoGYylh2tpS9K4=
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211026114500-ddecab880266 h1:hco4OPKD9L+jShgld3e1vA1ekoAemdM1ur215f9PddQ=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211026114500-ddecab880266/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
@ -54,18 +51,15 @@ github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.21.0 h1:Q3vdXlfLNT+OftyBHsU0Y445MD+8m8axjKgf2si0QcM=
github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tidwall/gjson v1.9.3 h1:hqzS9wAHMO+KVBBkLxYdkEeeFHuqr95GfClRLKlgK0E=
github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.10.2 h1:APbLGOM0rrEkd8WBw9C24nllro4ajFuJu0Sc9hRz8Bo=
github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
@ -73,7 +67,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.0.3 h1:DeF+0LZqvIt4fKYw41aPB29ZGlvwVkHKktoXJ1YW9Y8=
github.com/tidwall/sjson v1.0.3/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7VpF15Y=
github.com/tidwall/sjson v1.2.3 h1:5+deguEhHSEjmuICXZ21uSSsXotWMA0orU783+Z7Cp8=
github.com/tidwall/sjson v1.2.3/go.mod h1:5WdjKx3AQMvCJ4RG6/2UYT7dLrGvJUV1x4jdTAyGvZs=
@ -85,7 +78,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
@ -100,13 +92,11 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -119,7 +109,6 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik=

View File

@ -85,27 +85,19 @@ func TestMultipleLists(t *testing.T) {
},
})
seen := map[int]bool{}
opMatch := func(op *sync3.ResponseOpRange) error {
seen[op.List] = true
if op.List == 0 { // first 3 encrypted rooms
MatchResponse(t, res,
MatchV3Counts([]int{len(encryptedRooms), len(unencryptedRooms)}),
MatchV3Ops(0, MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
// first 3 encrypted rooms
return checkRoomList(op, encryptedRooms[:3])
} else if op.List == 1 { // first 3 unencrypted rooms
})),
MatchV3Ops(1, MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
// first 3 unencrypted rooms
return checkRoomList(op, unencryptedRooms[:3])
}
return fmt.Errorf("unknown List: %d", op.List)
}
MatchResponse(t, res, MatchV3Counts([]int{len(encryptedRooms), len(unencryptedRooms)}), MatchV3Ops(
MatchV3SyncOp(opMatch), MatchV3SyncOp(opMatch),
))
if !seen[0] || !seen[1] {
t.Fatalf("didn't see both list 0 and 1: %+v", res)
}
})),
)
// now scroll one of the lists
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{
Lists: []sync3.RequestList{
{
@ -121,11 +113,8 @@ func TestMultipleLists(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Counts([]int{len(encryptedRooms), len(unencryptedRooms)}), MatchV3Ops(
MatchResponse(t, res, MatchV3Counts([]int{len(encryptedRooms), len(unencryptedRooms)}), MatchV3Ops(1,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if op.List != 1 {
return fmt.Errorf("expected unencrypted list to be SYNCed but wasn't, got list %d want %d", op.List, 1)
}
return checkRoomList(op, unencryptedRooms[3:6])
}),
))
@ -173,9 +162,9 @@ func TestMultipleLists(t *testing.T) {
// We are tracking the first few encrypted rooms so we expect list 0 to update
// However we do not track old unencrypted rooms so we expect no change in list 1
// TODO: We always assume operations are done sequentially starting at list 0, is this safe?
MatchResponse(t, res, MatchV3Counts([]int{len(encryptedRooms), len(unencryptedRooms)}), MatchV3Ops(
MatchV3DeleteOp(0, 2),
MatchV3InsertOp(0, 0, encryptedRooms[0].roomID),
MatchResponse(t, res, MatchV3Counts([]int{len(encryptedRooms), len(unencryptedRooms)}), MatchV3Ops(0,
MatchV3DeleteOp(2),
MatchV3InsertOp(0, encryptedRooms[0].roomID),
))
}
@ -262,20 +251,17 @@ func TestMultipleListsDMUpdate(t *testing.T) {
},
})
seen := map[int]bool{}
opMatch := func(op *sync3.ResponseOpRange) error {
seen[op.List] = true
if op.List == 0 { // first 3 DM rooms
MatchResponse(t, res,
MatchV3Counts([]int{len(dmRooms), len(groupRooms)}),
MatchV3Ops(0, MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
// first 3 DM rooms
return checkRoomList(op, dmRooms[:3])
} else if op.List == 1 { // first 3 group rooms
})),
MatchV3Ops(1, MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
// first 3 group rooms
return checkRoomList(op, groupRooms[:3])
}
return fmt.Errorf("unknown List: %d", op.List)
}
MatchResponse(t, res, MatchV3Counts([]int{len(dmRooms), len(groupRooms)}), MatchV3Ops(
MatchV3SyncOp(opMatch), MatchV3SyncOp(opMatch),
))
})),
)
// now bring the last DM room to the top with a notif
pingMessage := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "ping"})
@ -315,10 +301,12 @@ func TestMultipleListsDMUpdate(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Counts([]int{len(dmRooms), len(groupRooms)}), MatchV3Ops(
MatchV3DeleteOp(0, 2),
MatchV3InsertOp(0, 0, dmRooms[0].roomID, MatchRoomHighlightCount(1), MatchRoomTimelineMostRecent(1, dmRooms[0].events)),
))
MatchResponse(t, res, MatchV3Counts([]int{len(dmRooms), len(groupRooms)}),
MatchV3Ops(
0, MatchV3DeleteOp(2),
MatchV3InsertOp(0, dmRooms[0].roomID, MatchRoomHighlightCount(1), MatchRoomTimelineMostRecent(1, dmRooms[0].events)),
),
)
}
// Test that a new list can be added mid-connection
@ -367,11 +355,8 @@ func TestNewListMidConnection(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Counts([]int{len(allRooms)}), MatchV3Ops(
MatchResponse(t, res, MatchV3Counts([]int{len(allRooms)}), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if op.List != 0 {
return fmt.Errorf("got list %d want %d", op.List, 0)
}
return checkRoomList(op, allRooms[0:3])
}),
))

View File

@ -103,7 +103,7 @@ func TestRoomStateTransitions(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(0,
MatchV3SyncOpWithMatchers(
MatchRoomRange([]roomMatcher{
MatchRoomID(allRoomsAlicePerspective[indexBobInvited].roomID),
@ -137,9 +137,9 @@ func TestRoomStateTransitions(t *testing.T) {
},
},
})
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(2), MatchV3Ops(0,
MatchV3UpdateOp(
0, 0, allRoomsAlicePerspective[indexBobInvited].roomID, MatchRoomRequiredState([]json.RawMessage{
0, allRoomsAlicePerspective[indexBobInvited].roomID, MatchRoomRequiredState([]json.RawMessage{
allRoomsAlicePerspective[indexBobInvited].events[0], // create event
}),
MatchRoomTimelineMostRecent(1, []json.RawMessage{

View File

@ -65,7 +65,7 @@ func TestNotificationsOnTop(t *testing.T) {
}},
}
res := v3.mustDoV3Request(t, aliceToken, syncRequestBody)
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != len(allRooms) {
return fmt.Errorf("want %d rooms, got %d", len(allRooms), len(op.Rooms))
@ -102,10 +102,9 @@ func TestNotificationsOnTop(t *testing.T) {
})
v2.waitUntilEmpty(t, alice)
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, syncRequestBody)
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchV3DeleteOp(0, 1),
MatchV3InsertOp(0, 0, bingRoomID),
))
MatchResponse(t, res, MatchV3Count(len(allRooms)),
MatchV3Ops(0, MatchV3DeleteOp(1), MatchV3InsertOp(0, bingRoomID)),
)
// send a message into the nobing room, it's position must not change due to our sort order
noBingEvent := testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{"body": "no bing"}, testutils.WithTimestamp(latestTimestamp.Add(2*time.Minute)))
@ -124,9 +123,9 @@ func TestNotificationsOnTop(t *testing.T) {
})
v2.waitUntilEmpty(t, alice)
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, syncRequestBody)
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchV3UpdateOp(0, 1, noBingRoomID),
))
MatchResponse(t, res, MatchV3Count(len(allRooms)),
MatchV3Ops(0, MatchV3UpdateOp(1, noBingRoomID)),
)
// restart the server and sync from fresh again, it should still have the bing room on top
v3.restart(t, v2, pqString)
@ -142,7 +141,7 @@ func TestNotificationsOnTop(t *testing.T) {
Sort: []string{sync3.SortByHighlightCount, sync3.SortByNotificationCount, sync3.SortByRecency},
}},
})
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != len(allRooms) {
return fmt.Errorf("want %d rooms, got %d", len(allRooms), len(op.Rooms))

View File

@ -79,7 +79,7 @@ func TestRoomNames(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != len(allRooms) {
return fmt.Errorf("want %d rooms, got %d", len(allRooms), len(op.Rooms))
@ -124,7 +124,7 @@ func TestRoomNames(t *testing.T) {
MatchRoomID(wantRooms[i].roomID),
}
}
MatchResponse(t, res, MatchV3Count(len(wantRooms)), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(len(wantRooms)), MatchV3Ops(0,
MatchV3SyncOpWithMatchers(MatchRoomRange(matchers...)),
))
}

View File

@ -71,7 +71,7 @@ func TestSecurityLiveStreamEventLeftLeak(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(0, MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != 1 {
return fmt.Errorf("range missing room: got %d want 1", len(op.Rooms))
}
@ -134,8 +134,8 @@ func TestSecurityLiveStreamEventLeftLeak(t *testing.T) {
}},
})
// TODO: We include left counts mid-sync so clients can see the user has left/been kicked. Should be configurable.
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(
MatchV3UpdateOp(0, 0, roomID, MatchRoomName(""), MatchRoomRequiredState(nil), MatchRoomTimelineMostRecent(1, []json.RawMessage{kickEvent})),
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(0,
MatchV3UpdateOp(0, roomID, MatchRoomName(""), MatchRoomRequiredState(nil), MatchRoomTimelineMostRecent(1, []json.RawMessage{kickEvent})),
))
// Ensure Alice does see both events
@ -153,12 +153,12 @@ func TestSecurityLiveStreamEventLeftLeak(t *testing.T) {
})
// TODO: We should consolidate 2x UPDATEs into 1x if we get scenarios like this
// TODO: WE should be returning updated values for name and required_state
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(0,
MatchV3UpdateOp(
0, 0, roomID, MatchRoomTimelineMostRecent(1, []json.RawMessage{kickEvent}),
0, roomID, MatchRoomTimelineMostRecent(1, []json.RawMessage{kickEvent}),
),
MatchV3UpdateOp(
0, 0, roomID, MatchRoomTimelineMostRecent(1, []json.RawMessage{sensitiveEvent}),
0, roomID, MatchRoomTimelineMostRecent(1, []json.RawMessage{sensitiveEvent}),
),
))
}
@ -225,7 +225,7 @@ func TestSecurityRoomSubscriptionLeak(t *testing.T) {
},
})
// Assert that Eve doesn't see anything
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(0,
MatchV3SyncOpWithMatchers(
MatchRoomRange([]roomMatcher{
MatchRoomID(unrelatedRoomID),
@ -264,5 +264,5 @@ func TestSecurityRoomSubscriptionLeak(t *testing.T) {
},
})
// Assert that Eve doesn't see anything
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(), MatchRoomSubscriptions(true, map[string][]roomMatcher{}))
MatchResponse(t, res, MatchV3Count(1), MatchNoV3Ops(), MatchRoomSubscriptions(true, map[string][]roomMatcher{}))
}

View File

@ -3,7 +3,6 @@ package sync3
import (
"context"
"errors"
"reflect"
"strconv"
"sync"
"testing"
@ -35,7 +34,11 @@ func TestConn(t *testing.T) {
c := NewConn(connID, &connHandlerMock{func(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error) {
count += 1
return &Response{
Counts: []int{count},
Lists: []ResponseList{
{
Count: count,
},
},
}, nil
}})
@ -45,14 +48,14 @@ func TestConn(t *testing.T) {
})
assertNoError(t, err)
assertPos(t, resp.Pos, 1)
assertInts(t, resp.Counts, []int{101})
assertInt(t, resp.Lists[0].Count, 101)
// happy case, pos=1
resp, err = c.OnIncomingRequest(ctx, &Request{
pos: 1,
})
assertPos(t, resp.Pos, 2)
assertInts(t, resp.Counts, []int{102})
assertInt(t, resp.Lists[0].Count, 102)
assertNoError(t, err)
// bogus position returns a 400
_, err = c.OnIncomingRequest(ctx, &Request{
@ -129,25 +132,29 @@ func TestConnRetries(t *testing.T) {
connID := ConnID{
DeviceID: "d",
}
callCount := int64(0)
callCount := 0
c := NewConn(connID, &connHandlerMock{func(ctx context.Context, cid ConnID, req *Request, init bool) (*Response, error) {
callCount += 1
return &Response{Counts: []int{20}}, nil
return &Response{Lists: []ResponseList{
{
Count: 20,
},
}}, nil
}})
resp, err := c.OnIncomingRequest(ctx, &Request{})
assertPos(t, resp.Pos, 1)
assertInts(t, resp.Counts, []int{20})
assertInt(t, resp.Lists[0].Count, 20)
assertInt(t, callCount, 1)
assertNoError(t, err)
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1})
assertPos(t, resp.Pos, 2)
assertInts(t, resp.Counts, []int{20})
assertInt(t, resp.Lists[0].Count, 20)
assertInt(t, callCount, 2)
assertNoError(t, err)
// retry! Shouldn't invoke handler again
resp, err = c.OnIncomingRequest(ctx, &Request{pos: 1})
assertPos(t, resp.Pos, 2)
assertInts(t, resp.Counts, []int{20})
assertInt(t, resp.Lists[0].Count, 20)
assertInt(t, callCount, 2) // this doesn't increment
assertNoError(t, err)
// retry! but with modified request body, so should invoke handler again
@ -158,7 +165,7 @@ func TestConnRetries(t *testing.T) {
},
}})
assertPos(t, resp.Pos, 3)
assertInts(t, resp.Counts, []int{20})
assertInt(t, resp.Lists[0].Count, 20)
assertInt(t, callCount, 3)
assertNoError(t, err)
}
@ -225,27 +232,20 @@ func TestConnErrorsNoCache(t *testing.T) {
}
}
func assertPos(t *testing.T, pos string, wantPos int64) {
func assertPos(t *testing.T, pos string, wantPos int) {
t.Helper()
gotPos, err := strconv.Atoi(pos)
if err != nil {
t.Errorf("pos isn't an int: %s", err)
return
}
assertInt(t, int64(gotPos), wantPos)
assertInt(t, int(gotPos), wantPos)
}
func assertInt(t *testing.T, nextPos, wantPos int64) {
func assertInt(t *testing.T, got, want int) {
t.Helper()
if nextPos != wantPos {
t.Errorf("got %d pos %d", nextPos, wantPos)
}
}
func assertInts(t *testing.T, gots, wants []int) {
t.Helper()
if !reflect.DeepEqual(gots, wants) {
t.Errorf("got %v want %v", gots, wants)
if got != want {
t.Errorf("got %d want %d", got, want)
}
}

View File

@ -167,8 +167,8 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
// start forming the response, handle subscriptions
response := &sync3.Response{
Rooms: s.updateRoomSubscriptions(ctx, int(sync3.DefaultTimelineLimit), newSubs, newUnsubs),
Lists: make([]sync3.ResponseList, len(s.muxedReq.Lists)),
}
responseOperations := []sync3.ResponseOp{} // empty not nil slice
// loop each list and handle each independently
for i := range s.muxedReq.Lists {
@ -176,11 +176,10 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
if prevReq != nil && i < len(prevReq.Lists) {
prevList = &prevReq.Lists[i]
}
ops := s.onIncomingListRequest(ctx, i, prevList, &s.muxedReq.Lists[i])
responseOperations = append(responseOperations, ops...)
response.Lists[i] = s.onIncomingListRequest(ctx, i, prevList, &s.muxedReq.Lists[i])
}
includedRoomIDs := sync3.IncludedRoomIDsInOps(responseOperations)
includedRoomIDs := sync3.IncludedRoomIDsInOps(response.Lists)
for _, roomID := range newSubs { // include room subs in addition to lists
includedRoomIDs[roomID] = struct{}{}
}
@ -192,32 +191,19 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
// do live tracking if we have nothing to tell the client yet
region = trace.StartRegion(ctx, "liveUpdate")
responseOperations = s.live.liveUpdate(ctx, req, ex, isInitial, response, responseOperations)
s.live.liveUpdate(ctx, req, ex, isInitial, response)
region.End()
response.Ops = responseOperations
response.Counts = s.lists.Counts() // counts are AFTER events are applied
// counts are AFTER events are applied
for i := range response.Lists {
response.Lists[i].Count = s.lists.Count(i)
}
return response, nil
}
func (s *ConnState) writeDeleteOp(listIndex, deletedIndex int) sync3.ResponseOp {
// update operations return -1 if nothing gets deleted
if deletedIndex < 0 {
return nil
}
// only notify if we are tracking this index
if !s.muxedReq.Lists[listIndex].Ranges.Inside(int64(deletedIndex)) {
return nil
}
return &sync3.ResponseOpSingle{
List: listIndex,
Operation: sync3.OpDelete,
Index: &deletedIndex,
}
}
func (s *ConnState) onIncomingListRequest(ctx context.Context, listIndex int, prevReqList, nextReqList *sync3.RequestList) []sync3.ResponseOp {
// TODO, response list + room data
func (s *ConnState) onIncomingListRequest(ctx context.Context, listIndex int, prevReqList, nextReqList *sync3.RequestList) sync3.ResponseList {
defer trace.StartRegion(ctx, "onIncomingListRequest").End()
if !s.lists.ListExists(listIndex) {
s.setInitialList(listIndex, *nextReqList)
@ -254,7 +240,6 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, listIndex int, pr
logger.Trace().Interface("range", prevRange).Msg("INVALIDATEing because sort/filter ops have changed")
for _, r := range prevRange {
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
List: listIndex,
Operation: sync3.OpInvalidate,
Range: r[:],
})
@ -278,7 +263,6 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, listIndex int, pr
}
for _, r := range removedRanges {
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
List: listIndex,
Operation: sync3.OpInvalidate,
Range: r[:],
})
@ -294,14 +278,16 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, listIndex int, pr
roomIDs := sortableRooms.RoomIDs()
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
List: listIndex,
Operation: sync3.OpSync,
Range: r[:],
Rooms: s.getInitialRoomData(ctx, listIndex, int(nextReqList.TimelineLimit), roomIDs...),
})
}
return responseOperations
return sync3.ResponseList{
Ops: responseOperations,
// count will be filled in later
}
}
func (s *ConnState) updateRoomSubscriptions(ctx context.Context, timelineLimit int, subs, unsubs []string) map[string]sync3.Room {

View File

@ -50,10 +50,11 @@ func (s *connStateLive) onUpdate(up caches.Update) {
}
}
// live update waits for new data and populates the response given when new data arrives.
func (s *connStateLive) liveUpdate(
ctx context.Context, req *sync3.Request, ex extensions.Request, isInitial bool,
response *sync3.Response, responseOperations []sync3.ResponseOp,
) []sync3.ResponseOp {
response *sync3.Response,
) {
// we need to ensure that we keep consuming from the updates channel, even if they want a response
// immediately. If we have new list data we won't wait, but if we don't then we need to be able to
// catch-up to the current head position, hence giving 100ms grace period for processing.
@ -62,81 +63,89 @@ func (s *connStateLive) liveUpdate(
}
// block until we get a new event, with appropriate timeout
startTime := time.Now()
for len(responseOperations) == 0 && len(response.Rooms) == 0 && !response.Extensions.HasData(isInitial) {
for response.ListOps() == 0 && len(response.Rooms) == 0 && !response.Extensions.HasData(isInitial) {
timeToWait := time.Duration(req.TimeoutMSecs()) * time.Millisecond
timeWaited := time.Since(startTime)
timeLeftToWait := timeToWait - timeWaited
if timeLeftToWait < 0 {
logger.Trace().Str("user", s.userID).Str("time_waited", timeWaited.String()).Msg("liveUpdate: timed out")
return responseOperations
return
}
logger.Trace().Str("user", s.userID).Str("dur", timeLeftToWait.String()).Msg("liveUpdate: no response data yet; blocking")
select {
case <-ctx.Done(): // client has given up
logger.Trace().Str("user", s.userID).Msg("liveUpdate: client gave up")
trace.Logf(ctx, "liveUpdate", "context cancelled")
return responseOperations
return
case <-time.After(timeLeftToWait): // we've timed out
logger.Trace().Str("user", s.userID).Msg("liveUpdate: timed out")
trace.Logf(ctx, "liveUpdate", "timed out after %v", timeLeftToWait)
return responseOperations
return
case update := <-s.updates:
trace.Logf(ctx, "liveUpdate", "process live update")
responseOperations = s.processLiveUpdate(ctx, update, responseOperations, response)
updateWillReturnResponse := len(responseOperations) > 0 || len(response.Rooms) > 0
updateWillReturnResponse := s.processLiveUpdate(ctx, update, response)
// pass event to extensions AFTER processing
s.extensionsHandler.HandleLiveUpdate(update, ex, &response.Extensions, updateWillReturnResponse, isInitial)
// if there's more updates and we don't have lots stacked up already, go ahead and process another
for len(s.updates) > 0 && len(responseOperations) < 50 {
for len(s.updates) > 0 && response.ListOps() < 50 {
update = <-s.updates
responseOperations = s.processLiveUpdate(ctx, update, responseOperations, response)
willReturn := s.processLiveUpdate(ctx, update, response)
if willReturn {
updateWillReturnResponse = true
}
s.extensionsHandler.HandleLiveUpdate(update, ex, &response.Extensions, updateWillReturnResponse, isInitial)
}
}
}
logger.Trace().Str("user", s.userID).Int("ops", len(responseOperations)).Int("subs", len(response.Rooms)).Msg("liveUpdate: returning")
logger.Trace().Str("user", s.userID).Int("subs", len(response.Rooms)).Msg("liveUpdate: returning")
// TODO: op consolidation
return responseOperations
}
func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, responseOperations []sync3.ResponseOp, response *sync3.Response) []sync3.ResponseOp {
roomUpdate, ok := up.(caches.RoomUpdate)
func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, response *sync3.Response) bool {
hasUpdates := false
internal.Assert("processLiveUpdate: response list length != internal list length", s.lists.Len() == len(response.Lists))
internal.Assert("processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists))
// do global connection updates (e.g adding/removing rooms from allRooms)
s.processGlobalUpdates(ctx, up)
// process room subscriptions
isSubscribedToRoom := false
roomUpdate, ok := up.(*caches.RoomEventUpdate)
if ok {
// always update our view of the world
s.lists.ForEach(func(index int, list *sync3.FilteredSortableRooms) {
// see if the latest room metadata means we delete a room, else update our state
deletedIndex := list.UpdateGlobalRoomMetadata(roomUpdate.GlobalRoomMetadata())
if op := s.writeDeleteOp(index, deletedIndex); op != nil {
responseOperations = append(responseOperations, op)
}
// see if the latest user room metadata means we delete a room (e.g it transition from dm to non-dm)
// modify notification counts, DM-ness, etc
deletedIndex = list.UpdateUserRoomMetadata(roomUpdate.RoomID(), roomUpdate.UserRoomMetadata())
if op := s.writeDeleteOp(index, deletedIndex); op != nil {
responseOperations = append(responseOperations, op)
}
})
if _, ok = s.roomSubscriptions[roomUpdate.RoomID()]; ok {
// there is a subscription for this room, so update the room subscription response
roomSub := *s.getDeltaRoomData(roomUpdate.RoomID(), roomUpdate.EventData.Event)
response.Rooms[roomUpdate.RoomID()] = roomSub // TODO: consolidate updates
isSubscribedToRoom = true
hasUpdates = true
}
}
// do per-list updates (e.g resorting, adding/removing rooms which no longer match filter)
s.lists.ForEach(func(index int, list *sync3.FilteredSortableRooms) {
reqList := s.muxedReq.Lists[index]
// TODO: remove need for index - required for required_state calcs.
if s.processLiveUpdateForList(ctx, up, index, &reqList, s.lists.List(index), &response.Lists[index], isSubscribedToRoom) {
hasUpdates = true
}
})
return hasUpdates
}
// this function does any updates which apply to the connection, regardless of which lists/subs exist.
func (s *connStateLive) processGlobalUpdates(ctx context.Context, up caches.Update) {
// TODO: joins and leave?
switch update := up.(type) {
case *caches.RoomEventUpdate:
logger.Trace().Str("user", s.userID).Str("type", update.EventData.EventType).Msg("received event update")
subs, ops := s.processIncomingEvent(ctx, update)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.Rooms[sub.RoomID] = sub
}
case *caches.UnreadCountUpdate:
logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Msg("received unread count update")
subs, ops := s.processUnreadCountUpdate(ctx, update)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.Rooms[sub.RoomID] = sub
// keep track of the latest stream position
if update.EventData.LatestPos > s.loadPosition {
s.loadPosition = update.EventData.LatestPos
}
case *caches.InviteUpdate:
logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Msg("received invite update")
if update.Retired {
// remove the room from all rooms
logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Msg("processGlobalUpdates: room was retired")
for i, r := range s.allRooms {
if r.RoomID == update.RoomID() {
// delete the room
@ -144,109 +153,147 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
s.allRooms = s.allRooms[:len(s.allRooms)-1]
}
}
// remove the room from any lists
s.lists.ForEach(func(index int, list *sync3.FilteredSortableRooms) {
deletedIndex := list.Remove(update.RoomID())
if op := s.writeDeleteOp(index, deletedIndex); op != nil {
responseOperations = append(responseOperations, op)
} else {
// add the room to allRooms if it doesn't exist
exists := false
for _, r := range s.allRooms {
if r.RoomID == update.RoomID() {
exists = true
break
}
})
}
if !exists {
logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Msg("processGlobalUpdates: newly invited room")
// TODO: factor out
metadata := update.InviteData.RoomMetadata()
s.allRooms = append(s.allRooms, sync3.RoomConnMetadata{
RoomMetadata: *metadata,
UserRoomData: *update.UserRoomMetadata(),
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(metadata, 5), "#!():_@"),
),
})
}
}
}
}
func (s *connStateLive) processLiveUpdateForList(
ctx context.Context, up caches.Update, listIndex int, reqList *sync3.RequestList, intList *sync3.FilteredSortableRooms, resList *sync3.ResponseList,
isSubscribedToRoom bool,
) bool {
hasUpdates := false
roomUpdate, ok := up.(caches.RoomUpdate)
if ok { // update the internal lists - this may remove rooms if the room no longer matches a filter
// see if the latest room metadata means we delete a room, else update our state
deletedIndex := intList.UpdateGlobalRoomMetadata(roomUpdate.GlobalRoomMetadata())
if op := s.writeDeleteOp(reqList, deletedIndex); op != nil {
resList.Ops = append(resList.Ops, op)
hasUpdates = true
}
// see if the latest user room metadata means we delete a room (e.g it transition from dm to non-dm)
// modify notification counts, DM-ness, etc
deletedIndex = intList.UpdateUserRoomMetadata(roomUpdate.RoomID(), roomUpdate.UserRoomMetadata())
if op := s.writeDeleteOp(reqList, deletedIndex); op != nil {
resList.Ops = append(resList.Ops, op)
hasUpdates = true
}
}
switch update := up.(type) {
case *caches.RoomEventUpdate:
logger.Trace().Str("user", s.userID).Str("type", update.EventData.EventType).Msg("received event update")
ops := s.processIncomingEventForList(ctx, update, listIndex, reqList, intList, isSubscribedToRoom)
resList.Ops = append(resList.Ops, ops...)
case *caches.UnreadCountUpdate:
logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Msg("received unread count update")
ops := s.processUnreadCountUpdateForList(ctx, update, listIndex, reqList, intList, isSubscribedToRoom)
resList.Ops = append(resList.Ops, ops...)
case *caches.InviteUpdate:
logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Msg("received invite update")
if update.Retired {
// remove the room from this list if need be
deletedIndex := intList.Remove(update.RoomID())
if op := s.writeDeleteOp(reqList, deletedIndex); op != nil {
resList.Ops = append(resList.Ops, op)
hasUpdates = true
}
} else {
roomUpdate := &caches.RoomEventUpdate{
RoomUpdate: update.RoomUpdate,
EventData: update.InviteData.InviteEvent,
}
subs, ops := s.processIncomingEvent(ctx, roomUpdate)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.Rooms[sub.RoomID] = sub
}
ops := s.processIncomingEventForList(ctx, roomUpdate, listIndex, reqList, intList, isSubscribedToRoom)
resList.Ops = append(resList.Ops, ops...)
}
}
return responseOperations
if !hasUpdates {
hasUpdates = len(resList.Ops) > 0
}
return hasUpdates
}
func (s *connStateLive) processUnreadCountUpdate(ctx context.Context, up *caches.UnreadCountUpdate) ([]sync3.Room, []sync3.ResponseOp) {
func (s *connStateLive) processUnreadCountUpdateForList(
ctx context.Context, up *caches.UnreadCountUpdate, listIndex int, reqList *sync3.RequestList, intList *sync3.FilteredSortableRooms,
isSubscribedToRoom bool,
) []sync3.ResponseOp {
if !up.HasCountDecreased {
// if the count increases then we'll notify the user for the event which increases the count, hence
// do nothing. We only care to notify the user when the counts decrease.
return nil, nil
return nil
}
var responseOperations []sync3.ResponseOp
var rooms []sync3.Room
s.lists.ForEach(func(index int, list *sync3.FilteredSortableRooms) {
fromIndex, ok := list.IndexOf(up.RoomID())
if !ok {
return
}
roomSubs, ops := s.resort(ctx, index, &s.muxedReq.Lists[index], list, up.RoomID(), fromIndex, nil, false, false)
rooms = append(rooms, roomSubs...)
responseOperations = append(responseOperations, ops...)
})
return rooms, responseOperations
fromIndex, ok := intList.IndexOf(up.RoomID())
if !ok {
return nil
}
return s.resort(ctx, listIndex, reqList, intList, up.RoomID(), fromIndex, nil, false, false, isSubscribedToRoom)
}
func (s *connStateLive) processIncomingEvent(ctx context.Context, update *caches.RoomEventUpdate) ([]sync3.Room, []sync3.ResponseOp) {
var responseOperations []sync3.ResponseOp
var rooms []sync3.Room
// keep track of the latest stream position
if update.EventData.LatestPos > s.loadPosition {
s.loadPosition = update.EventData.LatestPos
}
s.lists.ForEach(func(index int, list *sync3.FilteredSortableRooms) {
fromIndex, ok := list.IndexOf(update.RoomID())
newlyAdded := false
if !ok {
// the user may have just joined the room hence not have an entry in this list yet.
fromIndex = int(list.Len())
roomMetadata := update.GlobalRoomMetadata()
roomMetadata.RemoveHero(s.userID)
newRoomConn := sync3.RoomConnMetadata{
RoomMetadata: *roomMetadata,
UserRoomData: *update.UserRoomMetadata(),
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(roomMetadata, 5), "#!():_@"),
),
}
if !list.Add(newRoomConn) {
// we didn't add this room to the list so we don't need to resort
return
}
logger.Info().Str("room", update.RoomID()).Msg("room added")
newlyAdded = true
func (s *connStateLive) processIncomingEventForList(
ctx context.Context, update *caches.RoomEventUpdate, listIndex int, reqList *sync3.RequestList, intList *sync3.FilteredSortableRooms,
isSubscribedToRoom bool,
) []sync3.ResponseOp {
fromIndex, ok := intList.IndexOf(update.RoomID())
newlyAdded := false
if !ok {
// the user may have just joined the room hence not have an entry in this list yet.
fromIndex = int(intList.Len())
roomMetadata := update.GlobalRoomMetadata()
roomMetadata.RemoveHero(s.userID)
newRoomConn := sync3.RoomConnMetadata{
RoomMetadata: *roomMetadata,
UserRoomData: *update.UserRoomMetadata(),
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(roomMetadata, 5), "#!():_@"),
),
}
roomSubs, ops := s.resort(ctx, index, &s.muxedReq.Lists[index], list, update.RoomID(), fromIndex, update.EventData.Event, newlyAdded, update.EventData.ForceInitial)
rooms = append(rooms, roomSubs...)
responseOperations = append(responseOperations, ops...)
})
return rooms, responseOperations
if !intList.Add(newRoomConn) {
// we didn't add this room to the list so we don't need to resort
return nil
}
logger.Info().Str("room", update.RoomID()).Msg("room added")
newlyAdded = true
}
return s.resort(ctx, listIndex, reqList, intList, update.RoomID(), fromIndex, update.EventData.Event, newlyAdded, update.EventData.ForceInitial, isSubscribedToRoom)
}
// Resort should be called after a specific room has been modified in `sortedJoinedRooms`.
func (s *connStateLive) resort(
ctx context.Context,
listIndex int, reqList *sync3.RequestList, roomList *sync3.FilteredSortableRooms, roomID string,
fromIndex int, newEvent json.RawMessage, newlyAdded, forceInitial bool,
) ([]sync3.Room, []sync3.ResponseOp) {
listIndex int, reqList *sync3.RequestList, intList *sync3.FilteredSortableRooms, roomID string,
fromIndex int, newEvent json.RawMessage, newlyAdded, forceInitial, isSubscribedToRoom bool,
) []sync3.ResponseOp {
if reqList.Sort == nil {
reqList.Sort = []string{sync3.SortByRecency}
}
if err := roomList.Sort(reqList.Sort); err != nil {
if err := intList.Sort(reqList.Sort); err != nil {
logger.Err(err).Msg("cannot sort list")
}
var subs []sync3.Room
isSubscribedToRoom := false
if _, ok := s.roomSubscriptions[roomID]; ok {
// there is a subscription for this room, so update the room subscription field
subs = append(subs, *s.getDeltaRoomData(roomID, newEvent))
isSubscribedToRoom = true
}
toIndex, _ := roomList.IndexOf(roomID)
toIndex, _ := intList.IndexOf(roomID)
isInsideRange := reqList.Ranges.Inside(int64(toIndex))
logger = logger.With().Str("room", roomID).Int("from", fromIndex).Int("to", toIndex).Bool("inside_range", isInsideRange).Logger()
logger.Info().Bool("newEvent", newEvent != nil).Msg("moved!")
@ -254,21 +301,21 @@ func (s *connStateLive) resort(
// different room
if !isInsideRange {
toIndex = int(reqList.Ranges.UpperClamp(int64(toIndex)))
count := int(roomList.Len())
count := int(intList.Len())
if toIndex >= count {
// no room exists
logger.Warn().Int("to", toIndex).Int("size", count).Msg(
"cannot move to index, it's greater than the list of sorted rooms",
)
return subs, nil
return nil
}
if toIndex == -1 {
logger.Warn().Int("from", fromIndex).Int("to", toIndex).Interface("ranges", reqList.Ranges).Msg(
"room moved but not in tracked ranges, ignoring",
)
return subs, nil
return nil
}
toRoom := roomList.Get(toIndex)
toRoom := intList.Get(toIndex)
// fake an update event for this room.
// We do this because we are introducing a new room in the list because of this situation:
@ -294,11 +341,11 @@ func (s *connStateLive) resort(
"Rooms may be duplicated in the list.",
)
// do nothing and pretend the new event didn't exist...
return subs, nil
return nil
}
}
return subs, s.moveRoom(ctx, reqList, listIndex, roomID, newEvent, fromIndex, toIndex, reqList.Ranges, isSubscribedToRoom, newlyAdded, forceInitial)
return s.moveRoom(ctx, reqList, listIndex, roomID, newEvent, fromIndex, toIndex, reqList.Ranges, isSubscribedToRoom, newlyAdded, forceInitial)
}
// Move a room from an absolute index position to another absolute position.
@ -327,7 +374,6 @@ func (s *connStateLive) moveRoom(
}
return []sync3.ResponseOp{
&sync3.ResponseOpSingle{
List: listIndex,
Operation: op,
Index: &fromIndex,
Room: room,
@ -354,15 +400,28 @@ func (s *connStateLive) moveRoom(
return []sync3.ResponseOp{
&sync3.ResponseOpSingle{
List: listIndex,
Operation: sync3.OpDelete,
Index: &deleteIndex,
},
&sync3.ResponseOpSingle{
List: listIndex,
Operation: sync3.OpInsert,
Index: &toIndex,
Room: room,
},
}
}
func (s *ConnState) writeDeleteOp(reqList *sync3.RequestList, deletedIndex int) sync3.ResponseOp {
// update operations return -1 if nothing gets deleted
if deletedIndex < 0 {
return nil
}
// only notify if we are tracking this index
if !reqList.Ranges.Inside(int64(deletedIndex)) {
return nil
}
return &sync3.ResponseOpSingle{
Operation: sync3.OpDelete,
Index: &deletedIndex,
}
}

View File

@ -121,29 +121,33 @@ func TestConnStateInitial(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Counts: []int{3},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 9},
Rooms: []sync3.Room{
{
RoomID: roomB.RoomID,
Name: roomB.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomB.RoomID]},
},
{
RoomID: roomC.RoomID,
Name: roomC.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomC.RoomID]},
},
{
RoomID: roomA.RoomID,
Name: roomA.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomA.RoomID]},
Lists: []sync3.ResponseList{
{
Count: 3,
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 9},
Rooms: []sync3.Room{
{
RoomID: roomB.RoomID,
Name: roomB.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomB.RoomID]},
},
{
RoomID: roomC.RoomID,
Name: roomC.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomC.RoomID]},
},
{
RoomID: roomA.RoomID,
Name: roomA.NameEvent,
Initial: true,
Timeline: []json.RawMessage{timeline[roomA.RoomID]},
},
},
},
},
},
@ -169,17 +173,21 @@ func TestConnStateInitial(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Counts: []int{3},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(2),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
Room: &sync3.Room{
RoomID: roomA.RoomID,
Lists: []sync3.ResponseList{
{
Count: 3,
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(2),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
Room: &sync3.Room{
RoomID: roomA.RoomID,
},
},
},
},
},
@ -202,13 +210,17 @@ func TestConnStateInitial(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Counts: []int{3},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "UPDATE",
Index: intPtr(0),
Room: &sync3.Room{
RoomID: roomA.RoomID,
Lists: []sync3.ResponseList{
{
Count: 3,
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "UPDATE",
Index: intPtr(0),
Room: &sync3.Room{
RoomID: roomA.RoomID,
},
},
},
},
},
@ -273,20 +285,24 @@ func TestConnStateMultipleRanges(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Counts: []int{(len(rooms))},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 2},
Rooms: []sync3.Room{
{
RoomID: roomIDs[0],
},
{
RoomID: roomIDs[1],
},
{
RoomID: roomIDs[2],
Lists: []sync3.ResponseList{
{
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 2},
Rooms: []sync3.Room{
{
RoomID: roomIDs[0],
},
{
RoomID: roomIDs[1],
},
{
RoomID: roomIDs[2],
},
},
},
},
},
@ -305,20 +321,24 @@ func TestConnStateMultipleRanges(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Counts: []int{len(rooms)},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{4, 6},
Rooms: []sync3.Room{
{
RoomID: roomIDs[4],
},
{
RoomID: roomIDs[5],
},
{
RoomID: roomIDs[6],
Lists: []sync3.ResponseList{
{
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{4, 6},
Rooms: []sync3.Room{
{
RoomID: roomIDs[4],
},
{
RoomID: roomIDs[5],
},
{
RoomID: roomIDs[6],
},
},
},
},
},
@ -348,17 +368,21 @@ func TestConnStateMultipleRanges(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Counts: []int{len(rooms)},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
Room: &sync3.Room{
RoomID: roomIDs[8],
Lists: []sync3.ResponseList{
{
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
Room: &sync3.Room{
RoomID: roomIDs[8],
},
},
},
},
},
@ -388,17 +412,21 @@ func TestConnStateMultipleRanges(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Counts: []int{len(rooms)},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(4),
Room: &sync3.Room{
RoomID: roomIDs[2],
Lists: []sync3.ResponseList{
{
Count: len(rooms),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(4),
Room: &sync3.Room{
RoomID: roomIDs[2],
},
},
},
},
},
@ -457,17 +485,21 @@ func TestBumpToOutsideRange(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &sync3.Response{
Counts: []int{4},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 1},
Rooms: []sync3.Room{
{
RoomID: roomA.RoomID,
},
{
RoomID: roomB.RoomID,
Lists: []sync3.ResponseList{
{
Count: 4,
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 1},
Rooms: []sync3.Room{
{
RoomID: roomA.RoomID,
},
{
RoomID: roomB.RoomID,
},
},
},
},
},
@ -494,7 +526,7 @@ func TestBumpToOutsideRange(t *testing.T) {
if err != nil {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
if len(res.Ops) > 0 {
if len(res.Lists[0].Ops) > 0 {
t.Errorf("response returned ops, expected none")
}
}
@ -571,7 +603,35 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Counts: []int{len(roomIDs)},
Lists: []sync3.ResponseList{
{
Count: len(roomIDs),
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 1},
Rooms: []sync3.Room{
{
RoomID: roomA.RoomID,
Name: roomA.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomA.RoomID],
},
},
{
RoomID: roomB.RoomID,
Name: roomB.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomB.RoomID],
},
},
},
},
},
},
},
Rooms: map[string]sync3.Room{
roomD.RoomID: {
RoomID: roomD.RoomID,
@ -582,30 +642,6 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
},
},
},
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 1},
Rooms: []sync3.Room{
{
RoomID: roomA.RoomID,
Name: roomA.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomA.RoomID],
},
},
{
RoomID: roomB.RoomID,
Name: roomB.NameEvent,
Initial: true,
Timeline: []json.RawMessage{
timeline[roomB.RoomID],
},
},
},
},
},
})
// room D gets a new event
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, testutils.WithTimestamp(gomatrixserverlib.Timestamp(timestampNow+2000).Time()))
@ -625,7 +661,11 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Counts: []int{len(roomIDs)},
Lists: []sync3.ResponseList{
{
Count: len(roomIDs),
},
},
Rooms: map[string]sync3.Room{
roomD.RoomID: {
RoomID: roomD.RoomID,
@ -656,7 +696,11 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &sync3.Response{
Counts: []int{len(roomIDs)},
Lists: []sync3.ResponseList{
{
Count: len(roomIDs),
},
},
Rooms: map[string]sync3.Room{
roomC.RoomID: {
RoomID: roomC.RoomID,
@ -672,52 +716,59 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
func checkResponse(t *testing.T, checkRoomIDsOnly bool, got, want *sync3.Response) {
t.Helper()
if len(want.Counts) > 0 {
if !reflect.DeepEqual(got.Counts, want.Counts) {
t.Errorf("response Counts: got %v want %v", got.Counts, want.Counts)
}
if len(got.Lists) != len(want.Lists) {
t.Errorf("got %v lists, want %v", len(got.Lists), len(want.Lists))
}
if len(want.Ops) > 0 {
t.Logf("got %v", serialise(t, got))
t.Logf("want %v", serialise(t, want))
defer func() {
t.Helper()
if !t.Failed() {
t.Logf("OK!")
}
}()
if len(got.Ops) != len(want.Ops) {
t.Fatalf("got %d ops, want %d", len(got.Ops), len(want.Ops))
for i := 0; i < len(want.Lists); i++ {
wl := want.Lists[i]
gl := got.Lists[i]
if wl.Count > 0 && gl.Count != wl.Count {
t.Errorf("response list %d got count %d want %d", i, gl.Count, wl.Count)
}
for i, wantOpVal := range want.Ops {
gotOp := got.Ops[i]
if gotOp.Op() != wantOpVal.Op() {
t.Errorf("operation i=%d got '%s' want '%s'", i, gotOp.Op(), wantOpVal.Op())
if len(wl.Ops) > 0 {
t.Logf("got %v", serialise(t, gl))
t.Logf("want %v", serialise(t, wl))
t.Logf("DEBUG %v", serialise(t, got))
defer func() {
t.Helper()
if !t.Failed() {
t.Logf("OK!")
}
}()
if len(gl.Ops) != len(wl.Ops) {
t.Fatalf("got %d ops, want %d", len(gl.Ops), len(wl.Ops))
}
switch wantOp := wantOpVal.(type) {
case *sync3.ResponseOpRange:
gotOpRange, ok := gotOp.(*sync3.ResponseOpRange)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpRange but it isn't", i, gotOp.Op())
for i, wantOpVal := range wl.Ops {
gotOp := gl.Ops[i]
if gotOp.Op() != wantOpVal.Op() {
t.Errorf("operation i=%d got '%s' want '%s'", i, gotOp.Op(), wantOpVal.Op())
}
if !reflect.DeepEqual(gotOpRange.Range, wantOp.Range) {
t.Errorf("operation i=%d (%s) got range %v want range %v", i, gotOp.Op(), gotOpRange.Range, wantOp.Range)
switch wantOp := wantOpVal.(type) {
case *sync3.ResponseOpRange:
gotOpRange, ok := gotOp.(*sync3.ResponseOpRange)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpRange but it isn't", i, gotOp.Op())
}
if !reflect.DeepEqual(gotOpRange.Range, wantOp.Range) {
t.Errorf("operation i=%d (%s) got range %v want range %v", i, gotOp.Op(), gotOpRange.Range, wantOp.Range)
}
if len(gotOpRange.Rooms) != len(wantOp.Rooms) {
t.Fatalf("operation i=%d (%s) got %d rooms in array, want %d", i, gotOp.Op(), len(gotOpRange.Rooms), len(wantOp.Rooms))
}
for j := range wantOp.Rooms {
checkRoomsEqual(t, checkRoomIDsOnly, &gotOpRange.Rooms[j], &wantOp.Rooms[j])
}
case *sync3.ResponseOpSingle:
gotOpSingle, ok := gotOp.(*sync3.ResponseOpSingle)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpSingle but it isn't", i, gotOp.Op())
}
if *gotOpSingle.Index != *wantOp.Index {
t.Errorf("operation i=%d (%s) single op on index %d want index %d", i, gotOp.Op(), *gotOpSingle.Index, *wantOp.Index)
}
checkRoomsEqual(t, checkRoomIDsOnly, gotOpSingle.Room, wantOp.Room)
}
if len(gotOpRange.Rooms) != len(wantOp.Rooms) {
t.Fatalf("operation i=%d (%s) got %d rooms in array, want %d", i, gotOp.Op(), len(gotOpRange.Rooms), len(wantOp.Rooms))
}
for j := range wantOp.Rooms {
checkRoomsEqual(t, checkRoomIDsOnly, &gotOpRange.Rooms[j], &wantOp.Rooms[j])
}
case *sync3.ResponseOpSingle:
gotOpSingle, ok := gotOp.(*sync3.ResponseOpSingle)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpSingle but it isn't", i, gotOp.Op())
}
if *gotOpSingle.Index != *wantOp.Index {
t.Errorf("operation i=%d (%s) single op on index %d want index %d", i, gotOp.Op(), *gotOpSingle.Index, *wantOp.Index)
}
checkRoomsEqual(t, checkRoomIDsOnly, gotOpSingle.Room, wantOp.Room)
}
}
}

View File

@ -31,11 +31,11 @@ func (s *SortableRoomLists) Set(index int, val *FilteredSortableRooms) {
s.lists[index] = val
}
// Counts returns the counts of all lists
func (s *SortableRoomLists) Counts() []int {
counts := make([]int, len(s.lists))
for i := range s.lists {
counts[i] = int(s.lists[i].Len())
}
return counts
// Count returns the count of total rooms in this list
func (s *SortableRoomLists) Count(index int) int {
return int(s.lists[index].Len())
}
func (s *SortableRoomLists) Len() int {
return len(s.lists)
}

View File

@ -17,29 +17,43 @@ const (
)
type Response struct {
Ops []ResponseOp `json:"ops"`
Rooms map[string]Room `json:"rooms"`
Counts []int `json:"counts"`
Lists []ResponseList `json:"lists"`
Rooms map[string]Room `json:"rooms"`
Extensions extensions.Response `json:"extensions"`
Pos string `json:"pos"`
Session string `json:"session_id,omitempty"`
}
type ResponseList struct {
Ops []ResponseOp `json:"ops"`
Count int `json:"count"`
}
func (r *Response) PosInt() int64 {
p, _ := strconv.ParseInt(r.Pos, 10, 64)
return p
}
func (r *Response) ListOps() int {
num := 0
for _, l := range r.Lists {
if len(l.Ops) > 0 {
num += len(l.Ops)
}
}
return num
}
// Custom unmarshal so we can dynamically create the right ResponseOp for Ops
func (r *Response) UnmarshalJSON(b []byte) error {
temporary := struct {
Ops []json.RawMessage `json:"ops"`
Rooms map[string]Room `json:"rooms"`
Counts []int `json:"counts"`
Rooms map[string]Room `json:"rooms"`
Lists []struct {
Ops []json.RawMessage `json:"ops"`
Count int `json:"count"`
} `json:"lists"`
Extensions extensions.Response `json:"extensions"`
Pos string `json:"pos"`
@ -49,25 +63,31 @@ func (r *Response) UnmarshalJSON(b []byte) error {
return err
}
r.Rooms = temporary.Rooms
r.Counts = temporary.Counts
r.Pos = temporary.Pos
r.Session = temporary.Session
r.Extensions = temporary.Extensions
r.Lists = make([]ResponseList, len(temporary.Lists))
for _, op := range temporary.Ops {
if gjson.GetBytes(op, "range").Exists() {
var oper ResponseOpRange
if err := json.Unmarshal(op, &oper); err != nil {
return err
for i := range temporary.Lists {
l := temporary.Lists[i]
var list ResponseList
list.Count = l.Count
for _, op := range l.Ops {
if gjson.GetBytes(op, "range").Exists() {
var oper ResponseOpRange
if err := json.Unmarshal(op, &oper); err != nil {
return err
}
list.Ops = append(list.Ops, &oper)
} else {
var oper ResponseOpSingle
if err := json.Unmarshal(op, &oper); err != nil {
return err
}
list.Ops = append(list.Ops, &oper)
}
r.Ops = append(r.Ops, &oper)
} else {
var oper ResponseOpSingle
if err := json.Unmarshal(op, &oper); err != nil {
return err
}
r.Ops = append(r.Ops, &oper)
}
r.Lists[i] = list
}
return nil
@ -81,11 +101,13 @@ type ResponseOp interface {
// Return which room IDs these set of operations are returning information on. Information means
// things like SYNC/INSERT/UPDATE, and not DELETE/INVALIDATE.
func IncludedRoomIDsInOps(ops []ResponseOp) map[string]struct{} {
func IncludedRoomIDsInOps(lists []ResponseList) map[string]struct{} {
set := make(map[string]struct{})
for _, o := range ops {
for _, roomID := range o.IncludedRoomIDs() {
set[roomID] = struct{}{}
for _, list := range lists {
for _, op := range list.Ops {
for _, roomID := range op.IncludedRoomIDs() {
set[roomID] = struct{}{}
}
}
}
return set
@ -93,7 +115,6 @@ func IncludedRoomIDsInOps(ops []ResponseOp) map[string]struct{} {
type ResponseOpRange struct {
Operation string `json:"op"`
List int `json:"list"`
Range []int64 `json:"range,omitempty"`
Rooms []Room `json:"rooms,omitempty"`
}
@ -114,7 +135,6 @@ func (r *ResponseOpRange) IncludedRoomIDs() []string {
type ResponseOpSingle struct {
Operation string `json:"op"`
List int `json:"list"`
Index *int `json:"index,omitempty"` // 0 is a valid value, hence *int
Room *Room `json:"room,omitempty"`
}

View File

@ -166,7 +166,7 @@ func TestTimelinesLiveStream(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != len(wantRooms) {
return fmt.Errorf("want %d rooms, got %d", len(wantRooms), len(op.Rooms))
@ -196,10 +196,10 @@ func TestTimelinesLiveStream(t *testing.T) {
// sticky remember the timeline_limit
}},
})
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchV3DeleteOp(0, 3),
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3DeleteOp(3),
MatchV3InsertOp(
0, 0, allRooms[7].roomID,
0, allRooms[7].roomID,
MatchRoomName(allRooms[7].name),
MatchRoomTimelineMostRecent(numTimelineEventsPerRoom, allRooms[7].events),
),
@ -215,8 +215,8 @@ func TestTimelinesLiveStream(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchV3UpdateOp(0, 0, allRooms[7].roomID),
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3UpdateOp(0, allRooms[7].roomID),
))
bumpRoom(18)
@ -229,10 +229,10 @@ func TestTimelinesLiveStream(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchV3DeleteOp(0, 2),
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3DeleteOp(2),
MatchV3InsertOp(
0, 0, allRooms[18].roomID,
0, allRooms[18].roomID,
MatchRoomName(allRooms[18].name),
MatchRoomTimelineMostRecent(numTimelineEventsPerRoom, allRooms[18].events),
),
@ -267,7 +267,7 @@ func TestInitialFlag(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Ops(
MatchResponse(t, res, MatchV3Ops(0,
MatchV3SyncOpWithMatchers(MatchRoomRange(
[]roomMatcher{MatchRoomInitial(true)},
)),
@ -294,8 +294,8 @@ func TestInitialFlag(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Ops(
MatchV3UpdateOp(0, 0, roomID, MatchRoomInitial(false)),
MatchResponse(t, res, MatchV3Ops(0,
MatchV3UpdateOp(0, roomID, MatchRoomInitial(false)),
))
}
@ -336,7 +336,7 @@ func TestDuplicateEventsInTimeline(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Ops(
MatchResponse(t, res, MatchV3Ops(0,
MatchV3SyncOpWithMatchers(MatchRoomRange(
[]roomMatcher{
MatchRoomTimelineMostRecent(1, []json.RawMessage{dupeEvent}),
@ -394,7 +394,7 @@ func TestTimelineMiddleWindowZeroTimelineLimit(t *testing.T) {
}},
})
wantRooms := allRooms[5:11]
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != len(wantRooms) {
return fmt.Errorf("want %d rooms, got %d", len(wantRooms), len(op.Rooms))
@ -434,9 +434,9 @@ func TestTimelineMiddleWindowZeroTimelineLimit(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(
MatchV3DeleteOp(0, 10),
MatchV3InsertOp(0, 5, allRooms[4].roomID),
MatchResponse(t, res, MatchV3Count(len(allRooms)), MatchV3Ops(0,
MatchV3DeleteOp(10),
MatchV3InsertOp(5, allRooms[4].roomID),
))
}
@ -538,8 +538,8 @@ func TestTimelineTxnID(t *testing.T) {
},
},
})
MatchResponse(t, aliceRes, MatchV3Counts([]int{1}), MatchV3Ops(
MatchV3UpdateOp(0, 0, roomID, MatchRoomID(roomID), MatchRoomTimelineMostRecent(1, []json.RawMessage{newEvent})),
MatchResponse(t, aliceRes, MatchV3Counts([]int{1}), MatchV3Ops(0,
MatchV3UpdateOp(0, roomID, MatchRoomID(roomID), MatchRoomTimelineMostRecent(1, []json.RawMessage{newEvent})),
))
// now Bob syncs, he should see the event without the txn ID
@ -552,8 +552,8 @@ func TestTimelineTxnID(t *testing.T) {
},
},
})
MatchResponse(t, bobRes, MatchV3Counts([]int{1}), MatchV3Ops(
MatchV3UpdateOp(0, 0, roomID, MatchRoomID(roomID), MatchRoomTimelineMostRecent(1, []json.RawMessage{newEventNoUnsigned})),
MatchResponse(t, bobRes, MatchV3Counts([]int{1}), MatchV3Ops(0,
MatchV3UpdateOp(0, roomID, MatchRoomID(roomID), MatchRoomTimelineMostRecent(1, []json.RawMessage{newEventNoUnsigned})),
))
}
@ -573,7 +573,7 @@ func testTimelineLoadInitialEvents(v3 *testV3Server, token string, count int, wa
}},
})
MatchResponse(t, res, MatchV3Count(count), MatchV3Ops(
MatchResponse(t, res, MatchV3Count(count), MatchV3Ops(0,
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != len(wantRooms) {
return fmt.Errorf("want %d rooms, got %d", len(wantRooms), len(op.Rooms))
@ -629,7 +629,7 @@ func TestPrevBatchInTimeline(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Ops(
MatchResponse(t, res, MatchV3Ops(0,
MatchV3SyncOpWithMatchers(MatchRoomRange(
[]roomMatcher{
MatchRoomID(roomID),
@ -684,7 +684,7 @@ func TestPrevBatchInTimeline(t *testing.T) {
},
}},
})
MatchResponse(t, res, MatchV3Ops(
MatchResponse(t, res, MatchV3Ops(0,
MatchV3SyncOpWithMatchers(MatchRoomRange(
[]roomMatcher{
MatchRoomID(roomID),

View File

@ -455,8 +455,13 @@ func MatchV3Count(wantCount int) respMatcher {
}
func MatchV3Counts(wantCounts []int) respMatcher {
return func(res *sync3.Response) error {
if !reflect.DeepEqual(res.Counts, wantCounts) {
return fmt.Errorf("counts: got %v want %v", res.Counts, wantCounts)
if len(res.Lists) != len(wantCounts) {
return fmt.Errorf("MatchV3Counts: got %v lists, want %v", len(res.Lists), len(wantCounts))
}
for i := range wantCounts {
if res.Lists[i].Count != wantCounts[i] {
return fmt.Errorf("MatchV3Counts: list %d got %v want %v", i, res.Lists[i].Count, wantCounts[i])
}
}
return nil
}
@ -571,15 +576,12 @@ func MatchV3SyncOp(fn func(op *sync3.ResponseOpRange) error) opMatcher {
}
}
func MatchV3InsertOp(listIndex, roomIndex int, roomID string, matchers ...roomMatcher) opMatcher {
func MatchV3InsertOp(roomIndex int, roomID string, matchers ...roomMatcher) opMatcher {
return func(op sync3.ResponseOp) error {
if op.Op() != sync3.OpInsert {
return fmt.Errorf("op: %s != %s", op.Op(), sync3.OpInsert)
}
oper := op.(*sync3.ResponseOpSingle)
if oper.List != listIndex {
return fmt.Errorf("%s: got list index %d want %d", sync3.OpInsert, oper.List, listIndex)
}
if *oper.Index != roomIndex {
return fmt.Errorf("%s: got index %d want %d", sync3.OpInsert, *oper.Index, roomIndex)
}
@ -595,15 +597,12 @@ func MatchV3InsertOp(listIndex, roomIndex int, roomID string, matchers ...roomMa
}
}
func MatchV3UpdateOp(listIndex, roomIndex int, roomID string, matchers ...roomMatcher) opMatcher {
func MatchV3UpdateOp(roomIndex int, roomID string, matchers ...roomMatcher) opMatcher {
return func(op sync3.ResponseOp) error {
if op.Op() != sync3.OpUpdate {
return fmt.Errorf("op: %s != %s", op.Op(), sync3.OpUpdate)
}
oper := op.(*sync3.ResponseOpSingle)
if oper.List != listIndex {
return fmt.Errorf("%s: got list index %d want %d", sync3.OpUpdate, oper.List, listIndex)
}
if *oper.Index != roomIndex {
return fmt.Errorf("%s: got room index %d want %d", sync3.OpUpdate, *oper.Index, roomIndex)
}
@ -619,7 +618,7 @@ func MatchV3UpdateOp(listIndex, roomIndex int, roomID string, matchers ...roomMa
}
}
func MatchV3DeleteOp(listIndex, roomIndex int) opMatcher {
func MatchV3DeleteOp(roomIndex int) opMatcher {
return func(op sync3.ResponseOp) error {
if op.Op() != sync3.OpDelete {
return fmt.Errorf("op: %s != %s", op.Op(), sync3.OpDelete)
@ -628,14 +627,11 @@ func MatchV3DeleteOp(listIndex, roomIndex int) opMatcher {
if *oper.Index != roomIndex {
return fmt.Errorf("%s: got room index %d want %d", sync3.OpDelete, *oper.Index, roomIndex)
}
if oper.List != listIndex {
return fmt.Errorf("%s: got list index %d want %d", sync3.OpDelete, oper.List, listIndex)
}
return nil
}
}
func MatchV3InvalidateOp(listIndex int, start, end int64) opMatcher {
func MatchV3InvalidateOp(start, end int64) opMatcher {
return func(op sync3.ResponseOp) error {
if op.Op() != sync3.OpInvalidate {
return fmt.Errorf("op: %s != %s", op.Op(), sync3.OpInvalidate)
@ -647,20 +643,31 @@ func MatchV3InvalidateOp(listIndex int, start, end int64) opMatcher {
if oper.Range[1] != end {
return fmt.Errorf("%s: got end %d want %d", sync3.OpInvalidate, oper.Range[1], end)
}
if oper.List != listIndex {
return fmt.Errorf("%s: got list index %d want %d", sync3.OpInvalidate, oper.List, listIndex)
return nil
}
}
func MatchNoV3Ops() respMatcher {
return func(res *sync3.Response) error {
for i, l := range res.Lists {
if len(l.Ops) > 0 {
return fmt.Errorf("MatchNoV3Ops: list %d got %d ops", i, len(l.Ops))
}
}
return nil
}
}
func MatchV3Ops(matchOps ...opMatcher) respMatcher {
func MatchV3Ops(listIndex int, matchOps ...opMatcher) respMatcher {
return func(res *sync3.Response) error {
if len(matchOps) != len(res.Ops) {
return fmt.Errorf("ops: got %d ops want %d", len(res.Ops), len(matchOps))
if listIndex >= len(res.Lists) {
return fmt.Errorf("MatchV3Ops: list index %d doesn't exist", listIndex)
}
for i := range res.Ops {
op := res.Ops[i]
if len(matchOps) != len(res.Lists[listIndex].Ops) {
return fmt.Errorf("ops: got %d ops want %d", len(res.Lists[listIndex].Ops), len(matchOps))
}
for i := range res.Lists[listIndex].Ops {
op := res.Lists[listIndex].Ops[i]
if err := matchOps[i](op); err != nil {
return fmt.Errorf("op[%d](%s) - %s", i, op.Op(), err)
}