Return the most recent timeline events for each room

TODO: the global cache isn't being kept updated so live
streamed events don't load (though they sort correctly)
This commit is contained in:
Kegan Dougal 2021-10-22 18:18:02 +01:00
parent 488c638e7b
commit d7913c8e26
8 changed files with 167 additions and 36 deletions

View File

@ -336,6 +336,7 @@ const doSyncLoop = async(accessToken, sessionId) => {
// if this is the first request on this session, send sticky request data which never changes
if (!currentPos) {
reqBody.required_state = requiredStateEventsInList;
reqBody.timeline_limit = 20;
}
// check if we are (un)subscribing to a room and modify request this one time for it
let subscribingToRoom;

View File

@ -240,6 +240,14 @@ func (t *EventTable) SelectEventsBetween(txn *sqlx.Tx, roomID string, lowerExclu
return events, err
}
func (t *EventTable) SelectLatestEventsBetween(txn *sqlx.Tx, roomID string, lowerExclusive, upperInclusive int64, limit int) ([]Event, error) {
var events []Event
err := txn.Select(&events, `SELECT event_nid, event FROM syncv3_events WHERE event_nid > $1 AND event_nid <= $2 AND room_id = $3 ORDER BY event_nid DESC LIMIT $4`,
lowerExclusive, upperInclusive, roomID, limit,
)
return events, err
}
func (t *EventTable) SelectLatestEventInRoom(txn *sqlx.Tx, roomID string, upperInclusive int64) (*Event, error) {
var event Event
err := txn.Get(&event, `SELECT event_nid, event, event_type, state_key, event_id, room_id FROM syncv3_events

View File

@ -203,7 +203,42 @@ func (s *Storage) RoomStateBeforeEventPosition(roomID string, pos int64) (events
return
}
func (s *Storage) VisibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][][2]int64, error) {
func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string][]json.RawMessage, error) {
roomIDToRanges, err := s.visibleEventNIDsBetweenForRooms(userID, roomIDs, 0, to)
if err != nil {
return nil, err
}
result := make(map[string][]json.RawMessage)
err = sqlutil.WithTransaction(s.accumulator.db, func(txn *sqlx.Tx) error {
for roomID, ranges := range roomIDToRanges {
var roomEvents []json.RawMessage
// start at the most recent range as we want to return the most recent `limit` events
for i := len(ranges) - 1; i >= 0; i-- {
if len(roomEvents) >= limit {
break
}
r := ranges[i]
// the most recent event will be first
events, err := s.EventsTable.SelectLatestEventsBetween(txn, roomID, r[0]-1, r[1], limit)
if err != nil {
return fmt.Errorf("room %s failed to SelectEventsBetween: %s", roomID, err)
}
// keep pushing to the front so we end up with A,B,C
for _, ev := range events {
roomEvents = append([]json.RawMessage{ev.JSON}, roomEvents...)
if len(roomEvents) >= limit {
break
}
}
}
result[roomID] = roomEvents
}
return nil
})
return result, err
}
func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][][2]int64, error) {
// load *THESE* joined rooms for this user at from (inclusive)
var membershipEvents []Event
var err error

View File

@ -289,7 +289,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
}
// check that we can query subsets too
roomIDToVisibleRangesSubset, err := store.VisibleEventNIDsBetweenForRooms(alice, []string{roomA, roomB}, startPos, latestPos)
roomIDToVisibleRangesSubset, err := store.visibleEventNIDsBetweenForRooms(alice, []string{roomA, roomB}, startPos, latestPos)
if err != nil {
t.Fatalf("VisibleEventNIDsBetweenForRooms to %d: %s", latestPos, err)
}

View File

@ -3,7 +3,6 @@ package sync3
import (
"context"
"encoding/json"
"fmt"
"reflect"
"sort"
"time"
@ -162,15 +161,15 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
sr := SliceRanges([][2]int64{r})
subslice := sr.SliceInto(s.sortedJoinedRooms)
rooms := subslice[0].(SortableRooms)
roomsResponse := make([]Room, len(rooms))
roomIDs := make([]string, len(rooms))
for i := range rooms {
roomData := s.getInitialRoomData(rooms[i].RoomID)
roomsResponse[i] = *roomData
roomIDs[i] = rooms[i].RoomID
}
responseOperations = append(responseOperations, &ResponseOpRange{
Operation: "SYNC",
Range: r[:],
Rooms: roomsResponse,
Rooms: s.getInitialRoomData(roomIDs...),
})
}
// do live tracking if we haven't changed the range and we have nothing to tell the client yet
@ -278,8 +277,8 @@ func (s *ConnState) updateRoomSubscriptions(subs, unsubs []string) map[string]Ro
}
s.roomSubscriptions[roomID] = sub
// send initial room information
room := s.getInitialRoomData(roomID)
result[roomID] = *room
rooms := s.getInitialRoomData(roomID)
result[roomID] = rooms[0]
}
for _, roomID := range unsubs {
delete(s.roomSubscriptions, roomID)
@ -302,20 +301,26 @@ func (s *ConnState) getDeltaRoomData(updateEvent *EventData) *Room {
return room
}
func (s *ConnState) getInitialRoomData(roomID string) *Room {
r := s.globalCache.LoadRoom(roomID) // TODO: does this race?
userRoomData := s.userCache.loadRoomData(roomID)
return &Room{
RoomID: roomID,
Name: r.Name,
NotificationCount: int64(userRoomData.NotificationCount),
HighlightCount: int64(userRoomData.HighlightCount),
// TODO: timeline limits
Timeline: []json.RawMessage{
r.LastEventJSON,
},
RequiredState: s.globalCache.LoadRoomState(roomID, s.loadPosition, s.muxedReq.GetRequiredState(roomID)),
func (s *ConnState) getInitialRoomData(roomIDs ...string) []Room {
sortableRooms := make([]SortableRoom, len(roomIDs))
for i := range roomIDs {
// TODO: the timestamp here can race with loading the timeline events (timeline may be newer), is this a problem?
sortableRooms[i] = *s.globalCache.LoadRoom(roomIDs[i])
}
roomIDToUserRoomData := s.userCache.lazilyLoadRoomDatas(s.loadPosition, roomIDs, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
rooms := make([]Room, len(roomIDs))
for i, sr := range sortableRooms {
userRoomData := roomIDToUserRoomData[sr.RoomID]
rooms[i] = Room{
RoomID: sr.RoomID,
Name: sr.Name,
NotificationCount: int64(userRoomData.NotificationCount),
HighlightCount: int64(userRoomData.HighlightCount),
Timeline: userRoomData.Timeline,
RequiredState: s.globalCache.LoadRoomState(sr.RoomID, s.loadPosition, s.muxedReq.GetRequiredState(sr.RoomID)),
}
}
return rooms
}
// Called when the global cache has a new event. This callback fires when the server gets a new event and determines this connection MAY be
@ -323,7 +328,6 @@ func (s *ConnState) getInitialRoomData(roomID string) *Room {
// from different v2 poll loops, and there is no locking in order to prevent a slow ConnState from wedging the poll loop.
// We need to move this data onto a channel for onIncomingRequest to consume later.
func (s *ConnState) OnNewEvent(joinedUserIDs []string, eventData *EventData) {
fmt.Println("procccc")
targetUser := ""
if eventData.eventType == "m.room.member" && eventData.stateKey != nil {
targetUser = *eventData.stateKey
@ -404,7 +408,8 @@ func (s *ConnState) moveRoom(updateEvent *EventData, fromIndex, toIndex int, ran
RoomID: updateEvent.roomID,
}
if !onlySendRoomID {
room = s.getInitialRoomData(updateEvent.roomID)
rooms := s.getInitialRoomData(updateEvent.roomID)
room = &rooms[0]
}
return []ResponseOp{
&ResponseOpSingle{

View File

@ -23,6 +23,18 @@ func newSortableRoom(roomID string, lastMsgTimestamp int64) SortableRoom {
}
}
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
for _, roomID := range roomIDs {
result[roomID] = UserRoomData{
Timeline: []json.RawMessage{
[]byte(`{}`),
},
}
}
return result
}
// Sync an account with 3 rooms and check that we can grab all rooms and they are sorted correctly initially. Checks
// that basic UPDATE and DELETE/INSERT works when tracking all rooms.
func TestConnStateInitial(t *testing.T) {
@ -48,7 +60,19 @@ func TestConnStateInitial(t *testing.T) {
roomA, roomB, roomC,
}, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache)
userCache := NewUserCache(userID, nil)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
for _, roomID := range roomIDs {
result[roomID] = UserRoomData{
Timeline: []json.RawMessage{
globalCache.LoadRoom(roomID).LastEventJSON,
},
}
}
return result
}
cs := NewConnState(userID, userCache, globalCache)
if userID != cs.UserID() {
t.Fatalf("UserID returned wrong value, got %v want %v", cs.UserID(), userID)
}
@ -177,7 +201,9 @@ func TestConnStateMultipleRanges(t *testing.T) {
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []SortableRoom, err error) {
return 1, rooms, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache)
userCache := NewUserCache(userID, nil)
userCache.LazyRoomDataOverride = mockLazyRoomOverride
cs := NewConnState(userID, userCache, globalCache)
// request first page
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
@ -339,7 +365,9 @@ func TestBumpToOutsideRange(t *testing.T) {
roomA, roomB, roomC, roomD,
}, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache)
userCache := NewUserCache(userID, nil)
userCache.LazyRoomDataOverride = mockLazyRoomOverride
cs := NewConnState(userID, userCache, globalCache)
// Ask for A,B
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
@ -417,7 +445,19 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
roomA, roomB, roomC, roomD,
}, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache)
userCache := NewUserCache(userID, nil)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
for _, roomID := range roomIDs {
result[roomID] = UserRoomData{
Timeline: []json.RawMessage{
globalCache.LoadRoom(roomID).LastEventJSON,
},
}
}
return result
}
cs := NewConnState(userID, userCache, globalCache)
// subscribe to room D
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},

View File

@ -251,7 +251,7 @@ func (h *SyncLiveHandler) userCache(userID string) (*UserCache, error) {
if ok {
return c.(*UserCache), nil
}
uc := NewUserCache(userID)
uc := NewUserCache(userID, h.Storage)
// select all non-zero highlight or notif counts and set them, as this is less costly than looping every room/user pair
err := h.Storage.UnreadTable.SelectAllNonZeroCountsForUser(userID, func(roomID string, highlightCount, notificationCount int) {
uc.OnUnreadCounts(roomID, &highlightCount, &notificationCount)

View File

@ -1,12 +1,16 @@
package sync3
import (
"encoding/json"
"sync"
"github.com/matrix-org/sync-v3/state"
)
type UserRoomData struct {
NotificationCount int
HighlightCount int
Timeline []json.RawMessage
}
type UserCacheListener interface {
@ -14,21 +18,24 @@ type UserCacheListener interface {
}
type UserCache struct {
userID string
roomToData map[string]UserRoomData
roomToDataMu *sync.RWMutex
listeners map[int]UserCacheListener
listenersMu *sync.Mutex
id int
LazyRoomDataOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData
userID string
roomToData map[string]UserRoomData
roomToDataMu *sync.RWMutex
listeners map[int]UserCacheListener
listenersMu *sync.Mutex
id int
store *state.Storage
}
func NewUserCache(userID string) *UserCache {
func NewUserCache(userID string, store *state.Storage) *UserCache {
return &UserCache{
userID: userID,
roomToDataMu: &sync.RWMutex{},
roomToData: make(map[string]UserRoomData),
listeners: make(map[int]UserCacheListener),
listenersMu: &sync.Mutex{},
store: store,
}
}
@ -47,6 +54,41 @@ func (c *UserCache) Unsubscribe(id int) {
delete(c.listeners, id)
}
func (c *UserCache) lazilyLoadRoomDatas(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
if c.LazyRoomDataOverride != nil {
return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents)
}
result := make(map[string]UserRoomData)
var lazyRoomIDs []string
for _, roomID := range roomIDs {
urd := c.loadRoomData(roomID)
if len(urd.Timeline) > 0 {
// we already have data, use it
result[roomID] = urd
} else {
lazyRoomIDs = append(lazyRoomIDs, roomID)
}
}
if len(lazyRoomIDs) == 0 {
return result
}
roomIDToEvents, err := c.store.LatestEventsInRooms(c.userID, lazyRoomIDs, loadPos, maxTimelineEvents)
if err != nil {
logger.Err(err).Strs("rooms", lazyRoomIDs).Msg("failed to get LatestEventsInRooms")
return nil
}
c.roomToDataMu.Lock()
for roomID, events := range roomIDToEvents {
urd := UserRoomData{
Timeline: events,
}
result[roomID] = urd
c.roomToData[roomID] = urd
}
c.roomToDataMu.Unlock()
return result
}
func (c *UserCache) loadRoomData(roomID string) UserRoomData {
c.roomToDataMu.RLock()
defer c.roomToDataMu.RUnlock()