2022-03-28 15:19:42 +01:00
|
|
|
package sync2
|
|
|
|
|
|
|
|
import (
|
2023-07-25 14:37:53 +01:00
|
|
|
"fmt"
|
|
|
|
"sync"
|
2022-03-28 15:19:42 +01:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/ReneKroon/ttlcache/v2"
|
|
|
|
)
|
|
|
|
|
2023-07-25 14:37:53 +01:00
|
|
|
type loaderFunc func(userID string) (deviceIDs []string)
|
|
|
|
|
|
|
|
// PendingTransactionIDs is (conceptually) a map from event IDs to a list of device IDs.
|
2023-07-26 14:06:05 +01:00
|
|
|
// Its keys are the IDs of event we've seen which a) lack a transaction ID, and b) were
|
|
|
|
// sent by one of the users we are polling for. The values are the list of the sender's
|
2023-07-25 14:37:53 +01:00
|
|
|
// devices whose pollers are yet to see a transaction ID.
|
|
|
|
//
|
|
|
|
// If another poller sees the same event
|
2023-07-26 14:06:05 +01:00
|
|
|
//
|
2023-07-25 14:37:53 +01:00
|
|
|
// - with a transaction ID, it emits a V2TransactionID payload with that ID and
|
|
|
|
// removes the event ID from this map.
|
2023-07-26 14:06:05 +01:00
|
|
|
//
|
2023-07-25 14:37:53 +01:00
|
|
|
// - without a transaction ID, it removes the polling device ID from the values
|
|
|
|
// list. If the device ID list is now empty, the poller emits an "all clear"
|
|
|
|
// V2TransactionID payload.
|
|
|
|
//
|
|
|
|
// This is a best-effort affair to ensure that the rest of the proxy can wait for
|
|
|
|
// transaction IDs to appear before transmitting an event down /sync to its sender.
|
|
|
|
//
|
|
|
|
// It's possible that we add an entry to this map and then the list of remaining
|
|
|
|
// device IDs becomes out of date, either due to a new device creation or an
|
|
|
|
// existing device expiring. We choose not to handle this case, because it is relatively
|
|
|
|
// rare.
|
|
|
|
//
|
|
|
|
// To avoid the map growing without bound, we use a ttlcache and drop entries
|
|
|
|
// after a short period of time.
|
|
|
|
type PendingTransactionIDs struct {
|
2023-07-27 12:01:37 +01:00
|
|
|
// mu guards the pending field. See MissingTxnID for rationale.
|
2023-07-25 14:37:53 +01:00
|
|
|
mu sync.Mutex
|
|
|
|
pending *ttlcache.Cache
|
|
|
|
// loader should provide the list of device IDs
|
|
|
|
loader loaderFunc
|
2022-03-28 15:19:42 +01:00
|
|
|
}
|
|
|
|
|
2023-07-25 14:37:53 +01:00
|
|
|
func NewPendingTransactionIDs(loader loaderFunc) *PendingTransactionIDs {
|
2022-03-28 15:19:42 +01:00
|
|
|
c := ttlcache.NewCache()
|
|
|
|
c.SetTTL(5 * time.Minute) // keep transaction IDs for 5 minutes before forgetting about them
|
|
|
|
c.SkipTTLExtensionOnHit(true) // we don't care how many times they ask for the item, 5min is the limit.
|
2023-07-25 14:37:53 +01:00
|
|
|
return &PendingTransactionIDs{
|
|
|
|
mu: sync.Mutex{},
|
|
|
|
pending: c,
|
|
|
|
loader: loader,
|
2022-03-28 15:19:42 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-25 14:37:53 +01:00
|
|
|
// MissingTxnID should be called to report that this device ID did not see a
|
|
|
|
// transaction ID for this event ID. Returns true if this is the first time we know
|
|
|
|
// for sure that we'll never see a txn ID for this event.
|
|
|
|
func (c *PendingTransactionIDs) MissingTxnID(eventID, userID, myDeviceID string) (bool, error) {
|
2023-07-27 12:01:37 +01:00
|
|
|
// While ttlcache is threadsafe, it does not provide a way to atomically update
|
|
|
|
// (get+set) a value, which means we are still open to races. For example:
|
|
|
|
//
|
|
|
|
// - We have three pollers A, B, C.
|
|
|
|
// - Poller A sees an event without txn id and calls MissingTxnID.
|
|
|
|
// - `c.pending.Get()` fails, so we load up all device IDs: [A, B, C].
|
|
|
|
// - Then `c.pending.Set()` with [B, C].
|
|
|
|
// - Poller B sees the same event, also missing txn ID and calls MissingTxnID.
|
|
|
|
// - Poller C does the same concurrently.
|
|
|
|
//
|
|
|
|
// If the Get+Set isn't atomic, then we might do e.g.
|
|
|
|
// - B gets [B, C] and prepares to write [C].
|
|
|
|
// - C gets [B, C] and prepares to write [B].
|
|
|
|
// - Last writer wins. Either way, we never write [] and so never return true
|
|
|
|
// (the all-clear signal.)
|
|
|
|
//
|
|
|
|
// This wouldn't be the end of the world (the API process has a maximum delay, and
|
|
|
|
// the ttlcache will expire the entry), but it would still be nice to avoid it.
|
2023-07-25 14:37:53 +01:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
2022-03-28 15:19:42 +01:00
|
|
|
|
2023-07-25 14:37:53 +01:00
|
|
|
data, err := c.pending.Get(eventID)
|
|
|
|
if err == ttlcache.ErrNotFound {
|
|
|
|
data = c.loader(userID)
|
|
|
|
} else if err != nil {
|
|
|
|
return false, fmt.Errorf("PendingTransactionIDs: failed to get device ids: %w", err)
|
2022-03-28 15:19:42 +01:00
|
|
|
}
|
2023-07-25 14:37:53 +01:00
|
|
|
|
|
|
|
deviceIDs, ok := data.([]string)
|
|
|
|
if !ok {
|
|
|
|
return false, fmt.Errorf("PendingTransactionIDs: failed to cast device IDs")
|
|
|
|
}
|
|
|
|
|
|
|
|
deviceIDs, changed := removeDevice(myDeviceID, deviceIDs)
|
|
|
|
if changed {
|
|
|
|
err = c.pending.Set(eventID, deviceIDs)
|
|
|
|
if err != nil {
|
|
|
|
return false, fmt.Errorf("PendingTransactionIDs: failed to set device IDs: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return changed && len(deviceIDs) == 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SeenTxnID should be called to report that this device saw a transaction ID
|
|
|
|
// for this event.
|
|
|
|
func (c *PendingTransactionIDs) SeenTxnID(eventID string) error {
|
|
|
|
c.mu.Lock()
|
2023-07-27 12:01:37 +01:00
|
|
|
defer c.mu.Unlock()
|
2023-07-25 14:37:53 +01:00
|
|
|
return c.pending.Set(eventID, []string{})
|
2022-03-28 15:19:42 +01:00
|
|
|
}
|
|
|
|
|
2023-07-26 14:06:05 +01:00
|
|
|
// removeDevice takes a device ID slice and returns a device ID slice with one
|
|
|
|
// particular string removed. Assumes that the given slice has no duplicates.
|
|
|
|
// Does not modify the given slice in situ.
|
2023-07-25 14:37:53 +01:00
|
|
|
func removeDevice(device string, devices []string) ([]string, bool) {
|
|
|
|
for i, otherDevice := range devices {
|
|
|
|
if otherDevice == device {
|
|
|
|
return append(devices[:i], devices[i+1:]...), true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return devices, false
|
2022-03-28 15:19:42 +01:00
|
|
|
}
|