mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Merge pull request #405 from matrix-org/kegan/spans
Add more spans for live updates; change when we early return from buffered events
This commit is contained in:
commit
30f8c5b308
@ -428,7 +428,9 @@ func (c *UserCache) Invites() map[string]UserRoomData {
|
||||
}
|
||||
|
||||
// AttemptToFetchPrevBatch tries to find a prev_batch value for the given event. This may not always succeed.
|
||||
func (c *UserCache) AttemptToFetchPrevBatch(roomID string, firstTimelineEvent *EventData) (prevBatch string) {
|
||||
func (c *UserCache) AttemptToFetchPrevBatch(ctx context.Context, roomID string, firstTimelineEvent *EventData) (prevBatch string) {
|
||||
_, span := internal.StartSpan(ctx, "AttemptToFetchPrevBatch")
|
||||
defer span.End()
|
||||
return c.store.GetClosestPrevBatch(roomID, firstTimelineEvent.NID)
|
||||
}
|
||||
|
||||
|
@ -135,6 +135,7 @@ func (c *Conn) isOutstanding(pos int64) bool {
|
||||
// client. It will NOT be reported to Sentry---this should happen as close as possible
|
||||
// to the creation of the error (or else Sentry cannot provide a meaningful traceback.)
|
||||
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.Time) (resp *Response, herr *internal.HandlerError) {
|
||||
ctx, span := internal.StartSpan(ctx, "OnIncomingRequest.AcquireMutex")
|
||||
c.cancelOutstandingRequestMu.Lock()
|
||||
if c.cancelOutstandingRequest != nil {
|
||||
c.cancelOutstandingRequest()
|
||||
@ -149,6 +150,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.T
|
||||
// it's intentional for the lock to be held whilst inside HandleIncomingRequest
|
||||
// as it guarantees linearisation of data within a single connection
|
||||
defer c.mu.Unlock()
|
||||
span.End()
|
||||
|
||||
isFirstRequest := req.pos == 0
|
||||
isRetransmit := !isFirstRequest && c.lastClientRequest.pos == req.pos
|
||||
|
@ -318,7 +318,9 @@ func (h *Handler) HandleLiveUpdate(ctx context.Context, update caches.Update, re
|
||||
extCtx.Handler = h
|
||||
exts := req.EnabledExtensions()
|
||||
for _, ext := range exts {
|
||||
ext.AppendLive(ctx, res, extCtx, update)
|
||||
childCtx, region := internal.StartSpan(ctx, "extension_live_"+ext.Name())
|
||||
ext.AppendLive(childCtx, res, extCtx, update)
|
||||
region.End()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,7 @@ func (s *connStateLive) liveUpdate(
|
||||
// block until we get a new event, with appropriate timeout
|
||||
startTime := time.Now()
|
||||
hasLiveStreamed := false
|
||||
numProcessedUpdates := 0
|
||||
for response.ListOps() == 0 && len(response.Rooms) == 0 && !response.Extensions.HasData(isInitial) {
|
||||
hasLiveStreamed = true
|
||||
timeToWait := time.Duration(req.TimeoutMSecs()) * time.Millisecond
|
||||
@ -82,10 +83,12 @@ func (s *connStateLive) liveUpdate(
|
||||
return
|
||||
case update := <-s.updates:
|
||||
s.processUpdate(ctx, update, response, ex)
|
||||
numProcessedUpdates++
|
||||
// if there's more updates and we don't have lots stacked up already, go ahead and process another
|
||||
for len(s.updates) > 0 && response.ListOps() < 50 {
|
||||
for len(s.updates) > 0 && numProcessedUpdates < 100 {
|
||||
update = <-s.updates
|
||||
s.processUpdate(ctx, update, response, ex)
|
||||
numProcessedUpdates++
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -128,6 +131,8 @@ func (s *connStateLive) processUpdate(ctx context.Context, update caches.Update,
|
||||
}
|
||||
|
||||
func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, response *sync3.Response) bool {
|
||||
_, span := internal.StartSpan(ctx, "processLiveUpdate")
|
||||
defer span.End()
|
||||
internal.AssertWithContext(ctx, "processLiveUpdate: response list length != internal list length", s.lists.Len() == len(response.Lists))
|
||||
internal.AssertWithContext(ctx, "processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists))
|
||||
roomUpdate, _ := up.(caches.RoomUpdate)
|
||||
@ -236,7 +241,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
|
||||
})
|
||||
if len(r.Timeline) == 0 && r.PrevBatch == "" {
|
||||
// attempt to fill in the prev_batch value for this room
|
||||
prevBatch := s.userCache.AttemptToFetchPrevBatch(roomEventUpdate.RoomID(), roomEventUpdate.EventData)
|
||||
prevBatch := s.userCache.AttemptToFetchPrevBatch(ctx, roomEventUpdate.RoomID(), roomEventUpdate.EventData)
|
||||
if prevBatch != "" {
|
||||
r.PrevBatch = prevBatch
|
||||
}
|
||||
@ -246,11 +251,13 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
|
||||
sender := roomEventUpdate.EventData.Sender
|
||||
if s.lazyCache.IsLazyLoading(roomID) && !s.lazyCache.IsSet(roomID, sender) {
|
||||
// load the state event
|
||||
_, span := internal.StartSpan(ctx, "LazyLoadingMemberEvent")
|
||||
memberEvent := s.globalCache.LoadStateEvent(context.Background(), roomID, s.loadPositions[roomID], "m.room.member", sender)
|
||||
if memberEvent != nil {
|
||||
r.RequiredState = append(r.RequiredState, memberEvent)
|
||||
s.lazyCache.AddUser(roomID, sender)
|
||||
}
|
||||
span.End()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user