mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Report errors to Sentry, plumbing ctxs if needed
This commit is contained in:
parent
690790e46d
commit
1f3f14f30c
@ -4,6 +4,7 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/getsentry/sentry-go"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/lib/pq"
|
||||
@ -176,9 +177,9 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool,
|
||||
if len(eventIDToNID) == 0 {
|
||||
// we don't have a current snapshot for this room but yet no events are new,
|
||||
// no idea how this should be handled.
|
||||
logger.Error().Str("room_id", roomID).Msg(
|
||||
"Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug.",
|
||||
)
|
||||
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
|
||||
logger.Error().Str("room_id", roomID).Msg(errMsg)
|
||||
sentry.CaptureException(fmt.Errorf(errMsg))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package handler2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"hash/fnv"
|
||||
"os"
|
||||
"sync"
|
||||
@ -76,6 +77,7 @@ func (h *Handler) Listen() {
|
||||
err := h.v3Sub.Listen()
|
||||
if err != nil {
|
||||
logger.Err(err).Msg("Failed to listen for v3 messages")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -96,6 +98,7 @@ func (h *Handler) StartV2Pollers() {
|
||||
devices, err := h.v2Store.AllDevices()
|
||||
if err != nil {
|
||||
logger.Err(err).Msg("StartV2Pollers: failed to query devices")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
// how many concurrent pollers to make at startup.
|
||||
@ -172,6 +175,7 @@ func (h *Handler) UpdateDeviceSince(deviceID, since string) {
|
||||
err := h.v2Store.UpdateDeviceSince(deviceID, since)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -189,6 +193,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int,
|
||||
nextPos, err := h.Store.DeviceDataTable.Upsert(&partialDD)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Msg("failed to upsert device data")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceData{
|
||||
@ -213,6 +218,7 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json
|
||||
err := h.Store.TransactionsTable.Insert(deviceID, eventIDToTxnID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,6 +226,7 @@ func (h *Handler) Accumulate(deviceID, roomID, prevBatch string, timeline []json
|
||||
numNew, latestNIDs, err := h.Store.Accumulate(roomID, prevBatch, timeline)
|
||||
if err != nil {
|
||||
logger.Err(err).Int("timeline", len(timeline)).Str("room", roomID).Msg("V2: failed to accumulate room")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
if numNew == 0 {
|
||||
@ -237,6 +244,7 @@ func (h *Handler) Initialise(roomID string, state []json.RawMessage) {
|
||||
added, snapID, err := h.Store.Initialise(roomID, state)
|
||||
if err != nil {
|
||||
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
if !added {
|
||||
@ -270,6 +278,7 @@ func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.R
|
||||
newReceipts, err := h.Store.ReceiptTable.Insert(roomID, ephEvent)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", roomID).Msg("failed to store receipts")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
if len(newReceipts) == 0 {
|
||||
@ -285,6 +294,7 @@ func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMe
|
||||
_, err := h.Store.ToDeviceTable.InsertMessages(deviceID, msgs)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceMessages{
|
||||
UserID: userID,
|
||||
@ -319,6 +329,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti
|
||||
err := h.Store.UnreadTable.UpdateUnreadCounters(userID, roomID, highlightCount, notifCount)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2UnreadCounts{
|
||||
RoomID: roomID,
|
||||
@ -332,6 +343,7 @@ func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage)
|
||||
data, err := h.Store.InsertAccountData(userID, roomID, events)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
var types []string
|
||||
@ -349,6 +361,7 @@ func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage)
|
||||
err := h.Store.InvitesTable.InsertInvite(userID, roomID, inviteState)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InviteRoom{
|
||||
@ -362,6 +375,7 @@ func (h *Handler) OnLeftRoom(userID, roomID string) {
|
||||
err := h.Store.InvitesTable.RemoveInvite(userID, roomID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2LeaveRoom{
|
||||
UserID: userID,
|
||||
@ -377,6 +391,7 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) {
|
||||
dev, err := h.v2Store.Device(p.DeviceID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", p.UserID).Str("device", p.DeviceID).Msg("V3Sub: EnsurePolling unknown device")
|
||||
sentry.CaptureException(err)
|
||||
return
|
||||
}
|
||||
// don't block us from consuming more pubsub messages just because someone wants to sync
|
||||
|
@ -3,6 +3,8 @@ package caches
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
@ -87,7 +89,9 @@ func (c *GlobalCache) LoadRooms(roomIDs ...string) map[string]*internal.RoomMeta
|
||||
roomID := roomIDs[i]
|
||||
sr := c.roomIDToMetadata[roomID]
|
||||
if sr == nil {
|
||||
logger.Error().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room")
|
||||
const errMsg = "GlobalCache.LoadRoom: no metadata for this room"
|
||||
logger.Error().Str("room", roomID).Msg(errMsg)
|
||||
sentry.CaptureException(fmt.Errorf(errMsg))
|
||||
continue
|
||||
}
|
||||
srCopy := *sr
|
||||
@ -126,6 +130,7 @@ func (c *GlobalCache) LoadStateEvent(ctx context.Context, roomID string, loadPos
|
||||
})
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", roomID).Int64("pos", loadPosition).Msg("failed to load room state")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
events := roomIDToStateEvents[roomID]
|
||||
@ -147,6 +152,7 @@ func (c *GlobalCache) LoadRoomState(ctx context.Context, roomIDs []string, loadP
|
||||
roomIDToStateEvents, err := c.store.RoomStateAfterEventPosition(ctx, roomIDs, loadPosition, requiredStateMap.QueryStateMap())
|
||||
if err != nil {
|
||||
logger.Err(err).Strs("rooms", roomIDs).Int64("pos", loadPosition).Msg("failed to load room state")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
for roomID, stateEvents := range roomIDToStateEvents {
|
||||
@ -295,6 +301,7 @@ func (c *GlobalCache) OnNewEvent(
|
||||
err := c.store.InvitesTable.RemoveInvite(*ed.StateKey, ed.RoomID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", *ed.StateKey).Str("room", ed.RoomID).Msg("failed to remove accepted invite")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ type InviteData struct {
|
||||
IsDM bool
|
||||
}
|
||||
|
||||
func NewInviteData(userID, roomID string, inviteState []json.RawMessage) *InviteData {
|
||||
func NewInviteData(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) *InviteData {
|
||||
// work out metadata for this invite. There's an origin_server_ts on the invite m.room.member event
|
||||
id := InviteData{
|
||||
roomID: roomID,
|
||||
@ -138,9 +138,9 @@ func NewInviteData(userID, roomID string, inviteState []json.RawMessage) *Invite
|
||||
}
|
||||
}
|
||||
if id.InviteEvent == nil {
|
||||
logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(
|
||||
"cannot make invite, missing invite event for user",
|
||||
)
|
||||
const errMsg = "cannot make invite, missing invite event for user"
|
||||
logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(fmt.Errorf(errMsg))
|
||||
return nil
|
||||
}
|
||||
return &id
|
||||
@ -270,7 +270,7 @@ func (c *UserCache) OnRegistered(_ int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
|
||||
func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
|
||||
if c.LazyRoomDataOverride != nil {
|
||||
return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents)
|
||||
}
|
||||
@ -306,6 +306,7 @@ func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli
|
||||
prevBatch, err := c.store.EventsTable.SelectClosestPrevBatchByID(roomID, eventID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", roomID).Str("event_id", eventID).Msg("failed to get prev batch token for room")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
urd.SetPrevBatch(eventID, prevBatch)
|
||||
}
|
||||
@ -335,6 +336,7 @@ func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli
|
||||
roomIDToEvents, roomIDToPrevBatch, err := c.store.LatestEventsInRooms(c.UserID, lazyRoomIDs, loadPos, maxTimelineEvents)
|
||||
if err != nil {
|
||||
logger.Err(err).Strs("rooms", lazyRoomIDs).Msg("failed to get LatestEventsInRooms")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
c.roomToDataMu.Lock()
|
||||
@ -428,7 +430,7 @@ func (c *UserCache) Invites() map[string]UserRoomData {
|
||||
// events are globally scoped, so if Alice sends a message, Bob might receive it first on his v2 loop
|
||||
// which would cause the transaction ID to be missing from the event. Instead, we always look for txn
|
||||
// IDs in the v2 poller, and then set them appropriately at request time.
|
||||
func (c *UserCache) AnnotateWithTransactionIDs(deviceID string, roomIDToEvents map[string][]json.RawMessage) map[string][]json.RawMessage {
|
||||
func (c *UserCache) AnnotateWithTransactionIDs(ctx context.Context, deviceID string, roomIDToEvents map[string][]json.RawMessage) map[string][]json.RawMessage {
|
||||
var eventIDs []string
|
||||
eventIDToEvent := make(map[string]struct {
|
||||
roomID string
|
||||
@ -458,6 +460,7 @@ func (c *UserCache) AnnotateWithTransactionIDs(deviceID string, roomIDToEvents m
|
||||
newJSON, err := sjson.SetBytes(event, "unsigned.transaction_id", txnID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", c.UserID).Msg("AnnotateWithTransactionIDs: sjson failed")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
} else {
|
||||
events[data.i] = newJSON
|
||||
roomIDToEvents[data.roomID] = events
|
||||
@ -600,7 +603,7 @@ func (c *UserCache) OnNewEvent(ctx context.Context, eventData *EventData) {
|
||||
}
|
||||
|
||||
func (c *UserCache) OnInvite(ctx context.Context, roomID string, inviteStateEvents []json.RawMessage) {
|
||||
inviteData := NewInviteData(c.UserID, roomID, inviteStateEvents)
|
||||
inviteData := NewInviteData(ctx, c.UserID, roomID, inviteStateEvents)
|
||||
if inviteData == nil {
|
||||
return // malformed invite
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package caches_test
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"golang.org/x/net/context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@ -82,7 +83,7 @@ func TestAnnotateWithTransactionIDs(t *testing.T) {
|
||||
data: tc.eventIDToTxnIDs,
|
||||
}
|
||||
uc := caches.NewUserCache(userID, nil, nil, fetcher)
|
||||
got := uc.AnnotateWithTransactionIDs("DEVICE", convertIDToEventStub(tc.roomIDToEvents))
|
||||
got := uc.AnnotateWithTransactionIDs(context.Background(), "DEVICE", convertIDToEventStub(tc.roomIDToEvents))
|
||||
want := convertIDTxnToEventStub(tc.wantRoomIDToEvents)
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("%s : got %v want %v", tc.name, js(got), js(want))
|
||||
|
@ -3,7 +3,7 @@ package extensions
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
"github.com/matrix-org/sliding-sync/state"
|
||||
"github.com/matrix-org/sliding-sync/sync3/caches"
|
||||
)
|
||||
@ -57,6 +57,7 @@ func (r *AccountDataRequest) AppendLive(ctx context.Context, res *Response, extC
|
||||
roomAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, update.RoomID())
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.RoomID()).Msg("failed to fetch room account data")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
} else {
|
||||
if len(roomAccountData) > 0 { // else we can end up with `null` not `[]`
|
||||
roomToMsgs[update.RoomID()] = accountEventsAsJSON(roomAccountData)
|
||||
@ -96,6 +97,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response,
|
||||
roomsAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID, roomIDs...)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Strs("rooms", roomIDs).Msg("failed to fetch room account data")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
} else {
|
||||
extRes.Rooms = make(map[string][]json.RawMessage)
|
||||
for _, ad := range roomsAccountData {
|
||||
@ -108,6 +110,7 @@ func (r *AccountDataRequest) ProcessInitial(ctx context.Context, res *Response,
|
||||
globalAccountData, err := extCtx.Store.AccountDatas(extCtx.UserID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Msg("failed to fetch global account data")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
} else {
|
||||
extRes.Global = accountEventsAsJSON(globalAccountData)
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
|
||||
// Fetcher used by the E2EE extension
|
||||
type E2EEFetcher interface {
|
||||
DeviceData(userID, deviceID string, isInitial bool) *internal.DeviceData
|
||||
DeviceData(context context.Context, userID, deviceID string, isInitial bool) *internal.DeviceData
|
||||
}
|
||||
|
||||
// Client created request params
|
||||
@ -56,7 +56,7 @@ func (r *E2EERequest) AppendLive(ctx context.Context, res *Response, extCtx Cont
|
||||
|
||||
func (r *E2EERequest) ProcessInitial(ctx context.Context, res *Response, extCtx Context) {
|
||||
// pull OTK counts and changed/left from device data
|
||||
dd := extCtx.E2EEFetcher.DeviceData(extCtx.UserID, extCtx.DeviceID, extCtx.IsInitial)
|
||||
dd := extCtx.E2EEFetcher.DeviceData(ctx, extCtx.UserID, extCtx.DeviceID, extCtx.IsInitial)
|
||||
if dd == nil {
|
||||
return // unknown device?
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package extensions
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
"github.com/matrix-org/sliding-sync/state"
|
||||
"github.com/matrix-org/sliding-sync/sync3/caches"
|
||||
@ -43,6 +42,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx
|
||||
edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt})
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into new edu")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
res.Receipts = &ReceiptsResponse{
|
||||
@ -55,6 +55,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx
|
||||
edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt})
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
res.Receipts.Rooms[update.RoomID()] = edu
|
||||
@ -64,6 +65,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx
|
||||
pub, priv, err := state.UnpackReceiptsFromEDU(update.RoomID(), res.Receipts.Rooms[update.RoomID()])
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
receipts := append(pub, priv...)
|
||||
@ -72,6 +74,7 @@ func (r *ReceiptsRequest) AppendLive(ctx context.Context, res *Response, extCtx
|
||||
edu, err := state.PackReceiptsIntoEDU(receipts)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
res.Receipts.Rooms[update.RoomID()] = edu
|
||||
@ -89,12 +92,14 @@ func (r *ReceiptsRequest) ProcessInitial(ctx context.Context, res *Response, ext
|
||||
receipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForEvents(roomID, timeline)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForEvents")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
continue
|
||||
}
|
||||
// always include your own receipts
|
||||
ownReceipts, err := extCtx.Store.ReceiptTable.SelectReceiptsForUser(roomID, extCtx.UserID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", extCtx.UserID).Str("room", roomID).Msg("failed to SelectReceiptsForUser")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
continue
|
||||
}
|
||||
if len(receipts) == 0 && len(ownReceipts) == 0 {
|
||||
|
@ -195,7 +195,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
|
||||
func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBuilder, listKey string, prevReqList, nextReqList *sync3.RequestList) sync3.ResponseList {
|
||||
ctx, span := internal.StartSpan(ctx, "onIncomingListRequest")
|
||||
defer span.End()
|
||||
roomList, overwritten := s.lists.AssignList(listKey, nextReqList.Filters, nextReqList.Sort, sync3.DoNotOverwrite)
|
||||
roomList, overwritten := s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.DoNotOverwrite)
|
||||
|
||||
if nextReqList.ShouldGetAllRooms() {
|
||||
if overwritten || prevReqList.FiltersChanged(nextReqList) {
|
||||
@ -249,11 +249,12 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui
|
||||
}
|
||||
if filtersChanged {
|
||||
// we need to re-create the list as the rooms may have completely changed
|
||||
roomList, _ = s.lists.AssignList(listKey, nextReqList.Filters, nextReqList.Sort, sync3.Overwrite)
|
||||
roomList, _ = s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.Overwrite)
|
||||
}
|
||||
// resort as either we changed the sort order or we added/removed a bunch of rooms
|
||||
if err := roomList.Sort(nextReqList.Sort); err != nil {
|
||||
logger.Err(err).Str("key", listKey).Msg("cannot sort list")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
addedRanges = nextReqList.Ranges
|
||||
removedRanges = nil
|
||||
@ -427,7 +428,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
|
||||
defer span.End()
|
||||
rooms := make(map[string]sync3.Room, len(roomIDs))
|
||||
// We want to grab the user room data and the room metadata for each room ID.
|
||||
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(s.loadPosition, roomIDs, int(roomSub.TimelineLimit))
|
||||
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.loadPosition, roomIDs, int(roomSub.TimelineLimit))
|
||||
roomMetadatas := s.globalCache.LoadRooms(roomIDs...)
|
||||
// prepare lazy loading data structures, txn IDs
|
||||
roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData))
|
||||
@ -446,7 +447,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
|
||||
roomToUsersInTimeline[roomID] = userIDs
|
||||
roomToTimeline[roomID] = urd.Timeline
|
||||
}
|
||||
roomToTimeline = s.userCache.AnnotateWithTransactionIDs(s.deviceID, roomToTimeline)
|
||||
roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.deviceID, roomToTimeline)
|
||||
rsm := roomSub.RequiredStateMap(s.userID)
|
||||
roomIDToState := s.globalCache.LoadRoomState(ctx, roomIDs, s.loadPosition, rsm, roomToUsersInTimeline)
|
||||
if roomIDToState == nil { // e.g no required_state
|
||||
|
@ -209,7 +209,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
|
||||
// - the initial:true room from BuildSubscriptions contains the latest live events in the timeline as it's pulled from the DB
|
||||
// - we then process the live events in turn which adds them again.
|
||||
if !advancedPastEvent {
|
||||
roomIDtoTimeline := s.userCache.AnnotateWithTransactionIDs(s.deviceID, map[string][]json.RawMessage{
|
||||
roomIDtoTimeline := s.userCache.AnnotateWithTransactionIDs(ctx, s.deviceID, map[string][]json.RawMessage{
|
||||
roomEventUpdate.RoomID(): {roomEventUpdate.EventData.Event},
|
||||
})
|
||||
r.Timeline = append(r.Timeline, roomIDtoTimeline[roomEventUpdate.RoomID()]...)
|
||||
@ -383,7 +383,7 @@ func (s *connStateLive) resort(
|
||||
return nil, true
|
||||
}
|
||||
|
||||
ops, subs := sync3.CalculateListOps(reqList, intList, roomID, listOp)
|
||||
ops, subs := sync3.CalculateListOps(ctx, reqList, intList, roomID, listOp)
|
||||
if len(subs) > 0 { // handle rooms which have just come into the window
|
||||
subID := builder.AddSubscription(reqList.RoomSubscription)
|
||||
builder.AddRoomsToSubscription(subID, subs)
|
||||
|
@ -1,9 +1,11 @@
|
||||
package handler
|
||||
|
||||
import "C"
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -111,6 +113,7 @@ func (h *SyncLiveHandler) Listen() {
|
||||
err := h.V2Sub.Listen()
|
||||
if err != nil {
|
||||
logger.Err(err).Msg("Failed to listen for v2 messages")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -443,7 +446,7 @@ func (h *SyncLiveHandler) userCache(userID string) (*caches.UserCache, error) {
|
||||
// Implements E2EEFetcher
|
||||
// DeviceData returns the latest device data for this user. isInitial should be set if this is for
|
||||
// an initial /sync request.
|
||||
func (h *SyncLiveHandler) DeviceData(userID, deviceID string, isInitial bool) *internal.DeviceData {
|
||||
func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID string, isInitial bool) *internal.DeviceData {
|
||||
// We have 2 sources of DeviceData:
|
||||
// - pubsub updates stored in deviceDataMap
|
||||
// - the database itself
|
||||
@ -479,6 +482,7 @@ func (h *SyncLiveHandler) DeviceData(userID, deviceID string, isInitial bool) *i
|
||||
dd, err := h.Storage.DeviceDataTable.Select(userID, deviceID, shouldSwap)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Msg("failed to SelectAndSwap device data")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -505,6 +509,7 @@ func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate) {
|
||||
events, err := h.Storage.EventNIDs(p.EventNIDs)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", p.RoomID).Msg("Accumulate: failed to EventNIDs")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
if len(events) == 0 {
|
||||
@ -522,6 +527,7 @@ func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise) {
|
||||
state, err := h.Storage.StateSnapshot(p.SnapshotNID)
|
||||
if err != nil {
|
||||
logger.Err(err).Int64("snap", p.SnapshotNID).Str("room", p.RoomID).Msg("Initialise: failed to get StateSnapshot")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
// we have new state, notify caches
|
||||
@ -573,6 +579,7 @@ func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom) {
|
||||
inviteState, err := h.Storage.InvitesTable.SelectInviteState(p.UserID, p.RoomID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("failed to get invite state")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
userCache.(*caches.UserCache).OnInvite(ctx, p.RoomID, inviteState)
|
||||
@ -642,6 +649,7 @@ func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData) {
|
||||
data, err := h.Storage.AccountData(p.UserID, p.RoomID, p.Types)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", p.UserID).Str("room", p.RoomID).Msg("OnAccountData: failed to lookup")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
userCache.(*caches.UserCache).OnAccountData(ctx, data)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package sync3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
@ -175,7 +176,7 @@ func (s *InternalRequestLists) ListsByVisibleRoomIDs(muxedReqLists map[string]Re
|
||||
|
||||
// Assign a new list at the given key. If Overwrite, any existing list is replaced. If DoNotOverwrite, the existing
|
||||
// list is returned if one exists, else a new list is created. Returns the list and true if the list was overwritten.
|
||||
func (s *InternalRequestLists) AssignList(listKey string, filters *RequestFilters, sort []string, shouldOverwrite OverwriteVal) (*FilteredSortableRooms, bool) {
|
||||
func (s *InternalRequestLists) AssignList(ctx context.Context, listKey string, filters *RequestFilters, sort []string, shouldOverwrite OverwriteVal) (*FilteredSortableRooms, bool) {
|
||||
if shouldOverwrite == DoNotOverwrite {
|
||||
_, exists := s.lists[listKey]
|
||||
if exists {
|
||||
@ -194,6 +195,7 @@ func (s *InternalRequestLists) AssignList(listKey string, filters *RequestFilter
|
||||
err := roomList.Sort(sort)
|
||||
if err != nil {
|
||||
logger.Err(err).Strs("sort_by", sort).Msg("failed to sort")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
}
|
||||
s.lists[listKey] = roomList
|
||||
|
@ -1,6 +1,7 @@
|
||||
package sync3_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -23,7 +24,7 @@ func BenchmarkSortRooms(b *testing.B) {
|
||||
func sortRooms(n int) {
|
||||
list := sync3.NewInternalRequestLists()
|
||||
addRooms(list, n)
|
||||
list.AssignList("benchmark", &sync3.RequestFilters{}, []string{sync3.SortByRecency}, sync3.Overwrite)
|
||||
list.AssignList(context.Background(), "benchmark", &sync3.RequestFilters{}, []string{sync3.SortByRecency}, sync3.Overwrite)
|
||||
}
|
||||
|
||||
func addRooms(list *sync3.InternalRequestLists, n int) {
|
||||
|
@ -1,5 +1,10 @@
|
||||
package sync3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
)
|
||||
|
||||
type List interface {
|
||||
IndexOf(roomID string) (int, bool)
|
||||
Len() int64
|
||||
@ -24,7 +29,7 @@ type List interface {
|
||||
// [ "A" ] <--- []string, new room subscriptions, if it wasn't in the window before
|
||||
//
|
||||
// This function will modify List to Add/Delete/Sort appropriately.
|
||||
func CalculateListOps(reqList *RequestList, list List, roomID string, listOp ListOp) (ops []ResponseOp, subs []string) {
|
||||
func CalculateListOps(ctx context.Context, reqList *RequestList, list List, roomID string, listOp ListOp) (ops []ResponseOp, subs []string) {
|
||||
fromIndex, ok := list.IndexOf(roomID)
|
||||
if !ok {
|
||||
if listOp == ListOpAdd {
|
||||
@ -45,6 +50,7 @@ func CalculateListOps(reqList *RequestList, list List, roomID string, listOp Lis
|
||||
// this should only move exactly 1 room at most as this is called for every single update
|
||||
if err := list.Sort(reqList.Sort); err != nil {
|
||||
logger.Err(err).Msg("cannot sort list")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
// find the new position of this room
|
||||
toIndex, _ = list.IndexOf(roomID)
|
||||
@ -62,6 +68,7 @@ func CalculateListOps(reqList *RequestList, list List, roomID string, listOp Lis
|
||||
// this should only move exactly 1 room at most as this is called for every single update
|
||||
if err := list.Sort(reqList.Sort); err != nil {
|
||||
logger.Err(err).Msg("cannot sort list")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
// find the new position of this room
|
||||
toIndex, _ = list.IndexOf(roomID)
|
||||
|
@ -2,6 +2,7 @@ package sync3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"testing"
|
||||
@ -141,7 +142,7 @@ func TestCalculateListOps_BasicOperations(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sl := newStringList(tc.before)
|
||||
sl.sortedRoomIDs = tc.after
|
||||
gotOps, gotSubs := CalculateListOps(&RequestList{
|
||||
gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{
|
||||
Ranges: tc.ranges,
|
||||
}, sl, tc.roomID, tc.listOp)
|
||||
assertEqualOps(t, tc.name, gotOps, tc.wantOps)
|
||||
@ -301,7 +302,7 @@ func TestCalculateListOps_SingleWindowOperations(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sl := newStringList(tc.before)
|
||||
sl.sortedRoomIDs = tc.after
|
||||
gotOps, gotSubs := CalculateListOps(&RequestList{
|
||||
gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{
|
||||
Ranges: tc.ranges,
|
||||
}, sl, tc.roomID, tc.listOp)
|
||||
assertEqualOps(t, tc.name, gotOps, tc.wantOps)
|
||||
@ -501,7 +502,7 @@ func TestCalculateListOps_MultipleWindowOperations(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
sl := newStringList(tc.before)
|
||||
sl.sortedRoomIDs = tc.after
|
||||
gotOps, gotSubs := CalculateListOps(&RequestList{
|
||||
gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{
|
||||
Ranges: tc.ranges,
|
||||
}, sl, tc.roomID, tc.listOp)
|
||||
assertEqualOps(t, tc.name, gotOps, tc.wantOps)
|
||||
@ -522,7 +523,7 @@ func TestCalculateListOps_TortureSingleWindow_Move(t *testing.T) {
|
||||
|
||||
sl := newStringList(before)
|
||||
sl.sortedRoomIDs = after
|
||||
gotOps, gotSubs := CalculateListOps(&RequestList{
|
||||
gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{
|
||||
Ranges: ranges,
|
||||
}, sl, roomID, ListOpChange)
|
||||
|
||||
@ -589,7 +590,7 @@ func TestCalculateListOps_TortureSingleWindowMiddle_Move(t *testing.T) {
|
||||
|
||||
sl := newStringList(before)
|
||||
sl.sortedRoomIDs = after
|
||||
gotOps, gotSubs := CalculateListOps(&RequestList{
|
||||
gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{
|
||||
Ranges: ranges,
|
||||
}, sl, roomID, ListOpChange)
|
||||
|
||||
@ -664,7 +665,7 @@ func TestCalculateListOpsTortureMultipleWindowsMove(t *testing.T) {
|
||||
|
||||
sl := newStringList(before)
|
||||
sl.sortedRoomIDs = after
|
||||
gotOps, gotSubs := CalculateListOps(&RequestList{
|
||||
gotOps, gotSubs := CalculateListOps(context.Background(), &RequestList{
|
||||
Ranges: ranges,
|
||||
}, sl, roomID, ListOpChange)
|
||||
for _, sub := range gotSubs {
|
||||
|
Loading…
x
Reference in New Issue
Block a user