2021-05-27 19:20:36 +01:00
|
|
|
package state
|
|
|
|
|
|
|
|
import (
|
2022-03-31 14:29:26 +01:00
|
|
|
"database/sql"
|
2021-05-27 19:20:36 +01:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
2023-10-02 09:45:40 +01:00
|
|
|
|
2023-08-17 18:55:00 +01:00
|
|
|
"github.com/matrix-org/sliding-sync/internal"
|
2023-09-19 12:41:25 +01:00
|
|
|
"github.com/matrix-org/sliding-sync/sync2"
|
2023-06-08 13:54:46 +01:00
|
|
|
|
2023-04-05 14:11:05 +01:00
|
|
|
"github.com/getsentry/sentry-go"
|
2021-05-27 19:20:36 +01:00
|
|
|
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
|
|
"github.com/lib/pq"
|
2022-12-15 11:08:50 +00:00
|
|
|
"github.com/matrix-org/sliding-sync/sqlutil"
|
2021-05-28 12:08:10 +01:00
|
|
|
"github.com/tidwall/gjson"
|
2021-05-27 19:20:36 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// Accumulator tracks room state and timelines.
|
|
|
|
//
|
|
|
|
// In order for it to remain simple(ish), the accumulator DOES NOT SUPPORT arbitrary timeline gaps.
|
|
|
|
// There is an Initialise function for new rooms (with some pre-determined state) and then a constant
|
|
|
|
// Accumulate function for timeline events. v2 sync must be called with a large enough timeline.limit
|
|
|
|
// for this to work!
|
|
|
|
type Accumulator struct {
|
2023-10-02 09:45:40 +01:00
|
|
|
db *sqlx.DB
|
|
|
|
roomsTable *RoomsTable
|
|
|
|
eventsTable *EventTable
|
|
|
|
snapshotTable *SnapshotTable
|
|
|
|
spacesTable *SpacesTable
|
2023-10-10 17:06:10 +01:00
|
|
|
invitesTable *InvitesTable
|
2023-10-02 09:45:40 +01:00
|
|
|
entityName string
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
|
|
|
|
2021-06-16 17:18:04 +01:00
|
|
|
func NewAccumulator(db *sqlx.DB) *Accumulator {
|
2021-05-27 19:20:36 +01:00
|
|
|
return &Accumulator{
|
2021-08-23 15:21:51 +01:00
|
|
|
db: db,
|
|
|
|
roomsTable: NewRoomsTable(db),
|
|
|
|
eventsTable: NewEventTable(db),
|
|
|
|
snapshotTable: NewSnapshotsTable(db),
|
2022-07-28 13:44:52 +01:00
|
|
|
spacesTable: NewSpacesTable(db),
|
2023-10-10 17:06:10 +01:00
|
|
|
invitesTable: NewInvitesTable(db),
|
2021-08-23 15:21:51 +01:00
|
|
|
entityName: "server",
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-06 15:39:36 +01:00
|
|
|
func (a *Accumulator) strippedEventsForSnapshot(txn *sqlx.Tx, snapID int64) (StrippedEvents, error) {
|
2021-05-28 12:08:10 +01:00
|
|
|
snapshot, err := a.snapshotTable.Select(txn, snapID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// pull stripped events as this may be huge (think Matrix HQ)
|
2023-01-12 17:11:09 +00:00
|
|
|
return a.eventsTable.SelectStrippedEventsByNIDs(txn, true, append(snapshot.MembershipEvents, snapshot.OtherEvents...))
|
2021-05-28 12:08:10 +01:00
|
|
|
}
|
|
|
|
|
2021-08-18 16:34:44 +01:00
|
|
|
// calculateNewSnapshot works out the new snapshot by combining an old snapshot and a new state event. Events get replaced
|
2021-08-18 18:21:40 +01:00
|
|
|
// if the tuple of event type/state_key match. A new slice is returned (the inputs are not modified) along with the NID
|
|
|
|
// that got replaced.
|
2022-06-08 18:20:10 +01:00
|
|
|
func (a *Accumulator) calculateNewSnapshot(old StrippedEvents, new Event) (StrippedEvents, int64, error) {
|
2021-08-18 16:34:44 +01:00
|
|
|
// TODO: implement dendrite's binary tree diff algorithm?
|
2021-08-20 15:56:17 +01:00
|
|
|
tupleKey := func(e Event) string {
|
2021-05-28 16:07:28 +01:00
|
|
|
// 0x1f = unit separator
|
|
|
|
return e.Type + "\x1f" + e.StateKey
|
|
|
|
}
|
2021-08-18 16:34:44 +01:00
|
|
|
newTuple := tupleKey(new)
|
|
|
|
added := false
|
2021-08-18 18:21:40 +01:00
|
|
|
var replacedNID int64
|
2022-06-08 18:20:10 +01:00
|
|
|
seenTuples := make(map[string]struct{}, len(old))
|
2022-04-22 12:34:08 +01:00
|
|
|
result := make(StrippedEvents, 0, len(old)+1)
|
2021-05-28 16:07:28 +01:00
|
|
|
for _, e := range old {
|
2021-08-18 16:34:44 +01:00
|
|
|
existingTuple := tupleKey(e)
|
2022-06-08 18:20:10 +01:00
|
|
|
if _, seen := seenTuples[existingTuple]; seen {
|
|
|
|
return nil, 0, fmt.Errorf("seen this tuple before: %v", existingTuple)
|
|
|
|
}
|
|
|
|
seenTuples[existingTuple] = struct{}{}
|
2021-10-01 16:55:50 +01:00
|
|
|
if e.NID == new.NID && existingTuple != newTuple {
|
|
|
|
// ruh roh. This should be impossible, but it can happen if the v2 response sends the same
|
|
|
|
// event in both state and timeline. We need to alert the operator and whine badly as it means
|
|
|
|
// we have lost an event by now.
|
2022-12-22 15:08:42 +00:00
|
|
|
logger.Warn().Str("new_event_id", new.ID).Str("old_event_id", e.ID).Str("room_id", new.RoomID).Str("type", new.Type).Str("state_key", new.StateKey).Msg(
|
2022-02-18 17:10:35 +00:00
|
|
|
"Detected different event IDs with the same NID when rolling forward state. This has resulted in data loss in this room (1 event). " +
|
2021-10-01 16:55:50 +01:00
|
|
|
"This can happen when the v2 /sync response sends the same event in both state and timeline sections. " +
|
|
|
|
"The event in this log line has been dropped!",
|
|
|
|
)
|
2022-06-08 18:20:10 +01:00
|
|
|
return nil, 0, fmt.Errorf("new event has same nid value '%d' as event in snapshot but has a different tuple: %v != %v", new.NID, existingTuple, newTuple)
|
2021-10-01 16:55:50 +01:00
|
|
|
}
|
2021-08-18 16:34:44 +01:00
|
|
|
if existingTuple == newTuple {
|
|
|
|
// use the new event
|
|
|
|
result = append(result, new)
|
|
|
|
added = true
|
2021-08-18 18:21:40 +01:00
|
|
|
replacedNID = e.NID
|
2021-08-18 16:34:44 +01:00
|
|
|
continue
|
2021-05-28 16:07:28 +01:00
|
|
|
}
|
2021-08-18 16:34:44 +01:00
|
|
|
// use the old event
|
|
|
|
result = append(result, e)
|
2021-05-28 16:07:28 +01:00
|
|
|
}
|
2021-08-18 16:34:44 +01:00
|
|
|
if !added {
|
|
|
|
result = append(result, new)
|
2021-05-28 16:07:28 +01:00
|
|
|
}
|
2022-06-08 18:20:10 +01:00
|
|
|
return result, replacedNID, nil
|
2021-05-28 12:08:10 +01:00
|
|
|
}
|
|
|
|
|
2022-07-28 13:44:52 +01:00
|
|
|
// roomInfoDelta calculates what the RoomInfo should be given a list of new events.
|
|
|
|
func (a *Accumulator) roomInfoDelta(roomID string, events []Event) RoomInfo {
|
|
|
|
isEncrypted := false
|
2022-09-07 17:34:49 +01:00
|
|
|
var upgradedRoomID *string
|
2022-07-28 13:44:52 +01:00
|
|
|
var roomType *string
|
2022-09-08 15:44:48 +01:00
|
|
|
var pred *string
|
2022-07-28 13:44:52 +01:00
|
|
|
for _, ev := range events {
|
|
|
|
if ev.Type == "m.room.encryption" && ev.StateKey == "" {
|
|
|
|
isEncrypted = true
|
|
|
|
}
|
|
|
|
if ev.Type == "m.room.tombstone" && ev.StateKey == "" {
|
2022-09-07 17:34:49 +01:00
|
|
|
contentType := gjson.GetBytes(ev.JSON, "content.replacement_room")
|
|
|
|
if contentType.Exists() && contentType.Type == gjson.String {
|
|
|
|
upgradedRoomID = &contentType.Str
|
|
|
|
}
|
2022-07-28 13:44:52 +01:00
|
|
|
}
|
|
|
|
if ev.Type == "m.room.create" && ev.StateKey == "" {
|
|
|
|
contentType := gjson.GetBytes(ev.JSON, "content.type")
|
|
|
|
if contentType.Exists() && contentType.Type == gjson.String {
|
|
|
|
roomType = &contentType.Str
|
|
|
|
}
|
2022-09-08 15:44:48 +01:00
|
|
|
predecessorRoomID := gjson.GetBytes(ev.JSON, "content.predecessor.room_id").Str
|
|
|
|
if predecessorRoomID != "" {
|
|
|
|
pred = &predecessorRoomID
|
|
|
|
}
|
|
|
|
|
2022-07-28 13:44:52 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return RoomInfo{
|
2022-09-08 15:44:48 +01:00
|
|
|
ID: roomID,
|
|
|
|
IsEncrypted: isEncrypted,
|
|
|
|
UpgradedRoomID: upgradedRoomID,
|
|
|
|
Type: roomType,
|
|
|
|
PredecessorRoomID: pred,
|
2022-07-28 13:44:52 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-17 18:21:58 +01:00
|
|
|
type InitialiseResult struct {
|
|
|
|
// AddedEvents is true iff this call to Initialise added new state events to the DB.
|
|
|
|
AddedEvents bool
|
|
|
|
// SnapshotID is the ID of the snapshot which incorporates all added events.
|
2023-11-03 15:41:09 +00:00
|
|
|
// It has no meaning if AddedEvents is false.
|
2023-04-17 18:21:58 +01:00
|
|
|
SnapshotID int64
|
2023-11-03 15:41:09 +00:00
|
|
|
// ReplacedExistingSnapshot is true when we created a new snapshot for the room and
|
|
|
|
// there a pre-existing room snapshot. It has no meaning if AddedEvents is false.
|
|
|
|
ReplacedExistingSnapshot bool
|
2023-04-17 18:21:58 +01:00
|
|
|
}
|
|
|
|
|
2023-11-03 15:41:09 +00:00
|
|
|
// Initialise processes the state block of a V2 sync response for a particular room. If
|
|
|
|
// the state of the room has changed, we persist any new state events and create a new
|
|
|
|
// "snapshot" of its entire state.
|
2023-04-18 15:46:47 +01:00
|
|
|
//
|
2023-11-03 15:41:09 +00:00
|
|
|
// Summary of the logic:
|
2023-04-18 15:46:47 +01:00
|
|
|
//
|
2023-11-03 15:41:09 +00:00
|
|
|
// 0. Ensure the state block is not empty.
|
2023-04-18 15:46:47 +01:00
|
|
|
//
|
2023-11-03 15:41:09 +00:00
|
|
|
// 1. Capture the current snapshot ID, possibly zero. If it is zero, ensure that the
|
|
|
|
// state block contains a `create event`.
|
|
|
|
//
|
|
|
|
// 2. Insert the events. If there are no newly inserted events, bail. If there are new
|
|
|
|
// events, then the state block has definitely changed. Note: we ignore cases where
|
|
|
|
// the state has only changed to a known subset of state events (i.e in the case of
|
|
|
|
// state resets, slow pollers) as it is impossible to then reconcile that state with
|
|
|
|
// any new events, as any "catchup" state will be ignored due to the events already
|
|
|
|
// existing.
|
|
|
|
//
|
|
|
|
// 3. Fetch the current state of the room, as a map from (type, state_key) to event.
|
|
|
|
// If there is no existing state snapshot, this map is the empty map.
|
|
|
|
// If the state hasn't altered, bail.
|
|
|
|
//
|
|
|
|
// 4. Create new snapshot. Update the map from (3) with the events in `state`.
|
|
|
|
// (There is similar logic for this in Accumulate.)
|
|
|
|
// Store the snapshot. Mark the room's current state as being this snapshot.
|
|
|
|
//
|
|
|
|
// 5. Any other processing of the new state events.
|
|
|
|
//
|
|
|
|
// 6. Return an "AddedEvents" bool (if true, emit an Initialise payload) and a
|
|
|
|
// "ReplacedSnapshot" bool (if true, emit a cache invalidation payload).
|
|
|
|
|
2023-04-18 14:56:10 +01:00
|
|
|
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
|
|
|
|
var res InitialiseResult
|
2023-11-03 15:41:09 +00:00
|
|
|
var startingSnapshotID int64
|
|
|
|
|
|
|
|
// 0. Ensure the state block is not empty.
|
2021-05-27 19:20:36 +01:00
|
|
|
if len(state) == 0 {
|
2023-04-17 18:21:58 +01:00
|
|
|
return res, nil
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
2023-11-03 15:41:09 +00:00
|
|
|
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) (err error) {
|
|
|
|
// 1. Capture the current snapshot ID, checking for a create event if this is our first snapshot.
|
|
|
|
|
2023-04-18 14:56:10 +01:00
|
|
|
// Attempt to short-circuit. This has to be done inside a transaction to make sure
|
2021-05-27 19:20:36 +01:00
|
|
|
// we don't race with multiple calls to Initialise with the same room ID.
|
2023-11-03 15:41:09 +00:00
|
|
|
startingSnapshotID, err = a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
|
2021-05-27 19:20:36 +01:00
|
|
|
if err != nil {
|
2023-11-03 15:41:09 +00:00
|
|
|
return fmt.Errorf("error fetching snapshot id for room %s: %w", roomID, err)
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
2023-11-03 15:41:09 +00:00
|
|
|
// Start by parsing the events in the state block.
|
2021-05-27 19:20:36 +01:00
|
|
|
events := make([]Event, len(state))
|
|
|
|
for i := range events {
|
|
|
|
events[i] = Event{
|
2022-07-21 16:20:59 +01:00
|
|
|
JSON: state[i],
|
|
|
|
RoomID: roomID,
|
|
|
|
IsState: true,
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
|
|
|
}
|
2023-07-25 14:41:23 +01:00
|
|
|
events = filterAndEnsureFieldsSet(events)
|
|
|
|
if len(events) == 0 {
|
2023-11-03 15:41:09 +00:00
|
|
|
return fmt.Errorf("failed to parse state block, all events were filtered out: %w", err)
|
2022-03-18 10:59:38 +00:00
|
|
|
}
|
2023-08-22 15:56:38 +01:00
|
|
|
|
2023-11-03 15:41:09 +00:00
|
|
|
if startingSnapshotID == 0 {
|
|
|
|
// Ensure that we have "proper" state and not "stray" events from Synapse.
|
|
|
|
if err = ensureStateHasCreateEvent(events); err != nil {
|
|
|
|
return err
|
2023-08-22 15:56:38 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-11-03 15:41:09 +00:00
|
|
|
// 2. Insert the events and determine which ones are new.
|
|
|
|
newEventIDToNID, err := a.eventsTable.Insert(txn, events, false)
|
2021-05-27 19:20:36 +01:00
|
|
|
if err != nil {
|
2021-06-04 13:02:28 +01:00
|
|
|
return fmt.Errorf("failed to insert events: %w", err)
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
2023-11-03 15:41:09 +00:00
|
|
|
if len(newEventIDToNID) == 0 {
|
|
|
|
if startingSnapshotID == 0 {
|
|
|
|
// we don't have a current snapshot for this room but yet no events are new,
|
|
|
|
// no idea how this should be handled.
|
|
|
|
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
|
|
|
|
logger.Error().Str("room_id", roomID).Msg(errMsg)
|
|
|
|
sentry.CaptureException(fmt.Errorf(errMsg))
|
|
|
|
}
|
|
|
|
// Note: we otherwise ignore cases where the state has only changed to a
|
|
|
|
// known subset of state events (i.e in the case of state resets, slow
|
|
|
|
// pollers) as it is impossible to then reconcile that state with
|
|
|
|
// any new events, as any "catchup" state will be ignored due to the events
|
|
|
|
// already existing.
|
2021-05-27 19:20:36 +01:00
|
|
|
return nil
|
|
|
|
}
|
2023-11-03 15:41:09 +00:00
|
|
|
newEvents := make([]Event, 0, len(newEventIDToNID))
|
2023-04-18 14:56:10 +01:00
|
|
|
for _, event := range events {
|
2023-11-03 15:41:09 +00:00
|
|
|
newNid, isNew := newEventIDToNID[event.ID]
|
|
|
|
if isNew {
|
|
|
|
event.NID = newNid
|
|
|
|
newEvents = append(newEvents, event)
|
2023-01-12 17:11:09 +00:00
|
|
|
}
|
2023-04-18 14:56:10 +01:00
|
|
|
}
|
2023-11-03 15:41:09 +00:00
|
|
|
|
|
|
|
// 3. Fetch the current state of the room.
|
|
|
|
var currentState stateMap
|
|
|
|
if startingSnapshotID > 0 {
|
|
|
|
currentState, err = a.stateMapAtSnapshot(txn, startingSnapshotID)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to load state map: %w", err)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
currentState = stateMap{
|
2023-11-07 13:28:16 +00:00
|
|
|
// Typically expect Other to be small, but Memberships may be large (think: Matrix HQ.)
|
2023-11-03 15:41:09 +00:00
|
|
|
Memberships: make(map[string]int64, len(events)),
|
2023-11-07 13:28:16 +00:00
|
|
|
Other: make(map[[2]string]int64),
|
2023-04-17 19:13:12 +01:00
|
|
|
}
|
2022-06-08 18:20:10 +01:00
|
|
|
}
|
2021-05-27 19:20:36 +01:00
|
|
|
|
2023-11-03 15:41:09 +00:00
|
|
|
// 4. Update the map from (3) with the new events to create a new snapshot.
|
|
|
|
for _, ev := range newEvents {
|
|
|
|
currentState.Ingest(ev)
|
|
|
|
}
|
|
|
|
memberNIDs, otherNIDs := currentState.NIDs()
|
2021-05-27 19:20:36 +01:00
|
|
|
snapshot := &SnapshotRow{
|
2023-01-12 17:11:09 +00:00
|
|
|
RoomID: roomID,
|
2023-11-03 15:41:09 +00:00
|
|
|
MembershipEvents: memberNIDs,
|
|
|
|
OtherEvents: otherNIDs,
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
|
|
|
err = a.snapshotTable.Insert(txn, snapshot)
|
|
|
|
if err != nil {
|
2021-06-04 13:02:28 +01:00
|
|
|
return fmt.Errorf("failed to insert snapshot: %w", err)
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
2023-04-17 18:21:58 +01:00
|
|
|
res.AddedEvents = true
|
2023-11-03 15:41:09 +00:00
|
|
|
|
|
|
|
// 5. Any other processing of new state events.
|
2022-04-26 11:15:21 +01:00
|
|
|
latestNID := int64(0)
|
2023-01-12 17:11:09 +00:00
|
|
|
for _, nid := range otherNIDs {
|
|
|
|
if nid > latestNID {
|
|
|
|
latestNID = nid
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, nid := range memberNIDs {
|
2022-04-26 11:15:21 +01:00
|
|
|
if nid > latestNID {
|
|
|
|
latestNID = nid
|
|
|
|
}
|
|
|
|
}
|
2021-08-06 15:39:36 +01:00
|
|
|
|
2023-10-10 18:27:45 +01:00
|
|
|
if err = a.invitesTable.RemoveSupersededInvites(txn, roomID, events); err != nil {
|
|
|
|
return fmt.Errorf("RemoveSupersededInvites: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-07-28 16:41:40 +01:00
|
|
|
if err = a.spacesTable.HandleSpaceUpdates(txn, events); err != nil {
|
|
|
|
return fmt.Errorf("HandleSpaceUpdates: %s", err)
|
2021-11-04 13:40:16 +00:00
|
|
|
}
|
|
|
|
|
2022-07-28 13:44:52 +01:00
|
|
|
// check for metadata events
|
|
|
|
info := a.roomInfoDelta(roomID, events)
|
|
|
|
|
2021-08-18 16:34:44 +01:00
|
|
|
// these events do not have a state snapshot ID associated with them as we don't know what
|
|
|
|
// order the state events came down in, it's only a snapshot. This means only timeline events
|
|
|
|
// will have an associated state snapshot ID on the event.
|
2021-08-06 15:39:36 +01:00
|
|
|
|
2021-05-27 19:20:36 +01:00
|
|
|
// Set the snapshot ID as the current state
|
2023-11-03 15:41:09 +00:00
|
|
|
err = a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// 6. Tell the caller what happened, so they know what payloads to emit.
|
2023-04-17 19:13:12 +01:00
|
|
|
res.SnapshotID = snapshot.SnapshotID
|
2023-11-03 15:41:09 +00:00
|
|
|
res.AddedEvents = true
|
|
|
|
res.ReplacedExistingSnapshot = startingSnapshotID > 0
|
|
|
|
return nil
|
2021-05-27 19:20:36 +01:00
|
|
|
})
|
2023-04-18 14:56:10 +01:00
|
|
|
return res, err
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
|
|
|
|
2023-09-07 19:21:44 +01:00
|
|
|
type AccumulateResult struct {
|
2023-09-12 16:22:11 +01:00
|
|
|
// NumNew is the number of events in timeline NIDs that were not previously known
|
|
|
|
// to the proyx.
|
|
|
|
NumNew int
|
|
|
|
// TimelineNIDs is the list of event nids seen in a sync v2 timeline. Some of these
|
|
|
|
// may already be known to the proxy.
|
2023-09-07 19:21:44 +01:00
|
|
|
TimelineNIDs []int64
|
2023-11-01 18:36:50 +00:00
|
|
|
// IncludesStateRedaction is set to true when we have accumulated a redaction to a
|
|
|
|
// piece of room state.
|
|
|
|
IncludesStateRedaction bool
|
2023-09-07 19:21:44 +01:00
|
|
|
}
|
|
|
|
|
2021-05-28 12:08:10 +01:00
|
|
|
// Accumulate internal state from a user's sync response. The timeline order MUST be in the order
|
2022-12-14 18:53:55 +00:00
|
|
|
// received from the server. Returns the number of new events in the timeline, the new timeline event NIDs
|
|
|
|
// or an error.
|
2021-05-27 19:20:36 +01:00
|
|
|
//
|
|
|
|
// This function does several things:
|
2021-06-04 13:02:28 +01:00
|
|
|
// - It ensures all events are persisted in the database. This is shared amongst users.
|
|
|
|
// - If all events have been stored before, then it short circuits and returns.
|
|
|
|
// This is because we must have already processed this part of the timeline in order for the event
|
|
|
|
// to exist in the database, and the sync stream is already linearised for us.
|
|
|
|
// - Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
|
2021-06-11 13:21:40 +01:00
|
|
|
// - It adds entries to the membership log for membership events.
|
2023-09-19 12:41:25 +01:00
|
|
|
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline sync2.TimelineResponse) (AccumulateResult, error) {
|
2023-07-11 19:08:32 +01:00
|
|
|
// The first stage of accumulating events is mostly around validation around what the upstream HS sends us. For accumulation to work correctly
|
|
|
|
// we expect:
|
|
|
|
// - there to be no duplicate events
|
|
|
|
// - if there are new events, they are always new.
|
|
|
|
// Both of these assumptions can be false for different reasons
|
2023-09-19 13:06:47 +01:00
|
|
|
incomingEvents := parseAndDeduplicateTimelineEvents(roomID, timeline)
|
|
|
|
newEvents, err := a.filterToNewTimelineEvents(txn, incomingEvents)
|
2023-07-11 19:08:32 +01:00
|
|
|
if err != nil {
|
|
|
|
err = fmt.Errorf("filterTimelineEvents: %w", err)
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, err
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
2023-09-19 13:06:47 +01:00
|
|
|
if len(newEvents) == 0 {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, nil // nothing to do
|
2023-07-11 19:08:32 +01:00
|
|
|
}
|
|
|
|
|
2023-09-19 16:03:57 +01:00
|
|
|
// If this timeline was limited and we don't recognise its first event E, mark it
|
2023-09-13 13:28:22 +01:00
|
|
|
// as not knowing its previous timeline event.
|
|
|
|
//
|
2023-09-19 16:03:57 +01:00
|
|
|
// NB: some other poller may have already learned about E from a non-limited sync.
|
|
|
|
// If so, E will be present in the DB and marked as not missing_previous. This will
|
|
|
|
// remain the case as the upsert of E to the events table has ON CONFLICT DO
|
|
|
|
// NOTHING.
|
2023-09-13 13:28:22 +01:00
|
|
|
if timeline.Limited {
|
2023-09-19 13:06:47 +01:00
|
|
|
firstTimelineEventUnknown := newEvents[0].ID == incomingEvents[0].ID
|
|
|
|
incomingEvents[0].MissingPrevious = firstTimelineEventUnknown
|
2023-09-13 13:28:22 +01:00
|
|
|
}
|
|
|
|
|
2023-07-31 17:53:15 +01:00
|
|
|
// Given a timeline of [E1, E2, S3, E4, S5, S6, E7] (E=message event, S=state event)
|
|
|
|
// And a prior state snapshot of SNAP0 then the BEFORE snapshot IDs are grouped as:
|
|
|
|
// E1,E2,S3 => SNAP0
|
|
|
|
// E4, S5 => (SNAP0 + S3)
|
|
|
|
// S6 => (SNAP0 + S3 + S5)
|
|
|
|
// E7 => (SNAP0 + S3 + S5 + S6)
|
|
|
|
// We can track this by loading the current snapshot ID (after snapshot) then rolling forward
|
|
|
|
// the timeline until we hit a state event, at which point we make a new snapshot but critically
|
|
|
|
// do NOT assign the new state event in the snapshot so as to represent the state before the event.
|
|
|
|
snapID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
|
|
|
|
if err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, err
|
2023-07-31 17:53:15 +01:00
|
|
|
}
|
|
|
|
|
2023-08-17 18:55:00 +01:00
|
|
|
// The only situation where no prior snapshot should exist is if this timeline is
|
|
|
|
// the very start of the room. If we get here in some other way, we should ignore
|
|
|
|
// this timeline completely.
|
|
|
|
//
|
|
|
|
// For example: we can sometimes get stray events from the homeserver down the
|
|
|
|
// timeline, often leaves after an invite rejection. Ignoring these ensures we
|
|
|
|
// don't create a false (e.g. lacking m.room.create) record of the room state.
|
|
|
|
if snapID == 0 {
|
2023-09-19 13:06:47 +01:00
|
|
|
if len(newEvents) > 0 && newEvents[0].Type == "m.room.create" && newEvents[0].StateKey == "" {
|
2023-08-17 18:55:00 +01:00
|
|
|
// All okay, continue on.
|
|
|
|
} else {
|
|
|
|
// Bail out and complain loudly.
|
2023-08-18 13:14:51 +01:00
|
|
|
const msg = "Accumulator: skipping processing of timeline, as no snapshot exists"
|
2023-08-17 18:55:00 +01:00
|
|
|
logger.Warn().
|
2023-09-19 13:06:47 +01:00
|
|
|
Str("event_id", newEvents[0].ID).
|
|
|
|
Str("event_type", newEvents[0].Type).
|
|
|
|
Str("event_state_key", newEvents[0].StateKey).
|
2023-08-17 18:55:00 +01:00
|
|
|
Str("room_id", roomID).
|
|
|
|
Str("user_id", userID).
|
2023-09-19 13:06:47 +01:00
|
|
|
Int("len_timeline", len(newEvents)).
|
2023-08-18 13:14:51 +01:00
|
|
|
Msg(msg)
|
2023-08-17 18:55:00 +01:00
|
|
|
sentry.WithScope(func(scope *sentry.Scope) {
|
|
|
|
scope.SetUser(sentry.User{ID: userID})
|
|
|
|
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
2023-09-19 13:06:47 +01:00
|
|
|
"event_id": newEvents[0].ID,
|
|
|
|
"event_type": newEvents[0].Type,
|
|
|
|
"event_state_key": newEvents[0].StateKey,
|
2023-08-17 18:55:00 +01:00
|
|
|
"room_id": roomID,
|
2023-09-19 13:06:47 +01:00
|
|
|
"len_timeline": len(newEvents),
|
2023-08-17 18:55:00 +01:00
|
|
|
})
|
2023-08-18 13:14:51 +01:00
|
|
|
sentry.CaptureMessage(msg)
|
2023-08-17 18:55:00 +01:00
|
|
|
})
|
2023-09-14 09:32:20 +01:00
|
|
|
// the HS gave us bad data so there's no point retrying
|
2023-09-12 14:16:21 +01:00
|
|
|
// by not returning an error, we are telling the poller it is fine to not retry this request.
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, nil
|
2023-08-17 18:55:00 +01:00
|
|
|
}
|
2023-07-31 17:53:15 +01:00
|
|
|
}
|
|
|
|
|
2023-09-19 13:06:47 +01:00
|
|
|
eventIDToNID, err := a.eventsTable.Insert(txn, newEvents, false)
|
2023-06-08 13:54:46 +01:00
|
|
|
if err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, err
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
|
|
|
if len(eventIDToNID) == 0 {
|
|
|
|
// nothing to do, we already know about these events
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
result := AccumulateResult{
|
|
|
|
NumNew: len(eventIDToNID),
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
2021-05-28 12:08:10 +01:00
|
|
|
|
2023-06-08 13:54:46 +01:00
|
|
|
var latestNID int64
|
2023-10-10 18:27:27 +01:00
|
|
|
// postInsertEvents matches newEvents, but a) it has NIDs, and b) state events have
|
|
|
|
// the IsState field hacked to true so we don't insert them into the timeline.
|
|
|
|
postInsertEvents := make([]Event, 0, len(eventIDToNID))
|
2023-09-07 10:16:00 +01:00
|
|
|
redactTheseEventIDs := make(map[string]*Event)
|
2023-09-19 13:06:47 +01:00
|
|
|
for i, ev := range newEvents {
|
2023-06-08 13:54:46 +01:00
|
|
|
nid, ok := eventIDToNID[ev.ID]
|
|
|
|
if ok {
|
|
|
|
ev.NID = int64(nid)
|
2023-08-31 17:06:44 +01:00
|
|
|
parsedEv := gjson.ParseBytes(ev.JSON)
|
|
|
|
if parsedEv.Get("state_key").Exists() {
|
2023-06-08 13:54:46 +01:00
|
|
|
// XXX: reusing this to mean "it's a state event" as well as "it's part of the state v2 response"
|
|
|
|
// its important that we don't insert 'ev' at this point as this should be False in the DB.
|
|
|
|
ev.IsState = true
|
|
|
|
}
|
|
|
|
// assign the highest nid value to the latest nid.
|
|
|
|
// we'll return this to the caller so they can stay in-sync
|
|
|
|
if ev.NID > latestNID {
|
|
|
|
latestNID = ev.NID
|
|
|
|
}
|
2023-08-31 17:06:44 +01:00
|
|
|
// if this is a redaction, try to redact the referenced event (best effort)
|
|
|
|
if !ev.IsState && ev.Type == "m.room.redaction" {
|
|
|
|
// look for top-level redacts then content.redacts (room version 11+)
|
|
|
|
redactsEventID := parsedEv.Get("redacts").Str
|
|
|
|
if redactsEventID == "" {
|
|
|
|
redactsEventID = parsedEv.Get("content.redacts").Str
|
|
|
|
}
|
|
|
|
if redactsEventID != "" {
|
2023-09-19 13:06:47 +01:00
|
|
|
redactTheseEventIDs[redactsEventID] = &newEvents[i]
|
2023-08-31 17:06:44 +01:00
|
|
|
}
|
|
|
|
}
|
2023-10-10 18:27:27 +01:00
|
|
|
postInsertEvents = append(postInsertEvents, ev)
|
2023-09-07 19:21:44 +01:00
|
|
|
result.TimelineNIDs = append(result.TimelineNIDs, ev.NID)
|
2021-05-28 12:08:10 +01:00
|
|
|
}
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
2021-08-18 18:21:40 +01:00
|
|
|
|
2023-08-31 17:06:44 +01:00
|
|
|
// if we are going to redact things, we need the room version to know the redaction algorithm
|
|
|
|
// so pull it out once now.
|
|
|
|
var roomVersion string
|
|
|
|
if len(redactTheseEventIDs) > 0 {
|
|
|
|
createEventJSON, err := a.eventsTable.SelectCreateEvent(txn, roomID)
|
|
|
|
if err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, fmt.Errorf("SelectCreateEvent: %w", err)
|
2023-08-31 17:06:44 +01:00
|
|
|
}
|
|
|
|
roomVersion = gjson.GetBytes(createEventJSON, "content.room_version").Str
|
|
|
|
if roomVersion == "" {
|
|
|
|
// Defaults to "1" if the key does not exist.
|
|
|
|
roomVersion = "1"
|
2023-09-06 09:52:52 +01:00
|
|
|
logger.Warn().Str("room", roomID).Err(err).Msg(
|
|
|
|
"Redact: no content.room_version in create event, defaulting to v1",
|
|
|
|
)
|
2023-08-31 17:06:44 +01:00
|
|
|
}
|
|
|
|
if err = a.eventsTable.Redact(txn, roomVersion, redactTheseEventIDs); err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, err
|
2023-08-31 17:06:44 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-10-10 18:27:27 +01:00
|
|
|
for _, ev := range postInsertEvents {
|
2023-06-08 13:54:46 +01:00
|
|
|
var replacesNID int64
|
|
|
|
// the snapshot ID we assign to this event is unaffected by whether /this/ event is state or not,
|
|
|
|
// as this is the before snapshot ID.
|
|
|
|
beforeSnapID := snapID
|
|
|
|
|
|
|
|
if ev.IsState {
|
|
|
|
// make a new snapshot and update the snapshot ID
|
|
|
|
var oldStripped StrippedEvents
|
|
|
|
if snapID != 0 {
|
|
|
|
oldStripped, err = a.strippedEventsForSnapshot(txn, snapID)
|
2021-08-18 16:34:44 +01:00
|
|
|
if err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, fmt.Errorf("failed to load stripped state events for snapshot %d: %s", snapID, err)
|
2021-08-18 16:34:44 +01:00
|
|
|
}
|
|
|
|
}
|
2023-06-08 13:54:46 +01:00
|
|
|
newStripped, replacedNID, err := a.calculateNewSnapshot(oldStripped, ev)
|
|
|
|
if err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, fmt.Errorf("failed to calculateNewSnapshot: %s", err)
|
2021-08-18 16:34:44 +01:00
|
|
|
}
|
2023-06-08 13:54:46 +01:00
|
|
|
replacesNID = replacedNID
|
|
|
|
memNIDs, otherNIDs := newStripped.NIDs()
|
|
|
|
newSnapshot := &SnapshotRow{
|
|
|
|
RoomID: roomID,
|
|
|
|
MembershipEvents: memNIDs,
|
|
|
|
OtherEvents: otherNIDs,
|
|
|
|
}
|
|
|
|
if err = a.snapshotTable.Insert(txn, newSnapshot); err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, fmt.Errorf("failed to insert new snapshot: %w", err)
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
|
|
|
snapID = newSnapshot.SnapshotID
|
2021-06-11 13:21:40 +01:00
|
|
|
}
|
2023-06-08 13:54:46 +01:00
|
|
|
if err := a.eventsTable.UpdateBeforeSnapshotID(txn, ev.NID, beforeSnapID, replacesNID); err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, err
|
2022-07-28 13:44:52 +01:00
|
|
|
}
|
2023-09-07 19:54:39 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(redactTheseEventIDs) > 0 {
|
2023-09-12 17:16:42 +01:00
|
|
|
// We need to emit a cache invalidation if we have redacted some state in the
|
|
|
|
// current snapshot ID. Note that we run this _after_ persisting any new snapshots.
|
|
|
|
redactedEventIDs := make([]string, 0, len(redactTheseEventIDs))
|
|
|
|
for eventID := range redactTheseEventIDs {
|
|
|
|
redactedEventIDs = append(redactedEventIDs, eventID)
|
|
|
|
}
|
|
|
|
var currentStateRedactions int
|
2023-09-12 18:11:44 +01:00
|
|
|
err = txn.Get(¤tStateRedactions, `
|
2023-09-12 17:16:42 +01:00
|
|
|
SELECT COUNT(*)
|
|
|
|
FROM syncv3_events
|
|
|
|
JOIN syncv3_snapshots ON event_nid = ANY (ARRAY_CAT(events, membership_events))
|
|
|
|
WHERE snapshot_id = $1 AND event_id = ANY($2)
|
|
|
|
`, snapID, pq.StringArray(redactedEventIDs))
|
|
|
|
if err != nil {
|
|
|
|
return AccumulateResult{}, err
|
2022-07-28 13:44:52 +01:00
|
|
|
}
|
2023-11-01 18:36:50 +00:00
|
|
|
result.IncludesStateRedaction = currentStateRedactions > 0
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
2022-07-28 13:44:52 +01:00
|
|
|
|
2023-10-10 18:27:45 +01:00
|
|
|
if err = a.invitesTable.RemoveSupersededInvites(txn, roomID, postInsertEvents); err != nil {
|
|
|
|
return AccumulateResult{}, fmt.Errorf("RemoveSupersededInvites: %w", err)
|
|
|
|
}
|
|
|
|
|
2023-10-10 18:27:27 +01:00
|
|
|
if err = a.spacesTable.HandleSpaceUpdates(txn, postInsertEvents); err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, fmt.Errorf("HandleSpaceUpdates: %s", err)
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// the last fetched snapshot ID is the current one
|
2023-10-10 18:27:27 +01:00
|
|
|
info := a.roomInfoDelta(roomID, postInsertEvents)
|
2023-06-08 13:54:46 +01:00
|
|
|
if err = a.roomsTable.Upsert(txn, info, snapID, latestNID); err != nil {
|
2023-09-07 19:21:44 +01:00
|
|
|
return AccumulateResult{}, fmt.Errorf("failed to UpdateCurrentSnapshotID to %d: %w", snapID, err)
|
2023-06-08 13:54:46 +01:00
|
|
|
}
|
2023-09-07 19:21:44 +01:00
|
|
|
return result, nil
|
2021-05-27 19:20:36 +01:00
|
|
|
}
|
|
|
|
|
2023-07-11 19:08:32 +01:00
|
|
|
// - parses it and returns Event structs.
|
2023-09-13 13:28:22 +01:00
|
|
|
// - removes duplicate events: this is just a bug which has been seen on Synapse on matrix.org
|
2023-09-19 12:41:25 +01:00
|
|
|
func parseAndDeduplicateTimelineEvents(roomID string, timeline sync2.TimelineResponse) []Event {
|
2023-09-13 13:28:22 +01:00
|
|
|
dedupedEvents := make([]Event, 0, len(timeline.Events))
|
2023-07-11 19:08:32 +01:00
|
|
|
seenEvents := make(map[string]struct{})
|
2023-09-13 13:28:22 +01:00
|
|
|
for i, rawEvent := range timeline.Events {
|
2023-07-11 19:08:32 +01:00
|
|
|
e := Event{
|
2023-09-13 13:28:22 +01:00
|
|
|
JSON: rawEvent,
|
2023-07-11 19:08:32 +01:00
|
|
|
RoomID: roomID,
|
|
|
|
}
|
|
|
|
if err := e.ensureFieldsSetOnEvent(); err != nil {
|
2023-07-25 14:25:30 +01:00
|
|
|
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg(
|
2023-09-13 13:28:22 +01:00
|
|
|
"Accumulator.filterToNewTimelineEvents: failed to parse event, ignoring",
|
2023-07-25 14:25:30 +01:00
|
|
|
)
|
|
|
|
continue
|
2023-07-11 19:08:32 +01:00
|
|
|
}
|
|
|
|
if _, ok := seenEvents[e.ID]; ok {
|
|
|
|
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
|
2023-09-13 13:28:22 +01:00
|
|
|
"Accumulator.filterToNewTimelineEvents: seen the same event ID twice, ignoring",
|
2023-07-11 19:08:32 +01:00
|
|
|
)
|
|
|
|
continue
|
|
|
|
}
|
2023-09-13 13:28:22 +01:00
|
|
|
if i == 0 && timeline.PrevBatch != "" {
|
2023-07-11 19:08:32 +01:00
|
|
|
// tag the first timeline event with the prev batch token
|
|
|
|
e.PrevBatch = sql.NullString{
|
2023-09-13 13:28:22 +01:00
|
|
|
String: timeline.PrevBatch,
|
2023-07-11 19:08:32 +01:00
|
|
|
Valid: true,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dedupedEvents = append(dedupedEvents, e)
|
|
|
|
seenEvents[e.ID] = struct{}{}
|
|
|
|
}
|
2023-09-13 13:28:22 +01:00
|
|
|
return dedupedEvents
|
|
|
|
}
|
2023-07-11 19:08:32 +01:00
|
|
|
|
2023-09-13 13:28:22 +01:00
|
|
|
// filterToNewTimelineEvents takes a raw timeline array from sync v2 and applies sanity to it:
|
|
|
|
// - removes old events: this is an edge case when joining rooms over federation, see https://github.com/matrix-org/sliding-sync/issues/192
|
|
|
|
// - check which events are unknown. If all events are known, filter them all out.
|
|
|
|
func (a *Accumulator) filterToNewTimelineEvents(txn *sqlx.Tx, dedupedEvents []Event) ([]Event, error) {
|
2023-07-11 19:08:32 +01:00
|
|
|
// if we only have a single timeline event we cannot determine if it is old or not, as we rely on already seen events
|
|
|
|
// being after (higher index) than it.
|
|
|
|
if len(dedupedEvents) <= 1 {
|
|
|
|
return dedupedEvents, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Figure out which of these events are unseen and hence brand new live events.
|
|
|
|
// In some cases, we may have unseen OLD events - see https://github.com/matrix-org/sliding-sync/issues/192
|
|
|
|
// in which case we need to drop those events.
|
2023-09-13 13:28:22 +01:00
|
|
|
dedupedEventIDs := make([]string, 0, len(dedupedEvents))
|
|
|
|
for _, ev := range dedupedEvents {
|
|
|
|
dedupedEventIDs = append(dedupedEventIDs, ev.ID)
|
2023-07-11 19:08:32 +01:00
|
|
|
}
|
|
|
|
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, dedupedEventIDs)
|
|
|
|
if err != nil {
|
2023-09-13 13:28:22 +01:00
|
|
|
return nil, fmt.Errorf("filterToNewTimelineEvents: failed to SelectUnknownEventIDs: %w", err)
|
2023-07-11 19:08:32 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(unknownEventIDs) == 0 {
|
|
|
|
// every event has been seen already, no work to do
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// In the happy case, we expect to see timeline arrays like this: (SEEN=S, UNSEEN=U)
|
|
|
|
// [S,S,U,U] -> want last 2
|
|
|
|
// [U,U,U] -> want all
|
|
|
|
// In the backfill edge case, we might see:
|
|
|
|
// [U,S,S,S] -> want none
|
|
|
|
// [U,S,S,U] -> want last 1
|
|
|
|
// We should never see scenarios like:
|
|
|
|
// [U,S,S,U,S,S] <- we should only see 1 contiguous block of seen events.
|
|
|
|
// If we do, we'll just ignore all unseen events less than the highest seen event.
|
|
|
|
|
|
|
|
// The algorithm starts at the end and just looks for the first S event, returning the subslice after that S event (which may be [])
|
|
|
|
seenIndex := -1
|
|
|
|
for i := len(dedupedEvents) - 1; i >= 0; i-- {
|
|
|
|
_, unseen := unknownEventIDs[dedupedEvents[i].ID]
|
|
|
|
if !unseen {
|
|
|
|
seenIndex = i
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// seenIndex can be -1 if all are unseen, or len-1 if all are seen, either way if we +1 this slices correctly:
|
|
|
|
// no seen events s[A,B,C] => s[-1+1:] => [A,B,C]
|
|
|
|
// C is seen event s[A,B,C] => s[2+1:] => []
|
|
|
|
// B is seen event s[A,B,C] => s[1+1:] => [C]
|
|
|
|
// A is seen event s[A,B,C] => s[0+1:] => [B,C]
|
|
|
|
return dedupedEvents[seenIndex+1:], nil
|
|
|
|
}
|
2023-11-03 15:41:09 +00:00
|
|
|
|
|
|
|
func ensureStateHasCreateEvent(events []Event) error {
|
|
|
|
hasCreate := false
|
|
|
|
for _, e := range events {
|
|
|
|
if e.Type == "m.room.create" && e.StateKey == "" {
|
|
|
|
hasCreate = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !hasCreate {
|
|
|
|
const errMsg = "cannot create first snapshot without a create event"
|
|
|
|
sentry.WithScope(func(scope *sentry.Scope) {
|
|
|
|
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
|
|
|
"room_id": events[0].RoomID,
|
|
|
|
"len_state": len(events),
|
|
|
|
})
|
|
|
|
sentry.CaptureMessage(errMsg)
|
|
|
|
})
|
|
|
|
logger.Warn().
|
|
|
|
Str("room_id", events[0].RoomID).
|
|
|
|
Int("len_state", len(events)).
|
|
|
|
Msg(errMsg)
|
|
|
|
// the HS gave us bad data so there's no point retrying => return DataError
|
|
|
|
return internal.NewDataError(errMsg)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type stateMap struct {
|
|
|
|
// state_key (user id) -> NID
|
|
|
|
Memberships map[string]int64
|
|
|
|
// type, state_key -> NID
|
|
|
|
Other map[[2]string]int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *stateMap) Ingest(e Event) (replacedNID int64) {
|
|
|
|
if e.Type == "m.room.member" {
|
|
|
|
replacedNID = s.Memberships[e.StateKey]
|
|
|
|
s.Memberships[e.StateKey] = e.NID
|
|
|
|
} else {
|
|
|
|
key := [2]string{e.Type, e.StateKey}
|
|
|
|
replacedNID = s.Other[key]
|
|
|
|
s.Other[key] = e.NID
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *stateMap) NIDs() (membershipNIDs, otherNIDs []int64) {
|
|
|
|
membershipNIDs = make([]int64, 0, len(s.Memberships))
|
|
|
|
otherNIDs = make([]int64, 0, len(s.Other))
|
|
|
|
for _, nid := range s.Memberships {
|
|
|
|
membershipNIDs = append(membershipNIDs, nid)
|
|
|
|
}
|
|
|
|
for _, nid := range s.Other {
|
|
|
|
otherNIDs = append(otherNIDs, nid)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *Accumulator) stateMapAtSnapshot(txn *sqlx.Tx, snapID int64) (stateMap, error) {
|
|
|
|
snapshot, err := a.snapshotTable.Select(txn, snapID)
|
|
|
|
if err != nil {
|
|
|
|
return stateMap{}, err
|
|
|
|
}
|
|
|
|
// pull stripped events as this may be huge (think Matrix HQ)
|
|
|
|
events, err := a.eventsTable.SelectStrippedEventsByNIDs(txn, true, append(snapshot.MembershipEvents, snapshot.OtherEvents...))
|
|
|
|
if err != nil {
|
|
|
|
return stateMap{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
state := stateMap{
|
|
|
|
Memberships: make(map[string]int64, len(snapshot.MembershipEvents)),
|
|
|
|
Other: make(map[[2]string]int64, len(snapshot.OtherEvents)),
|
|
|
|
}
|
|
|
|
for _, e := range events {
|
|
|
|
state.Ingest(e)
|
|
|
|
}
|
|
|
|
return state, nil
|
|
|
|
}
|