diff --git a/sync3/caches/global.go b/sync3/caches/global.go index 75fdf72..7393c52 100644 --- a/sync3/caches/global.go +++ b/sync3/caches/global.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/getsentry/sentry-go" "os" "sort" "sync" @@ -74,14 +73,14 @@ func NewGlobalCache(store *state.Storage) *GlobalCache { } } -func (c *GlobalCache) OnRegistered(_ int64) error { +func (c *GlobalCache) OnRegistered(_ context.Context, _ int64) error { return nil } // Load the current room metadata for the given room IDs. Races unless you call this in a dispatcher loop. // Always returns copies of the room metadata so ownership can be passed to other threads. // Keeps the ordering of the room IDs given. -func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMetadata { +func (c *GlobalCache) LoadRooms(ctx context.Context, roomIDs ...string) map[string]*internal.RoomMetadata { c.roomIDToMetadataMu.RLock() defer c.roomIDToMetadataMu.RUnlock() result := make(map[string]*internal.RoomMetadata, len(roomIDs)) @@ -91,7 +90,7 @@ func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMeta if sr == nil { const errMsg = "GlobalCache.LoadRoom: no metadata for this room" logger.Error().Str("room", roomID).Msg(errMsg) - sentry.CaptureException(fmt.Errorf(errMsg)) + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(fmt.Errorf(errMsg)) continue } srCopy := *sr @@ -107,7 +106,7 @@ func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMeta // Load all current joined room metadata for the user given. Returns the absolute database position along // with the results. TODO: remove with LoadRoomState? -func (c *GlobalCache) LoadJoinedRooms(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) { +func (c *GlobalCache) LoadJoinedRooms(ctx context.Context, userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) { if c.LoadJoinedRoomsOverride != nil { return c.LoadJoinedRoomsOverride(userID) } @@ -120,7 +119,7 @@ func (c *GlobalCache) LoadJoinedRooms(userID string) (pos int64, joinedRooms map return 0, nil, err } // TODO: no guarantee that this state is the same as latest unless called in a dispatcher loop - rooms := c.LoadRooms(joinedRoomIDs...) + rooms := c.LoadRooms(ctx, joinedRoomIDs...) return initialLoadPosition, rooms, nil } diff --git a/sync3/caches/user.go b/sync3/caches/user.go index e408820..24a8915 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -213,10 +213,10 @@ func (c *UserCache) Unsubscribe(id int) { delete(c.listeners, id) } -func (c *UserCache) OnRegistered(_ int64) error { +func (c *UserCache) OnRegistered(ctx context.Context, _ int64) error { // select all spaces the user is a part of to seed the cache correctly. This has to be done in // the OnRegistered callback which has locking guarantees. This is why... - latestPos, joinedRooms, err := c.globalCache.LoadJoinedRooms(c.UserID) + latestPos, joinedRooms, err := c.globalCache.LoadJoinedRooms(ctx, c.UserID) if err != nil { return fmt.Errorf("failed to load joined rooms: %s", err) } @@ -393,7 +393,7 @@ func (c *roomUpdateCache) UserRoomMetadata() *UserRoomData { func (c *UserCache) newRoomUpdate(ctx context.Context, roomID string) RoomUpdate { u := c.LoadRoomData(roomID) var r *internal.RoomMetadata - globalRooms := c.globalCache.LoadRooms(roomID) + globalRooms := c.globalCache.LoadRooms(ctx, roomID) if globalRooms == nil || globalRooms[roomID] == nil { // this can happen when we join a room we didn't know about because we process unread counts // before the timeline events. Warn and send a stub diff --git a/sync3/dispatcher.go b/sync3/dispatcher.go index a1e25f6..b06af11 100644 --- a/sync3/dispatcher.go +++ b/sync3/dispatcher.go @@ -23,7 +23,7 @@ type Receiver interface { OnNewEvent(ctx context.Context, event *caches.EventData) OnReceipt(ctx context.Context, receipt internal.Receipt) OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage) - OnRegistered(latestPos int64) error + OnRegistered(ctx context.Context, latestPos int64) error } // Dispatches live events to caches @@ -61,14 +61,14 @@ func (d *Dispatcher) Unregister(userID string) { delete(d.userToReceiver, userID) } -func (d *Dispatcher) Register(userID string, r Receiver) error { +func (d *Dispatcher) Register(ctx context.Context, userID string, r Receiver) error { d.userToReceiverMu.Lock() defer d.userToReceiverMu.Unlock() if _, ok := d.userToReceiver[userID]; ok { logger.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered") } d.userToReceiver[userID] = r - return r.OnRegistered(d.latestPos) + return r.OnRegistered(ctx, d.latestPos) } func (d *Dispatcher) newEventData(event json.RawMessage, roomID string, latestPos int64) *caches.EventData { diff --git a/sync3/extensions/typing.go b/sync3/extensions/typing.go index d735264..286f277 100644 --- a/sync3/extensions/typing.go +++ b/sync3/extensions/typing.go @@ -72,7 +72,7 @@ func (r *TypingRequest) ProcessInitial(ctx context.Context, res *Response, extCt for roomID := range extCtx.RoomIDToTimeline { roomIDs = append(roomIDs, roomID) } - roomToGlobalMetadata := extCtx.GlobalCache.LoadRooms(roomIDs...) + roomToGlobalMetadata := extCtx.GlobalCache.LoadRooms(ctx, roomIDs...) for roomID := range extCtx.RoomIDToTimeline { meta := roomToGlobalMetadata[roomID] if meta == nil || meta.TypingEvent == nil { diff --git a/sync3/handler/connstate.go b/sync3/handler/connstate.go index 608299e..8fc9052 100644 --- a/sync3/handler/connstate.go +++ b/sync3/handler/connstate.go @@ -83,8 +83,8 @@ func NewConnState( // N events arrive and get buffered. // - load() bases its current state based on the latest position, which includes processing of these N events. // - post load() we read N events, processing them a 2nd time. -func (s *ConnState) load() error { - initialLoadPosition, joinedRooms, err := s.globalCache.LoadJoinedRooms(s.userID) +func (s *ConnState) load(ctx context.Context) error { + initialLoadPosition, joinedRooms, err := s.globalCache.LoadJoinedRooms(ctx, s.userID) if err != nil { return err } @@ -120,7 +120,7 @@ func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req if s.loadPosition == -1 { // load() needs no ctx so drop it _, region := internal.StartSpan(ctx, "load") - s.load() + s.load(ctx) region.End() } return s.onIncomingRequest(ctx, req, isInitial) @@ -252,7 +252,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui roomList, _ = s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.Overwrite) } // resort as either we changed the sort order or we added/removed a bunch of rooms - if err := roomList.Sort(ctx, nextReqList.Sort); err != nil { + if err := roomList.Sort(nextReqList.Sort); err != nil { logger.Err(err).Str("key", listKey).Msg("cannot sort list") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } @@ -429,7 +429,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu rooms := make(map[string]sync3.Room, len(roomIDs)) // We want to grab the user room data and the room metadata for each room ID. roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.loadPosition, roomIDs, int(roomSub.TimelineLimit)) - roomMetadatas := s.globalCache.LoadRooms(roomIDs...) + roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...) // prepare lazy loading data structures, txn IDs roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData)) roomToTimeline := make(map[string][]json.RawMessage) diff --git a/sync3/handler/handler.go b/sync3/handler/handler.go index 9fccad6..98c7ede 100644 --- a/sync3/handler/handler.go +++ b/sync3/handler/handler.go @@ -100,7 +100,7 @@ func (h *SyncLiveHandler) Startup(storeSnapshot *state.StartupSnapshot) error { if err := h.Dispatcher.Startup(storeSnapshot.AllJoinedMembers); err != nil { return fmt.Errorf("failed to load sync3.Dispatcher: %s", err) } - h.Dispatcher.Register(sync3.DispatcherAllUsers, h.GlobalCache) + h.Dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, h.GlobalCache) if err := h.GlobalCache.Startup(storeSnapshot.GlobalMetadata); err != nil { return fmt.Errorf("failed to populate global cache: %s", err) } @@ -433,7 +433,7 @@ func (h *SyncLiveHandler) userCache(userID string) (*caches.UserCache, error) { actualUC, loaded := h.userCaches.LoadOrStore(userID, uc) uc = actualUC.(*caches.UserCache) if !loaded { // we actually inserted the cache, so register with the dispatcher. - if err = h.Dispatcher.Register(userID, uc); err != nil { + if err = h.Dispatcher.Register(context.Background(), userID, uc); err != nil { h.Dispatcher.Unregister(userID) h.userCaches.Delete(userID) return nil, fmt.Errorf("failed to register user cache with dispatcher: %s", err) @@ -630,7 +630,7 @@ func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt) { func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing) { ctx, task := internal.StartTask(context.Background(), "OnTyping") defer task.End() - rooms := h.GlobalCache.LoadRooms(p.RoomID) + rooms := h.GlobalCache.LoadRooms(ctx, p.RoomID) if rooms[p.RoomID] != nil { if reflect.DeepEqual(p.EphemeralEvent, rooms[p.RoomID].TypingEvent) { return // it's a duplicate, which happens when 2+ users are in the same room diff --git a/sync3/lists.go b/sync3/lists.go index 84969ca..83928f0 100644 --- a/sync3/lists.go +++ b/sync3/lists.go @@ -192,7 +192,7 @@ func (s *InternalRequestLists) AssignList(ctx context.Context, listKey string, f roomList := NewFilteredSortableRooms(s, roomIDs, filters) if sort != nil { - err := roomList.Sort(ctx, sort) + err := roomList.Sort(sort) if err != nil { logger.Err(err).Strs("sort_by", sort).Msg("failed to sort") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) diff --git a/sync3/ops.go b/sync3/ops.go index 3847b9f..9f05119 100644 --- a/sync3/ops.go +++ b/sync3/ops.go @@ -83,7 +83,7 @@ func CalculateListOps(ctx context.Context, reqList *RequestList, list List, room listFromIndex := listFromTo[0] listToIndex := listFromTo[1] wasUpdatedRoomInserted := listToIndex == toIndex - toRoomID := list.Get(ctx, listToIndex) + toRoomID := list.Get(listToIndex) if toRoomID == roomID && listFromIndex == listToIndex && listOp == ListOpChange && wasInsideRange && len(listFromTos) == 1 { // DELETE/INSERT have the same index, we're INSERTing the room that was updated, it was a Change not Add/Delete, it // was previously inside the window AND there's just 1 move operation = it's moving to and from the same index so diff --git a/sync3/sort.go b/sync3/sort.go index db14af1..4402b84 100644 --- a/sync3/sort.go +++ b/sync3/sort.go @@ -1,7 +1,6 @@ package sync3 import ( - "context" "fmt" "sort" @@ -52,8 +51,9 @@ func (s *SortableRooms) Add(roomID string) bool { return true } -func (s *SortableRooms) Get(ctx context.Context, index int) string { - internal.AssertWithContext(ctx, fmt.Sprintf("index is within len(rooms) %v < %v", index, len(s.roomIDs)), index < len(s.roomIDs)) +func (s *SortableRooms) Get(index int) string { + // TODO: find a way to plumb a context through + internal.Assert(fmt.Sprintf("index is within len(rooms) %v < %v", index, len(s.roomIDs)), index < len(s.roomIDs)) return s.roomIDs[index] } @@ -85,8 +85,8 @@ func (s *SortableRooms) Subslice(i, j int64) Subslicer { } } -func (s *SortableRooms) Sort(ctx context.Context, sortBy []string) error { - internal.AssertWithContext(ctx, "sortBy is not empty", len(sortBy) != 0) +func (s *SortableRooms) Sort(sortBy []string) error { + internal.Assert("sortBy is not empty", len(sortBy) != 0) comparators := []func(i, j int) int{} for _, sort := range sortBy { switch sort {