mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Fix #365: only return the last joined range
If we returned multiple distinct ranges, we always assumed that the history visibility was "joined", so we would never return events in the invite/shared state. This would be fine if the client had a way to fetch those events sent before they were joined, but they did not have a way as the prev_batch token would not be set correctly. We now only return a single range of events and the prev batch for /that/ range only, and defer to the upstream HS for history visibility calculations. Add end-to-end test to assert this new behaviour works.
This commit is contained in:
parent
89c4198765
commit
7e06813fe2
@ -701,38 +701,31 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str
|
||||
// - with NIDs <= `to`.
|
||||
// Up to `limit` events are chosen per room.
|
||||
func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*LatestEvents, error) {
|
||||
roomIDToRanges, err := s.visibleEventNIDsBetweenForRooms(userID, roomIDs, 0, to)
|
||||
roomIDToRange, err := s.visibleEventNIDsBetweenForRooms(userID, roomIDs, 0, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make(map[string]*LatestEvents, len(roomIDs))
|
||||
err = sqlutil.WithTransaction(s.Accumulator.db, func(txn *sqlx.Tx) error {
|
||||
for roomID, ranges := range roomIDToRanges {
|
||||
for roomID, r := range roomIDToRange {
|
||||
var earliestEventNID int64
|
||||
var latestEventNID int64
|
||||
var roomEvents []json.RawMessage
|
||||
// start at the most recent range as we want to return the most recent `limit` events
|
||||
for i := len(ranges) - 1; i >= 0; i-- {
|
||||
// the most recent event will be first
|
||||
events, err := s.EventsTable.SelectLatestEventsBetween(txn, roomID, r[0]-1, r[1], limit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("room %s failed to SelectEventsBetween: %s", roomID, err)
|
||||
}
|
||||
// keep pushing to the front so we end up with A,B,C
|
||||
for _, ev := range events {
|
||||
if latestEventNID == 0 { // set first time and never again
|
||||
latestEventNID = ev.NID
|
||||
}
|
||||
roomEvents = append([]json.RawMessage{ev.JSON}, roomEvents...)
|
||||
earliestEventNID = ev.NID
|
||||
if len(roomEvents) >= limit {
|
||||
break
|
||||
}
|
||||
r := ranges[i]
|
||||
// the most recent event will be first
|
||||
events, err := s.EventsTable.SelectLatestEventsBetween(txn, roomID, r[0]-1, r[1], limit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("room %s failed to SelectEventsBetween: %s", roomID, err)
|
||||
}
|
||||
// keep pushing to the front so we end up with A,B,C
|
||||
for _, ev := range events {
|
||||
if latestEventNID == 0 { // set first time and never again
|
||||
latestEventNID = ev.NID
|
||||
}
|
||||
roomEvents = append([]json.RawMessage{ev.JSON}, roomEvents...)
|
||||
earliestEventNID = ev.NID
|
||||
if len(roomEvents) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
latestEvents := LatestEvents{
|
||||
LatestNID: latestEventNID,
|
||||
@ -756,7 +749,7 @@ func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64,
|
||||
// visibleEventNIDsBetweenForRooms determines which events a given user has permission to see.
|
||||
// It accepts a nid range [from, to]. For each given room, it calculates the NID ranges
|
||||
// [A1, B1], [A2, B2], ... within [from, to] in which the user has permission to see events.
|
||||
func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][][2]int64, error) {
|
||||
func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][2]int64, error) {
|
||||
// load *THESE* joined rooms for this user at from (inclusive)
|
||||
var membershipEvents []Event
|
||||
var err error
|
||||
@ -782,7 +775,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
|
||||
}
|
||||
|
||||
// Work out the NID ranges to pull events from for this user. Given a from and to event nid stream position,
|
||||
// this function returns a map of room ID to a slice of 2-element from|to positions. These positions are
|
||||
// this function returns a map of room ID to a 2-element from|to positions. These positions are
|
||||
// all INCLUSIVE, and the client should be informed of these events at some point. For example:
|
||||
//
|
||||
// Stream Positions
|
||||
@ -793,20 +786,23 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
|
||||
//
|
||||
// E=message event, M=membership event, followed by user letter, followed by 'i' or 'j' or 'l' for invite|join|leave
|
||||
//
|
||||
// - For Room A: from=1, to=10, returns { RoomA: [ [1,10] ]} (tests events in joined room)
|
||||
// - For Room B: from=1, to=10, returns { RoomB: [ [5,10] ]} (tests joining a room starts events)
|
||||
// - For Room C: from=1, to=10, returns { RoomC: [ [0,9] ]} (tests leaving a room stops events)
|
||||
// - For Room A: from=1, to=10, returns { RoomA: [ 1,10 ]} (tests events in joined room)
|
||||
// - For Room B: from=1, to=10, returns { RoomB: [ 5,10 ]} (tests joining a room starts events)
|
||||
// - For Room C: from=1, to=10, returns { RoomC: [ 0,9 ]} (tests leaving a room stops events)
|
||||
//
|
||||
// Multiple slices can occur when a user leaves and re-joins the same room, and invites are same-element positions:
|
||||
// In cases where a user joins/leaves a room multiple times in the nid range, only the last range is returned.
|
||||
// This is critical to ensure we don't skip out timeline events due to history visibility (which the proxy defers
|
||||
// to the upstream HS for). See https://github.com/matrix-org/sliding-sync/issues/365 for what happens if we returned
|
||||
// all ranges.
|
||||
//
|
||||
// Stream Positions
|
||||
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
||||
// Room D Maj E Mal E Maj E Mal E
|
||||
// Room E E Mai E E Maj E E
|
||||
//
|
||||
// - For Room D: from=1, to=15 returns { RoomD: [ [1,6], [8,10] ] } (tests multi-join/leave)
|
||||
// - For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites)
|
||||
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][][2]int64, error) {
|
||||
// - For Room D: from=1, to=15 returns { RoomD: [ 8,10 ] } (tests multi-join/leave)
|
||||
// - For Room E: from=1, to=15 returns { RoomE: [ 13,15 ] } (tests invites)
|
||||
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][2]int64, error) {
|
||||
// load *ALL* joined rooms for this user at from (inclusive)
|
||||
joinTimingsAtFromByRoomID, err := s.JoinedRoomsAfterPosition(userID, from)
|
||||
if err != nil {
|
||||
@ -822,7 +818,7 @@ func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[st
|
||||
return s.visibleEventNIDsWithData(joinTimingsAtFromByRoomID, membershipEvents, userID, from, to)
|
||||
}
|
||||
|
||||
func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) {
|
||||
func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][2]int64, error) {
|
||||
// load membership events in order and bucket based on room ID
|
||||
roomIDToLogs := make(map[string][]membershipEvent)
|
||||
for _, ev := range membershipEvents {
|
||||
@ -835,13 +831,11 @@ func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]
|
||||
}
|
||||
|
||||
// Performs the algorithm
|
||||
calculateVisibleEventNIDs := func(isJoined bool, fromIncl, toIncl int64, logs []membershipEvent) [][2]int64 {
|
||||
calculateVisibleEventNIDs := func(isJoined bool, fromIncl, toIncl int64, logs []membershipEvent) [2]int64 {
|
||||
// short circuit when there are no membership deltas
|
||||
if len(logs) == 0 {
|
||||
return [][2]int64{
|
||||
{
|
||||
fromIncl, toIncl,
|
||||
},
|
||||
return [2]int64{
|
||||
fromIncl, toIncl, // TODO: is this actually valid? Surely omitting it is the right answer?
|
||||
}
|
||||
}
|
||||
var result [][2]int64
|
||||
@ -879,11 +873,16 @@ func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]
|
||||
if isJoined {
|
||||
result = append(result, [2]int64{startIndex, toIncl})
|
||||
}
|
||||
return result
|
||||
if len(result) == 0 {
|
||||
return [2]int64{}
|
||||
}
|
||||
// we only care about the LAST nid range, otherwise we can end up with gaps being returned in the
|
||||
// timeline. See https://github.com/matrix-org/sliding-sync/issues/365
|
||||
return result[len(result)-1]
|
||||
}
|
||||
|
||||
// For each joined room, perform the algorithm and delete the logs afterwards
|
||||
result := make(map[string][][2]int64)
|
||||
result := make(map[string][2]int64)
|
||||
for joinedRoomID, _ := range joinTimingsAtFromByRoomID {
|
||||
roomResult := calculateVisibleEventNIDs(true, from, to, roomIDToLogs[joinedRoomID])
|
||||
result[joinedRoomID] = roomResult
|
||||
|
@ -5,12 +5,13 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
@ -362,17 +363,15 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
|
||||
t.Fatalf("LatestEventNID: %s", err)
|
||||
}
|
||||
t.Logf("ABC Start=%d Latest=%d", startPos, latestPos)
|
||||
roomIDToVisibleRanges, err := store.VisibleEventNIDsBetween(alice, startPos, latestPos)
|
||||
roomIDToVisibleRange, err := store.VisibleEventNIDsBetween(alice, startPos, latestPos)
|
||||
if err != nil {
|
||||
t.Fatalf("VisibleEventNIDsBetween to %d: %s", latestPos, err)
|
||||
}
|
||||
for roomID, ranges := range roomIDToVisibleRanges {
|
||||
for _, r := range ranges {
|
||||
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
|
||||
}
|
||||
for roomID, r := range roomIDToVisibleRange {
|
||||
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
|
||||
}
|
||||
if len(roomIDToVisibleRanges) != 3 {
|
||||
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 3 got %+v", roomIDToVisibleRanges)
|
||||
if len(roomIDToVisibleRange) != 3 {
|
||||
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 3 got %+v", roomIDToVisibleRange)
|
||||
}
|
||||
|
||||
// check that we can query subsets too
|
||||
@ -381,23 +380,23 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
|
||||
t.Fatalf("VisibleEventNIDsBetweenForRooms to %d: %s", latestPos, err)
|
||||
}
|
||||
if len(roomIDToVisibleRangesSubset) != 2 {
|
||||
t.Errorf("VisibleEventNIDsBetweenForRooms: wrong number of rooms, want 2 got %+v", roomIDToVisibleRanges)
|
||||
t.Errorf("VisibleEventNIDsBetweenForRooms: wrong number of rooms, want 2 got %+v", roomIDToVisibleRange)
|
||||
}
|
||||
|
||||
// For Room A: from=1, to=10, returns { RoomA: [ [1,10] ]} (tests events in joined room)
|
||||
verifyRange(t, roomIDToVisibleRanges, roomA, [][2]int64{
|
||||
{1 + startPos, 10 + startPos},
|
||||
verifyRange(t, roomIDToVisibleRange, roomA, [2]int64{
|
||||
1 + startPos, 10 + startPos,
|
||||
})
|
||||
|
||||
// For Room B: from=1, to=10, returns { RoomB: [ [5,10] ]} (tests joining a room starts events)
|
||||
verifyRange(t, roomIDToVisibleRanges, roomB, [][2]int64{
|
||||
{5 + startPos, 10 + startPos},
|
||||
verifyRange(t, roomIDToVisibleRange, roomB, [2]int64{
|
||||
5 + startPos, 10 + startPos,
|
||||
})
|
||||
|
||||
// For Room C: from=1, to=10, returns { RoomC: [ [0,9] ]} (tests leaving a room stops events)
|
||||
// We start at 0 because it's the earliest event (we were joined since the beginning of the room state)
|
||||
verifyRange(t, roomIDToVisibleRanges, roomC, [][2]int64{
|
||||
{0 + startPos, 9 + startPos},
|
||||
verifyRange(t, roomIDToVisibleRange, roomC, [2]int64{
|
||||
0 + startPos, 9 + startPos,
|
||||
})
|
||||
|
||||
// change the users else we will still have some rooms from A,B,C present if the user is still joined
|
||||
@ -465,29 +464,25 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
|
||||
t.Fatalf("LatestEventNID: %s", err)
|
||||
}
|
||||
t.Logf("DE Start=%d Latest=%d", startPos, latestPos)
|
||||
roomIDToVisibleRanges, err = store.VisibleEventNIDsBetween(alice, startPos, latestPos)
|
||||
roomIDToVisibleRange, err = store.VisibleEventNIDsBetween(alice, startPos, latestPos)
|
||||
if err != nil {
|
||||
t.Fatalf("VisibleEventNIDsBetween to %d: %s", latestPos, err)
|
||||
}
|
||||
for roomID, ranges := range roomIDToVisibleRanges {
|
||||
for _, r := range ranges {
|
||||
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
|
||||
}
|
||||
for roomID, r := range roomIDToVisibleRange {
|
||||
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
|
||||
}
|
||||
if len(roomIDToVisibleRanges) != 2 {
|
||||
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 2 got %+v", roomIDToVisibleRanges)
|
||||
if len(roomIDToVisibleRange) != 2 {
|
||||
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 2 got %+v", roomIDToVisibleRange)
|
||||
}
|
||||
|
||||
// For Room D: from=1, to=15 returns { RoomD: [ [1,6], [8,10] ] } (tests multi-join/leave)
|
||||
verifyRange(t, roomIDToVisibleRanges, roomD, [][2]int64{
|
||||
{1 + startPos, 6 + startPos},
|
||||
{8 + startPos, 10 + startPos},
|
||||
// For Room D: from=1, to=15 returns { RoomD: [ 8,10 ] } (tests multi-join/leave)
|
||||
verifyRange(t, roomIDToVisibleRange, roomD, [2]int64{
|
||||
8 + startPos, 10 + startPos,
|
||||
})
|
||||
|
||||
// For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites)
|
||||
verifyRange(t, roomIDToVisibleRanges, roomE, [][2]int64{
|
||||
{3 + startPos, 3 + startPos},
|
||||
{13 + startPos, 15 + startPos},
|
||||
// For Room E: from=1, to=15 returns { RoomE: [ 13,15 ] } (tests invites)
|
||||
verifyRange(t, roomIDToVisibleRange, roomE, [2]int64{
|
||||
13 + startPos, 15 + startPos,
|
||||
})
|
||||
|
||||
}
|
||||
@ -961,18 +956,13 @@ func sortHeroes(heroes []internal.Hero) []internal.Hero {
|
||||
return heroes
|
||||
}
|
||||
|
||||
func verifyRange(t *testing.T, result map[string][][2]int64, roomID string, wantRanges [][2]int64) {
|
||||
func verifyRange(t *testing.T, result map[string][2]int64, roomID string, wantRange [2]int64) {
|
||||
t.Helper()
|
||||
gotRanges := result[roomID]
|
||||
if gotRanges == nil {
|
||||
gotRange := result[roomID]
|
||||
if gotRange == [2]int64{} {
|
||||
t.Fatalf("no range was returned for room %s", roomID)
|
||||
}
|
||||
if len(gotRanges) != len(wantRanges) {
|
||||
t.Fatalf("%s range count mismatch, got %d ranges, want %d :: GOT=%+v WANT=%+v", roomID, len(gotRanges), len(wantRanges), gotRanges, wantRanges)
|
||||
}
|
||||
for i := range gotRanges {
|
||||
if !reflect.DeepEqual(gotRanges[i], wantRanges[i]) {
|
||||
t.Errorf("%s range at index %d got %v want %v", roomID, i, gotRanges[i], wantRanges[i])
|
||||
}
|
||||
if !reflect.DeepEqual(gotRange, wantRange) {
|
||||
t.Errorf("%s range got %v want %v", roomID, gotRange, wantRange)
|
||||
}
|
||||
}
|
||||
|
@ -6,11 +6,13 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/complement/b"
|
||||
"github.com/matrix-org/complement/client"
|
||||
"github.com/matrix-org/complement/must"
|
||||
"github.com/matrix-org/sliding-sync/sync3"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
@ -43,6 +45,19 @@ type CSAPI struct {
|
||||
AvatarURL string
|
||||
}
|
||||
|
||||
// TODO: put this in Complement at some point? Check usage.
|
||||
func (c *CSAPI) Scrollback(t *testing.T, roomID, prevBatch string, limit int) gjson.Result {
|
||||
t.Helper()
|
||||
res := c.MustDo(t, "GET", []string{
|
||||
"_matrix", "client", "v3", "rooms", roomID, "messages",
|
||||
}, client.WithQueries(url.Values{
|
||||
"dir": []string{"b"},
|
||||
"from": []string{prevBatch},
|
||||
"limit": []string{strconv.Itoa(limit)},
|
||||
}))
|
||||
return must.ParseJSON(t, res.Body)
|
||||
}
|
||||
|
||||
// SlidingSync performs a single sliding sync request. Fails on non 2xx
|
||||
func (c *CSAPI) SlidingSync(t *testing.T, data sync3.Request, opts ...client.RequestOpt) (resBody *sync3.Response) {
|
||||
t.Helper()
|
||||
|
@ -1,9 +1,11 @@
|
||||
package syncv3_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/complement/b"
|
||||
"github.com/matrix-org/complement/must"
|
||||
"github.com/matrix-org/sliding-sync/sync3"
|
||||
"github.com/matrix-org/sliding-sync/testutils/m"
|
||||
)
|
||||
@ -12,8 +14,7 @@ import (
|
||||
// - Alice invite Bob
|
||||
// - Alice send message
|
||||
// - Bob join room
|
||||
// The proxy returns either:
|
||||
// - all 3 events (if allowed by history visibility) OR
|
||||
// The proxy returns:
|
||||
// - Bob's join only, with a suitable prev_batch token
|
||||
// and never:
|
||||
// - invite then join, omitting the msg.
|
||||
@ -61,8 +62,53 @@ func TestTimelineIsCorrectWhenTransitioningFromInviteToJoin(t *testing.T) {
|
||||
aliceRes = alice.SlidingSyncUntilEventID(t, aliceRes.Pos, roomID, eventID)
|
||||
|
||||
bob.MustJoinRoom(t, roomID, []string{"hs1"})
|
||||
aliceRes = alice.SlidingSyncUntilMembership(t, aliceRes.Pos, roomID, bob, "join")
|
||||
alice.SlidingSyncUntilMembership(t, aliceRes.Pos, roomID, bob, "join")
|
||||
|
||||
bobRes = bob.SlidingSync(t, sync3.Request{}, WithPos(bobRes.Pos))
|
||||
m.MatchResponse(t, bobRes, m.LogResponse(t))
|
||||
m.MatchResponse(t, bobRes, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
|
||||
roomID: {
|
||||
// only the join event, this specifically does a length check
|
||||
MatchRoomTimeline([]Event{
|
||||
{
|
||||
Type: "m.room.member",
|
||||
StateKey: &bob.UserID,
|
||||
Content: map[string]interface{}{
|
||||
"membership": "join",
|
||||
"displayname": bob.Localpart,
|
||||
},
|
||||
},
|
||||
}),
|
||||
},
|
||||
}))
|
||||
// pull out the prev batch token and use it to make sure we see the correct timeline
|
||||
prevBatch := bobRes.Rooms[roomID].PrevBatch
|
||||
must.NotEqual(t, prevBatch, "", "missing prev_batch")
|
||||
|
||||
scrollback := bob.Scrollback(t, roomID, prevBatch, 2)
|
||||
// we should only see the message and our invite, in that order
|
||||
chunk := scrollback.Get("chunk").Array()
|
||||
var sbEvents []json.RawMessage
|
||||
for _, e := range chunk {
|
||||
sbEvents = append(sbEvents, json.RawMessage(e.Raw))
|
||||
}
|
||||
must.Equal(t, len(chunk), 2, "chunk length mismatch")
|
||||
must.NotError(t, "chunk mismatch", eventsEqual([]Event{
|
||||
{
|
||||
Type: "m.room.message",
|
||||
Sender: alice.UserID,
|
||||
Content: map[string]interface{}{
|
||||
"msgtype": "m.text",
|
||||
"body": "After invite, before join",
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "m.room.member",
|
||||
Sender: alice.UserID,
|
||||
StateKey: &bob.UserID,
|
||||
Content: map[string]interface{}{
|
||||
"membership": "invite",
|
||||
"displayname": bob.Localpart,
|
||||
},
|
||||
},
|
||||
}, sbEvents))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user