mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Add more spans for live updates; change when we early return from buffered events
- Add more spans to live updates to account for more time spent in various functions. - When there are a lot of stacked updates in the buffer, return after processing 100 of them rather than relying on >=50 list operations. List operations isn't a good proxy for the amount of work being done, as the majority of work updates are things like: receipts, typing, device list updates. This means we will return faster than before when we have stacked updates, reducing perceived latency, despite having to still go through the entire buffer.
This commit is contained in:
parent
a95f3c78c0
commit
dcb8854001
@ -428,7 +428,9 @@ func (c *UserCache) Invites() map[string]UserRoomData {
|
||||
}
|
||||
|
||||
// AttemptToFetchPrevBatch tries to find a prev_batch value for the given event. This may not always succeed.
|
||||
func (c *UserCache) AttemptToFetchPrevBatch(roomID string, firstTimelineEvent *EventData) (prevBatch string) {
|
||||
func (c *UserCache) AttemptToFetchPrevBatch(ctx context.Context, roomID string, firstTimelineEvent *EventData) (prevBatch string) {
|
||||
_, span := internal.StartSpan(ctx, "AttemptToFetchPrevBatch")
|
||||
defer span.End()
|
||||
return c.store.GetClosestPrevBatch(roomID, firstTimelineEvent.NID)
|
||||
}
|
||||
|
||||
|
@ -318,7 +318,9 @@ func (h *Handler) HandleLiveUpdate(ctx context.Context, update caches.Update, re
|
||||
extCtx.Handler = h
|
||||
exts := req.EnabledExtensions()
|
||||
for _, ext := range exts {
|
||||
ext.AppendLive(ctx, res, extCtx, update)
|
||||
childCtx, region := internal.StartSpan(ctx, "extension_live_"+ext.Name())
|
||||
ext.AppendLive(childCtx, res, extCtx, update)
|
||||
region.End()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,7 @@ func (s *connStateLive) liveUpdate(
|
||||
// block until we get a new event, with appropriate timeout
|
||||
startTime := time.Now()
|
||||
hasLiveStreamed := false
|
||||
numProcessedUpdates := 0
|
||||
for response.ListOps() == 0 && len(response.Rooms) == 0 && !response.Extensions.HasData(isInitial) {
|
||||
hasLiveStreamed = true
|
||||
timeToWait := time.Duration(req.TimeoutMSecs()) * time.Millisecond
|
||||
@ -82,10 +83,12 @@ func (s *connStateLive) liveUpdate(
|
||||
return
|
||||
case update := <-s.updates:
|
||||
s.processUpdate(ctx, update, response, ex)
|
||||
numProcessedUpdates++
|
||||
// if there's more updates and we don't have lots stacked up already, go ahead and process another
|
||||
for len(s.updates) > 0 && response.ListOps() < 50 {
|
||||
for len(s.updates) > 0 && numProcessedUpdates < 100 {
|
||||
update = <-s.updates
|
||||
s.processUpdate(ctx, update, response, ex)
|
||||
numProcessedUpdates++
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -128,6 +131,8 @@ func (s *connStateLive) processUpdate(ctx context.Context, update caches.Update,
|
||||
}
|
||||
|
||||
func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update, response *sync3.Response) bool {
|
||||
_, span := internal.StartSpan(ctx, "processLiveUpdate")
|
||||
defer span.End()
|
||||
internal.AssertWithContext(ctx, "processLiveUpdate: response list length != internal list length", s.lists.Len() == len(response.Lists))
|
||||
internal.AssertWithContext(ctx, "processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists))
|
||||
roomUpdate, _ := up.(caches.RoomUpdate)
|
||||
@ -236,7 +241,7 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
|
||||
})
|
||||
if len(r.Timeline) == 0 && r.PrevBatch == "" {
|
||||
// attempt to fill in the prev_batch value for this room
|
||||
prevBatch := s.userCache.AttemptToFetchPrevBatch(roomEventUpdate.RoomID(), roomEventUpdate.EventData)
|
||||
prevBatch := s.userCache.AttemptToFetchPrevBatch(ctx, roomEventUpdate.RoomID(), roomEventUpdate.EventData)
|
||||
if prevBatch != "" {
|
||||
r.PrevBatch = prevBatch
|
||||
}
|
||||
@ -246,11 +251,13 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
|
||||
sender := roomEventUpdate.EventData.Sender
|
||||
if s.lazyCache.IsLazyLoading(roomID) && !s.lazyCache.IsSet(roomID, sender) {
|
||||
// load the state event
|
||||
_, span := internal.StartSpan(ctx, "LazyLoadingMemberEvent")
|
||||
memberEvent := s.globalCache.LoadStateEvent(context.Background(), roomID, s.loadPositions[roomID], "m.room.member", sender)
|
||||
if memberEvent != nil {
|
||||
r.RequiredState = append(r.RequiredState, memberEvent)
|
||||
s.lazyCache.AddUser(roomID, sender)
|
||||
}
|
||||
span.End()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user