mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Merge pull request #324 from matrix-org/dmr/userroomdata
This commit is contained in:
commit
79ee561c89
@ -42,10 +42,6 @@ type UserRoomData struct {
|
||||
HighlightCount int
|
||||
Invite *InviteData
|
||||
|
||||
// this field is set by LazyLoadTimelines and is per-function call, and is not persisted in-memory.
|
||||
// The zero value of this safe to use (0 latest nid, no prev batch, no timeline).
|
||||
RequestedLatestEvents state.LatestEvents
|
||||
|
||||
// TODO: should CanonicalisedName really be in RoomConMetadata? It's only set in SetRoom AFAICS
|
||||
CanonicalisedName string // stripped leading symbols like #, all in lower case
|
||||
// Set of spaces this room is a part of, from the perspective of this user. This is NOT global room data
|
||||
@ -181,18 +177,18 @@ type UserCacheListener interface {
|
||||
// Tracks data specific to a given user. Specifically, this is the map of room ID to UserRoomData.
|
||||
// This data is user-scoped, not global or connection scoped.
|
||||
type UserCache struct {
|
||||
LazyRoomDataOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData
|
||||
UserID string
|
||||
roomToData map[string]UserRoomData
|
||||
roomToDataMu *sync.RWMutex
|
||||
listeners map[int]UserCacheListener
|
||||
listenersMu *sync.RWMutex
|
||||
id int
|
||||
store *state.Storage
|
||||
globalCache *GlobalCache
|
||||
txnIDs TransactionIDFetcher
|
||||
ignoredUsers map[string]struct{}
|
||||
ignoredUsersMu *sync.RWMutex
|
||||
LazyLoadTimelinesOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents
|
||||
UserID string
|
||||
roomToData map[string]UserRoomData
|
||||
roomToDataMu *sync.RWMutex
|
||||
listeners map[int]UserCacheListener
|
||||
listenersMu *sync.RWMutex
|
||||
id int
|
||||
store *state.Storage
|
||||
globalCache *GlobalCache
|
||||
txnIDs TransactionIDFetcher
|
||||
ignoredUsers map[string]struct{}
|
||||
ignoredUsersMu *sync.RWMutex
|
||||
}
|
||||
|
||||
func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage, txnIDs TransactionIDFetcher) *UserCache {
|
||||
@ -306,34 +302,29 @@ func (c *UserCache) OnRegistered(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load timelines from the database. Uses cached UserRoomData for metadata purposes only.
|
||||
func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
|
||||
// LazyLoadTimelines loads up to `maxTimelineEvents` from the database, plus other
|
||||
// timeline-related data. Events from senders ignored by this user are dropped.
|
||||
// Returns nil on error.
|
||||
func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
|
||||
_, span := internal.StartSpan(ctx, "LazyLoadTimelines")
|
||||
defer span.End()
|
||||
if c.LazyRoomDataOverride != nil {
|
||||
return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents)
|
||||
if c.LazyLoadTimelinesOverride != nil {
|
||||
return c.LazyLoadTimelinesOverride(loadPos, roomIDs, maxTimelineEvents)
|
||||
}
|
||||
result := make(map[string]UserRoomData)
|
||||
result := make(map[string]state.LatestEvents)
|
||||
roomIDToLatestEvents, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents)
|
||||
if err != nil {
|
||||
logger.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
c.roomToDataMu.Lock()
|
||||
for _, requestedRoomID := range roomIDs {
|
||||
latestEvents := roomIDToLatestEvents[requestedRoomID]
|
||||
urd, ok := c.roomToData[requestedRoomID]
|
||||
if !ok {
|
||||
urd = NewUserRoomData()
|
||||
}
|
||||
if latestEvents != nil {
|
||||
latestEvents.DiscardIgnoredMessages(c.ShouldIgnore)
|
||||
urd.RequestedLatestEvents = *latestEvents
|
||||
result[requestedRoomID] = *latestEvents
|
||||
}
|
||||
result[requestedRoomID] = urd
|
||||
}
|
||||
c.roomToDataMu.Unlock()
|
||||
return result
|
||||
}
|
||||
|
||||
@ -347,6 +338,21 @@ func (c *UserCache) LoadRoomData(roomID string) UserRoomData {
|
||||
return data
|
||||
}
|
||||
|
||||
// LoadRooms is a batch version of LoadRoomData. Returns a map keyed by roomID.
|
||||
func (c *UserCache) LoadRooms(roomIDs ...string) map[string]UserRoomData {
|
||||
result := make(map[string]UserRoomData, len(roomIDs))
|
||||
c.roomToDataMu.RLock()
|
||||
defer c.roomToDataMu.RUnlock()
|
||||
for _, roomID := range roomIDs {
|
||||
data, ok := c.roomToData[roomID]
|
||||
if !ok {
|
||||
data = NewUserRoomData()
|
||||
}
|
||||
result[roomID] = data
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
type roomUpdateCache struct {
|
||||
roomID string
|
||||
// globalRoomData is a snapshot of the global metadata for this room immediately
|
||||
|
@ -554,31 +554,28 @@ func (s *ConnState) lazyLoadTypingMembers(ctx context.Context, response *sync3.R
|
||||
func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSubscription, bumpEventTypes []string, roomIDs ...string) map[string]sync3.Room {
|
||||
ctx, span := internal.StartSpan(ctx, "getInitialRoomData")
|
||||
defer span.End()
|
||||
rooms := make(map[string]sync3.Room, len(roomIDs))
|
||||
|
||||
// 0. Load room metadata and timelines.
|
||||
// We want to grab the user room data and the room metadata for each room ID. We use the globally
|
||||
// highest NID we've seen to act as an anchor for the request. This anchor does not guarantee that
|
||||
// events returned here have already been seen - the position is not globally ordered - so because
|
||||
// room A has a position of 6 and B has 7 (so the highest is 7) does not mean that this connection
|
||||
// has seen 6, as concurrent room updates cause A and B to race. This is why we then go through the
|
||||
// response to this call to assign new load positions for each room.
|
||||
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit))
|
||||
roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...)
|
||||
// prepare lazy loading data structures, txn IDs
|
||||
roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData))
|
||||
userRoomDatas := s.userCache.LoadRooms(roomIDs...)
|
||||
timelines := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit))
|
||||
|
||||
// 1. Prepare lazy loading data structures, txn IDs.
|
||||
roomToUsersInTimeline := make(map[string][]string, len(timelines))
|
||||
roomToTimeline := make(map[string][]json.RawMessage)
|
||||
for roomID, urd := range roomIDToUserRoomData {
|
||||
set := make(map[string]struct{})
|
||||
for _, ev := range urd.RequestedLatestEvents.Timeline {
|
||||
set[gjson.GetBytes(ev, "sender").Str] = struct{}{}
|
||||
for roomID, latestEvents := range timelines {
|
||||
senders := make(map[string]struct{})
|
||||
for _, ev := range latestEvents.Timeline {
|
||||
senders[gjson.GetBytes(ev, "sender").Str] = struct{}{}
|
||||
}
|
||||
userIDs := make([]string, len(set))
|
||||
i := 0
|
||||
for userID := range set {
|
||||
userIDs[i] = userID
|
||||
i++
|
||||
}
|
||||
roomToUsersInTimeline[roomID] = userIDs
|
||||
roomToTimeline[roomID] = urd.RequestedLatestEvents.Timeline
|
||||
roomToUsersInTimeline[roomID] = keys(senders)
|
||||
roomToTimeline[roomID] = latestEvents.Timeline
|
||||
// remember what we just loaded so if we see these events down the live stream we know to ignore them.
|
||||
// This means that requesting a direct room subscription causes the connection to jump ahead to whatever
|
||||
// is in the database at the time of the call, rather than gradually converging by consuming live data.
|
||||
@ -586,10 +583,17 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
|
||||
// room state is also pinned to the load position here, else you could see weird things in individual
|
||||
// responses such as an updated room.name without the associated m.room.name event (though this will
|
||||
// come through on the next request -> it converges to the right state so it isn't critical).
|
||||
s.loadPositions[roomID] = urd.RequestedLatestEvents.LatestNID
|
||||
s.loadPositions[roomID] = latestEvents.LatestNID
|
||||
}
|
||||
roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, roomToTimeline)
|
||||
|
||||
// 2. Load required state events.
|
||||
rsm := roomSub.RequiredStateMap(s.userID)
|
||||
if rsm.IsLazyLoading() {
|
||||
for roomID, userIDs := range roomToUsersInTimeline {
|
||||
s.lazyCache.Add(roomID, userIDs...)
|
||||
}
|
||||
}
|
||||
|
||||
internal.Logf(ctx, "connstate", "getInitialRoomData for %d rooms, RequiredStateMap: %#v", len(roomIDs), rsm)
|
||||
|
||||
@ -597,7 +601,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
|
||||
// since we'll be using the invite_state only.
|
||||
loadRoomIDs := make([]string, 0, len(roomIDs))
|
||||
for _, roomID := range roomIDs {
|
||||
userRoomData, ok := roomIDToUserRoomData[roomID]
|
||||
userRoomData, ok := userRoomDatas[roomID]
|
||||
if !ok || !userRoomData.IsInvite {
|
||||
loadRoomIDs = append(loadRoomIDs, roomID)
|
||||
}
|
||||
@ -610,8 +614,11 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
|
||||
if roomIDToState == nil { // e.g no required_state
|
||||
roomIDToState = make(map[string][]json.RawMessage)
|
||||
}
|
||||
|
||||
// 3. Build sync3.Room structs to return to clients.
|
||||
rooms := make(map[string]sync3.Room, len(roomIDs))
|
||||
for _, roomID := range roomIDs {
|
||||
userRoomData, ok := roomIDToUserRoomData[roomID]
|
||||
userRoomData, ok := userRoomDatas[roomID]
|
||||
if !ok {
|
||||
userRoomData = caches.NewUserRoomData()
|
||||
}
|
||||
@ -677,7 +684,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
|
||||
IsDM: userRoomData.IsDM,
|
||||
JoinedCount: metadata.JoinCount,
|
||||
InvitedCount: &metadata.InviteCount,
|
||||
PrevBatch: userRoomData.RequestedLatestEvents.PrevBatch,
|
||||
PrevBatch: timelines[roomID].PrevBatch,
|
||||
Timestamp: maxTs,
|
||||
}
|
||||
if roomSub.IncludeHeroes() && calculated {
|
||||
@ -686,11 +693,6 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
|
||||
rooms[roomID] = room
|
||||
}
|
||||
|
||||
if rsm.IsLazyLoading() {
|
||||
for roomID, userIDs := range roomToUsersInTimeline {
|
||||
s.lazyCache.Add(roomID, userIDs...)
|
||||
}
|
||||
}
|
||||
return rooms
|
||||
}
|
||||
|
||||
@ -782,7 +784,7 @@ func clampSliceRangeToListSize(ctx context.Context, r [2]int64, totalRooms int64
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a slice containing copies of the keys of the given map, in no particular
|
||||
// keys returns a slice containing copies of the keys of the given map, in no particular
|
||||
// order.
|
||||
func keys[K comparable, V any](m map[K]V) []K {
|
||||
if m == nil {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/matrix-org/sliding-sync/state"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@ -44,12 +45,12 @@ func newRoomMetadata(roomID string, lastMsgTimestamp spec.Timestamp) internal.Ro
|
||||
return *m
|
||||
}
|
||||
|
||||
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
|
||||
result := make(map[string]caches.UserRoomData)
|
||||
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
|
||||
result := make(map[string]state.LatestEvents)
|
||||
for _, roomID := range roomIDs {
|
||||
u := caches.NewUserRoomData()
|
||||
u.RequestedLatestEvents.Timeline = []json.RawMessage{[]byte(`{}`)}
|
||||
result[roomID] = u
|
||||
result[roomID] = state.LatestEvents{
|
||||
Timeline: []json.RawMessage{[]byte(`{}`)},
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
@ -98,12 +99,12 @@ func TestConnStateInitial(t *testing.T) {
|
||||
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
|
||||
dispatcher.Register(context.Background(), userCache.UserID, userCache)
|
||||
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
|
||||
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
|
||||
result := make(map[string]caches.UserRoomData)
|
||||
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
|
||||
result := make(map[string]state.LatestEvents)
|
||||
for _, roomID := range roomIDs {
|
||||
u := caches.NewUserRoomData()
|
||||
u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]}
|
||||
result[roomID] = u
|
||||
result[roomID] = state.LatestEvents{
|
||||
Timeline: []json.RawMessage{timeline[roomID]},
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
@ -269,7 +270,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
|
||||
return 1, roomMetadata, joinTimings, nil, nil
|
||||
}
|
||||
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
|
||||
userCache.LazyRoomDataOverride = mockLazyRoomOverride
|
||||
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
|
||||
dispatcher.Register(context.Background(), userCache.UserID, userCache)
|
||||
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
|
||||
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
|
||||
@ -448,7 +449,7 @@ func TestBumpToOutsideRange(t *testing.T) {
|
||||
|
||||
}
|
||||
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
|
||||
userCache.LazyRoomDataOverride = mockLazyRoomOverride
|
||||
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
|
||||
dispatcher.Register(context.Background(), userCache.UserID, userCache)
|
||||
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
|
||||
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
|
||||
@ -551,12 +552,12 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
|
||||
}, nil, nil
|
||||
}
|
||||
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
|
||||
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
|
||||
result := make(map[string]caches.UserRoomData)
|
||||
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
|
||||
result := make(map[string]state.LatestEvents)
|
||||
for _, roomID := range roomIDs {
|
||||
u := caches.NewUserRoomData()
|
||||
u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]}
|
||||
result[roomID] = u
|
||||
result[roomID] = state.LatestEvents{
|
||||
Timeline: []json.RawMessage{timeline[roomID]},
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user