From 1f3f14f30c98cae687145a6cd0fabc097d571c9c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 5 Apr 2023 14:11:05 +0100 Subject: [PATCH] Report errors to Sentry, plumbing ctxs if needed --- state/accumulator.go | 7 ++++--- sync2/handler2/handler.go | 15 +++++++++++++++ sync3/caches/global.go | 9 ++++++++- sync3/caches/user.go | 17 ++++++++++------- sync3/caches/user_test.go | 3 ++- sync3/extensions/account_data.go | 5 ++++- sync3/extensions/e2ee.go | 4 ++-- sync3/extensions/receipts.go | 7 ++++++- sync3/handler/connstate.go | 9 +++++---- sync3/handler/connstate_live.go | 4 ++-- sync3/handler/handler.go | 10 +++++++++- sync3/lists.go | 4 +++- sync3/lists_test.go | 3 ++- sync3/ops.go | 9 ++++++++- sync3/ops_test.go | 13 +++++++------ 15 files changed, 87 insertions(+), 32 deletions(-) diff --git a/state/accumulator.go b/state/accumulator.go index c2bf49a..db77577 100644 --- a/state/accumulator.go +++ b/state/accumulator.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "fmt" + "github.com/getsentry/sentry-go" "github.com/jmoiron/sqlx" "github.com/lib/pq" @@ -176,9 +177,9 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, if len(eventIDToNID) == 0 { // we don't have a current snapshot for this room but yet no events are new, // no idea how this should be handled. - logger.Error().Str("room_id", roomID).Msg( - "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug.", - ) + const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug." + logger.Error().Str("room_id", roomID).Msg(errMsg) + sentry.CaptureException(fmt.Errorf(errMsg)) return nil } diff --git a/sync2/handler2/handler.go b/sync2/handler2/handler.go index 5f1cc5c..32010a8 100644 --- a/sync2/handler2/handler.go +++ b/sync2/handler2/handler.go @@ -2,6 +2,7 @@ package handler2 import ( "encoding/json" + "github.com/getsentry/sentry-go" "hash/fnv" "os" "sync" @@ -76,6 +77,7 @@ func (h *Handler) Listen() { err := h.v3Sub.Listen() if err != nil { logger.Err(err).Msg("Failed to listen for v3 messages") + sentry.CaptureException(err) } }() } @@ -96,6 +98,7 @@ func (h *Handler) StartV2Pollers() { devices, err := h.v2Store.AllDevices() if err != nil { logger.Err(err).Msg("StartV2Pollers: failed to query devices") + sentry.CaptureException(err) return } // how many concurrent pollers to make at startup. @@ -172,6 +175,7 @@ func (h *Handler) UpdateDeviceSince(deviceID, since string) { err := h.v2Store.UpdateDeviceSince(deviceID, since) if err != nil { logger.Err(err).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token") + sentry.CaptureException(err) } } @@ -189,6 +193,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int, nextPos, err := h.Store.DeviceDataTable.Upsert(&partialDD) if err != nil { logger.Err(err).Str("user", userID).Msg("failed to upsert device data") + sentry.CaptureException(err) return } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceData{ @@ -213,6 +218,7 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json err := h.Store.TransactionsTable.Insert(deviceID, eventIDToTxnID) if err != nil { logger.Err(err).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user") + sentry.CaptureException(err) } } @@ -220,6 +226,7 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json numNew, latestNIDs, err := h.Store.Accumulate(roomID, prevBatch, timeline) if err != nil { logger.Err(err).Int("timeline", len(timeline)).Str("room", roomID).Msg("V2: failed to accumulate room") + sentry.CaptureException(err) return } if numNew == 0 { @@ -237,6 +244,7 @@ func (h *Handler) Initialise(roomID string, state []json.RawMessage) { added, snapID, err := h.Store.Initialise(roomID, state) if err != nil { logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room") + sentry.CaptureException(err) return } if !added { @@ -270,6 +278,7 @@ func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.R newReceipts, err := h.Store.ReceiptTable.Insert(roomID, ephEvent) if err != nil { logger.Err(err).Str("room", roomID).Msg("failed to store receipts") + sentry.CaptureException(err) return } if len(newReceipts) == 0 { @@ -285,6 +294,7 @@ func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMe _, err := h.Store.ToDeviceTable.InsertMessages(deviceID, msgs) if err != nil { logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages") + sentry.CaptureException(err) } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceMessages{ UserID: userID, @@ -319,6 +329,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti err := h.Store.UnreadTable.UpdateUnreadCounters(userID, roomID, highlightCount, notifCount) if err != nil { logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters") + sentry.CaptureException(err) } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2UnreadCounts{ RoomID: roomID, @@ -332,6 +343,7 @@ func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage) data, err := h.Store.InsertAccountData(userID, roomID, events) if err != nil { logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data") + sentry.CaptureException(err) return } var types []string @@ -349,6 +361,7 @@ func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage) err := h.Store.InvitesTable.InsertInvite(userID, roomID, inviteState) if err != nil { logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite") + sentry.CaptureException(err) return } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InviteRoom{ @@ -362,6 +375,7 @@ func (h *Handler) OnLeftRoom(userID, roomID string) { err := h.Store.InvitesTable.RemoveInvite(userID, roomID) if err != nil { logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite") + sentry.CaptureException(err) } h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2LeaveRoom{ UserID: userID, @@ -377,6 +391,7 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) { dev, err := h.v2Store.Device(p.DeviceID) if err != nil { logger.Err(err).Str("user", p.UserID).Str("device", p.DeviceID).Msg("V3Sub: EnsurePolling unknown device") + sentry.CaptureException(err) return } // don't block us from consuming more pubsub messages just because someone wants to sync diff --git a/sync3/caches/global.go b/sync3/caches/global.go index 3f55fd7..75fdf72 100644 --- a/sync3/caches/global.go +++ b/sync3/caches/global.go @@ -3,6 +3,8 @@ package caches import ( "context" "encoding/json" + "fmt" + "github.com/getsentry/sentry-go" "os" "sort" "sync" @@ -87,7 +89,9 @@ func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMeta roomID := roomIDs[i] sr := c.roomIDToMetadata[roomID] if sr == nil { - logger.Error().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room") + const errMsg = "GlobalCache.LoadRoom: no metadata for this room" + logger.Error().Str("room", roomID).Msg(errMsg) + sentry.CaptureException(fmt.Errorf(errMsg)) continue } srCopy := *sr @@ -126,6 +130,7 @@ func (c *GlobalCache) LoadStateEvent(ctx context.Context, roomID string, loadPos }) if err != nil { logger.Err(err).Str("room", roomID).Int64("pos", loadPosition).Msg("failed to load room state") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } events := roomIDToStateEvents[roomID] @@ -147,6 +152,7 @@ func (c *GlobalCache) LoadRoomState(ctx context.Context, roomIDs []string, loadP roomIDToStateEvents, err := c.store.RoomStateAfterEventPosition(ctx, roomIDs, loadPosition, requiredStateMap.QueryStateMap()) if err != nil { logger.Err(err).Strs("rooms", roomIDs).Int64("pos", loadPosition).Msg("failed to load room state") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } for roomID, stateEvents := range roomIDToStateEvents { @@ -295,6 +301,7 @@ func (c *GlobalCache) OnNewEvent( err := c.store.InvitesTable.RemoveInvite(*ed.StateKey, ed.RoomID) if err != nil { logger.Err(err).Str("user", *ed.StateKey).Str("room", ed.RoomID).Msg("failed to remove accepted invite") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } } diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 3db6be0..c364638 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -97,7 +97,7 @@ type InviteData struct { IsDM bool } -func NewInviteData(userID, roomID string, inviteState []json.RawMessage) *InviteData { +func NewInviteData(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) *InviteData { // work out metadata for this invite. There's an origin_server_ts on the invite m.room.member event id := InviteData{ roomID: roomID, @@ -138,9 +138,9 @@ func NewInviteData(userID, roomID string, inviteState []json.RawMessage) *Invite } } if id.InviteEvent == nil { - logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg( - "cannot make invite, missing invite event for user", - ) + const errMsg = "cannot make invite, missing invite event for user" + logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg) + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(fmt.Errorf(errMsg)) return nil } return &id @@ -270,7 +270,7 @@ func (c *UserCache) OnRegistered(_ int64) error { return nil } -func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData { +func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData { if c.LazyRoomDataOverride != nil { return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents) } @@ -306,6 +306,7 @@ func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli prevBatch, err := c.store.EventsTable.SelectClosestPrevBatchByID(roomID, eventID) if err != nil { logger.Err(err).Str("room", roomID).Str("event_id", eventID).Msg("failed to get prev batch token for room") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } urd.SetPrevBatch(eventID, prevBatch) } @@ -335,6 +336,7 @@ func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli roomIDToEvents, roomIDToPrevBatch, err := c.store.LatestEventsInRooms(c.UserID, lazyRoomIDs, loadPos, maxTimelineEvents) if err != nil { logger.Err(err).Strs("rooms", lazyRoomIDs).Msg("failed to get LatestEventsInRooms") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } c.roomToDataMu.Lock() @@ -428,7 +430,7 @@ func (c *UserCache) Invites() map[string]UserRoomData { // events are globally scoped, so if Alice sends a message, Bob might receive it first on his v2 loop // which would cause the transaction ID to be missing from the event. Instead, we always look for txn // IDs in the v2 poller, and then set them appropriately at request time. -func (c *UserCache) AnnotateWithTransactionIDs(deviceID string, roomIDToEvents map[string][]json.RawMessage) map[string][]json.RawMessage { +func (c *UserCache) AnnotateWithTransactionIDs(ctx context.Context, deviceID string, roomIDToEvents map[string][]json.RawMessage) map[string][]json.RawMessage { var eventIDs []string eventIDToEvent := make(map[string]struct { roomID string @@ -458,6 +460,7 @@ func (c *UserCache) AnnotateWithTransactionIDs(deviceID string, roomIDToEvents m newJSON, err := sjson.SetBytes(event, "unsigned.transaction_id", txnID) if err != nil { logger.Err(err).Str("user", c.UserID).Msg("AnnotateWithTransactionIDs: sjson failed") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { events[data.i] = newJSON roomIDToEvents[data.roomID] = events @@ -600,7 +603,7 @@ func (c *UserCache) OnNewEvent(ctx context.Context, eventData *EventData) { } func (c *UserCache) OnInvite(ctx context.Context, roomID string, inviteStateEvents []json.RawMessage) { - inviteData := NewInviteData(c.UserID, roomID, inviteStateEvents) + inviteData := NewInviteData(ctx, c.UserID, roomID, inviteStateEvents) if inviteData == nil { return // malformed invite } diff --git a/sync3/caches/user_test.go b/sync3/caches/user_test.go index ab8676b..7329709 100644 --- a/sync3/caches/user_test.go +++ b/sync3/caches/user_test.go @@ -3,6 +3,7 @@ package caches_test import ( "encoding/json" "fmt" + "golang.org/x/net/context" "reflect" "testing" @@ -82,7 +83,7 @@ func TestAnnotateWithTransactionIDs(t *testing.T) { data: tc.eventIDToTxnIDs, } uc := caches.NewUserCache(userID, nil, nil, fetcher) - got := uc.AnnotateWithTransactionIDs("DEVICE", convertIDToEventStub(tc.roomIDToEvents)) + got := uc.AnnotateWithTransactionIDs(context.Background(), "DEVICE", convertIDToEventStub(tc.roomIDToEvents)) want := convertIDTxnToEventStub(tc.wantRoomIDToEvents) if !reflect.DeepEqual(got, want) { t.Errorf("%s : got %v want %v", tc.name, js(got), js(want)) diff --git a/sync3/extensions/account_data.go b/sync3/extensions/account_data.go index ec04dd0..db63a4a 100644 --- a/sync3/extensions/account_data.go +++ b/sync3/extensions/account_data.go @@ -3,7 +3,7 @@ package extensions import ( "context" "encoding/json" - + "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" ) @@ -57,6 +57,7 @@ func (r *AccountDataRequest) AppendLive(ctx context.Context, res *Response, extC roomAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, update.RoomID()) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Str("room", update.RoomID()).Msg("failed to fetch room account data") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { if len(roomAccountData) > 0 { // else we can end up with `null` not `[]` roomToMsgs[update.RoomID()] = accountEventsAsJSON(roomAccountData) @@ -96,6 +97,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response, roomsAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, roomIDs...) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Strs("rooms", roomIDs).Msg("failed to fetch room account data") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { extRes.Rooms = make(map[string][]json.RawMessage) for _, ad := range roomsAccountData { @@ -108,6 +110,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response, globalAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Msg("failed to fetch global account data") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } else { extRes.Global = accountEventsAsJSON(globalAccountData) } diff --git a/sync3/extensions/e2ee.go b/sync3/extensions/e2ee.go index e6b3205..700349a 100644 --- a/sync3/extensions/e2ee.go +++ b/sync3/extensions/e2ee.go @@ -9,7 +9,7 @@ import ( // Fetcher used by the E2EE extension type E2EEFetcher interface { - DeviceData(userID, deviceID string, isInitial bool) *internal.DeviceData + DeviceData(context context.Context, userID, deviceID string, isInitial bool) *internal.DeviceData } // Client created request params @@ -56,7 +56,7 @@ func (r *E2EERequest) AppendLive(ctx context.Context, res *Response, extCtx Cont func (r *E2EERequest) ProcessInitial(ctx context.Context, res *Response, extCtx Context) { // pull OTK counts and changed/left from device data - dd := extCtx.E2EEFetcher.DeviceData(extCtx.UserID, extCtx.DeviceID, extCtx.IsInitial) + dd := extCtx.E2EEFetcher.DeviceData(ctx, extCtx.UserID, extCtx.DeviceID, extCtx.IsInitial) if dd == nil { return // unknown device? } diff --git a/sync3/extensions/receipts.go b/sync3/extensions/receipts.go index dd0d80d..2727168 100644 --- a/sync3/extensions/receipts.go +++ b/sync3/extensions/receipts.go @@ -3,7 +3,6 @@ package extensions import ( "context" "encoding/json" - "github.com/matrix-org/sliding-sync/internal" "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync3/caches" @@ -43,6 +42,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt}) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into new edu") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } res.Receipts = &ReceiptsResponse{ @@ -55,6 +55,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt}) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } res.Receipts.Rooms[update.RoomID()] = edu @@ -64,6 +65,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx pub, priv, err := state.UnpackReceiptsFromEDU(update.RoomID(), res.Receipts.Rooms[update.RoomID()]) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } receipts := append(pub, priv...) @@ -72,6 +74,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx edu, err := state.PackReceiptsIntoEDU(receipts) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } res.Receipts.Rooms[update.RoomID()] = edu @@ -89,12 +92,14 @@ func (r *ReceiptsRequest) ProcessInitial(ctx context.Context, res *Response, ext receipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForEvents(roomID, timeline) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForEvents") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) continue } // always include your own receipts ownReceipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForUser(roomID, extCtx.UserID) if err != nil { logger.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForUser") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) continue } if len(receipts) == 0 && len(ownReceipts) == 0 { diff --git a/sync3/handler/connstate.go b/sync3/handler/connstate.go index a0e787e..81bd0fa 100644 --- a/sync3/handler/connstate.go +++ b/sync3/handler/connstate.go @@ -195,7 +195,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBuilder, listKey string, prevReqList, nextReqList *sync3.RequestList) sync3.ResponseList { ctx, span := internal.StartSpan(ctx, "onIncomingListRequest") defer span.End() - roomList, overwritten := s.lists.AssignList(listKey, nextReqList.Filters, nextReqList.Sort, sync3.DoNotOverwrite) + roomList, overwritten := s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.DoNotOverwrite) if nextReqList.ShouldGetAllRooms() { if overwritten || prevReqList.FiltersChanged(nextReqList) { @@ -249,11 +249,12 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui } if filtersChanged { // we need to re-create the list as the rooms may have completely changed - roomList, _ = s.lists.AssignList(listKey, nextReqList.Filters, nextReqList.Sort, sync3.Overwrite) + roomList, _ = s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.Overwrite) } // resort as either we changed the sort order or we added/removed a bunch of rooms if err := roomList.Sort(nextReqList.Sort); err != nil { logger.Err(err).Str("key", listKey).Msg("cannot sort list") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } addedRanges = nextReqList.Ranges removedRanges = nil @@ -427,7 +428,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu defer span.End() rooms := make(map[string]sync3.Room, len(roomIDs)) // We want to grab the user room data and the room metadata for each room ID. - roomIDToUserRoomData := s.userCache.LazyLoadTimelines(s.loadPosition, roomIDs, int(roomSub.TimelineLimit)) + roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.loadPosition, roomIDs, int(roomSub.TimelineLimit)) roomMetadatas := s.globalCache.LoadRooms(roomIDs...) // prepare lazy loading data structures, txn IDs roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData)) @@ -446,7 +447,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu roomToUsersInTimeline[roomID] = userIDs roomToTimeline[roomID] = urd.Timeline } - roomToTimeline = s.userCache.AnnotateWithTransactionIDs(s.deviceID, roomToTimeline) + roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.deviceID, roomToTimeline) rsm := roomSub.RequiredStateMap(s.userID) roomIDToState := s.globalCache.LoadRoomState(ctx, roomIDs, s.loadPosition, rsm, roomToUsersInTimeline) if roomIDToState == nil { // e.g no required_state diff --git a/sync3/handler/connstate_live.go b/sync3/handler/connstate_live.go index 8b2e5fa..9a2e616 100644 --- a/sync3/handler/connstate_live.go +++ b/sync3/handler/connstate_live.go @@ -209,7 +209,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, // - the initial:true room from BuildSubscriptions contains the latest live events in the timeline as it's pulled from the DB // - we then process the live events in turn which adds them again. if !advancedPastEvent { - roomIDtoTimeline := s.userCache.AnnotateWithTransactionIDs(s.deviceID, map[string][]json.RawMessage{ + roomIDtoTimeline := s.userCache.AnnotateWithTransactionIDs(ctx, s.deviceID, map[string][]json.RawMessage{ roomEventUpdate.RoomID(): {roomEventUpdate.EventData.Event}, }) r.Timeline = append(r.Timeline, roomIDtoTimeline[roomEventUpdate.RoomID()]...) @@ -383,7 +383,7 @@ func (s *connStateLive) resort( return nil, true } - ops, subs := sync3.CalculateListOps(reqList, intList, roomID, listOp) + ops, subs := sync3.CalculateListOps(ctx, reqList, intList, roomID, listOp) if len(subs) > 0 { // handle rooms which have just come into the window subID := builder.AddSubscription(reqList.RoomSubscription) builder.AddRoomsToSubscription(subID, subs) diff --git a/sync3/handler/handler.go b/sync3/handler/handler.go index 75e46ef..9fccad6 100644 --- a/sync3/handler/handler.go +++ b/sync3/handler/handler.go @@ -1,9 +1,11 @@ package handler +import "C" import ( "context" "encoding/json" "fmt" + "github.com/getsentry/sentry-go" "net/http" "net/url" "os" @@ -111,6 +113,7 @@ func (h *SyncLiveHandler) Listen() { err := h.V2Sub.Listen() if err != nil { logger.Err(err).Msg("Failed to listen for v2 messages") + sentry.CaptureException(err) } }() } @@ -443,7 +446,7 @@ func (h *SyncLiveHandler) userCache(userID string) (*caches.UserCache, error) { // Implements E2EEFetcher // DeviceData returns the latest device data for this user. isInitial should be set if this is for // an initial /sync request. -func (h *SyncLiveHandler) DeviceData(userID, deviceID string, isInitial bool) *internal.DeviceData { +func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID string, isInitial bool) *internal.DeviceData { // We have 2 sources of DeviceData: // - pubsub updates stored in deviceDataMap // - the database itself @@ -479,6 +482,7 @@ func (h *SyncLiveHandler) DeviceData(userID, deviceID string, isInitial bool) *i dd, err := h.Storage.DeviceDataTable.Select(userID, deviceID, shouldSwap) if err != nil { logger.Err(err).Str("user", userID).Msg("failed to SelectAndSwap device data") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } @@ -505,6 +509,7 @@ func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate) { events, err := h.Storage.EventNIDs(p.EventNIDs) if err != nil { logger.Err(err).Str("room", p.RoomID).Msg("Accumulate: failed to EventNIDs") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } if len(events) == 0 { @@ -522,6 +527,7 @@ func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise) { state, err := h.Storage.StateSnapshot(p.SnapshotNID) if err != nil { logger.Err(err).Int64("snap", p.SnapshotNID).Str("room", p.RoomID).Msg("Initialise: failed to get StateSnapshot") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } // we have new state, notify caches @@ -573,6 +579,7 @@ func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom) { inviteState, err := h.Storage.InvitesTable.SelectInviteState(p.UserID, p.RoomID) if err != nil { logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("failed to get invite state") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } userCache.(*caches.UserCache).OnInvite(ctx, p.RoomID, inviteState) @@ -642,6 +649,7 @@ func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData) { data, err := h.Storage.AccountData(p.UserID, p.RoomID, p.Types) if err != nil { logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("OnAccountData: failed to lookup") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return } userCache.(*caches.UserCache).OnAccountData(ctx, data) diff --git a/sync3/lists.go b/sync3/lists.go index df702bd..83928f0 100644 --- a/sync3/lists.go +++ b/sync3/lists.go @@ -1,6 +1,7 @@ package sync3 import ( + "context" "strings" "github.com/matrix-org/sliding-sync/internal" @@ -175,7 +176,7 @@ func (s *InternalRequestLists) ListsByVisibleRoomIDs(muxedReqLists map[string]Re // Assign a new list at the given key. If Overwrite, any existing list is replaced. If DoNotOverwrite, the existing // list is returned if one exists, else a new list is created. Returns the list and true if the list was overwritten. -func (s *InternalRequestLists) AssignList(listKey string, filters *RequestFilters, sort []string, shouldOverwrite OverwriteVal) (*FilteredSortableRooms, bool) { +func (s *InternalRequestLists) AssignList(ctx context.Context, listKey string, filters *RequestFilters, sort []string, shouldOverwrite OverwriteVal) (*FilteredSortableRooms, bool) { if shouldOverwrite == DoNotOverwrite { _, exists := s.lists[listKey] if exists { @@ -194,6 +195,7 @@ func (s *InternalRequestLists) AssignList(listKey string, filters *RequestFilter err := roomList.Sort(sort) if err != nil { logger.Err(err).Strs("sort_by", sort).Msg("failed to sort") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } } s.lists[listKey] = roomList diff --git a/sync3/lists_test.go b/sync3/lists_test.go index 94170c6..9c507f7 100644 --- a/sync3/lists_test.go +++ b/sync3/lists_test.go @@ -1,6 +1,7 @@ package sync3_test import ( + "context" "fmt" "sync/atomic" "testing" @@ -23,7 +24,7 @@ func BenchmarkSortRooms(b *testing.B) { func sortRooms(n int) { list := sync3.NewInternalRequestLists() addRooms(list, n) - list.AssignList("benchmark", &sync3.RequestFilters{}, []string{sync3.SortByRecency}, sync3.Overwrite) + list.AssignList(context.Background(), "benchmark", &sync3.RequestFilters{}, []string{sync3.SortByRecency}, sync3.Overwrite) } func addRooms(list *sync3.InternalRequestLists, n int) { diff --git a/sync3/ops.go b/sync3/ops.go index b12b25b..9f05119 100644 --- a/sync3/ops.go +++ b/sync3/ops.go @@ -1,5 +1,10 @@ package sync3 +import ( + "context" + "github.com/matrix-org/sliding-sync/internal" +) + type List interface { IndexOf(roomID string) (int, bool) Len() int64 @@ -24,7 +29,7 @@ type List interface { // [ "A" ] <--- []string, new room subscriptions, if it wasn't in the window before // // This function will modify List to Add/Delete/Sort appropriately. -func CalculateListOps(reqList *RequestList, list List, roomID string, listOp ListOp) (ops []ResponseOp, subs []string) { +func CalculateListOps(ctx context.Context, reqList *RequestList, list List, roomID string, listOp ListOp) (ops []ResponseOp, subs []string) { fromIndex, ok := list.IndexOf(roomID) if !ok { if listOp == ListOpAdd { @@ -45,6 +50,7 @@ func CalculateListOps(reqList *RequestList, list List, roomID string, listOp Lis // this should only move exactly 1 room at most as this is called for every single update if err := list.Sort(reqList.Sort); err != nil { logger.Err(err).Msg("cannot sort list") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // find the new position of this room toIndex, _ = list.IndexOf(roomID) @@ -62,6 +68,7 @@ func CalculateListOps(reqList *RequestList, list List, roomID string, listOp Lis // this should only move exactly 1 room at most as this is called for every single update if err := list.Sort(reqList.Sort); err != nil { logger.Err(err).Msg("cannot sort list") + internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) } // find the new position of this room toIndex, _ = list.IndexOf(roomID) diff --git a/sync3/ops_test.go b/sync3/ops_test.go index 49de8de..0529d8a 100644 --- a/sync3/ops_test.go +++ b/sync3/ops_test.go @@ -2,6 +2,7 @@ package sync3 import ( "bytes" + "context" "encoding/json" "math/rand" "testing" @@ -141,7 +142,7 @@ func TestCalculateListOps_BasicOperations(t *testing.T) { for _, tc := range testCases { sl := newStringList(tc.before) sl.sortedRoomIDs = tc.after - gotOps, gotSubs := CalculateListOps(&RequestList{ + gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{ Ranges: tc.ranges, }, sl, tc.roomID, tc.listOp) assertEqualOps(t, tc.name, gotOps, tc.wantOps) @@ -301,7 +302,7 @@ func TestCalculateListOps_SingleWindowOperations(t *testing.T) { for _, tc := range testCases { sl := newStringList(tc.before) sl.sortedRoomIDs = tc.after - gotOps, gotSubs := CalculateListOps(&RequestList{ + gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{ Ranges: tc.ranges, }, sl, tc.roomID, tc.listOp) assertEqualOps(t, tc.name, gotOps, tc.wantOps) @@ -501,7 +502,7 @@ func TestCalculateListOps_MultipleWindowOperations(t *testing.T) { for _, tc := range testCases { sl := newStringList(tc.before) sl.sortedRoomIDs = tc.after - gotOps, gotSubs := CalculateListOps(&RequestList{ + gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{ Ranges: tc.ranges, }, sl, tc.roomID, tc.listOp) assertEqualOps(t, tc.name, gotOps, tc.wantOps) @@ -522,7 +523,7 @@ func TestCalculateListOps_TortureSingleWindow_Move(t *testing.T) { sl := newStringList(before) sl.sortedRoomIDs = after - gotOps, gotSubs := CalculateListOps(&RequestList{ + gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{ Ranges: ranges, }, sl, roomID, ListOpChange) @@ -589,7 +590,7 @@ func TestCalculateListOps_TortureSingleWindowMiddle_Move(t *testing.T) { sl := newStringList(before) sl.sortedRoomIDs = after - gotOps, gotSubs := CalculateListOps(&RequestList{ + gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{ Ranges: ranges, }, sl, roomID, ListOpChange) @@ -664,7 +665,7 @@ func TestCalculateListOpsTortureMultipleWindowsMove(t *testing.T) { sl := newStringList(before) sl.sortedRoomIDs = after - gotOps, gotSubs := CalculateListOps(&RequestList{ + gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{ Ranges: ranges, }, sl, roomID, ListOpChange) for _, sub := range gotSubs {