poller: send all-clear

This commit is contained in:
David Robertson 2023-07-25 15:34:27 +01:00
parent c5d7570e09
commit 008157c146
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
4 changed files with 55 additions and 34 deletions

View File

@ -41,12 +41,14 @@ type V2Accumulate struct {
func (*V2Accumulate) Type() string { return "V2Accumulate" }
// V2TransactionID is emitted by a poller when it sees an event with a transaction ID.
// V2TransactionID is emitted by a poller when it sees an event with a transaction ID,
// or when it is certain that no other poller will see a transaction ID for this event
// (the "all-clear").
type V2TransactionID struct {
EventID string
UserID string
DeviceID string
TransactionID string
TransactionID string // Note: an empty transaction ID represents the all-clear.
NID int64
}

View File

@ -3,7 +3,6 @@ package handler2
import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"os"
"sync"
@ -240,15 +239,24 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
// Remember any transaction IDs that may be unique to this user
eventIDsWithTxns := make([]string, 0, len(timeline)) // in timeline order
eventIDToTxnID := make(map[string]string, len(timeline)) // event_id -> txn_id
// Also remember events which were sent by this user but lack a transaction ID.
eventIDsLackingTxns := make([]string, 0, len(timeline))
for _, e := range timeline {
txnID := gjson.GetBytes(e, "unsigned.transaction_id")
if !txnID.Exists() {
parsed := gjson.ParseBytes(e)
eventID := parsed.Get("event_id").Str
if txnID := parsed.Get("unsigned.transaction_id"); txnID.Exists() {
eventIDsWithTxns = append(eventIDsWithTxns, eventID)
eventIDToTxnID[eventID] = txnID.Str
continue
}
eventID := gjson.GetBytes(e, "event_id").Str
eventIDsWithTxns = append(eventIDsWithTxns, eventID)
eventIDToTxnID[eventID] = txnID.Str
if sender := parsed.Get("sender"); sender.Str == userID {
eventIDsLackingTxns = append(eventIDsLackingTxns, eventID)
}
}
if len(eventIDToTxnID) > 0 {
// persist the txn IDs
err := h.Store.TransactionsTable.Insert(userID, deviceID, eventIDToTxnID)
@ -269,57 +277,59 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
// no new events
return
}
// We've updated the database. Now tell any pubsub listeners what we learned.
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Accumulate{
RoomID: roomID,
PrevBatch: prevBatch,
EventNIDs: latestNIDs,
})
if len(eventIDToTxnID) > 0 {
if len(eventIDToTxnID) > 0 || len(eventIDsLackingTxns) > 0 {
// The call to h.Store.Accumulate above only tells us about new events' NIDS;
// for existing events we need to requery the database to fetch them.
// Rather than try to reuse work, keep things simple and just fetch NIDs for
// all events with txnIDs.
var nidsByIDs map[string]int64
eventIDsToFetch := append(eventIDsWithTxns, eventIDsLackingTxns...)
err = sqlutil.WithTransaction(h.Store.DB, func(txn *sqlx.Tx) error {
nidsByIDs, err = h.Store.EventsTable.SelectNIDsByIDs(txn, eventIDsWithTxns)
nidsByIDs, err = h.Store.EventsTable.SelectNIDsByIDs(txn, eventIDsToFetch)
return err
})
if err != nil {
logger.Err(err).
Int("timeline", len(timeline)).
Int("num_transaction_ids", len(eventIDsWithTxns)).
Int("num_missing_transaction_ids", len(eventIDsLackingTxns)).
Str("room", roomID).
Msg("V2: failed to fetch nids for events with transaction_ids")
Msg("V2: failed to fetch nids for event transaction_id handling")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return
}
for _, eventID := range eventIDsWithTxns {
for eventID, nid := range nidsByIDs {
txnID, ok := eventIDToTxnID[eventID]
if !ok {
continue
if ok {
h.pMap.SeenTxnID(eventID)
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2TransactionID{
EventID: eventID,
UserID: userID,
DeviceID: deviceID,
TransactionID: txnID,
NID: nid,
})
} else {
allClear, _ := h.pMap.MissingTxnID(eventID, userID, deviceID)
if allClear {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2TransactionID{
EventID: eventID,
UserID: userID,
DeviceID: deviceID,
TransactionID: "",
NID: nid,
})
}
}
nid, ok := nidsByIDs[eventID]
if !ok {
errMsg := "V2: failed to fetch NID for txnID"
logger.Error().Str("user", userID).Str("device", deviceID).Msg(errMsg)
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(fmt.Errorf("errMsg"))
continue
}
// TODO: if all pollers for this user's devices have seen this event, then
// we can send an "all clear" message. Maybe this is just a V2TransactionID
// with an empty string for the TransactionID. In order to do this we will
// need to keep track of which events have been seen by which devices. Maybe
// NIDs suffice?
h.pMap.SeenTxnID(eventID)
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2TransactionID{
EventID: eventID,
UserID: userID,
DeviceID: deviceID,
TransactionID: txnID,
NID: nid,
})
}
}
}

View File

@ -42,6 +42,10 @@ func (p *mockPollerMap) NumPollers() int {
}
func (p *mockPollerMap) Terminate() {}
func (p *mockPollerMap) MissingTxnID(eventID, userID, deviceID string) (bool, error) {
return false, nil
}
func (p *mockPollerMap) SeenTxnID(eventID string) error {
return nil
}

View File

@ -64,6 +64,7 @@ type IPollerMap interface {
EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger)
NumPollers() int
Terminate()
MissingTxnID(eventID, userID, deviceID string) (bool, error)
SeenTxnID(eventID string) error
}
@ -212,6 +213,10 @@ func (h *PollerMap) deviceIDs(userID string) []string {
return devices
}
func (h *PollerMap) MissingTxnID(eventID, userID, deviceID string) (bool, error) {
return h.pendingTxnIDs.MissingTxnID(eventID, userID, deviceID)
}
func (h *PollerMap) SeenTxnID(eventID string) error {
return h.pendingTxnIDs.SeenTxnID(eventID)
}