mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Plumb a ctx through to sync2
Thank God for Goland's refactoring tools. This will (untested) associate sentry events from the sync2 part of the code with User IDs and Device IDs, without having to constantly invoke sentry.WithScope(). (Not all of the handler methods currently have that information.) It also leaves the door open for us to include more data on poller sentry reports (e.g. access token hash, time of last token activity on the sync3 side, ...)
This commit is contained in:
parent
cb75672132
commit
e5eb4f12ba
@ -1,6 +1,7 @@
|
||||
package handler2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"hash/fnv"
|
||||
@ -151,12 +152,16 @@ func (h *Handler) updateMetrics() {
|
||||
h.numPollers.Set(float64(h.pMap.NumPollers()))
|
||||
}
|
||||
|
||||
func (h *Handler) OnTerminated(userID, deviceID string) {
|
||||
func (h *Handler) OnTerminated(ctx context.Context, userID, deviceID string) {
|
||||
h.updateMetrics()
|
||||
}
|
||||
|
||||
func (h *Handler) OnExpiredToken(accessTokenHash, userID, deviceID string) {
|
||||
h.v2Store.TokensTable.Delete(accessTokenHash)
|
||||
func (h *Handler) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
|
||||
err := h.v2Store.TokensTable.Delete(accessTokenHash)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Str("access_token_hash", accessTokenHash).Msg("V2: failed to expire token")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
// Notify v3 side so it can remove the connection from ConnMap
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2ExpiredToken{
|
||||
UserID: userID,
|
||||
@ -175,15 +180,15 @@ func (h *Handler) addPrometheusMetrics() {
|
||||
}
|
||||
|
||||
// Emits nothing as no downstream components need it.
|
||||
func (h *Handler) UpdateDeviceSince(userID, deviceID, since string) {
|
||||
func (h *Handler) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
|
||||
err := h.v2Store.DevicesTable.UpdateDeviceSince(userID, deviceID, since)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
func (h *Handler) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
// some of these fields may be set
|
||||
partialDD := internal.DeviceData{
|
||||
UserID: userID,
|
||||
@ -197,7 +202,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int,
|
||||
nextPos, err := h.Store.DeviceDataTable.Upsert(&partialDD)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Msg("failed to upsert device data")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceData{
|
||||
@ -207,7 +212,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
// Remember any transaction IDs that may be unique to this user
|
||||
eventIDToTxnID := make(map[string]string, len(timeline)) // event_id -> txn_id
|
||||
for _, e := range timeline {
|
||||
@ -223,7 +228,7 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
|
||||
err := h.Store.TransactionsTable.Insert(userID, deviceID, eventIDToTxnID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,7 +236,7 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
|
||||
numNew, latestNIDs, err := h.Store.Accumulate(roomID, prevBatch, timeline)
|
||||
if err != nil {
|
||||
logger.Err(err).Int("timeline", len(timeline)).Str("room", roomID).Msg("V2: failed to accumulate room")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
if numNew == 0 {
|
||||
@ -245,11 +250,11 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
res, err := h.Store.Initialise(roomID, state)
|
||||
if err != nil {
|
||||
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
if res.AddedEvents {
|
||||
@ -261,7 +266,7 @@ func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawM
|
||||
return res.PrependTimelineEvents
|
||||
}
|
||||
|
||||
func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
func (h *Handler) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
|
||||
next := typingHash(ephEvent)
|
||||
existing := h.typingMap[roomID]
|
||||
if existing == next {
|
||||
@ -276,13 +281,13 @@ func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
func (h *Handler) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
// update our records - we make an artifically new RR event if there are genuine changes
|
||||
// else it returns nil
|
||||
newReceipts, err := h.Store.ReceiptTable.Insert(roomID, ephEvent)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", roomID).Msg("failed to store receipts")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
if len(newReceipts) == 0 {
|
||||
@ -294,11 +299,11 @@ func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.R
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
|
||||
func (h *Handler) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
|
||||
_, err := h.Store.ToDeviceTable.InsertMessages(userID, deviceID, msgs)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceMessages{
|
||||
UserID: userID,
|
||||
@ -306,7 +311,7 @@ func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMe
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
func (h *Handler) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
|
||||
// only touch the DB and notify if they have changed. sync v2 will alwyas include the counts
|
||||
// even if they haven't changed :(
|
||||
key := roomID + userID
|
||||
@ -333,7 +338,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti
|
||||
err := h.Store.UnreadTable.UpdateUnreadCounters(userID, roomID, highlightCount, notifCount)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2UnreadCounts{
|
||||
RoomID: roomID,
|
||||
@ -343,7 +348,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage) {
|
||||
func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
|
||||
data, err := h.Store.InsertAccountData(userID, roomID, events)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data")
|
||||
@ -361,11 +366,11 @@ func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage)
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage) {
|
||||
func (h *Handler) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
|
||||
err := h.Store.InvitesTable.InsertInvite(userID, roomID, inviteState)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InviteRoom{
|
||||
@ -374,12 +379,12 @@ func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage)
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnLeftRoom(userID, roomID string) {
|
||||
func (h *Handler) OnLeftRoom(ctx context.Context, userID, roomID string) {
|
||||
// remove any invites for this user if they are rejecting an invite
|
||||
err := h.Store.InvitesTable.RemoveInvite(userID, roomID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2LeaveRoom{
|
||||
UserID: userID,
|
||||
|
131
sync2/poller.go
131
sync2/poller.go
@ -25,32 +25,32 @@ var timeSleep = time.Sleep
|
||||
// V2DataReceiver is the receiver for all the v2 sync data the poller gets
|
||||
type V2DataReceiver interface {
|
||||
// Update the since token for this device. Called AFTER all other data in this sync response has been processed.
|
||||
UpdateDeviceSince(userID, deviceID, since string)
|
||||
UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
|
||||
// Accumulate data for this room. This means the timeline section of the v2 response.
|
||||
Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
|
||||
Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
|
||||
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
|
||||
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
|
||||
Initialise(roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
|
||||
Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
|
||||
// SetTyping indicates which users are typing.
|
||||
SetTyping(roomID string, ephEvent json.RawMessage)
|
||||
SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage)
|
||||
// Sent when there is a new receipt
|
||||
OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage)
|
||||
OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)
|
||||
// AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering.
|
||||
AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
|
||||
AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
|
||||
// UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room.
|
||||
UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)
|
||||
UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
|
||||
// Set the latest account data for this user.
|
||||
OnAccountData(userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
|
||||
OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
|
||||
// Sent when there is a room in the `invite` section of the v2 response.
|
||||
OnInvite(userID, roomID string, inviteState []json.RawMessage) // invitestate in db
|
||||
OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) // invitestate in db
|
||||
// Sent when there is a room in the `leave` section of the v2 response.
|
||||
OnLeftRoom(userID, roomID string)
|
||||
OnLeftRoom(ctx context.Context, userID, roomID string)
|
||||
// Sent when there is a _change_ in E2EE data, not all the time
|
||||
OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
|
||||
OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
|
||||
// Sent when the poll loop terminates
|
||||
OnTerminated(userID, deviceID string)
|
||||
OnTerminated(ctx context.Context, userID, deviceID string)
|
||||
// Sent when the token gets a 401 response
|
||||
OnExpiredToken(accessTokenHash, userID, deviceID string)
|
||||
OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
|
||||
}
|
||||
|
||||
// PollerMap is a map of device ID to Poller
|
||||
@ -211,52 +211,52 @@ func (h *PollerMap) execute() {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *PollerMap) UpdateDeviceSince(userID, deviceID, since string) {
|
||||
h.callbacks.UpdateDeviceSince(userID, deviceID, since)
|
||||
func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
|
||||
h.callbacks.UpdateDeviceSince(ctx, userID, deviceID, since)
|
||||
}
|
||||
func (h *PollerMap) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.Accumulate(userID, deviceID, roomID, prevBatch, timeline)
|
||||
h.callbacks.Accumulate(ctx, userID, deviceID, roomID, prevBatch, timeline)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
func (h *PollerMap) Initialise(roomID string, state []json.RawMessage) (result []json.RawMessage) {
|
||||
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
result = h.callbacks.Initialise(roomID, state)
|
||||
result = h.callbacks.Initialise(ctx, roomID, state)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
func (h *PollerMap) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.SetTyping(roomID, ephEvent)
|
||||
h.callbacks.SetTyping(ctx, roomID, ephEvent)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage) {
|
||||
func (h *PollerMap) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnInvite(userID, roomID, inviteState)
|
||||
h.callbacks.OnInvite(ctx, userID, roomID, inviteState)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnLeftRoom(userID, roomID string) {
|
||||
func (h *PollerMap) OnLeftRoom(ctx context.Context, userID, roomID string) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnLeftRoom(userID, roomID)
|
||||
h.callbacks.OnLeftRoom(ctx, userID, roomID)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
@ -264,53 +264,53 @@ func (h *PollerMap) OnLeftRoom(userID, roomID string) {
|
||||
|
||||
// Add messages for this device. If an error is returned, the poll loop is terminated as continuing
|
||||
// would implicitly acknowledge these messages.
|
||||
func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
|
||||
h.callbacks.AddToDeviceMessages(userID, deviceID, msgs)
|
||||
func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
|
||||
h.callbacks.AddToDeviceMessages(ctx, userID, deviceID, msgs)
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnTerminated(userID, deviceID string) {
|
||||
h.callbacks.OnTerminated(userID, deviceID)
|
||||
func (h *PollerMap) OnTerminated(ctx context.Context, userID, deviceID string) {
|
||||
h.callbacks.OnTerminated(ctx, userID, deviceID)
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnExpiredToken(accessTokenHash, userID, deviceID string) {
|
||||
h.callbacks.OnExpiredToken(accessTokenHash, userID, deviceID)
|
||||
func (h *PollerMap) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
|
||||
h.callbacks.OnExpiredToken(ctx, accessTokenHash, userID, deviceID)
|
||||
}
|
||||
|
||||
func (h *PollerMap) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
func (h *PollerMap) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.UpdateUnreadCounts(roomID, userID, highlightCount, notifCount)
|
||||
h.callbacks.UpdateUnreadCounts(ctx, roomID, userID, highlightCount, notifCount)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage) {
|
||||
func (h *PollerMap) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnAccountData(userID, roomID, events)
|
||||
h.callbacks.OnAccountData(ctx, userID, roomID, events)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
func (h *PollerMap) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnReceipt(userID, roomID, ephEventType, ephEvent)
|
||||
h.callbacks.OnReceipt(ctx, userID, roomID, ephEventType, ephEvent)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
func (h *PollerMap) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnE2EEData(userID, deviceID, otkCounts, fallbackKeyTypes, deviceListChanges)
|
||||
h.callbacks.OnE2EEData(ctx, userID, deviceID, otkCounts, fallbackKeyTypes, deviceListChanges)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
@ -384,7 +384,7 @@ func (p *poller) Poll(since string) {
|
||||
|
||||
p.logger.Info().Str("since", since).Msg("Poller: v2 poll loop started")
|
||||
defer func() {
|
||||
p.receiver.OnTerminated(p.userID, p.deviceID)
|
||||
p.receiver.OnTerminated(ctx, p.userID, p.deviceID)
|
||||
}()
|
||||
failCount := 0
|
||||
firstTime := true
|
||||
@ -414,7 +414,7 @@ func (p *poller) Poll(since string) {
|
||||
continue
|
||||
} else {
|
||||
p.logger.Warn().Msg("Poller: access token has been invalidated, terminating loop")
|
||||
p.receiver.OnExpiredToken(hashToken(p.accessToken), p.userID, p.deviceID)
|
||||
p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID)
|
||||
p.Terminate()
|
||||
break
|
||||
}
|
||||
@ -425,17 +425,17 @@ func (p *poller) Poll(since string) {
|
||||
p.initialToDeviceOnly = false
|
||||
start = time.Now()
|
||||
failCount = 0
|
||||
p.parseE2EEData(resp)
|
||||
p.parseGlobalAccountData(resp)
|
||||
p.parseRoomsResponse(resp)
|
||||
p.parseToDeviceMessages(resp)
|
||||
p.parseE2EEData(ctx, resp)
|
||||
p.parseGlobalAccountData(ctx, resp)
|
||||
p.parseRoomsResponse(ctx, resp)
|
||||
p.parseToDeviceMessages(ctx, resp)
|
||||
|
||||
wasInitial := since == ""
|
||||
wasFirst := firstTime
|
||||
|
||||
since = resp.NextBatch
|
||||
// persist the since token (TODO: this could get slow if we hammer the DB too much)
|
||||
p.receiver.UpdateDeviceSince(p.userID, p.deviceID, since)
|
||||
p.receiver.UpdateDeviceSince(ctx, p.userID, p.deviceID, since)
|
||||
|
||||
if firstTime {
|
||||
firstTime = false
|
||||
@ -479,14 +479,14 @@ func labels(isInitial, isFirst bool) []string {
|
||||
return l
|
||||
}
|
||||
|
||||
func (p *poller) parseToDeviceMessages(res *SyncResponse) {
|
||||
func (p *poller) parseToDeviceMessages(ctx context.Context, res *SyncResponse) {
|
||||
if len(res.ToDevice.Events) == 0 {
|
||||
return
|
||||
}
|
||||
p.receiver.AddToDeviceMessages(p.userID, p.deviceID, res.ToDevice.Events)
|
||||
p.receiver.AddToDeviceMessages(ctx, p.userID, p.deviceID, res.ToDevice.Events)
|
||||
}
|
||||
|
||||
func (p *poller) parseE2EEData(res *SyncResponse) {
|
||||
func (p *poller) parseE2EEData(ctx context.Context, res *SyncResponse) {
|
||||
var changedOTKCounts map[string]int
|
||||
if res.DeviceListsOTKCount != nil && len(res.DeviceListsOTKCount) > 0 {
|
||||
if len(p.otkCounts) != len(res.DeviceListsOTKCount) {
|
||||
@ -519,18 +519,18 @@ func (p *poller) parseE2EEData(res *SyncResponse) {
|
||||
deviceListChanges := internal.ToDeviceListChangesMap(res.DeviceLists.Changed, res.DeviceLists.Left)
|
||||
|
||||
if deviceListChanges != nil || changedFallbackTypes != nil || changedOTKCounts != nil {
|
||||
p.receiver.OnE2EEData(p.userID, p.deviceID, changedOTKCounts, changedFallbackTypes, deviceListChanges)
|
||||
p.receiver.OnE2EEData(ctx, p.userID, p.deviceID, changedOTKCounts, changedFallbackTypes, deviceListChanges)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *poller) parseGlobalAccountData(res *SyncResponse) {
|
||||
func (p *poller) parseGlobalAccountData(ctx context.Context, res *SyncResponse) {
|
||||
if len(res.AccountData.Events) == 0 {
|
||||
return
|
||||
}
|
||||
p.receiver.OnAccountData(p.userID, AccountDataGlobalRoom, res.AccountData.Events)
|
||||
p.receiver.OnAccountData(ctx, p.userID, AccountDataGlobalRoom, res.AccountData.Events)
|
||||
}
|
||||
|
||||
func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) {
|
||||
stateCalls := 0
|
||||
timelineCalls := 0
|
||||
typingCalls := 0
|
||||
@ -538,7 +538,7 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
for roomID, roomData := range res.Rooms.Join {
|
||||
if len(roomData.State.Events) > 0 {
|
||||
stateCalls++
|
||||
prependStateEvents := p.receiver.Initialise(roomID, roomData.State.Events)
|
||||
prependStateEvents := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
|
||||
if len(prependStateEvents) > 0 {
|
||||
// The poller has just learned of these state events due to an
|
||||
// incremental poller sync; we must have missed the opportunity to see
|
||||
@ -547,12 +547,13 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
// correct room state.
|
||||
const warnMsg = "parseRoomsResponse: prepending state events to timeline after gappy poll"
|
||||
logger.Warn().Str("room_id", roomID).Int("prependStateEvents", len(prependStateEvents)).Msg(warnMsg)
|
||||
sentry.WithScope(func(scope *sentry.Scope) {
|
||||
hub := internal.GetSentryHubFromContextOrDefault(ctx)
|
||||
hub.WithScope(func(scope *sentry.Scope) {
|
||||
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
||||
"room_id": roomID,
|
||||
"num_prepend_state_events": len(prependStateEvents),
|
||||
})
|
||||
sentry.CaptureMessage(warnMsg)
|
||||
hub.CaptureMessage(warnMsg)
|
||||
})
|
||||
roomData.Timeline.Events = append(prependStateEvents, roomData.Timeline.Events...)
|
||||
}
|
||||
@ -563,42 +564,40 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
switch ephEventType {
|
||||
case "m.typing":
|
||||
typingCalls++
|
||||
p.receiver.SetTyping(roomID, ephEvent)
|
||||
p.receiver.SetTyping(ctx, roomID, ephEvent)
|
||||
case "m.receipt":
|
||||
receiptCalls++
|
||||
p.receiver.OnReceipt(p.userID, roomID, ephEventType, ephEvent)
|
||||
p.receiver.OnReceipt(ctx, p.userID, roomID, ephEventType, ephEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// process account data
|
||||
if len(roomData.AccountData.Events) > 0 {
|
||||
p.receiver.OnAccountData(p.userID, roomID, roomData.AccountData.Events)
|
||||
p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events)
|
||||
}
|
||||
if len(roomData.Timeline.Events) > 0 {
|
||||
timelineCalls++
|
||||
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
|
||||
p.receiver.Accumulate(p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
}
|
||||
|
||||
// process unread counts AFTER events so global caches have been updated by the time this metadata is added.
|
||||
// Previously we did this BEFORE events so we atomically showed the event and the unread count in one go, but
|
||||
// this could cause clients to de-sync: see TestUnreadCountMisordering integration test.
|
||||
if roomData.UnreadNotifications.HighlightCount != nil || roomData.UnreadNotifications.NotificationCount != nil {
|
||||
p.receiver.UpdateUnreadCounts(
|
||||
roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount,
|
||||
)
|
||||
p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount)
|
||||
}
|
||||
}
|
||||
for roomID, roomData := range res.Rooms.Leave {
|
||||
// TODO: do we care about state?
|
||||
if len(roomData.Timeline.Events) > 0 {
|
||||
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
|
||||
p.receiver.Accumulate(p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
}
|
||||
p.receiver.OnLeftRoom(p.userID, roomID)
|
||||
p.receiver.OnLeftRoom(ctx, p.userID, roomID)
|
||||
}
|
||||
for roomID, roomData := range res.Rooms.Invite {
|
||||
p.receiver.OnInvite(p.userID, roomID, roomData.InviteState.Events)
|
||||
p.receiver.OnInvite(ctx, p.userID, roomID, roomData.InviteState.Events)
|
||||
}
|
||||
var l *zerolog.Event
|
||||
if len(res.Rooms.Invite) > 0 || len(res.Rooms.Join) > 0 {
|
||||
|
@ -460,10 +460,10 @@ type mockDataReceiver struct {
|
||||
unblockProcess chan struct{}
|
||||
}
|
||||
|
||||
func (a *mockDataReceiver) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
a.timelines[roomID] = append(a.timelines[roomID], timeline...)
|
||||
}
|
||||
func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
func (a *mockDataReceiver) Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
a.states[roomID] = state
|
||||
if a.incomingProcess != nil {
|
||||
a.incomingProcess <- struct{}{}
|
||||
@ -475,24 +475,28 @@ func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []
|
||||
// timeline. Untested here---return nil for now.
|
||||
return nil
|
||||
}
|
||||
func (a *mockDataReceiver) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
func (a *mockDataReceiver) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) UpdateDeviceSince(userID, deviceID, since string) {
|
||||
func (s *mockDataReceiver) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
|
||||
s.pollerIDToSince[PollerID{UserID: userID, DeviceID: deviceID}] = since
|
||||
}
|
||||
func (s *mockDataReceiver) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
|
||||
func (s *mockDataReceiver) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
|
||||
}
|
||||
|
||||
func (s *mockDataReceiver) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
func (s *mockDataReceiver) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnAccountData(userID, roomID string, events []json.RawMessage) {}
|
||||
func (s *mockDataReceiver) OnReceipt(userID, roomID, ephEvenType string, ephEvent json.RawMessage) {}
|
||||
func (s *mockDataReceiver) OnInvite(userID, roomID string, inviteState []json.RawMessage) {}
|
||||
func (s *mockDataReceiver) OnLeftRoom(userID, roomID string) {}
|
||||
func (s *mockDataReceiver) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
func (s *mockDataReceiver) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnLeftRoom(ctx context.Context, userID, roomID string) {}
|
||||
func (s *mockDataReceiver) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnTerminated(ctx context.Context, userID, deviceID string) {}
|
||||
func (s *mockDataReceiver) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnTerminated(userID, deviceID string) {}
|
||||
func (s *mockDataReceiver) OnExpiredToken(accessTokenHash, userID, deviceID string) {}
|
||||
|
||||
func newMocks(doSyncV2 func(authHeader, since string) (*SyncResponse, int, error)) (*mockDataReceiver, *mockClient) {
|
||||
client := &mockClient{
|
||||
|
Loading…
x
Reference in New Issue
Block a user