mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
perf: immediately send to-device messages to listening conns
This commit is contained in:
parent
48f28f9f6c
commit
95a5af3abe
10
pubsub/v2.go
10
pubsub/v2.go
@ -20,6 +20,7 @@ type V2Listener interface {
|
||||
OnDeviceData(p *V2DeviceData)
|
||||
OnTyping(p *V2Typing)
|
||||
OnReceipt(p *V2Receipt)
|
||||
OnDeviceMessages(p *V2DeviceMessages)
|
||||
}
|
||||
|
||||
type V2Initialise struct {
|
||||
@ -96,6 +97,13 @@ type V2Receipt struct {
|
||||
|
||||
func (*V2Receipt) Type() string { return "V2Receipt" }
|
||||
|
||||
type V2DeviceMessages struct {
|
||||
UserID string
|
||||
DeviceID string
|
||||
}
|
||||
|
||||
func (*V2DeviceMessages) Type() string { return "V2DeviceMessages" }
|
||||
|
||||
type V2Sub struct {
|
||||
listener Listener
|
||||
receiver V2Listener
|
||||
@ -134,6 +142,8 @@ func (v *V2Sub) onMessage(p Payload) {
|
||||
v.receiver.OnDeviceData(pl)
|
||||
case *V2Typing:
|
||||
v.receiver.OnTyping(pl)
|
||||
case *V2DeviceMessages:
|
||||
v.receiver.OnDeviceMessages(pl)
|
||||
default:
|
||||
logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type")
|
||||
}
|
||||
|
@ -261,12 +261,15 @@ func (h *Handler) OnReceipt(userID, roomID, ephEventType string, ephEvent json.R
|
||||
})
|
||||
}
|
||||
|
||||
// Send nothing, the v3 API will pull from the DB directly; no in-memory shenanigans
|
||||
func (h *Handler) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) {
|
||||
_, err := h.Store.ToDeviceTable.InsertMessages(deviceID, msgs)
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("device", deviceID).Int("msgs", len(msgs)).Msg("V2: failed to store to-device messages")
|
||||
}
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2DeviceMessages{
|
||||
UserID: userID,
|
||||
DeviceID: deviceID,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
|
@ -58,3 +58,8 @@ type DeviceDataUpdate struct {
|
||||
// no data; just wakes up the connection
|
||||
// data comes via sidechannels e.g the database
|
||||
}
|
||||
|
||||
type DeviceEventsUpdate struct {
|
||||
// no data; just wakes up the connection
|
||||
// data comes via sidechannels e.g the database
|
||||
}
|
||||
|
@ -78,6 +78,9 @@ func (h *Handler) HandleLiveUpdate(update caches.Update, req Request, res *Respo
|
||||
if req.Receipts != nil && req.Receipts.Enabled {
|
||||
res.Receipts = ProcessLiveReceipts(update, updateWillReturnResponse, req.UserID, req.Receipts)
|
||||
}
|
||||
if req.ToDevice != nil && req.ToDevice.Enabled != nil && *req.ToDevice.Enabled {
|
||||
res.ToDevice = ProcessLiveToDeviceEvents(update, h.Store, req.UserID, req.DeviceID, req.ToDevice)
|
||||
}
|
||||
// only process 'live' e2ee when we aren't going to return data as we need to ensure that we don't calculate this twice
|
||||
// e.g once on incoming request then again due to wakeup
|
||||
if req.E2EE != nil && req.E2EE.Enabled {
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/state"
|
||||
"github.com/matrix-org/sliding-sync/sync3/caches"
|
||||
)
|
||||
|
||||
// used to remember since positions to warn when they are not incremented. This can happen
|
||||
@ -46,6 +47,14 @@ func (r *ToDeviceResponse) HasData(isInitial bool) bool {
|
||||
return len(r.Events) > 0
|
||||
}
|
||||
|
||||
func ProcessLiveToDeviceEvents(up caches.Update, store *state.Storage, userID, deviceID string, req *ToDeviceRequest) (res *ToDeviceResponse) {
|
||||
_, ok := up.(caches.DeviceEventsUpdate)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return ProcessToDevice(store, userID, deviceID, req, false)
|
||||
}
|
||||
|
||||
func ProcessToDevice(store *state.Storage, userID, deviceID string, req *ToDeviceRequest, isInitial bool) (res *ToDeviceResponse) {
|
||||
if req.Limit == 0 {
|
||||
req.Limit = 100 // default to 100
|
||||
|
@ -533,6 +533,16 @@ func (h *SyncLiveHandler) OnDeviceData(p *pubsub.V2DeviceData) {
|
||||
conn.OnUpdate(caches.DeviceDataUpdate{})
|
||||
}
|
||||
|
||||
func (h *SyncLiveHandler) OnDeviceMessages(p *pubsub.V2DeviceMessages) {
|
||||
conn := h.ConnMap.Conn(sync3.ConnID{
|
||||
DeviceID: p.DeviceID,
|
||||
})
|
||||
if conn == nil {
|
||||
return
|
||||
}
|
||||
conn.OnUpdate(caches.DeviceEventsUpdate{})
|
||||
}
|
||||
|
||||
func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom) {
|
||||
userCache, ok := h.userCaches.Load(p.UserID)
|
||||
if !ok {
|
||||
|
@ -341,6 +341,37 @@ func TestExtensionToDevice(t *testing.T) {
|
||||
},
|
||||
})
|
||||
m.MatchResponse(t, res, m.MatchList(0, m.MatchV3Count(0)), m.MatchToDeviceMessages([]json.RawMessage{}))
|
||||
|
||||
// live stream and block, then send a to-device msg which should go through immediately
|
||||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
t.Logf("sending to-device msgs %v", time.Now())
|
||||
v2.queueResponse(alice, sync2.SyncResponse{
|
||||
ToDevice: sync2.EventsResponse{
|
||||
Events: newToDeviceMsgs,
|
||||
},
|
||||
})
|
||||
}()
|
||||
req := sync3.Request{
|
||||
Lists: []sync3.RequestList{{
|
||||
Ranges: sync3.SliceRanges{
|
||||
[2]int64{0, 10}, // doesn't matter
|
||||
},
|
||||
}},
|
||||
Extensions: extensions.Request{
|
||||
ToDevice: &extensions.ToDeviceRequest{
|
||||
Since: sinceBeforeMsgs,
|
||||
},
|
||||
},
|
||||
}
|
||||
req.SetTimeoutMSecs(1000)
|
||||
t.Logf("sending sync request %v", time.Now())
|
||||
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, req)
|
||||
if time.Since(start) >= time.Second {
|
||||
t.Fatalf("new to-device msg did not unblock sync request, took: %v", time.Since(start))
|
||||
}
|
||||
m.MatchResponse(t, res, m.MatchList(0, m.MatchV3Count(0)), m.MatchToDeviceMessages(newToDeviceMsgs))
|
||||
}
|
||||
|
||||
// tests that the account data extension works:
|
||||
|
Loading…
x
Reference in New Issue
Block a user