mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Merge remote-tracking branch 'origin/main' into dmr/bump-event-types-3
This commit is contained in:
commit
38ee47c1fd
@ -26,6 +26,10 @@ func (e *HandlerError) Error() string {
|
||||
return fmt.Sprintf("HTTP %d : %s", e.StatusCode, e.Err.Error())
|
||||
}
|
||||
|
||||
func (e *HandlerError) Unwrap() error {
|
||||
return e.Err
|
||||
}
|
||||
|
||||
type jsonError struct {
|
||||
Err string `json:"error"`
|
||||
Code string `json:"errcode,omitempty"`
|
||||
|
@ -6,6 +6,10 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// SentryCtxKey is a namespace under which we can report arbitrary key-value pairs to
|
||||
// sentry.
|
||||
const SentryCtxKey = "sliding-sync"
|
||||
|
||||
// GetSentryHubFromContextOrDefault is a version of sentry.GetHubFromContext which
|
||||
// automatically falls back to sentry.CurrentHub if the given context has not been
|
||||
// attached a hub.
|
||||
|
@ -1,6 +1,7 @@
|
||||
package handler2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"hash/fnv"
|
||||
@ -151,12 +152,16 @@ func (h *Handler) updateMetrics() {
|
||||
h.numPollers.Set(float64(h.pMap.NumPollers()))
|
||||
}
|
||||
|
||||
func (h *Handler) OnTerminated(userID, deviceID string) {
|
||||
func (h *Handler) OnTerminated(ctx context.Context, userID, deviceID string) {
|
||||
h.updateMetrics()
|
||||
}
|
||||
|
||||
func (h *Handler) OnExpiredToken(accessTokenHash, userID, deviceID string) {
|
||||
h.v2Store.TokensTable.Delete(accessTokenHash)
|
||||
func (h *Handler) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
|
||||
err := h.v2Store.TokensTable.Delete(accessTokenHash)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Str("access_token_hash", accessTokenHash).Msg("V2: failed to expire token")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
// Notify v3 side so it can remove the connection from ConnMap
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2ExpiredToken{
|
||||
UserID: userID,
|
||||
@ -175,15 +180,15 @@ func (h *Handler) addPrometheusMetrics() {
|
||||
}
|
||||
|
||||
// Emits nothing as no downstream components need it.
|
||||
func (h *Handler) UpdateDeviceSince(userID, deviceID, since string) {
|
||||
func (h *Handler) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
|
||||
err := h.v2Store.DevicesTable.UpdateDeviceSince(userID, deviceID, since)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Str("since", since).Msg("V2: failed to persist since token")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
func (h *Handler) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
// some of these fields may be set
|
||||
partialDD := internal.DeviceData{
|
||||
UserID: userID,
|
||||
@ -197,7 +202,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int,
|
||||
nextPos, err := h.Store.DeviceDataTable.Upsert(&partialDD)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Msg("failed to upsert device data")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceData{
|
||||
@ -207,7 +212,7 @@ func (h *Handler) OnE2EEData(userID, deviceID string, otkCounts map[string]int,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
// Remember any transaction IDs that may be unique to this user
|
||||
eventIDToTxnID := make(map[string]string, len(timeline)) // event_id -> txn_id
|
||||
for _, e := range timeline {
|
||||
@ -223,7 +228,7 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
|
||||
err := h.Store.TransactionsTable.Insert(userID, deviceID, eventIDToTxnID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("num_txns", len(eventIDToTxnID)).Msg("failed to persist txn IDs for user")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,7 +236,7 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
|
||||
numNew, latestNIDs, err := h.Store.Accumulate(roomID, prevBatch, timeline)
|
||||
if err != nil {
|
||||
logger.Err(err).Int("timeline", len(timeline)).Str("room", roomID).Msg("V2: failed to accumulate room")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
if numNew == 0 {
|
||||
@ -245,11 +250,11 @@ func (h *Handler) Accumulate(userID, deviceID, roomID, prevBatch string, timelin
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
res, err := h.Store.Initialise(roomID, state)
|
||||
if err != nil {
|
||||
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
if res.AddedEvents {
|
||||
@ -261,7 +266,7 @@ func (h *Handler) Initialise(roomID string, state []json.RawMessage) []json.RawM
|
||||
return res.PrependTimelineEvents
|
||||
}
|
||||
|
||||
func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
func (h *Handler) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
|
||||
next := typingHash(ephEvent)
|
||||
existing := h.typingMap[roomID]
|
||||
if existing == next {
|
||||
@ -276,13 +281,13 @@ func (h *Handler) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
func (h *Handler) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
// update our records - we make an artifically new RR event if there are genuine changes
|
||||
// else it returns nil
|
||||
newReceipts, err := h.Store.ReceiptTable.Insert(roomID, ephEvent)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("room", roomID).Msg("failed to store receipts")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
if len(newReceipts) == 0 {
|
||||
@ -294,11 +299,11 @@ func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.R
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
|
||||
func (h *Handler) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
|
||||
_, err := h.Store.ToDeviceTable.InsertMessages(userID, deviceID, msgs)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceMessages{
|
||||
UserID: userID,
|
||||
@ -306,7 +311,7 @@ func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMe
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
func (h *Handler) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
|
||||
// only touch the DB and notify if they have changed. sync v2 will alwyas include the counts
|
||||
// even if they haven't changed :(
|
||||
key := roomID + userID
|
||||
@ -333,7 +338,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti
|
||||
err := h.Store.UnreadTable.UpdateUnreadCounters(userID, roomID, highlightCount, notifCount)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2UnreadCounts{
|
||||
RoomID: roomID,
|
||||
@ -343,7 +348,7 @@ func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, noti
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage) {
|
||||
func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
|
||||
data, err := h.Store.InsertAccountData(userID, roomID, events)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data")
|
||||
@ -361,11 +366,11 @@ func (h *Handler) OnAccountData(userID, roomID string, events []json.RawMessage)
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage) {
|
||||
func (h *Handler) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
|
||||
err := h.Store.InvitesTable.InsertInvite(userID, roomID, inviteState)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to insert invite")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InviteRoom{
|
||||
@ -374,12 +379,12 @@ func (h *Handler) OnInvite(userID, roomID string, inviteState []json.RawMessage)
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) OnLeftRoom(userID, roomID string) {
|
||||
func (h *Handler) OnLeftRoom(ctx context.Context, userID, roomID string) {
|
||||
// remove any invites for this user if they are rejecting an invite
|
||||
err := h.Store.InvitesTable.RemoveInvite(userID, roomID)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to retire invite")
|
||||
sentry.CaptureException(err)
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2LeaveRoom{
|
||||
UserID: userID,
|
||||
|
148
sync2/poller.go
148
sync2/poller.go
@ -25,32 +25,32 @@ var timeSleep = time.Sleep
|
||||
// V2DataReceiver is the receiver for all the v2 sync data the poller gets
|
||||
type V2DataReceiver interface {
|
||||
// Update the since token for this device. Called AFTER all other data in this sync response has been processed.
|
||||
UpdateDeviceSince(userID, deviceID, since string)
|
||||
UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
|
||||
// Accumulate data for this room. This means the timeline section of the v2 response.
|
||||
Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
|
||||
Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries
|
||||
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
|
||||
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
|
||||
Initialise(roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
|
||||
Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage // snapshot ID?
|
||||
// SetTyping indicates which users are typing.
|
||||
SetTyping(roomID string, ephEvent json.RawMessage)
|
||||
SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage)
|
||||
// Sent when there is a new receipt
|
||||
OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage)
|
||||
OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage)
|
||||
// AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering.
|
||||
AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
|
||||
AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) // start/end stream pos
|
||||
// UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room.
|
||||
UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)
|
||||
UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
|
||||
// Set the latest account data for this user.
|
||||
OnAccountData(userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
|
||||
OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying?
|
||||
// Sent when there is a room in the `invite` section of the v2 response.
|
||||
OnInvite(userID, roomID string, inviteState []json.RawMessage) // invitestate in db
|
||||
OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) // invitestate in db
|
||||
// Sent when there is a room in the `leave` section of the v2 response.
|
||||
OnLeftRoom(userID, roomID string)
|
||||
OnLeftRoom(ctx context.Context, userID, roomID string)
|
||||
// Sent when there is a _change_ in E2EE data, not all the time
|
||||
OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
|
||||
OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int)
|
||||
// Sent when the poll loop terminates
|
||||
OnTerminated(userID, deviceID string)
|
||||
OnTerminated(ctx context.Context, userID, deviceID string)
|
||||
// Sent when the token gets a 401 response
|
||||
OnExpiredToken(accessTokenHash, userID, deviceID string)
|
||||
OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
|
||||
}
|
||||
|
||||
// PollerMap is a map of device ID to Poller
|
||||
@ -211,52 +211,52 @@ func (h *PollerMap) execute() {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *PollerMap) UpdateDeviceSince(userID, deviceID, since string) {
|
||||
h.callbacks.UpdateDeviceSince(userID, deviceID, since)
|
||||
func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
|
||||
h.callbacks.UpdateDeviceSince(ctx, userID, deviceID, since)
|
||||
}
|
||||
func (h *PollerMap) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.Accumulate(userID, deviceID, roomID, prevBatch, timeline)
|
||||
h.callbacks.Accumulate(ctx, userID, deviceID, roomID, prevBatch, timeline)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
func (h *PollerMap) Initialise(roomID string, state []json.RawMessage) (result []json.RawMessage) {
|
||||
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
result = h.callbacks.Initialise(roomID, state)
|
||||
result = h.callbacks.Initialise(ctx, roomID, state)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
func (h *PollerMap) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.SetTyping(roomID, ephEvent)
|
||||
h.callbacks.SetTyping(ctx, roomID, ephEvent)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage) {
|
||||
func (h *PollerMap) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnInvite(userID, roomID, inviteState)
|
||||
h.callbacks.OnInvite(ctx, userID, roomID, inviteState)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnLeftRoom(userID, roomID string) {
|
||||
func (h *PollerMap) OnLeftRoom(ctx context.Context, userID, roomID string) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnLeftRoom(userID, roomID)
|
||||
h.callbacks.OnLeftRoom(ctx, userID, roomID)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
@ -264,53 +264,53 @@ func (h *PollerMap) OnLeftRoom(userID, roomID string) {
|
||||
|
||||
// Add messages for this device. If an error is returned, the poll loop is terminated as continuing
|
||||
// would implicitly acknowledge these messages.
|
||||
func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
|
||||
h.callbacks.AddToDeviceMessages(userID, deviceID, msgs)
|
||||
func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
|
||||
h.callbacks.AddToDeviceMessages(ctx, userID, deviceID, msgs)
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnTerminated(userID, deviceID string) {
|
||||
h.callbacks.OnTerminated(userID, deviceID)
|
||||
func (h *PollerMap) OnTerminated(ctx context.Context, userID, deviceID string) {
|
||||
h.callbacks.OnTerminated(ctx, userID, deviceID)
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnExpiredToken(accessTokenHash, userID, deviceID string) {
|
||||
h.callbacks.OnExpiredToken(accessTokenHash, userID, deviceID)
|
||||
func (h *PollerMap) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
|
||||
h.callbacks.OnExpiredToken(ctx, accessTokenHash, userID, deviceID)
|
||||
}
|
||||
|
||||
func (h *PollerMap) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
func (h *PollerMap) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.UpdateUnreadCounts(roomID, userID, highlightCount, notifCount)
|
||||
h.callbacks.UpdateUnreadCounts(ctx, roomID, userID, highlightCount, notifCount)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage) {
|
||||
func (h *PollerMap) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnAccountData(userID, roomID, events)
|
||||
h.callbacks.OnAccountData(ctx, userID, roomID, events)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
func (h *PollerMap) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnReceipt(userID, roomID, ephEventType, ephEvent)
|
||||
h.callbacks.OnReceipt(ctx, userID, roomID, ephEventType, ephEvent)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (h *PollerMap) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
func (h *PollerMap) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
h.callbacks.OnE2EEData(userID, deviceID, otkCounts, fallbackKeyTypes, deviceListChanges)
|
||||
h.callbacks.OnE2EEData(ctx, userID, deviceID, otkCounts, fallbackKeyTypes, deviceListChanges)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
@ -369,9 +369,22 @@ func (p *poller) Terminate() {
|
||||
// Returns if the access token gets invalidated or if there was a fatal error processing v2 responses.
|
||||
// Use WaitUntilInitialSync() to wait until the first poll has been processed.
|
||||
func (p *poller) Poll(since string) {
|
||||
// Basing the sentry-wrangling on the sentry-go net/http integration, see e.g.
|
||||
// https://github.com/getsentry/sentry-go/blob/02e712a638c40cd9701ad52d5d1309d65d556ef9/http/sentryhttp.go#L84
|
||||
// TODO is this the correct way to create hub? Should the cloning be done by the
|
||||
// caller and passed down?
|
||||
hub := sentry.CurrentHub().Clone()
|
||||
hub.ConfigureScope(func(scope *sentry.Scope) {
|
||||
scope.SetUser(sentry.User{Username: p.userID})
|
||||
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
||||
"device_id": p.deviceID,
|
||||
})
|
||||
})
|
||||
ctx := sentry.SetHubOnContext(context.Background(), hub)
|
||||
|
||||
p.logger.Info().Str("since", since).Msg("Poller: v2 poll loop started")
|
||||
defer func() {
|
||||
p.receiver.OnTerminated(p.userID, p.deviceID)
|
||||
p.receiver.OnTerminated(ctx, p.userID, p.deviceID)
|
||||
}()
|
||||
failCount := 0
|
||||
firstTime := true
|
||||
@ -388,7 +401,7 @@ func (p *poller) Poll(since string) {
|
||||
break
|
||||
}
|
||||
start := time.Now()
|
||||
resp, statusCode, err := p.client.DoSyncV2(context.Background(), p.accessToken, since, firstTime, p.initialToDeviceOnly)
|
||||
resp, statusCode, err := p.client.DoSyncV2(ctx, p.accessToken, since, firstTime, p.initialToDeviceOnly)
|
||||
p.trackRequestDuration(time.Since(start), since == "", firstTime)
|
||||
if p.terminated.Load() {
|
||||
break
|
||||
@ -401,7 +414,7 @@ func (p *poller) Poll(since string) {
|
||||
continue
|
||||
} else {
|
||||
p.logger.Warn().Msg("Poller: access token has been invalidated, terminating loop")
|
||||
p.receiver.OnExpiredToken(hashToken(p.accessToken), p.userID, p.deviceID)
|
||||
p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID)
|
||||
p.Terminate()
|
||||
break
|
||||
}
|
||||
@ -412,17 +425,17 @@ func (p *poller) Poll(since string) {
|
||||
p.initialToDeviceOnly = false
|
||||
start = time.Now()
|
||||
failCount = 0
|
||||
p.parseE2EEData(resp)
|
||||
p.parseGlobalAccountData(resp)
|
||||
p.parseRoomsResponse(resp)
|
||||
p.parseToDeviceMessages(resp)
|
||||
p.parseE2EEData(ctx, resp)
|
||||
p.parseGlobalAccountData(ctx, resp)
|
||||
p.parseRoomsResponse(ctx, resp)
|
||||
p.parseToDeviceMessages(ctx, resp)
|
||||
|
||||
wasInitial := since == ""
|
||||
wasFirst := firstTime
|
||||
|
||||
since = resp.NextBatch
|
||||
// persist the since token (TODO: this could get slow if we hammer the DB too much)
|
||||
p.receiver.UpdateDeviceSince(p.userID, p.deviceID, since)
|
||||
p.receiver.UpdateDeviceSince(ctx, p.userID, p.deviceID, since)
|
||||
|
||||
if firstTime {
|
||||
firstTime = false
|
||||
@ -466,14 +479,14 @@ func labels(isInitial, isFirst bool) []string {
|
||||
return l
|
||||
}
|
||||
|
||||
func (p *poller) parseToDeviceMessages(res *SyncResponse) {
|
||||
func (p *poller) parseToDeviceMessages(ctx context.Context, res *SyncResponse) {
|
||||
if len(res.ToDevice.Events) == 0 {
|
||||
return
|
||||
}
|
||||
p.receiver.AddToDeviceMessages(p.userID, p.deviceID, res.ToDevice.Events)
|
||||
p.receiver.AddToDeviceMessages(ctx, p.userID, p.deviceID, res.ToDevice.Events)
|
||||
}
|
||||
|
||||
func (p *poller) parseE2EEData(res *SyncResponse) {
|
||||
func (p *poller) parseE2EEData(ctx context.Context, res *SyncResponse) {
|
||||
var changedOTKCounts map[string]int
|
||||
if res.DeviceListsOTKCount != nil && len(res.DeviceListsOTKCount) > 0 {
|
||||
if len(p.otkCounts) != len(res.DeviceListsOTKCount) {
|
||||
@ -506,18 +519,18 @@ func (p *poller) parseE2EEData(res *SyncResponse) {
|
||||
deviceListChanges := internal.ToDeviceListChangesMap(res.DeviceLists.Changed, res.DeviceLists.Left)
|
||||
|
||||
if deviceListChanges != nil || changedFallbackTypes != nil || changedOTKCounts != nil {
|
||||
p.receiver.OnE2EEData(p.userID, p.deviceID, changedOTKCounts, changedFallbackTypes, deviceListChanges)
|
||||
p.receiver.OnE2EEData(ctx, p.userID, p.deviceID, changedOTKCounts, changedFallbackTypes, deviceListChanges)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *poller) parseGlobalAccountData(res *SyncResponse) {
|
||||
func (p *poller) parseGlobalAccountData(ctx context.Context, res *SyncResponse) {
|
||||
if len(res.AccountData.Events) == 0 {
|
||||
return
|
||||
}
|
||||
p.receiver.OnAccountData(p.userID, AccountDataGlobalRoom, res.AccountData.Events)
|
||||
p.receiver.OnAccountData(ctx, p.userID, AccountDataGlobalRoom, res.AccountData.Events)
|
||||
}
|
||||
|
||||
func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) {
|
||||
stateCalls := 0
|
||||
timelineCalls := 0
|
||||
typingCalls := 0
|
||||
@ -525,7 +538,7 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
for roomID, roomData := range res.Rooms.Join {
|
||||
if len(roomData.State.Events) > 0 {
|
||||
stateCalls++
|
||||
prependStateEvents := p.receiver.Initialise(roomID, roomData.State.Events)
|
||||
prependStateEvents := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
|
||||
if len(prependStateEvents) > 0 {
|
||||
// The poller has just learned of these state events due to an
|
||||
// incremental poller sync; we must have missed the opportunity to see
|
||||
@ -534,12 +547,13 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
// correct room state.
|
||||
const warnMsg = "parseRoomsResponse: prepending state events to timeline after gappy poll"
|
||||
logger.Warn().Str("room_id", roomID).Int("prependStateEvents", len(prependStateEvents)).Msg(warnMsg)
|
||||
sentry.WithScope(func(scope *sentry.Scope) {
|
||||
scope.SetContext("sliding-sync", map[string]interface{}{
|
||||
hub := internal.GetSentryHubFromContextOrDefault(ctx)
|
||||
hub.WithScope(func(scope *sentry.Scope) {
|
||||
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
||||
"room_id": roomID,
|
||||
"num_prepend_state_events": len(prependStateEvents),
|
||||
})
|
||||
sentry.CaptureMessage(warnMsg)
|
||||
hub.CaptureMessage(warnMsg)
|
||||
})
|
||||
roomData.Timeline.Events = append(prependStateEvents, roomData.Timeline.Events...)
|
||||
}
|
||||
@ -550,42 +564,40 @@ func (p *poller) parseRoomsResponse(res *SyncResponse) {
|
||||
switch ephEventType {
|
||||
case "m.typing":
|
||||
typingCalls++
|
||||
p.receiver.SetTyping(roomID, ephEvent)
|
||||
p.receiver.SetTyping(ctx, roomID, ephEvent)
|
||||
case "m.receipt":
|
||||
receiptCalls++
|
||||
p.receiver.OnReceipt(p.userID, roomID, ephEventType, ephEvent)
|
||||
p.receiver.OnReceipt(ctx, p.userID, roomID, ephEventType, ephEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// process account data
|
||||
if len(roomData.AccountData.Events) > 0 {
|
||||
p.receiver.OnAccountData(p.userID, roomID, roomData.AccountData.Events)
|
||||
p.receiver.OnAccountData(ctx, p.userID, roomID, roomData.AccountData.Events)
|
||||
}
|
||||
if len(roomData.Timeline.Events) > 0 {
|
||||
timelineCalls++
|
||||
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
|
||||
p.receiver.Accumulate(p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
}
|
||||
|
||||
// process unread counts AFTER events so global caches have been updated by the time this metadata is added.
|
||||
// Previously we did this BEFORE events so we atomically showed the event and the unread count in one go, but
|
||||
// this could cause clients to de-sync: see TestUnreadCountMisordering integration test.
|
||||
if roomData.UnreadNotifications.HighlightCount != nil || roomData.UnreadNotifications.NotificationCount != nil {
|
||||
p.receiver.UpdateUnreadCounts(
|
||||
roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount,
|
||||
)
|
||||
p.receiver.UpdateUnreadCounts(ctx, roomID, p.userID, roomData.UnreadNotifications.HighlightCount, roomData.UnreadNotifications.NotificationCount)
|
||||
}
|
||||
}
|
||||
for roomID, roomData := range res.Rooms.Leave {
|
||||
// TODO: do we care about state?
|
||||
if len(roomData.Timeline.Events) > 0 {
|
||||
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
|
||||
p.receiver.Accumulate(p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
}
|
||||
p.receiver.OnLeftRoom(p.userID, roomID)
|
||||
p.receiver.OnLeftRoom(ctx, p.userID, roomID)
|
||||
}
|
||||
for roomID, roomData := range res.Rooms.Invite {
|
||||
p.receiver.OnInvite(p.userID, roomID, roomData.InviteState.Events)
|
||||
p.receiver.OnInvite(ctx, p.userID, roomID, roomData.InviteState.Events)
|
||||
}
|
||||
var l *zerolog.Event
|
||||
if len(res.Rooms.Invite) > 0 || len(res.Rooms.Join) > 0 {
|
||||
|
@ -460,10 +460,10 @@ type mockDataReceiver struct {
|
||||
unblockProcess chan struct{}
|
||||
}
|
||||
|
||||
func (a *mockDataReceiver) Accumulate(userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) {
|
||||
a.timelines[roomID] = append(a.timelines[roomID], timeline...)
|
||||
}
|
||||
func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
func (a *mockDataReceiver) Initialise(ctx context.Context, roomID string, state []json.RawMessage) []json.RawMessage {
|
||||
a.states[roomID] = state
|
||||
if a.incomingProcess != nil {
|
||||
a.incomingProcess <- struct{}{}
|
||||
@ -475,24 +475,28 @@ func (a *mockDataReceiver) Initialise(roomID string, state []json.RawMessage) []
|
||||
// timeline. Untested here---return nil for now.
|
||||
return nil
|
||||
}
|
||||
func (a *mockDataReceiver) SetTyping(roomID string, ephEvent json.RawMessage) {
|
||||
func (a *mockDataReceiver) SetTyping(ctx context.Context, roomID string, ephEvent json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) UpdateDeviceSince(userID, deviceID, since string) {
|
||||
func (s *mockDataReceiver) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
|
||||
s.pollerIDToSince[PollerID{UserID: userID, DeviceID: deviceID}] = since
|
||||
}
|
||||
func (s *mockDataReceiver) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
|
||||
func (s *mockDataReceiver) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) {
|
||||
}
|
||||
|
||||
func (s *mockDataReceiver) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
func (s *mockDataReceiver) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnAccountData(userID, roomID string, events []json.RawMessage) {}
|
||||
func (s *mockDataReceiver) OnReceipt(userID, roomID, ephEvenType string, ephEvent json.RawMessage) {}
|
||||
func (s *mockDataReceiver) OnInvite(userID, roomID string, inviteState []json.RawMessage) {}
|
||||
func (s *mockDataReceiver) OnLeftRoom(userID, roomID string) {}
|
||||
func (s *mockDataReceiver) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
func (s *mockDataReceiver) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnLeftRoom(ctx context.Context, userID, roomID string) {}
|
||||
func (s *mockDataReceiver) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnTerminated(ctx context.Context, userID, deviceID string) {}
|
||||
func (s *mockDataReceiver) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) {
|
||||
}
|
||||
func (s *mockDataReceiver) OnTerminated(userID, deviceID string) {}
|
||||
func (s *mockDataReceiver) OnExpiredToken(accessTokenHash, userID, deviceID string) {}
|
||||
|
||||
func newMocks(doSyncV2 func(authHeader, since string) (*SyncResponse, int, error)) (*mockDataReceiver, *mockClient) {
|
||||
client := &mockClient{
|
||||
|
@ -104,13 +104,14 @@ func (c *GlobalCache) LoadRoomsFromMap(ctx context.Context, joinNIDsByRoomID map
|
||||
|
||||
// copyRoom returns a copy of the internal.RoomMetadata stored for this room.
|
||||
// This is an internal implementation detail of LoadRooms and LoadRoomsFromMap.
|
||||
// If the room is not present in the global cache, returns nil.
|
||||
// If the room is not present in the global cache, returns a stub metadata entry.
|
||||
// The caller MUST acquire a read lock on roomIDToMetadataMu before calling this.
|
||||
func (c *GlobalCache) copyRoom(roomID string) *internal.RoomMetadata {
|
||||
sr := c.roomIDToMetadata[roomID]
|
||||
if sr == nil {
|
||||
logger.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room")
|
||||
return nil
|
||||
logger.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room, generating stub")
|
||||
c.roomIDToMetadata[roomID] = internal.NewRoomMetadata(roomID)
|
||||
sr = c.roomIDToMetadata[roomID]
|
||||
}
|
||||
srCopy := *sr
|
||||
// copy the heroes or else we may modify the same slice which would be bad :(
|
||||
|
@ -129,7 +129,7 @@ func NewInviteData(ctx context.Context, userID, roomID string, inviteState []jso
|
||||
logger.Error().Str("invitee", userID).Str("room", roomID).Int("num_invite_state", len(inviteState)).Msg(errMsg)
|
||||
hub := internal.GetSentryHubFromContextOrDefault(ctx)
|
||||
hub.WithScope(func(scope *sentry.Scope) {
|
||||
scope.SetContext("sliding-sync", map[string]interface{}{
|
||||
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
||||
"invitee": userID,
|
||||
"room": roomID,
|
||||
"num_invite_state": len(inviteState),
|
||||
@ -264,7 +264,7 @@ func (c *UserCache) OnRegistered(ctx context.Context, _ int64) error {
|
||||
// inject space children events
|
||||
if room.IsSpace() {
|
||||
for childRoomID := range room.ChildSpaceRooms {
|
||||
c.OnSpaceUpdate(context.Background(), room.RoomID, childRoomID, false, &EventData{
|
||||
c.OnSpaceUpdate(ctx, room.RoomID, childRoomID, false, &EventData{
|
||||
RoomID: room.RoomID,
|
||||
EventType: "m.space.child",
|
||||
StateKey: &childRoomID,
|
||||
|
@ -80,8 +80,17 @@ func (c *Conn) OnUpdate(ctx context.Context, update caches.Update) {
|
||||
c.handler.OnUpdate(ctx, update)
|
||||
}
|
||||
|
||||
// tryRequest is a wrapper around ConnHandler.OnIncomingRequest which automatically
|
||||
// starts and closes a tracing task.
|
||||
//
|
||||
// If the wrapped call panics, it is recovered from, reported to Sentry, and an error
|
||||
// is passed to the caller. If the wrapped call returns an error, that error is passed
|
||||
// upwards but will NOT be logged to Sentry (neither here nor by the caller). Errors
|
||||
// should be reported to Sentry as close as possible to the point of creating the error,
|
||||
// to provide the best possible Sentry traceback.
|
||||
func (c *Conn) tryRequest(ctx context.Context, req *Request) (res *Response, err error) {
|
||||
// TODO: include useful information from the request in the sentry hub/context
|
||||
// Might be better done in the caller though?
|
||||
defer func() {
|
||||
panicErr := recover()
|
||||
if panicErr != nil {
|
||||
@ -98,8 +107,6 @@ func (c *Conn) tryRequest(ctx context.Context, req *Request) (res *Response, err
|
||||
// I'm guessing that Sentry will use the former to display panicErr as
|
||||
// having come from a panic.
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).RecoverWithContext(ctx, panicErr)
|
||||
} else if err != nil {
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
}
|
||||
}()
|
||||
taskType := "OnIncomingRequest"
|
||||
@ -121,7 +128,10 @@ func (c *Conn) isOutstanding(pos int64) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// OnIncomingRequest advances the clients position in the stream, returning the response position and data.
|
||||
// OnIncomingRequest advances the client's position in the stream, returning the response position and data.
|
||||
// If an error is returned, it will be logged by the caller and transmitted to the
|
||||
// client. It will NOT be reported to Sentry---this should happen as close as possible
|
||||
// to the creation of the error (or else Sentry cannot provide a meaningful traceback.)
|
||||
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Response, herr *internal.HandlerError) {
|
||||
c.cancelOutstandingRequestMu.Lock()
|
||||
if c.cancelOutstandingRequest != nil {
|
||||
|
@ -257,7 +257,7 @@ type Context struct {
|
||||
|
||||
type HandlerInterface interface {
|
||||
Handle(ctx context.Context, req Request, extCtx Context) (res Response)
|
||||
HandleLiveUpdate(update caches.Update, req Request, res *Response, extCtx Context)
|
||||
HandleLiveUpdate(ctx context.Context, update caches.Update, req Request, res *Response, extCtx Context)
|
||||
}
|
||||
|
||||
type Handler struct {
|
||||
@ -266,11 +266,11 @@ type Handler struct {
|
||||
GlobalCache *caches.GlobalCache
|
||||
}
|
||||
|
||||
func (h *Handler) HandleLiveUpdate(update caches.Update, req Request, res *Response, extCtx Context) {
|
||||
func (h *Handler) HandleLiveUpdate(ctx context.Context, update caches.Update, req Request, res *Response, extCtx Context) {
|
||||
extCtx.Handler = h
|
||||
exts := req.EnabledExtensions()
|
||||
for _, ext := range exts {
|
||||
ext.AppendLive(context.Background(), res, extCtx, update)
|
||||
ext.AppendLive(ctx, res, extCtx, update)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,12 +86,9 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext
|
||||
mapMu.Unlock()
|
||||
if from < lastSentPos {
|
||||
// we told the client about a newer position, but yet they are using an older position, yell loudly
|
||||
const errMsg = "Client did not increment since token: possibly sending back duplicate to-device events!"
|
||||
l.Warn().Int64("last_sent", lastSentPos).Int64("recv", from).Bool("initial", extCtx.IsInitial).Msg(
|
||||
errMsg,
|
||||
"Client did not increment since token: possibly sending back duplicate to-device events!",
|
||||
)
|
||||
// TODO add context to sentry
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(fmt.Errorf(errMsg))
|
||||
}
|
||||
|
||||
msgs, upTo, err := extCtx.Store.ToDeviceTable.Messages(extCtx.UserID, extCtx.DeviceID, from, int64(r.Limit))
|
||||
|
@ -88,7 +88,7 @@ func (s *connStateLive) liveUpdate(
|
||||
s.processLiveUpdate(ctx, update, response)
|
||||
// pass event to extensions AFTER processing
|
||||
roomIDsToLists := s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists)
|
||||
s.extensionsHandler.HandleLiveUpdate(update, ex, &response.Extensions, extensions.Context{
|
||||
s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{
|
||||
IsInitial: false,
|
||||
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
|
||||
UserID: s.userID,
|
||||
@ -99,7 +99,7 @@ func (s *connStateLive) liveUpdate(
|
||||
for len(s.updates) > 0 && response.ListOps() < 50 {
|
||||
update = <-s.updates
|
||||
s.processLiveUpdate(ctx, update, response)
|
||||
s.extensionsHandler.HandleLiveUpdate(update, ex, &response.Extensions, extensions.Context{
|
||||
s.extensionsHandler.HandleLiveUpdate(ctx, update, ex, &response.Extensions, extensions.Context{
|
||||
IsInitial: false,
|
||||
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
|
||||
UserID: s.userID,
|
||||
|
@ -22,7 +22,7 @@ func (h *NopExtensionHandler) Handle(ctx context.Context, req extensions.Request
|
||||
return
|
||||
}
|
||||
|
||||
func (h *NopExtensionHandler) HandleLiveUpdate(u caches.Update, req extensions.Request, res *extensions.Response, extCtx extensions.Context) {
|
||||
func (h *NopExtensionHandler) HandleLiveUpdate(ctx context.Context, update caches.Update, req extensions.Request, res *extensions.Response, extCtx extensions.Context) {
|
||||
}
|
||||
|
||||
type NopJoinTracker struct{}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -12,6 +13,7 @@ import (
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
@ -213,10 +215,9 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
|
||||
}
|
||||
}
|
||||
|
||||
logErrorAndReport500s := func(msg string, herr *internal.HandlerError) {
|
||||
logErrorOrWarning := func(msg string, herr *internal.HandlerError) {
|
||||
if herr.StatusCode >= 500 {
|
||||
hlog.FromRequest(req).Err(herr).Msg(msg)
|
||||
internal.GetSentryHubFromContextOrDefault(req.Context()).CaptureException(herr)
|
||||
} else {
|
||||
hlog.FromRequest(req).Warn().Err(herr).Msg(msg)
|
||||
}
|
||||
@ -224,7 +225,7 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
|
||||
|
||||
conn, herr := h.setupConnection(req, &requestBody, req.URL.Query().Get("pos") != "")
|
||||
if herr != nil {
|
||||
logErrorAndReport500s("failed to get or create Conn", herr)
|
||||
logErrorOrWarning("failed to get or create Conn", herr)
|
||||
return herr
|
||||
}
|
||||
// set pos and timeout if specified
|
||||
@ -252,7 +253,7 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
|
||||
|
||||
resp, herr := conn.OnIncomingRequest(req.Context(), &requestBody)
|
||||
if herr != nil {
|
||||
logErrorAndReport500s("failed to OnIncomingRequest", herr)
|
||||
logErrorOrWarning("failed to OnIncomingRequest", herr)
|
||||
return herr
|
||||
}
|
||||
// for logging
|
||||
@ -281,7 +282,17 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
|
||||
StatusCode: 500,
|
||||
Err: err,
|
||||
}
|
||||
logErrorAndReport500s("failed to JSON-encode result", herr)
|
||||
if errors.Is(err, syscall.EPIPE) {
|
||||
// Client closed the connection. Use a 499 status code internally so that
|
||||
// we consider this a warning rather than an error. 499 is nonstandard,
|
||||
// but a) the client has already gone, so this status code will only show
|
||||
// up in our logs; and b) nginx uses 499 to mean "Client Closed Request",
|
||||
// see e.g.
|
||||
// https://www.nginx.com/resources/wiki/extending/api/http/#http-return-codes
|
||||
herr.StatusCode = 499
|
||||
}
|
||||
|
||||
logErrorOrWarning("failed to JSON-encode result", herr)
|
||||
return herr
|
||||
}
|
||||
return nil
|
||||
|
@ -180,7 +180,8 @@ func (c *CSAPI) CreateRoom(t *testing.T, creationContent interface{}) string {
|
||||
return GetJSONFieldStr(t, body, "room_id")
|
||||
}
|
||||
|
||||
// JoinRoom joins the room ID or alias given, else fails the test. Returns the room ID.
|
||||
// JoinRoom joins the room ID or alias given, else fails the test. Returns the room ID,
|
||||
// NOT the join event ID! (c.f. https://github.com/matrix-org/matrix-spec/issues/1545)
|
||||
func (c *CSAPI) JoinRoom(t *testing.T, roomIDOrAlias string, serverNames []string) string {
|
||||
t.Helper()
|
||||
// construct URL query parameters
|
||||
|
@ -3,6 +3,7 @@ package syncv3_test
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/matrix-org/sliding-sync/sync3/extensions"
|
||||
"github.com/tidwall/gjson"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -949,7 +950,7 @@ func TestBumpEventTypesHandling(t *testing.T) {
|
||||
m.MatchResponse(t, bobRes, m.MatchList("bob_list", matchRoom1ThenRoom2...))
|
||||
|
||||
t.Log("Charlie joins room 2.")
|
||||
charlieJoinEventID := charlie.JoinRoom(t, room2, nil)
|
||||
charlie.JoinRoom(t, room2, nil)
|
||||
|
||||
t.Log("Alice syncs until she sees Charlie's membership.")
|
||||
aliceRes = alice.SlidingSyncUntilMembership(t, aliceRes.Pos, room2, charlie, "join")
|
||||
@ -978,8 +979,14 @@ func TestBumpEventTypesHandling(t *testing.T) {
|
||||
|
||||
// The read receipt stuff here specifically checks for the bug in
|
||||
// https://github.com/matrix-org/sliding-sync/issues/83
|
||||
aliceRoom2Timeline := aliceRes.Rooms[room2].Timeline
|
||||
aliceLastSeenEvent := aliceRoom2Timeline[len(aliceRoom2Timeline)-1]
|
||||
aliceLastSeenEventID := gjson.ParseBytes(aliceLastSeenEvent).Get("event_id").Str
|
||||
if aliceLastSeenEventID == "" {
|
||||
t.Error("Could not find event ID for the last event in Alice's timeline.")
|
||||
}
|
||||
t.Log("Alice marks herself as having seen Charlie's join.")
|
||||
alice.SendReceipt(t, room2, charlieJoinEventID, "m.read")
|
||||
alice.SendReceipt(t, room2, aliceLastSeenEventID, "m.read")
|
||||
|
||||
t.Log("Alice syncs until she sees her receipt. At no point should see see any room list operations.")
|
||||
alice.SlidingSyncUntil(
|
||||
@ -995,7 +1002,7 @@ func TestBumpEventTypesHandling(t *testing.T) {
|
||||
t.Fatalf("expected no ops while waiting for receipt: %s", err)
|
||||
}
|
||||
matchReceipt := m.MatchReceipts(room2, []m.Receipt{{
|
||||
EventID: charlieJoinEventID,
|
||||
EventID: aliceLastSeenEventID,
|
||||
UserID: alice.UserID,
|
||||
Type: "m.read",
|
||||
}})
|
||||
|
@ -198,3 +198,40 @@ func TestSpacesFilterInvite(t *testing.T) {
|
||||
m.MatchV3SyncOp(0, 0, []string{normalRoomID}),
|
||||
)))
|
||||
}
|
||||
|
||||
// Regression test to catch https://github.com/matrix-org/sliding-sync/issues/85
|
||||
func TestAddingUnknownChildToSpace(t *testing.T) {
|
||||
alice := registerNewUser(t)
|
||||
bob := registerNewUser(t)
|
||||
|
||||
t.Log("Alice creates a space and invites Bob.")
|
||||
parentID := alice.CreateRoom(t, map[string]interface{}{
|
||||
"type": "m.space",
|
||||
"invite": []string{bob.UserID},
|
||||
})
|
||||
|
||||
t.Log("Bob accepts the invite.")
|
||||
bob.JoinRoom(t, parentID, nil)
|
||||
|
||||
t.Log("Bob requests a new sliding sync.")
|
||||
res := bob.SlidingSync(t, sync3.Request{
|
||||
Lists: map[string]sync3.RequestList{
|
||||
"bob_list": {
|
||||
RoomSubscription: sync3.RoomSubscription{
|
||||
TimelineLimit: 10,
|
||||
},
|
||||
Ranges: sync3.SliceRanges{{0, 10}},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
t.Log("Alice creates a room and marks it as a child of the space.")
|
||||
childID := alice.CreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
||||
childEventID := alice.SetState(t, parentID, "m.space.child", childID, map[string]interface{}{
|
||||
"via": []string{"localhost"},
|
||||
})
|
||||
|
||||
t.Log("Bob syncs until he sees the m.space.child event in the space.")
|
||||
// Before the fix, this would panic inside getInitialRoomData, resulting in a 500
|
||||
res = bob.SlidingSyncUntilEventID(t, res.Pos, parentID, childEventID)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user