Rearrange and rename cleanup ticker

This commit is contained in:
David Robertson 2023-08-10 12:05:47 +01:00
parent 2d6777850f
commit 93bac84007
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
2 changed files with 17 additions and 13 deletions

View File

@ -52,9 +52,9 @@ func (t *DevicesTable) UpdateDeviceSince(userID, deviceID, since string) error {
// no particular order. // no particular order.
// //
// This is determined using the syncv3_sync2_tokens.last_seen column, which is updated // This is determined using the syncv3_sync2_tokens.last_seen column, which is updated
// at most once per day to save DB throughtput (see MaybeUpdateLastSeen). The caller // at most once per day to save DB throughtput (see TokensTable.MaybeUpdateLastSeen).
// should therefore use an inactivityPeriod of at least two days to avoid considering // The caller should therefore use an inactivityPeriod of at least two days to avoid
// a recently-used device as old. // considering a recently-used device as old.
func (t *DevicesTable) FindOldDevices(inactivityPeriod time.Duration) (devices []Device, err error) { func (t *DevicesTable) FindOldDevices(inactivityPeriod time.Duration) (devices []Device, err error) {
err = t.db.Select(&devices, ` err = t.db.Select(&devices, `
SELECT user_id, device_id SELECT user_id, device_id

View File

@ -48,8 +48,9 @@ type Handler struct {
typingMu *sync.Mutex typingMu *sync.Mutex
PendingTxnIDs *sync2.PendingTransactionIDs PendingTxnIDs *sync2.PendingTransactionIDs
deviceDataTicker *sync2.DeviceDataTicker deviceDataTicker *sync2.DeviceDataTicker
e2eeWorkerPool *internal.WorkerPool pollerExpiryTicker *time.Ticker
e2eeWorkerPool *internal.WorkerPool
numPollers prometheus.Gauge numPollers prometheus.Gauge
subSystem string subSystem string
@ -111,6 +112,7 @@ func (h *Handler) Teardown() {
h.v2Store.Teardown() h.v2Store.Teardown()
h.pMap.Terminate() h.pMap.Terminate()
h.deviceDataTicker.Stop() h.deviceDataTicker.Stop()
h.pollerExpiryTicker.Stop()
if h.numPollers != nil { if h.numPollers != nil {
prometheus.Unregister(h.numPollers) prometheus.Unregister(h.numPollers)
} }
@ -163,7 +165,7 @@ func (h *Handler) StartV2Pollers() {
wg.Wait() wg.Wait()
logger.Info().Msg("StartV2Pollers finished") logger.Info().Msg("StartV2Pollers finished")
h.updateMetrics() h.updateMetrics()
h.startCleanupTicker() h.startPollerExpiryTicker()
} }
func (h *Handler) updateMetrics() { func (h *Handler) updateMetrics() {
@ -559,17 +561,19 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) {
}() }()
} }
func (h *Handler) startCleanupTicker() { func (h *Handler) startPollerExpiryTicker() {
// TODO: would be nice to make this method idempotent so we don't run >1 ticker. if h.pollerExpiryTicker != nil {
ticker := time.NewTicker(time.Hour) return
}
h.pollerExpiryTicker = time.NewTicker(time.Hour)
go func() { go func() {
for range ticker.C { for range h.pollerExpiryTicker.C {
h.cleanup() h.expireOldPollers()
} }
}() }()
} }
func (h *Handler) cleanup() { func (h *Handler) expireOldPollers() {
devices, err := h.v2Store.DevicesTable.FindOldDevices(30 * 24 * time.Hour) devices, err := h.v2Store.DevicesTable.FindOldDevices(30 * 24 * time.Hour)
if err != nil { if err != nil {
logger.Err(err).Msg("Error fetching old devices") logger.Err(err).Msg("Error fetching old devices")
@ -581,7 +585,7 @@ func (h *Handler) cleanup() {
pids[i].DeviceID = devices[i].DeviceID pids[i].DeviceID = devices[i].DeviceID
} }
numExpired := h.pMap.ExpirePollers(pids) numExpired := h.pMap.ExpirePollers(pids)
logger.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup") logger.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup old devices")
} }
func fnvHash(event json.RawMessage) uint64 { func fnvHash(event json.RawMessage) uint64 {