bugfix: pass events through usercache so it can return full timelines on INSERT

We lazily load timelines for rooms as the client fetches them. If a previously
lazily loaded timeine goes out of the window then back in it results in a
DELETE/INSERT. We would detect that we already have a timeline for this room
and just return that, however that timeline was stale. We now keep this timeline
in sync with live events.

With regression test.
This commit is contained in:
Kegan Dougal 2021-10-26 18:01:57 +01:00
parent 7cc6cce668
commit 2285a7bef9
6 changed files with 62 additions and 94 deletions

View File

@ -1,57 +0,0 @@
package syncv3
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/matrix-org/sync-v3/sync2"
"github.com/matrix-org/sync-v3/sync3"
"github.com/matrix-org/sync-v3/testutils"
)
func TestInteg(t *testing.T) {
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, "")
defer v2.close()
defer v3.close()
alice := "@TestInteg_alice:localhost"
aliceToken := "ALICE_BEARER_TOKEN_TestInteg"
roomA := "!a_TestInteg:localhost"
v2.addAccount(alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomA: {
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{
testutils.NewStateEvent(t, "m.room.create", "", alice, map[string]interface{}{"creator": alice}),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewStateEvent(t, "m.room.join_rules", "", alice, map[string]interface{}{"join_rule": "public"}),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hello world"}, time.Now()),
},
},
},
},
},
})
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Rooms: sync3.SliceRanges{
[2]int64{0, 9}, // first 10 rooms
},
})
MatchResponse(t, res, MatchV3Count(1), MatchV3Ops(
MatchV3SyncOp(func(op *sync3.ResponseOpRange) error {
if len(op.Rooms) != 1 {
return fmt.Errorf("want 1 room, got %d", len(op.Rooms))
}
room := op.Rooms[0]
if room.RoomID != roomA {
return fmt.Errorf("want room id %s got %s", roomA, room.RoomID)
}
return nil
}),
))
}

View File

@ -29,10 +29,9 @@ type ConnState struct {
// saying the client is ded and cleaning up the conn.
updateEvents chan *EventData
globalCache *GlobalCache
globalCacheID int
userCache *UserCache
userCacheID int
globalCache *GlobalCache
userCache *UserCache
userCacheID int
}
func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache) *ConnState {
@ -44,7 +43,6 @@ func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache)
sortedJoinedRoomsPositions: make(map[string]int),
updateEvents: make(chan *EventData, MaxPendingEventUpdates), // TODO: customisable
}
cs.globalCacheID = globalCache.Subsribe(cs)
cs.userCacheID = cs.userCache.Subsribe(cs)
return cs
}
@ -323,30 +321,11 @@ func (s *ConnState) getInitialRoomData(roomIDs ...string) []Room {
return rooms
}
// Called when the global cache has a new event. This callback fires when the server gets a new event and determines this connection MAY be
// Called when the user cache has a new event for us. This callback fires when the server gets a new event and determines this connection MAY be
// interested in it (e.g the client is joined to the room or it's an invite, etc). Each callback can fire
// from different v2 poll loops, and there is no locking in order to prevent a slow ConnState from wedging the poll loop.
// We need to move this data onto a channel for onIncomingRequest to consume later.
func (s *ConnState) OnNewEvent(joinedUserIDs []string, eventData *EventData) {
targetUser := ""
if eventData.eventType == "m.room.member" && eventData.stateKey != nil {
targetUser = *eventData.stateKey
}
isInterested := targetUser == s.userID // e.g invites
for _, userID := range joinedUserIDs {
if s.userID == userID {
isInterested = true
break
}
}
if !isInterested {
return
}
s.pushData(eventData)
}
func (s *ConnState) pushData(eventData *EventData) {
func (s *ConnState) OnNewEvent(eventData *EventData) {
// TODO: remove 0 check when Initialise state returns sensible positions
if eventData.latestPos != 0 && eventData.latestPos < s.loadPosition {
// do not push this event down the stream as we have already processed it when we loaded
@ -365,7 +344,6 @@ func (s *ConnState) pushData(eventData *EventData) {
// Called when the connection is torn down
func (s *ConnState) Destroy() {
s.globalCache.Unsubscribe(s.globalCacheID)
s.userCache.Unsubscribe(s.userCacheID)
}
@ -431,7 +409,7 @@ func (s *ConnState) OnUnreadCountsChanged(userID, roomID string, urd UserRoomDat
return
}
room := s.globalCache.LoadRoom(roomID)
s.pushData(&EventData{
s.OnNewEvent(&EventData{
roomID: roomID,
userRoomData: &urd,
timestamp: room.LastMessageTimestamp,

View File

@ -61,7 +61,7 @@ func TestConnStateInitial(t *testing.T) {
roomA, roomB, roomC,
}, nil
}
userCache := NewUserCache(userID, nil)
userCache := NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
for _, roomID := range roomIDs {
@ -202,7 +202,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []SortableRoom, err error) {
return 1, rooms, nil
}
userCache := NewUserCache(userID, nil)
userCache := NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = mockLazyRoomOverride
cs := NewConnState(userID, userCache, globalCache)
@ -366,7 +366,7 @@ func TestBumpToOutsideRange(t *testing.T) {
roomA, roomB, roomC, roomD,
}, nil
}
userCache := NewUserCache(userID, nil)
userCache := NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = mockLazyRoomOverride
cs := NewConnState(userID, userCache, globalCache)
// Ask for A,B
@ -446,7 +446,7 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
roomA, roomB, roomC, roomD,
}, nil
}
userCache := NewUserCache(userID, nil)
userCache := NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
for _, roomID := range roomIDs {

View File

@ -251,7 +251,7 @@ func (h *SyncLiveHandler) userCache(userID string) (*UserCache, error) {
if ok {
return c.(*UserCache), nil
}
uc := NewUserCache(userID, h.Storage)
uc := NewUserCache(userID, h.globalCache, h.Storage)
// select all non-zero highlight or notif counts and set them, as this is less costly than looping every room/user pair
err := h.Storage.UnreadTable.SelectAllNonZeroCountsForUser(userID, func(roomID string, highlightCount, notificationCount int) {
uc.OnUnreadCounts(roomID, &highlightCount, &notificationCount)

View File

@ -14,6 +14,7 @@ type UserRoomData struct {
}
type UserCacheListener interface {
OnNewEvent(event *EventData)
OnUnreadCountsChanged(userID, roomID string, urd UserRoomData, hasCountDecreased bool)
}
@ -26,17 +27,22 @@ type UserCache struct {
listenersMu *sync.Mutex
id int
store *state.Storage
globalCache *GlobalCache
globalCacheID int
}
func NewUserCache(userID string, store *state.Storage) *UserCache {
return &UserCache{
func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage) *UserCache {
uc := &UserCache{
userID: userID,
roomToDataMu: &sync.RWMutex{},
roomToData: make(map[string]UserRoomData),
listeners: make(map[int]UserCacheListener),
listenersMu: &sync.Mutex{},
store: store,
globalCache: globalCache,
}
uc.globalCacheID = globalCache.Subsribe(uc)
return uc
}
func (c *UserCache) Subsribe(ucl UserCacheListener) (id int) {
@ -63,8 +69,16 @@ func (c *UserCache) lazilyLoadRoomDatas(loadPos int64, roomIDs []string, maxTime
for _, roomID := range roomIDs {
urd := c.loadRoomData(roomID)
if len(urd.Timeline) > 0 {
timeline := urd.Timeline
if len(timeline) > maxTimelineEvents {
timeline = timeline[len(timeline)-maxTimelineEvents:]
}
// we already have data, use it
result[roomID] = urd
result[roomID] = UserRoomData{
NotificationCount: urd.NotificationCount,
HighlightCount: urd.HighlightCount,
Timeline: timeline,
}
} else {
lazyRoomIDs = append(lazyRoomIDs, roomID)
}
@ -123,3 +137,33 @@ func (c *UserCache) OnUnreadCounts(roomID string, highlightCount, notifCount *in
l.OnUnreadCountsChanged(c.userID, roomID, data, hasCountDecreased)
}
}
func (c *UserCache) OnNewEvent(joinedUsers []string, eventData *EventData) {
targetUser := ""
if eventData.eventType == "m.room.member" && eventData.stateKey != nil {
targetUser = *eventData.stateKey
}
isInterested := targetUser == c.userID // e.g invites
for _, userID := range joinedUsers {
if c.userID == userID {
isInterested = true
break
}
}
if !isInterested {
return
}
// add this to our tracked timelines if we have one
urd := c.loadRoomData(eventData.roomID)
if len(urd.Timeline) > 0 {
// we're tracking timelines, add this message too
urd.Timeline = append(urd.Timeline, eventData.event)
}
c.roomToDataMu.Lock()
c.roomToData[eventData.roomID] = urd
c.roomToDataMu.Unlock()
for _, l := range c.listeners {
l.OnNewEvent(eventData)
}
}

View File

@ -10,7 +10,10 @@ import (
func createLocalDB(dbName string) string {
fmt.Println("Note: tests require a postgres install accessible to the current user")
exec.Command("dropdb", dbName).Run()
if err := exec.Command("createdb", dbName).Run(); err != nil {
createDB := exec.Command("createdb", dbName)
createDB.Stdout = os.Stdout
createDB.Stderr = os.Stderr
if err := createDB.Run(); err != nil {
fmt.Println("createdb failed: ", err)
os.Exit(2)
}