From 93bac84007220d5d3f8d976bc701440f0e641ad0 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 10 Aug 2023 12:05:47 +0100 Subject: [PATCH] Rearrange and rename cleanup ticker --- sync2/devices_table.go | 6 +++--- sync2/handler2/handler.go | 24 ++++++++++++++---------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sync2/devices_table.go b/sync2/devices_table.go index 20d1fe7..f8c0f5a 100644 --- a/sync2/devices_table.go +++ b/sync2/devices_table.go @@ -52,9 +52,9 @@ func (t *DevicesTable) UpdateDeviceSince(userID, deviceID, since string) error { // no particular order. // // This is determined using the syncv3_sync2_tokens.last_seen column, which is updated -// at most once per day to save DB throughtput (see MaybeUpdateLastSeen). The caller -// should therefore use an inactivityPeriod of at least two days to avoid considering -// a recently-used device as old. +// at most once per day to save DB throughtput (see TokensTable.MaybeUpdateLastSeen). +// The caller should therefore use an inactivityPeriod of at least two days to avoid +// considering a recently-used device as old. func (t *DevicesTable) FindOldDevices(inactivityPeriod time.Duration) (devices []Device, err error) { err = t.db.Select(&devices, ` SELECT user_id, device_id diff --git a/sync2/handler2/handler.go b/sync2/handler2/handler.go index 3850e93..6379bcd 100644 --- a/sync2/handler2/handler.go +++ b/sync2/handler2/handler.go @@ -48,8 +48,9 @@ type Handler struct { typingMu *sync.Mutex PendingTxnIDs *sync2.PendingTransactionIDs - deviceDataTicker *sync2.DeviceDataTicker - e2eeWorkerPool *internal.WorkerPool + deviceDataTicker *sync2.DeviceDataTicker + pollerExpiryTicker *time.Ticker + e2eeWorkerPool *internal.WorkerPool numPollers prometheus.Gauge subSystem string @@ -111,6 +112,7 @@ func (h *Handler) Teardown() { h.v2Store.Teardown() h.pMap.Terminate() h.deviceDataTicker.Stop() + h.pollerExpiryTicker.Stop() if h.numPollers != nil { prometheus.Unregister(h.numPollers) } @@ -163,7 +165,7 @@ func (h *Handler) StartV2Pollers() { wg.Wait() logger.Info().Msg("StartV2Pollers finished") h.updateMetrics() - h.startCleanupTicker() + h.startPollerExpiryTicker() } func (h *Handler) updateMetrics() { @@ -559,17 +561,19 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) { }() } -func (h *Handler) startCleanupTicker() { - // TODO: would be nice to make this method idempotent so we don't run >1 ticker. - ticker := time.NewTicker(time.Hour) +func (h *Handler) startPollerExpiryTicker() { + if h.pollerExpiryTicker != nil { + return + } + h.pollerExpiryTicker = time.NewTicker(time.Hour) go func() { - for range ticker.C { - h.cleanup() + for range h.pollerExpiryTicker.C { + h.expireOldPollers() } }() } -func (h *Handler) cleanup() { +func (h *Handler) expireOldPollers() { devices, err := h.v2Store.DevicesTable.FindOldDevices(30 * 24 * time.Hour) if err != nil { logger.Err(err).Msg("Error fetching old devices") @@ -581,7 +585,7 @@ func (h *Handler) cleanup() { pids[i].DeviceID = devices[i].DeviceID } numExpired := h.pMap.ExpirePollers(pids) - logger.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup") + logger.Info().Int("old", len(devices)).Int("expired", numExpired).Msg("poller cleanup old devices") } func fnvHash(event json.RawMessage) uint64 {