sliding-sync/sync2/device_data_ticker.go
Kegan Dougal f36c038cf8 Rate limit pubsub.V2DeviceData updates to be at most 1 per second
The db writes are still instant, but the notifications are now delayed
by up to 1 second, in order to not swamp the pubsub channels.
2023-06-26 21:04:02 -07:00

91 lines
2.3 KiB
Go

package sync2
import (
"sync"
"time"
"github.com/matrix-org/sliding-sync/pubsub"
)
// This struct remembers user+device IDs to notify for then periodically
// emits them all to the caller. Use to rate limit the frequency of device list
// updates.
type DeviceDataTicker struct {
// data structures to periodically notify downstream about device data updates
// The ticker controls the frequency of updates. The done channel is used to stop ticking
// and clean up the goroutine. The notify map contains the values to notify for.
ticker *time.Ticker
done chan struct{}
notifyMap *sync.Map // map of PollerID to bools, unwrapped when notifying
fn func(payload *pubsub.V2DeviceData)
}
// Create a new device data ticker, which batches calls to Remember and invokes a callback every
// d duration. If d is 0, no batching is performed and the callback is invoked synchronously, which
// is useful for testing.
func NewDeviceDataTicker(d time.Duration) *DeviceDataTicker {
ddt := &DeviceDataTicker{
done: make(chan struct{}),
notifyMap: &sync.Map{},
}
if d != 0 {
ddt.ticker = time.NewTicker(d)
}
return ddt
}
// Stop ticking.
func (t *DeviceDataTicker) Stop() {
if t.ticker != nil {
t.ticker.Stop()
}
close(t.done)
}
// Set the function which should be called when the tick happens.
func (t *DeviceDataTicker) SetCallback(fn func(payload *pubsub.V2DeviceData)) {
t.fn = fn
}
// Remember this user/device ID, and emit it later on.
func (t *DeviceDataTicker) Remember(pid PollerID) {
t.notifyMap.Store(pid, true)
if t.ticker == nil {
t.emitUpdate()
}
}
func (t *DeviceDataTicker) emitUpdate() {
var p pubsub.V2DeviceData
p.UserIDToDeviceIDs = make(map[string][]string)
// populate the pubsub payload
t.notifyMap.Range(func(key, value any) bool {
pid := key.(PollerID)
devices := p.UserIDToDeviceIDs[pid.UserID]
devices = append(devices, pid.DeviceID)
p.UserIDToDeviceIDs[pid.UserID] = devices
// clear the map of this value
t.notifyMap.Delete(key)
return true // keep enumerating
})
// notify if we have entries
if len(p.UserIDToDeviceIDs) > 0 {
t.fn(&p)
}
}
// Blocks forever, ticking until Stop() is called.
func (t *DeviceDataTicker) Run() {
if t.ticker == nil {
return
}
for {
select {
case <-t.done:
return
case <-t.ticker.C:
t.emitUpdate()
}
}
}