Merge pull request #134 from matrix-org/dmr/sentry-pollers

Associate user IDs and device IDs to poller sentry reports
This commit is contained in:
David Robertson 2023-06-05 11:34:21 +01:00 committed by GitHub
commit 2c497008e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 131 additions and 106 deletions

View File

@ -6,6 +6,10 @@ import (
"time"
)
// SentryCtxKey is a namespace under which we can report arbitrary key-value pairs to
// sentry.
const SentryCtxKey = "sliding-sync"
// GetSentryHubFromContextOrDefault is a version of sentry.GetHubFromContext which
// automatically falls back to sentry.CurrentHub if the given context has not been
// attached a hub.

View File

@ -1,6 +1,7 @@
package handler2
import (
"context"
"encoding/json"
"github.com/getsentry/sentry-go"
"hash/fnv"
@ -151,12 +152,16 @@ func (h *Handler) updateMetrics() {
h.numPollers.Set(float64(h.pMap.NumPollers()))
}
func (h *Handler) OnTerminated(userID, deviceID string) {
func (h *Handler) OnTerminated(ctx context.Context, userID, deviceID string) {
h.updateMetrics()
}
func (h *Handler) OnExpiredToken(accessTokenHash, userID, deviceID string) {
h.v2Store.TokensTable.Delete(accessTokenHash)
func (h *Handler) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
err := h.v2Store.TokensTable.Delete(accessTokenHash)
if err != nil {
logger.Err(err).Str("user", userID).Str("device", deviceID).Str("access_token_hash", accessTokenHash).Msg("V2: failed to expire token")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
// Notify v3 side so it can remove the connection from ConnMap
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2ExpiredToken{
UserID: userID,
@ -175,15 +180,15 @@ func (h *Handler) addPrometheusMetrics() {
}
// Emits nothing as no downstream components need it.
func (h *Handler) UpdateDeviceSince(userID, deviceID, since string) {
func (h *Handler) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
err := h.v2Store.DevicesTable.UpdateDeviceSince(userID, deviceID, since)
if err != nil {
logger.Err(err).Str("user", userID).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
}
func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
func (h *Handler) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
// some of these fields may be set
partialDD := internal.DeviceData{
UserID: userID,
@ -197,7 +202,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int,
nextPos, err := h.Store.DeviceDataTable.Upsert(&partialDD)
if err != nil {
logger.Err(err).Str("user", userID).Msg("failed to upsert device data")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceData{
@ -207,7 +212,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int,
})
}
func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
// Remember any transaction IDs that may be unique to this user
eventIDToTxnID := make(map[string]string, len(timeline)) // event_id -> txn_id
for _, e := range timeline {
@ -223,7 +228,7 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
err := h.Store.TransactionsTable.Insert(userID, deviceID, eventIDToTxnID)
if err != nil {
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
}
@ -231,7 +236,7 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
numNew, latestNIDs, err := h.Store.Accumulate(roomID, prevBatch, timeline)
if err != nil {
logger.Err(err).Int("timeline", len(timeline)).Str("room", roomID).Msg("V2: failed to accumulate room")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return
}
if numNew == 0 {
@ -245,11 +250,11 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
})
}
func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage {
res, err := h.Store.Initialise(roomID, state)
if err != nil {
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return nil
}
if res.AddedEvents {
@ -261,7 +266,7 @@ func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawM
return res.PrependTimelineEvents
}
func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) {
func (h *Handler) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
next := typingHash(ephEvent)
existing := h.typingMap[roomID]
if existing == next {
@ -276,13 +281,13 @@ func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) {
})
}
func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage) {
func (h *Handler) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
// update our records - we make an artifically new RR event if there are genuine changes
// else it returns nil
newReceipts, err := h.Store.ReceiptTable.Insert(roomID, ephEvent)
if err != nil {
logger.Err(err).Str("room", roomID).Msg("failed to store receipts")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return
}
if len(newReceipts) == 0 {
@ -294,11 +299,11 @@ func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.R
})
}
func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
func (h *Handler) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
_, err := h.Store.ToDeviceTable.InsertMessages(userID, deviceID, msgs)
if err != nil {
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceMessages{
UserID: userID,
@ -306,7 +311,7 @@ func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMe
})
}
func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
func (h *Handler) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
// only touch the DB and notify if they have changed. sync v2 will alwyas include the counts
// even if they haven't changed :(
key := roomID + userID
@ -333,7 +338,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti
err := h.Store.UnreadTable.UpdateUnreadCounters(userID, roomID, highlightCount, notifCount)
if err != nil {
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2UnreadCounts{
RoomID: roomID,
@ -343,7 +348,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti
})
}
func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage) {
func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
data, err := h.Store.InsertAccountData(userID, roomID, events)
if err != nil {
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data")
@ -361,11 +366,11 @@ func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage)
})
}
func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage) {
func (h *Handler) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
err := h.Store.InvitesTable.InsertInvite(userID, roomID, inviteState)
if err != nil {
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InviteRoom{
@ -374,12 +379,12 @@ func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage)
})
}
func (h *Handler) OnLeftRoom(userID, roomID string) {
func (h *Handler) OnLeftRoom(ctx context.Context, userID, roomID string) {
// remove any invites for this user if they are rejecting an invite
err := h.Store.InvitesTable.RemoveInvite(userID, roomID)
if err != nil {
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite")
sentry.CaptureException(err)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2LeaveRoom{
UserID: userID,

View File

@ -25,32 +25,32 @@ var timeSleep = time.Sleep
// V2DataReceiver is the receiver for all the v2 sync data the poller gets
type V2DataReceiver interface {
// Update the since token for this device. Called AFTER all other data in this sync response has been processed.
UpdateDeviceSince(userID, deviceID, since string)
UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
// Accumulate data for this room. This means the timeline section of the v2 response.
Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
Initialise(roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
// SetTyping indicates which users are typing.
SetTyping(roomID string, ephEvent json.RawMessage)
SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage)
// Sent when there is a new receipt
OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage)
OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)
// AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering.
AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
// UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room.
UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)
UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
// Set the latest account data for this user.
OnAccountData(userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
// Sent when there is a room in the `invite` section of the v2 response.
OnInvite(userID, roomID string, inviteState []json.RawMessage) // invitestate in db
OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) // invitestate in db
// Sent when there is a room in the `leave` section of the v2 response.
OnLeftRoom(userID, roomID string)
OnLeftRoom(ctx context.Context, userID, roomID string)
// Sent when there is a _change_ in E2EE data, not all the time
OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
// Sent when the poll loop terminates
OnTerminated(userID, deviceID string)
OnTerminated(ctx context.Context, userID, deviceID string)
// Sent when the token gets a 401 response
OnExpiredToken(accessTokenHash, userID, deviceID string)
OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
}
// PollerMap is a map of device ID to Poller
@ -211,52 +211,52 @@ func (h *PollerMap) execute() {
}
}
func (h *PollerMap) UpdateDeviceSince(userID, deviceID, since string) {
h.callbacks.UpdateDeviceSince(userID, deviceID, since)
func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
h.callbacks.UpdateDeviceSince(ctx, userID, deviceID, since)
}
func (h *PollerMap) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.Accumulate(userID, deviceID, roomID, prevBatch, timeline)
h.callbacks.Accumulate(ctx, userID, deviceID, roomID, prevBatch, timeline)
wg.Done()
}
wg.Wait()
}
func (h *PollerMap) Initialise(roomID string, state []json.RawMessage) (result []json.RawMessage) {
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
result = h.callbacks.Initialise(roomID, state)
result = h.callbacks.Initialise(ctx, roomID, state)
wg.Done()
}
wg.Wait()
return
}
func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage) {
func (h *PollerMap) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.SetTyping(roomID, ephEvent)
h.callbacks.SetTyping(ctx, roomID, ephEvent)
wg.Done()
}
wg.Wait()
}
func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage) {
func (h *PollerMap) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.OnInvite(userID, roomID, inviteState)
h.callbacks.OnInvite(ctx, userID, roomID, inviteState)
wg.Done()
}
wg.Wait()
}
func (h *PollerMap) OnLeftRoom(userID, roomID string) {
func (h *PollerMap) OnLeftRoom(ctx context.Context, userID, roomID string) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.OnLeftRoom(userID, roomID)
h.callbacks.OnLeftRoom(ctx, userID, roomID)
wg.Done()
}
wg.Wait()
@ -264,53 +264,53 @@ func (h *PollerMap) OnLeftRoom(userID, roomID string) {
// Add messages for this device. If an error is returned, the poll loop is terminated as continuing
// would implicitly acknowledge these messages.
func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
h.callbacks.AddToDeviceMessages(userID, deviceID, msgs)
func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
h.callbacks.AddToDeviceMessages(ctx, userID, deviceID, msgs)
}
func (h *PollerMap) OnTerminated(userID, deviceID string) {
h.callbacks.OnTerminated(userID, deviceID)
func (h *PollerMap) OnTerminated(ctx context.Context, userID, deviceID string) {
h.callbacks.OnTerminated(ctx, userID, deviceID)
}
func (h *PollerMap) OnExpiredToken(accessTokenHash, userID, deviceID string) {
h.callbacks.OnExpiredToken(accessTokenHash, userID, deviceID)
func (h *PollerMap) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
h.callbacks.OnExpiredToken(ctx, accessTokenHash, userID, deviceID)
}
func (h *PollerMap) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
func (h *PollerMap) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.UpdateUnreadCounts(roomID, userID, highlightCount, notifCount)
h.callbacks.UpdateUnreadCounts(ctx, roomID, userID, highlightCount, notifCount)
wg.Done()
}
wg.Wait()
}
func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage) {
func (h *PollerMap) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.OnAccountData(userID, roomID, events)
h.callbacks.OnAccountData(ctx, userID, roomID, events)
wg.Done()
}
wg.Wait()
}
func (h *PollerMap) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage) {
func (h *PollerMap) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.OnReceipt(userID, roomID, ephEventType, ephEvent)
h.callbacks.OnReceipt(ctx, userID, roomID, ephEventType, ephEvent)
wg.Done()
}
wg.Wait()
}
func (h *PollerMap) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
func (h *PollerMap) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
h.callbacks.OnE2EEData(userID, deviceID, otkCounts, fallbackKeyTypes, deviceListChanges)
h.callbacks.OnE2EEData(ctx, userID, deviceID, otkCounts, fallbackKeyTypes, deviceListChanges)
wg.Done()
}
wg.Wait()
@ -369,9 +369,22 @@ func (p *poller) Terminate() {
// Returns if the access token gets invalidated or if there was a fatal error processing v2 responses.
// Use WaitUntilInitialSync() to wait until the first poll has been processed.
func (p *poller) Poll(since string) {
// Basing the sentry-wrangling on the sentry-go net/http integration, see e.g.
// https://github.com/getsentry/sentry-go/blob/02e712a638c40cd9701ad52d5d1309d65d556ef9/http/sentryhttp.go#L84
// TODO is this the correct way to create hub? Should the cloning be done by the
// caller and passed down?
hub := sentry.CurrentHub().Clone()
hub.ConfigureScope(func(scope *sentry.Scope) {
scope.SetUser(sentry.User{Username: p.userID})
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"device_id": p.deviceID,
})
})
ctx := sentry.SetHubOnContext(context.Background(), hub)
p.logger.Info().Str("since", since).Msg("Poller: v2 poll loop started")
defer func() {
p.receiver.OnTerminated(p.userID, p.deviceID)
p.receiver.OnTerminated(ctx, p.userID, p.deviceID)
}()
failCount := 0
firstTime := true
@ -388,7 +401,7 @@ func (p *poller) Poll(since string) {
break
}
start := time.Now()
resp, statusCode, err := p.client.DoSyncV2(context.Background(), p.accessToken, since, firstTime, p.initialToDeviceOnly)
resp, statusCode, err := p.client.DoSyncV2(ctx, p.accessToken, since, firstTime, p.initialToDeviceOnly)
p.trackRequestDuration(time.Since(start), since == "", firstTime)
if p.terminated.Load() {
break
@ -401,7 +414,7 @@ func (p *poller) Poll(since string) {
continue
} else {
p.logger.Warn().Msg("Poller: access token has been invalidated, terminating loop")
p.receiver.OnExpiredToken(hashToken(p.accessToken), p.userID, p.deviceID)
p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID)
p.Terminate()
break
}
@ -412,17 +425,17 @@ func (p *poller) Poll(since string) {
p.initialToDeviceOnly = false
start = time.Now()
failCount = 0
p.parseE2EEData(resp)
p.parseGlobalAccountData(resp)
p.parseRoomsResponse(resp)
p.parseToDeviceMessages(resp)
p.parseE2EEData(ctx, resp)
p.parseGlobalAccountData(ctx, resp)
p.parseRoomsResponse(ctx, resp)
p.parseToDeviceMessages(ctx, resp)
wasInitial := since == ""
wasFirst := firstTime
since = resp.NextBatch
// persist the since token (TODO: this could get slow if we hammer the DB too much)
p.receiver.UpdateDeviceSince(p.userID, p.deviceID, since)
p.receiver.UpdateDeviceSince(ctx, p.userID, p.deviceID, since)
if firstTime {
firstTime = false
@ -466,14 +479,14 @@ func labels(isInitial, isFirst bool) []string {
return l
}
func (p *poller) parseToDeviceMessages(res *SyncResponse) {
func (p *poller) parseToDeviceMessages(ctx context.Context, res *SyncResponse) {
if len(res.ToDevice.Events) == 0 {
return
}
p.receiver.AddToDeviceMessages(p.userID, p.deviceID, res.ToDevice.Events)
p.receiver.AddToDeviceMessages(ctx, p.userID, p.deviceID, res.ToDevice.Events)
}
func (p *poller) parseE2EEData(res *SyncResponse) {
func (p *poller) parseE2EEData(ctx context.Context, res *SyncResponse) {
var changedOTKCounts map[string]int
if res.DeviceListsOTKCount != nil && len(res.DeviceListsOTKCount) > 0 {
if len(p.otkCounts) != len(res.DeviceListsOTKCount) {
@ -506,18 +519,18 @@ func (p *poller) parseE2EEData(res *SyncResponse) {
deviceListChanges := internal.ToDeviceListChangesMap(res.DeviceLists.Changed, res.DeviceLists.Left)
if deviceListChanges != nil || changedFallbackTypes != nil || changedOTKCounts != nil {
p.receiver.OnE2EEData(p.userID, p.deviceID, changedOTKCounts, changedFallbackTypes, deviceListChanges)
p.receiver.OnE2EEData(ctx, p.userID, p.deviceID, changedOTKCounts, changedFallbackTypes, deviceListChanges)
}
}
func (p *poller) parseGlobalAccountData(res *SyncResponse) {
func (p *poller) parseGlobalAccountData(ctx context.Context, res *SyncResponse) {
if len(res.AccountData.Events) == 0 {
return
}
p.receiver.OnAccountData(p.userID, AccountDataGlobalRoom, res.AccountData.Events)
p.receiver.OnAccountData(ctx, p.userID, AccountDataGlobalRoom, res.AccountData.Events)
}
func (p *poller) parseRoomsResponse(res *SyncResponse) {
func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) {
stateCalls := 0
timelineCalls := 0
typingCalls := 0
@ -525,7 +538,7 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
for roomID, roomData := range res.Rooms.Join {
if len(roomData.State.Events) > 0 {
stateCalls++
prependStateEvents := p.receiver.Initialise(roomID, roomData.State.Events)
prependStateEvents := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if len(prependStateEvents) > 0 {
// The poller has just learned of these state events due to an
// incremental poller sync; we must have missed the opportunity to see
@ -534,12 +547,13 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
// correct room state.
const warnMsg = "parseRoomsResponse: prepending state events to timeline after gappy poll"
logger.Warn().Str("room_id", roomID).Int("prependStateEvents", len(prependStateEvents)).Msg(warnMsg)
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext("sliding-sync", map[string]interface{}{
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"num_prepend_state_events": len(prependStateEvents),
})
sentry.CaptureMessage(warnMsg)
hub.CaptureMessage(warnMsg)
})
roomData.Timeline.Events = append(prependStateEvents, roomData.Timeline.Events...)
}
@ -550,42 +564,40 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
switch ephEventType {
case "m.typing":
typingCalls++
p.receiver.SetTyping(roomID, ephEvent)
p.receiver.SetTyping(ctx, roomID, ephEvent)
case "m.receipt":
receiptCalls++
p.receiver.OnReceipt(p.userID, roomID, ephEventType, ephEvent)
p.receiver.OnReceipt(ctx, p.userID, roomID, ephEventType, ephEvent)
}
}
// process account data
if len(roomData.AccountData.Events) > 0 {
p.receiver.OnAccountData(p.userID, roomID, roomData.AccountData.Events)
p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events)
}
if len(roomData.Timeline.Events) > 0 {
timelineCalls++
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
p.receiver.Accumulate(p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
}
// process unread counts AFTER events so global caches have been updated by the time this metadata is added.
// Previously we did this BEFORE events so we atomically showed the event and the unread count in one go, but
// this could cause clients to de-sync: see TestUnreadCountMisordering integration test.
if roomData.UnreadNotifications.HighlightCount != nil || roomData.UnreadNotifications.NotificationCount != nil {
p.receiver.UpdateUnreadCounts(
roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount,
)
p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount)
}
}
for roomID, roomData := range res.Rooms.Leave {
// TODO: do we care about state?
if len(roomData.Timeline.Events) > 0 {
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
p.receiver.Accumulate(p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
}
p.receiver.OnLeftRoom(p.userID, roomID)
p.receiver.OnLeftRoom(ctx, p.userID, roomID)
}
for roomID, roomData := range res.Rooms.Invite {
p.receiver.OnInvite(p.userID, roomID, roomData.InviteState.Events)
p.receiver.OnInvite(ctx, p.userID, roomID, roomData.InviteState.Events)
}
var l *zerolog.Event
if len(res.Rooms.Invite) > 0 || len(res.Rooms.Join) > 0 {

View File

@ -460,10 +460,10 @@ type mockDataReceiver struct {
unblockProcess chan struct{}
}
func (a *mockDataReceiver) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
a.timelines[roomID] = append(a.timelines[roomID], timeline...)
}
func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
func (a *mockDataReceiver) Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage {
a.states[roomID] = state
if a.incomingProcess != nil {
a.incomingProcess <- struct{}{}
@ -475,24 +475,28 @@ func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []
// timeline. Untested here---return nil for now.
return nil
}
func (a *mockDataReceiver) SetTyping(roomID string, ephEvent json.RawMessage) {
func (a *mockDataReceiver) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
}
func (s *mockDataReceiver) UpdateDeviceSince(userID, deviceID, since string) {
func (s *mockDataReceiver) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
s.pollerIDToSince[PollerID{UserID: userID, DeviceID: deviceID}] = since
}
func (s *mockDataReceiver) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
func (s *mockDataReceiver) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
}
func (s *mockDataReceiver) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
func (s *mockDataReceiver) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
}
func (s *mockDataReceiver) OnAccountData(userID, roomID string, events []json.RawMessage) {}
func (s *mockDataReceiver) OnReceipt(userID, roomID, ephEvenType string, ephEvent json.RawMessage) {}
func (s *mockDataReceiver) OnInvite(userID, roomID string, inviteState []json.RawMessage) {}
func (s *mockDataReceiver) OnLeftRoom(userID, roomID string) {}
func (s *mockDataReceiver) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
func (s *mockDataReceiver) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
}
func (s *mockDataReceiver) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
}
func (s *mockDataReceiver) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
}
func (s *mockDataReceiver) OnLeftRoom(ctx context.Context, userID, roomID string) {}
func (s *mockDataReceiver) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
}
func (s *mockDataReceiver) OnTerminated(ctx context.Context, userID, deviceID string) {}
func (s *mockDataReceiver) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
}
func (s *mockDataReceiver) OnTerminated(userID, deviceID string) {}
func (s *mockDataReceiver) OnExpiredToken(accessTokenHash, userID, deviceID string) {}
func newMocks(doSyncV2 func(authHeader, since string) (*SyncResponse, int, error)) (*mockDataReceiver, *mockClient) {
client := &mockClient{

View File

@ -127,7 +127,7 @@ func NewInviteData(ctx context.Context, userID, roomID string, inviteState []jso
logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg)
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext("sliding-sync", map[string]interface{}{
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"invitee": userID,
"room": roomID,
"num_invite_state": len(inviteState),