mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
perf: cache the prev batch tokens for each room with an LRU cache
- Replace `PrevBatch string` in user room data with `PrevBatches lru.Cache`. This allows us to persist prev batch tokens in-memory rather than doing N sequential DB lookups which would take ~4s for ~150 rooms on the postgres instance running the database. The tokens are keyed off a tuple of the event ID being searched and the latest event in the room, to allow prev batches to be assigned when new sync v2 responses arrive. - Thread through context to complex storage functions for profiling
This commit is contained in:
parent
3a5323d90f
commit
5339dc8ce3
1
go.mod
1
go.mod
@ -5,6 +5,7 @@ go 1.14
|
||||
require (
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/jmoiron/sqlx v1.3.3
|
||||
github.com/lib/pq v1.10.1
|
||||
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 // indirect
|
||||
|
2
go.sum
2
go.sum
@ -15,6 +15,8 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
|
||||
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk=
|
||||
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
|
@ -1,9 +1,11 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime/trace"
|
||||
"strings"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
@ -288,7 +290,8 @@ func (s *Storage) Initialise(roomID string, state []json.RawMessage) (bool, erro
|
||||
// Look up room state after the given event position and no further. eventTypesToStateKeys is a map of event type to a list of state keys for that event type.
|
||||
// If the list of state keys is empty then all events matching that event type will be returned. If the map is empty entirely, then all room state
|
||||
// will be returned.
|
||||
func (s *Storage) RoomStateAfterEventPosition(roomIDs []string, pos int64, eventTypesToStateKeys map[string][]string) (roomToEvents map[string][]Event, err error) {
|
||||
func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []string, pos int64, eventTypesToStateKeys map[string][]string) (roomToEvents map[string][]Event, err error) {
|
||||
defer trace.StartRegion(ctx, "RoomStateAfterEventPosition").End()
|
||||
roomToEvents = make(map[string][]Event, len(roomIDs))
|
||||
roomIndex := make(map[string]int, len(roomIDs))
|
||||
err = sqlutil.WithTransaction(s.accumulator.db, func(txn *sqlx.Tx) error {
|
||||
|
@ -2,6 +2,7 @@ package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
@ -16,6 +17,7 @@ import (
|
||||
)
|
||||
|
||||
func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := NewStorage(postgresConnectionString)
|
||||
roomID := "!TestStorageRoomStateAfterEventPosition:localhost"
|
||||
alice := "@alice:localhost"
|
||||
@ -39,7 +41,7 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
|
||||
{
|
||||
name: "room state after the latest position includes the invite event",
|
||||
getEvents: func() []Event {
|
||||
events, err := store.RoomStateAfterEventPosition([]string{roomID}, latest, nil)
|
||||
events, err := store.RoomStateAfterEventPosition(ctx, []string{roomID}, latest, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("RoomStateAfterEventPosition: %s", err)
|
||||
}
|
||||
@ -50,7 +52,7 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
|
||||
{
|
||||
name: "room state after the latest position filtered for join_rule returns a single event",
|
||||
getEvents: func() []Event {
|
||||
events, err := store.RoomStateAfterEventPosition([]string{roomID}, latest, map[string][]string{"m.room.join_rules": nil})
|
||||
events, err := store.RoomStateAfterEventPosition(ctx, []string{roomID}, latest, map[string][]string{"m.room.join_rules": nil})
|
||||
if err != nil {
|
||||
t.Fatalf("RoomStateAfterEventPosition: %s", err)
|
||||
}
|
||||
@ -63,7 +65,7 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
|
||||
{
|
||||
name: "room state after the latest position filtered for join_rule and create event excludes member events",
|
||||
getEvents: func() []Event {
|
||||
events, err := store.RoomStateAfterEventPosition([]string{roomID}, latest, map[string][]string{
|
||||
events, err := store.RoomStateAfterEventPosition(ctx, []string{roomID}, latest, map[string][]string{
|
||||
"m.room.join_rules": []string{""},
|
||||
"m.room.create": nil, // all matching state events with this event type
|
||||
})
|
||||
@ -79,7 +81,7 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
|
||||
{
|
||||
name: "room state after the latest position filtered for all members returns all member events",
|
||||
getEvents: func() []Event {
|
||||
events, err := store.RoomStateAfterEventPosition([]string{roomID}, latest, map[string][]string{
|
||||
events, err := store.RoomStateAfterEventPosition(ctx, []string{roomID}, latest, map[string][]string{
|
||||
"m.room.member": nil, // all matching state events with this event type
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package caches
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"sort"
|
||||
@ -111,7 +112,7 @@ func (c *GlobalCache) LoadJoinedRooms(userID string) (pos int64, joinedRooms map
|
||||
}
|
||||
|
||||
// TODO: remove? Doesn't touch global cache fields
|
||||
func (c *GlobalCache) LoadRoomState(roomIDs []string, loadPosition int64, requiredState [][2]string) map[string][]json.RawMessage {
|
||||
func (c *GlobalCache) LoadRoomState(ctx context.Context, roomIDs []string, loadPosition int64, requiredState [][2]string) map[string][]json.RawMessage {
|
||||
if len(requiredState) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -146,7 +147,7 @@ func (c *GlobalCache) LoadRoomState(roomIDs []string, loadPosition int64, requir
|
||||
}
|
||||
}
|
||||
resultMap := make(map[string][]json.RawMessage, len(roomIDs))
|
||||
roomIDToStateEvents, err := c.store.RoomStateAfterEventPosition(roomIDs, loadPosition, queryStateMap)
|
||||
roomIDToStateEvents, err := c.store.RoomStateAfterEventPosition(ctx, roomIDs, loadPosition, queryStateMap)
|
||||
if err != nil {
|
||||
logger.Err(err).Strs("rooms", roomIDs).Int64("pos", loadPosition).Msg("failed to load room state")
|
||||
return nil
|
||||
|
@ -2,6 +2,7 @@ package caches
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
@ -10,6 +11,7 @@ import (
|
||||
)
|
||||
|
||||
func TestGlobalCacheLoadState(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := state.NewStorage(postgresConnectionString)
|
||||
roomID := "!TestConnMapLoadState:localhost"
|
||||
roomID2 := "!another:localhost"
|
||||
@ -156,7 +158,7 @@ func TestGlobalCacheLoadState(t *testing.T) {
|
||||
for r := range tc.wantEvents {
|
||||
roomIDs = append(roomIDs, r)
|
||||
}
|
||||
gotMap := globalCache.LoadRoomState(roomIDs, latest, tc.requiredState)
|
||||
gotMap := globalCache.LoadRoomState(ctx, roomIDs, latest, tc.requiredState)
|
||||
for _, roomID := range roomIDs {
|
||||
got := gotMap[roomID]
|
||||
wantEvents := tc.wantEvents[roomID]
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/matrix-org/sync-v3/internal"
|
||||
"github.com/matrix-org/sync-v3/state"
|
||||
"github.com/matrix-org/sync-v3/sync2"
|
||||
@ -20,9 +21,45 @@ type UserRoomData struct {
|
||||
IsInvite bool
|
||||
NotificationCount int
|
||||
HighlightCount int
|
||||
PrevBatch string
|
||||
Timeline []json.RawMessage
|
||||
Invite *InviteData
|
||||
// (event_id, last_event_id) -> closest prev_batch
|
||||
// We mux in last_event_id so we can invalidate prev batch tokens for the same event ID when a new timeline event
|
||||
// comes in, without having to do a SQL query.
|
||||
PrevBatches *lru.Cache
|
||||
Timeline []json.RawMessage
|
||||
Invite *InviteData
|
||||
}
|
||||
|
||||
func NewUserRoomData() UserRoomData {
|
||||
l, _ := lru.New(64) // 64 tokens least recently used evicted
|
||||
return UserRoomData{
|
||||
PrevBatches: l,
|
||||
}
|
||||
}
|
||||
|
||||
// fetch the prev batch for this timeline
|
||||
func (u UserRoomData) PrevBatch() (string, bool) {
|
||||
if len(u.Timeline) == 0 {
|
||||
return "", false
|
||||
}
|
||||
eventID := gjson.GetBytes(u.Timeline[0], "event_id").Str
|
||||
lastEventID := gjson.GetBytes(u.Timeline[len(u.Timeline)-1], "event_id").Str
|
||||
val, ok := u.PrevBatches.Get(eventID + lastEventID)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
return val.(string), true
|
||||
}
|
||||
|
||||
// set the prev batch token for the given event ID. This should come from the database. The prev batch
|
||||
// cache will be updated to return this prev batch token for this event ID as well as the latest event
|
||||
// ID in this timeline.
|
||||
func (u UserRoomData) SetPrevBatch(eventID string, pb string) {
|
||||
if len(u.Timeline) == 0 {
|
||||
return
|
||||
}
|
||||
lastEventID := gjson.GetBytes(u.Timeline[len(u.Timeline)-1], "event_id").Str
|
||||
u.PrevBatches.Add(eventID+lastEventID, pb)
|
||||
u.PrevBatches.Add(eventID+eventID, pb)
|
||||
}
|
||||
|
||||
// Subset of data from internal.RoomMetadata which we can glean from invite_state.
|
||||
@ -181,27 +218,28 @@ func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
var prevBatch string
|
||||
|
||||
// either we satisfied their request or we can't get any more events, either way that's good enough
|
||||
if len(timeline) == maxTimelineEvents || createEventExists {
|
||||
if !createEventExists && len(timeline) > 0 {
|
||||
// fetch a prev batch token for the earliest event
|
||||
eventID := gjson.ParseBytes(timeline[0]).Get("event_id").Str
|
||||
prevBatch, err = c.store.EventsTable.SelectClosestPrevBatchByID(roomID, eventID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", roomID).Str("event_id", eventID).Msg("failed to get prev batch token for room")
|
||||
_, ok := urd.PrevBatch()
|
||||
if !ok {
|
||||
eventID := gjson.ParseBytes(timeline[0]).Get("event_id").Str
|
||||
prevBatch, err := c.store.EventsTable.SelectClosestPrevBatchByID(roomID, eventID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", roomID).Str("event_id", eventID).Msg("failed to get prev batch token for room")
|
||||
}
|
||||
urd.SetPrevBatch(eventID, prevBatch)
|
||||
}
|
||||
}
|
||||
|
||||
// we already have data, use it
|
||||
result[roomID] = UserRoomData{
|
||||
NotificationCount: urd.NotificationCount,
|
||||
HighlightCount: urd.HighlightCount,
|
||||
Timeline: timeline,
|
||||
PrevBatch: prevBatch,
|
||||
}
|
||||
u := NewUserRoomData()
|
||||
u.NotificationCount = urd.NotificationCount
|
||||
u.HighlightCount = urd.HighlightCount
|
||||
u.Timeline = timeline
|
||||
u.PrevBatches = urd.PrevBatches
|
||||
result[roomID] = u
|
||||
} else {
|
||||
// refetch from the db
|
||||
lazyRoomIDs = append(lazyRoomIDs, roomID)
|
||||
@ -225,10 +263,13 @@ func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli
|
||||
for roomID, events := range roomIDToEvents {
|
||||
urd, ok := c.roomToData[roomID]
|
||||
if !ok {
|
||||
urd = UserRoomData{}
|
||||
urd = NewUserRoomData()
|
||||
}
|
||||
urd.Timeline = events
|
||||
urd.PrevBatch = roomIDToPrevBatch[roomID]
|
||||
if len(events) > 0 {
|
||||
eventID := gjson.ParseBytes(events[0]).Get("event_id").Str
|
||||
urd.SetPrevBatch(eventID, roomIDToPrevBatch[roomID])
|
||||
}
|
||||
|
||||
result[roomID] = urd
|
||||
c.roomToData[roomID] = urd
|
||||
@ -242,7 +283,7 @@ func (c *UserCache) LoadRoomData(roomID string) UserRoomData {
|
||||
defer c.roomToDataMu.RUnlock()
|
||||
data, ok := c.roomToData[roomID]
|
||||
if !ok {
|
||||
return UserRoomData{}
|
||||
return NewUserRoomData()
|
||||
}
|
||||
return data
|
||||
}
|
||||
@ -460,9 +501,9 @@ func (c *UserCache) OnAccountData(datas []state.AccountData) {
|
||||
}
|
||||
// remaining stuff in dmRoomSet are new rooms the cache is unaware of
|
||||
for dmRoomID := range dmRoomSet {
|
||||
c.roomToData[dmRoomID] = UserRoomData{
|
||||
IsDM: true,
|
||||
}
|
||||
u := NewUserRoomData()
|
||||
u.IsDM = true
|
||||
c.roomToData[dmRoomID] = u
|
||||
}
|
||||
c.roomToDataMu.Unlock()
|
||||
}
|
||||
|
@ -297,7 +297,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, listIndex int, pr
|
||||
List: listIndex,
|
||||
Operation: sync3.OpSync,
|
||||
Range: r[:],
|
||||
Rooms: s.getInitialRoomData(listIndex, int(nextReqList.TimelineLimit), roomIDs...),
|
||||
Rooms: s.getInitialRoomData(ctx, listIndex, int(nextReqList.TimelineLimit), roomIDs...),
|
||||
})
|
||||
}
|
||||
|
||||
@ -325,7 +325,7 @@ func (s *ConnState) updateRoomSubscriptions(ctx context.Context, timelineLimit i
|
||||
if sub.TimelineLimit > 0 {
|
||||
timelineLimit = int(sub.TimelineLimit)
|
||||
}
|
||||
rooms := s.getInitialRoomData(-1, timelineLimit, roomID)
|
||||
rooms := s.getInitialRoomData(ctx, -1, timelineLimit, roomID)
|
||||
result[roomID] = rooms[0]
|
||||
}
|
||||
for _, roomID := range unsubs {
|
||||
@ -349,16 +349,19 @@ func (s *ConnState) getDeltaRoomData(roomID string, event json.RawMessage) *sync
|
||||
return room
|
||||
}
|
||||
|
||||
func (s *ConnState) getInitialRoomData(listIndex int, timelineLimit int, roomIDs ...string) []sync3.Room {
|
||||
func (s *ConnState) getInitialRoomData(ctx context.Context, listIndex int, timelineLimit int, roomIDs ...string) []sync3.Room {
|
||||
rooms := make([]sync3.Room, len(roomIDs))
|
||||
// We want to grab the user room data and the room metadata for each room ID.
|
||||
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(s.loadPosition, roomIDs, timelineLimit)
|
||||
roomMetadatas := s.globalCache.LoadRooms(roomIDs...)
|
||||
// FIXME: required_state needs to be honoured!
|
||||
roomIDToState := s.globalCache.LoadRoomState(roomIDs, s.loadPosition, s.muxedReq.GetRequiredState(listIndex, roomIDs[0]))
|
||||
roomIDToState := s.globalCache.LoadRoomState(ctx, roomIDs, s.loadPosition, s.muxedReq.GetRequiredState(listIndex, roomIDs[0]))
|
||||
|
||||
for i, roomID := range roomIDs {
|
||||
userRoomData := roomIDToUserRoomData[roomID]
|
||||
userRoomData, ok := roomIDToUserRoomData[roomID]
|
||||
if !ok {
|
||||
userRoomData = caches.NewUserRoomData()
|
||||
}
|
||||
metadata := roomMetadatas[roomID]
|
||||
var inviteState []json.RawMessage
|
||||
// handle invites specially as we do not want to leak additional data beyond the invite_state and if
|
||||
@ -380,6 +383,7 @@ func (s *ConnState) getInitialRoomData(listIndex int, timelineLimit int, roomIDs
|
||||
if !userRoomData.IsInvite {
|
||||
requiredState = roomIDToState[roomID]
|
||||
}
|
||||
prevBatch, _ := userRoomData.PrevBatch()
|
||||
rooms[i] = sync3.Room{
|
||||
RoomID: roomID,
|
||||
Name: internal.CalculateRoomName(metadata, 5), // TODO: customisable?
|
||||
@ -390,7 +394,7 @@ func (s *ConnState) getInitialRoomData(listIndex int, timelineLimit int, roomIDs
|
||||
InviteState: inviteState,
|
||||
Initial: true,
|
||||
IsDM: userRoomData.IsDM,
|
||||
PrevBatch: userRoomData.PrevBatch,
|
||||
PrevBatch: prevBatch,
|
||||
}
|
||||
}
|
||||
return rooms
|
||||
|
@ -82,14 +82,14 @@ func (s *connStateLive) liveUpdate(
|
||||
return responseOperations
|
||||
case update := <-s.updates:
|
||||
trace.Logf(ctx, "liveUpdate", "process live update")
|
||||
responseOperations = s.processLiveUpdate(update, responseOperations, response)
|
||||
responseOperations = s.processLiveUpdate(ctx, update, responseOperations, response)
|
||||
updateWillReturnResponse := len(responseOperations) > 0 || len(response.RoomSubscriptions) > 0
|
||||
// pass event to extensions AFTER processing
|
||||
s.extensionsHandler.HandleLiveUpdate(update, ex, &response.Extensions, updateWillReturnResponse, isInitial)
|
||||
// if there's more updates and we don't have lots stacked up already, go ahead and process another
|
||||
for len(s.updates) > 0 && len(responseOperations) < 50 {
|
||||
update = <-s.updates
|
||||
responseOperations = s.processLiveUpdate(update, responseOperations, response)
|
||||
responseOperations = s.processLiveUpdate(ctx, update, responseOperations, response)
|
||||
s.extensionsHandler.HandleLiveUpdate(update, ex, &response.Extensions, updateWillReturnResponse, isInitial)
|
||||
}
|
||||
}
|
||||
@ -99,7 +99,7 @@ func (s *connStateLive) liveUpdate(
|
||||
return responseOperations
|
||||
}
|
||||
|
||||
func (s *connStateLive) processLiveUpdate(up caches.Update, responseOperations []sync3.ResponseOp, response *sync3.Response) []sync3.ResponseOp {
|
||||
func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, responseOperations []sync3.ResponseOp, response *sync3.Response) []sync3.ResponseOp {
|
||||
roomUpdate, ok := up.(caches.RoomUpdate)
|
||||
if ok {
|
||||
// always update our view of the world
|
||||
@ -121,14 +121,14 @@ func (s *connStateLive) processLiveUpdate(up caches.Update, responseOperations [
|
||||
switch update := up.(type) {
|
||||
case *caches.RoomEventUpdate:
|
||||
logger.Trace().Str("user", s.userID).Str("type", update.EventData.EventType).Msg("received event update")
|
||||
subs, ops := s.processIncomingEvent(update)
|
||||
subs, ops := s.processIncomingEvent(ctx, update)
|
||||
responseOperations = append(responseOperations, ops...)
|
||||
for _, sub := range subs {
|
||||
response.RoomSubscriptions[sub.RoomID] = sub
|
||||
}
|
||||
case *caches.UnreadCountUpdate:
|
||||
logger.Trace().Str("user", s.userID).Str("room", update.RoomID()).Msg("received unread count update")
|
||||
subs, ops := s.processUnreadCountUpdate(update)
|
||||
subs, ops := s.processUnreadCountUpdate(ctx, update)
|
||||
responseOperations = append(responseOperations, ops...)
|
||||
for _, sub := range subs {
|
||||
response.RoomSubscriptions[sub.RoomID] = sub
|
||||
@ -156,7 +156,7 @@ func (s *connStateLive) processLiveUpdate(up caches.Update, responseOperations [
|
||||
RoomUpdate: update.RoomUpdate,
|
||||
EventData: update.InviteData.InviteEvent,
|
||||
}
|
||||
subs, ops := s.processIncomingEvent(roomUpdate)
|
||||
subs, ops := s.processIncomingEvent(ctx, roomUpdate)
|
||||
responseOperations = append(responseOperations, ops...)
|
||||
for _, sub := range subs {
|
||||
response.RoomSubscriptions[sub.RoomID] = sub
|
||||
@ -167,7 +167,7 @@ func (s *connStateLive) processLiveUpdate(up caches.Update, responseOperations [
|
||||
return responseOperations
|
||||
}
|
||||
|
||||
func (s *connStateLive) processUnreadCountUpdate(up *caches.UnreadCountUpdate) ([]sync3.Room, []sync3.ResponseOp) {
|
||||
func (s *connStateLive) processUnreadCountUpdate(ctx context.Context, up *caches.UnreadCountUpdate) ([]sync3.Room, []sync3.ResponseOp) {
|
||||
if !up.HasCountDecreased {
|
||||
// if the count increases then we'll notify the user for the event which increases the count, hence
|
||||
// do nothing. We only care to notify the user when the counts decrease.
|
||||
@ -181,14 +181,14 @@ func (s *connStateLive) processUnreadCountUpdate(up *caches.UnreadCountUpdate) (
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
roomSubs, ops := s.resort(index, &s.muxedReq.Lists[index], list, up.RoomID(), fromIndex, nil, false, false)
|
||||
roomSubs, ops := s.resort(ctx, index, &s.muxedReq.Lists[index], list, up.RoomID(), fromIndex, nil, false, false)
|
||||
rooms = append(rooms, roomSubs...)
|
||||
responseOperations = append(responseOperations, ops...)
|
||||
})
|
||||
return rooms, responseOperations
|
||||
}
|
||||
|
||||
func (s *connStateLive) processIncomingEvent(update *caches.RoomEventUpdate) ([]sync3.Room, []sync3.ResponseOp) {
|
||||
func (s *connStateLive) processIncomingEvent(ctx context.Context, update *caches.RoomEventUpdate) ([]sync3.Room, []sync3.ResponseOp) {
|
||||
var responseOperations []sync3.ResponseOp
|
||||
var rooms []sync3.Room
|
||||
|
||||
@ -219,7 +219,7 @@ func (s *connStateLive) processIncomingEvent(update *caches.RoomEventUpdate) ([]
|
||||
logger.Info().Str("room", update.RoomID()).Msg("room added")
|
||||
newlyAdded = true
|
||||
}
|
||||
roomSubs, ops := s.resort(index, &s.muxedReq.Lists[index], list, update.RoomID(), fromIndex, update.EventData.Event, newlyAdded, update.EventData.ForceInitial)
|
||||
roomSubs, ops := s.resort(ctx, index, &s.muxedReq.Lists[index], list, update.RoomID(), fromIndex, update.EventData.Event, newlyAdded, update.EventData.ForceInitial)
|
||||
rooms = append(rooms, roomSubs...)
|
||||
responseOperations = append(responseOperations, ops...)
|
||||
})
|
||||
@ -228,6 +228,7 @@ func (s *connStateLive) processIncomingEvent(update *caches.RoomEventUpdate) ([]
|
||||
|
||||
// Resort should be called after a specific room has been modified in `sortedJoinedRooms`.
|
||||
func (s *connStateLive) resort(
|
||||
ctx context.Context,
|
||||
listIndex int, reqList *sync3.RequestList, roomList *sync3.FilteredSortableRooms, roomID string,
|
||||
fromIndex int, newEvent json.RawMessage, newlyAdded, forceInitial bool,
|
||||
) ([]sync3.Room, []sync3.ResponseOp) {
|
||||
@ -297,7 +298,7 @@ func (s *connStateLive) resort(
|
||||
}
|
||||
}
|
||||
|
||||
return subs, s.moveRoom(reqList, listIndex, roomID, newEvent, fromIndex, toIndex, reqList.Ranges, isSubscribedToRoom, newlyAdded, forceInitial)
|
||||
return subs, s.moveRoom(ctx, reqList, listIndex, roomID, newEvent, fromIndex, toIndex, reqList.Ranges, isSubscribedToRoom, newlyAdded, forceInitial)
|
||||
}
|
||||
|
||||
// Move a room from an absolute index position to another absolute position.
|
||||
@ -305,6 +306,7 @@ func (s *connStateLive) resort(
|
||||
// 3 bumps to top -> 3,1,2,4,5 -> DELETE index=2, INSERT val=3 index=0
|
||||
// 7 bumps to top -> 7,1,2,3,4 -> DELETE index=4, INSERT val=7 index=0
|
||||
func (s *connStateLive) moveRoom(
|
||||
ctx context.Context,
|
||||
reqList *sync3.RequestList, listIndex int, roomID string, event json.RawMessage, fromIndex, toIndex int,
|
||||
ranges sync3.SliceRanges, onlySendRoomID, newlyAdded, forceInitial bool,
|
||||
) []sync3.ResponseOp {
|
||||
@ -314,7 +316,7 @@ func (s *connStateLive) moveRoom(
|
||||
RoomID: roomID,
|
||||
}
|
||||
if newlyAdded || forceInitial {
|
||||
rooms := s.getInitialRoomData(listIndex, int(reqList.TimelineLimit), roomID)
|
||||
rooms := s.getInitialRoomData(ctx, listIndex, int(reqList.TimelineLimit), roomID)
|
||||
room = &rooms[0]
|
||||
} else if !onlySendRoomID {
|
||||
room = s.getDeltaRoomData(roomID, event)
|
||||
@ -346,7 +348,7 @@ func (s *connStateLive) moveRoom(
|
||||
RoomID: roomID,
|
||||
}
|
||||
if !onlySendRoomID {
|
||||
rooms := s.getInitialRoomData(listIndex, int(reqList.TimelineLimit), roomID)
|
||||
rooms := s.getInitialRoomData(ctx, listIndex, int(reqList.TimelineLimit), roomID)
|
||||
room = &rooms[0]
|
||||
}
|
||||
|
||||
|
@ -49,11 +49,9 @@ func newRoomMetadata(roomID string, lastMsgTimestamp gomatrixserverlib.Timestamp
|
||||
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
|
||||
result := make(map[string]caches.UserRoomData)
|
||||
for _, roomID := range roomIDs {
|
||||
result[roomID] = caches.UserRoomData{
|
||||
Timeline: []json.RawMessage{
|
||||
[]byte(`{}`),
|
||||
},
|
||||
}
|
||||
u := caches.NewUserRoomData()
|
||||
u.Timeline = []json.RawMessage{[]byte(`{}`)}
|
||||
result[roomID] = u
|
||||
}
|
||||
return result
|
||||
}
|
||||
@ -101,9 +99,9 @@ func TestConnStateInitial(t *testing.T) {
|
||||
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
|
||||
result := make(map[string]caches.UserRoomData)
|
||||
for _, roomID := range roomIDs {
|
||||
result[roomID] = caches.UserRoomData{
|
||||
Timeline: []json.RawMessage{timeline[roomID]},
|
||||
}
|
||||
u := caches.NewUserRoomData()
|
||||
u.Timeline = []json.RawMessage{timeline[roomID]}
|
||||
result[roomID] = u
|
||||
}
|
||||
return result
|
||||
}
|
||||
@ -546,11 +544,9 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
|
||||
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
|
||||
result := make(map[string]caches.UserRoomData)
|
||||
for _, roomID := range roomIDs {
|
||||
result[roomID] = caches.UserRoomData{
|
||||
Timeline: []json.RawMessage{
|
||||
timeline[roomID],
|
||||
},
|
||||
}
|
||||
u := caches.NewUserRoomData()
|
||||
u.Timeline = []json.RawMessage{timeline[roomID]}
|
||||
result[roomID] = u
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user