Additional context plumbing

This commit is contained in:
David Robertson 2023-04-05 15:36:05 +01:00
parent c0557f2011
commit d27dc37641
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
9 changed files with 27 additions and 28 deletions

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/getsentry/sentry-go"
"os"
"sort"
"sync"
@ -74,14 +73,14 @@ func NewGlobalCache(store *state.Storage) *GlobalCache {
}
}
func (c *GlobalCache) OnRegistered(_ int64) error {
func (c *GlobalCache) OnRegistered(_ context.Context, _ int64) error {
return nil
}
// Load the current room metadata for the given room IDs. Races unless you call this in a dispatcher loop.
// Always returns copies of the room metadata so ownership can be passed to other threads.
// Keeps the ordering of the room IDs given.
func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMetadata {
func (c *GlobalCache) LoadRooms(ctx context.Context, roomIDs ...string) map[string]*internal.RoomMetadata {
c.roomIDToMetadataMu.RLock()
defer c.roomIDToMetadataMu.RUnlock()
result := make(map[string]*internal.RoomMetadata, len(roomIDs))
@ -91,7 +90,7 @@ func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMeta
if sr == nil {
const errMsg = "GlobalCache.LoadRoom: no metadata for this room"
logger.Error().Str("room", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(fmt.Errorf(errMsg))
continue
}
srCopy := *sr
@ -107,7 +106,7 @@ func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMeta
// Load all current joined room metadata for the user given. Returns the absolute database position along
// with the results. TODO: remove with LoadRoomState?
func (c *GlobalCache) LoadJoinedRooms(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) {
func (c *GlobalCache) LoadJoinedRooms(ctx context.Context, userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) {
if c.LoadJoinedRoomsOverride != nil {
return c.LoadJoinedRoomsOverride(userID)
}
@ -120,7 +119,7 @@ func (c *GlobalCache) LoadJoinedRooms(userID string) (pos int64, joinedRooms map
return 0, nil, err
}
// TODO: no guarantee that this state is the same as latest unless called in a dispatcher loop
rooms := c.LoadRooms(joinedRoomIDs...)
rooms := c.LoadRooms(ctx, joinedRoomIDs...)
return initialLoadPosition, rooms, nil
}

View File

@ -213,10 +213,10 @@ func (c *UserCache) Unsubscribe(id int) {
delete(c.listeners, id)
}
func (c *UserCache) OnRegistered(_ int64) error {
func (c *UserCache) OnRegistered(ctx context.Context, _ int64) error {
// select all spaces the user is a part of to seed the cache correctly. This has to be done in
// the OnRegistered callback which has locking guarantees. This is why...
latestPos, joinedRooms, err := c.globalCache.LoadJoinedRooms(c.UserID)
latestPos, joinedRooms, err := c.globalCache.LoadJoinedRooms(ctx, c.UserID)
if err != nil {
return fmt.Errorf("failed to load joined rooms: %s", err)
}
@ -393,7 +393,7 @@ func (c *roomUpdateCache) UserRoomMetadata() *UserRoomData {
func (c *UserCache) newRoomUpdate(ctx context.Context, roomID string) RoomUpdate {
u := c.LoadRoomData(roomID)
var r *internal.RoomMetadata
globalRooms := c.globalCache.LoadRooms(roomID)
globalRooms := c.globalCache.LoadRooms(ctx, roomID)
if globalRooms == nil || globalRooms[roomID] == nil {
// this can happen when we join a room we didn't know about because we process unread counts
// before the timeline events. Warn and send a stub

View File

@ -23,7 +23,7 @@ type Receiver interface {
OnNewEvent(ctx context.Context, event *caches.EventData)
OnReceipt(ctx context.Context, receipt internal.Receipt)
OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
OnRegistered(latestPos int64) error
OnRegistered(ctx context.Context, latestPos int64) error
}
// Dispatches live events to caches
@ -61,14 +61,14 @@ func (d *Dispatcher) Unregister(userID string) {
delete(d.userToReceiver, userID)
}
func (d *Dispatcher) Register(userID string, r Receiver) error {
func (d *Dispatcher) Register(ctx context.Context, userID string, r Receiver) error {
d.userToReceiverMu.Lock()
defer d.userToReceiverMu.Unlock()
if _, ok := d.userToReceiver[userID]; ok {
logger.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered")
}
d.userToReceiver[userID] = r
return r.OnRegistered(d.latestPos)
return r.OnRegistered(ctx, d.latestPos)
}
func (d *Dispatcher) newEventData(event json.RawMessage, roomID string, latestPos int64) *caches.EventData {

View File

@ -72,7 +72,7 @@ func (r *TypingRequest) ProcessInitial(ctx context.Context, res *Response, extCt
for roomID := range extCtx.RoomIDToTimeline {
roomIDs = append(roomIDs, roomID)
}
roomToGlobalMetadata := extCtx.GlobalCache.LoadRooms(roomIDs...)
roomToGlobalMetadata := extCtx.GlobalCache.LoadRooms(ctx, roomIDs...)
for roomID := range extCtx.RoomIDToTimeline {
meta := roomToGlobalMetadata[roomID]
if meta == nil || meta.TypingEvent == nil {

View File

@ -83,8 +83,8 @@ func NewConnState(
// N events arrive and get buffered.
// - load() bases its current state based on the latest position, which includes processing of these N events.
// - post load() we read N events, processing them a 2nd time.
func (s *ConnState) load() error {
initialLoadPosition, joinedRooms, err := s.globalCache.LoadJoinedRooms(s.userID)
func (s *ConnState) load(ctx context.Context) error {
initialLoadPosition, joinedRooms, err := s.globalCache.LoadJoinedRooms(ctx, s.userID)
if err != nil {
return err
}
@ -120,7 +120,7 @@ func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req
if s.loadPosition == -1 {
// load() needs no ctx so drop it
_, region := internal.StartSpan(ctx, "load")
s.load()
s.load(ctx)
region.End()
}
return s.onIncomingRequest(ctx, req, isInitial)
@ -252,7 +252,7 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui
roomList, _ = s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.Overwrite)
}
// resort as either we changed the sort order or we added/removed a bunch of rooms
if err := roomList.Sort(ctx, nextReqList.Sort); err != nil {
if err := roomList.Sort(nextReqList.Sort); err != nil {
logger.Err(err).Str("key", listKey).Msg("cannot sort list")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
@ -429,7 +429,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
rooms := make(map[string]sync3.Room, len(roomIDs))
// We want to grab the user room data and the room metadata for each room ID.
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.loadPosition, roomIDs, int(roomSub.TimelineLimit))
roomMetadatas := s.globalCache.LoadRooms(roomIDs...)
roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...)
// prepare lazy loading data structures, txn IDs
roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData))
roomToTimeline := make(map[string][]json.RawMessage)

View File

@ -100,7 +100,7 @@ func (h *SyncLiveHandler) Startup(storeSnapshot *state.StartupSnapshot) error {
if err := h.Dispatcher.Startup(storeSnapshot.AllJoinedMembers); err != nil {
return fmt.Errorf("failed to load sync3.Dispatcher: %s", err)
}
h.Dispatcher.Register(sync3.DispatcherAllUsers, h.GlobalCache)
h.Dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, h.GlobalCache)
if err := h.GlobalCache.Startup(storeSnapshot.GlobalMetadata); err != nil {
return fmt.Errorf("failed to populate global cache: %s", err)
}
@ -433,7 +433,7 @@ func (h *SyncLiveHandler) userCache(userID string) (*caches.UserCache, error) {
actualUC, loaded := h.userCaches.LoadOrStore(userID, uc)
uc = actualUC.(*caches.UserCache)
if !loaded { // we actually inserted the cache, so register with the dispatcher.
if err = h.Dispatcher.Register(userID, uc); err != nil {
if err = h.Dispatcher.Register(context.Background(), userID, uc); err != nil {
h.Dispatcher.Unregister(userID)
h.userCaches.Delete(userID)
return nil, fmt.Errorf("failed to register user cache with dispatcher: %s", err)
@ -630,7 +630,7 @@ func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt) {
func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing) {
ctx, task := internal.StartTask(context.Background(), "OnTyping")
defer task.End()
rooms := h.GlobalCache.LoadRooms(p.RoomID)
rooms := h.GlobalCache.LoadRooms(ctx, p.RoomID)
if rooms[p.RoomID] != nil {
if reflect.DeepEqual(p.EphemeralEvent, rooms[p.RoomID].TypingEvent) {
return // it's a duplicate, which happens when 2+ users are in the same room

View File

@ -192,7 +192,7 @@ func (s *InternalRequestLists) AssignList(ctx context.Context, listKey string, f
roomList := NewFilteredSortableRooms(s, roomIDs, filters)
if sort != nil {
err := roomList.Sort(ctx, sort)
err := roomList.Sort(sort)
if err != nil {
logger.Err(err).Strs("sort_by", sort).Msg("failed to sort")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)

View File

@ -83,7 +83,7 @@ func CalculateListOps(ctx context.Context, reqList *RequestList, list List, room
listFromIndex := listFromTo[0]
listToIndex := listFromTo[1]
wasUpdatedRoomInserted := listToIndex == toIndex
toRoomID := list.Get(ctx, listToIndex)
toRoomID := list.Get(listToIndex)
if toRoomID == roomID && listFromIndex == listToIndex && listOp == ListOpChange && wasInsideRange && len(listFromTos) == 1 {
// DELETE/INSERT have the same index, we're INSERTing the room that was updated, it was a Change not Add/Delete, it
// was previously inside the window AND there's just 1 move operation = it's moving to and from the same index so

View File

@ -1,7 +1,6 @@
package sync3
import (
"context"
"fmt"
"sort"
@ -52,8 +51,9 @@ func (s *SortableRooms) Add(roomID string) bool {
return true
}
func (s *SortableRooms) Get(ctx context.Context, index int) string {
internal.AssertWithContext(ctx, fmt.Sprintf("index is within len(rooms) %v < %v", index, len(s.roomIDs)), index < len(s.roomIDs))
func (s *SortableRooms) Get(index int) string {
// TODO: find a way to plumb a context through
internal.Assert(fmt.Sprintf("index is within len(rooms) %v < %v", index, len(s.roomIDs)), index < len(s.roomIDs))
return s.roomIDs[index]
}
@ -85,8 +85,8 @@ func (s *SortableRooms) Subslice(i, j int64) Subslicer {
}
}
func (s *SortableRooms) Sort(ctx context.Context, sortBy []string) error {
internal.AssertWithContext(ctx, "sortBy is not empty", len(sortBy) != 0)
func (s *SortableRooms) Sort(sortBy []string) error {
internal.Assert("sortBy is not empty", len(sortBy) != 0)
comparators := []func(i, j int) int{}
for _, sort := range sortBy {
switch sort {