mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Merge pull request #300 from matrix-org/dmr/invalidate-timelines
This commit is contained in:
commit
e75a462d4c
7
.github/workflows/tests.yml
vendored
7
.github/workflows/tests.yml
vendored
@ -92,6 +92,9 @@ jobs:
|
||||
matrix:
|
||||
# test with unlimited + 1 + 2 max db conns. If we end up double transacting in the tests anywhere, conn=1 tests will fail.
|
||||
max_db_conns: [0,1,2]
|
||||
# If the server fails to start, we'll wait for GHA to cancel the job after 6 hours.
|
||||
# Ensure we fail sooner than that to avoid clogging up GHA runners.
|
||||
timeout-minutes: 30
|
||||
services:
|
||||
synapse:
|
||||
# Custom image built from https://github.com/matrix-org/synapse/tree/v1.72.0/docker/complement with a dummy /complement/ca set
|
||||
@ -222,6 +225,10 @@ jobs:
|
||||
env:
|
||||
PREV_VERSION: "v0.99.4"
|
||||
|
||||
# If the server fails to start, we'll wait for GHA to cancel the job after 6 hours.
|
||||
# Ensure we fail sooner than that to avoid clogging up GHA runners.
|
||||
timeout-minutes: 30
|
||||
|
||||
# Service containers to run with `container-job`
|
||||
services:
|
||||
synapse:
|
||||
|
@ -4,8 +4,8 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
@ -342,21 +342,34 @@ type AccumulateResult struct {
|
||||
// to exist in the database, and the sync stream is already linearised for us.
|
||||
// - Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
|
||||
// - It adds entries to the membership log for membership events.
|
||||
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch string, timeline []json.RawMessage) (AccumulateResult, error) {
|
||||
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline sync2.TimelineResponse) (AccumulateResult, error) {
|
||||
// The first stage of accumulating events is mostly around validation around what the upstream HS sends us. For accumulation to work correctly
|
||||
// we expect:
|
||||
// - there to be no duplicate events
|
||||
// - if there are new events, they are always new.
|
||||
// Both of these assumptions can be false for different reasons
|
||||
dedupedEvents, err := a.filterAndParseTimelineEvents(txn, roomID, timeline, prevBatch)
|
||||
incomingEvents := parseAndDeduplicateTimelineEvents(roomID, timeline)
|
||||
newEvents, err := a.filterToNewTimelineEvents(txn, incomingEvents)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("filterTimelineEvents: %w", err)
|
||||
return AccumulateResult{}, err
|
||||
}
|
||||
if len(dedupedEvents) == 0 {
|
||||
if len(newEvents) == 0 {
|
||||
return AccumulateResult{}, nil // nothing to do
|
||||
}
|
||||
|
||||
// If this timeline was limited and we don't recognise its first event E, mark it
|
||||
// as not knowing its previous timeline event.
|
||||
//
|
||||
// NB: some other poller may have already learned about E from a non-limited sync.
|
||||
// If so, E will be present in the DB and marked as not missing_previous. This will
|
||||
// remain the case as the upsert of E to the events table has ON CONFLICT DO
|
||||
// NOTHING.
|
||||
if timeline.Limited {
|
||||
firstTimelineEventUnknown := newEvents[0].ID == incomingEvents[0].ID
|
||||
incomingEvents[0].MissingPrevious = firstTimelineEventUnknown
|
||||
}
|
||||
|
||||
// Given a timeline of [E1, E2, S3, E4, S5, S6, E7] (E=message event, S=state event)
|
||||
// And a prior state snapshot of SNAP0 then the BEFORE snapshot IDs are grouped as:
|
||||
// E1,E2,S3 => SNAP0
|
||||
@ -379,27 +392,27 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
|
||||
// timeline, often leaves after an invite rejection. Ignoring these ensures we
|
||||
// don't create a false (e.g. lacking m.room.create) record of the room state.
|
||||
if snapID == 0 {
|
||||
if len(dedupedEvents) > 0 && dedupedEvents[0].Type == "m.room.create" && dedupedEvents[0].StateKey == "" {
|
||||
if len(newEvents) > 0 && newEvents[0].Type == "m.room.create" && newEvents[0].StateKey == "" {
|
||||
// All okay, continue on.
|
||||
} else {
|
||||
// Bail out and complain loudly.
|
||||
const msg = "Accumulator: skipping processing of timeline, as no snapshot exists"
|
||||
logger.Warn().
|
||||
Str("event_id", dedupedEvents[0].ID).
|
||||
Str("event_type", dedupedEvents[0].Type).
|
||||
Str("event_state_key", dedupedEvents[0].StateKey).
|
||||
Str("event_id", newEvents[0].ID).
|
||||
Str("event_type", newEvents[0].Type).
|
||||
Str("event_state_key", newEvents[0].StateKey).
|
||||
Str("room_id", roomID).
|
||||
Str("user_id", userID).
|
||||
Int("len_timeline", len(dedupedEvents)).
|
||||
Int("len_timeline", len(newEvents)).
|
||||
Msg(msg)
|
||||
sentry.WithScope(func(scope *sentry.Scope) {
|
||||
scope.SetUser(sentry.User{ID: userID})
|
||||
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
|
||||
"event_id": dedupedEvents[0].ID,
|
||||
"event_type": dedupedEvents[0].Type,
|
||||
"event_state_key": dedupedEvents[0].StateKey,
|
||||
"event_id": newEvents[0].ID,
|
||||
"event_type": newEvents[0].Type,
|
||||
"event_state_key": newEvents[0].StateKey,
|
||||
"room_id": roomID,
|
||||
"len_timeline": len(dedupedEvents),
|
||||
"len_timeline": len(newEvents),
|
||||
})
|
||||
sentry.CaptureMessage(msg)
|
||||
})
|
||||
@ -409,7 +422,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
|
||||
}
|
||||
}
|
||||
|
||||
eventIDToNID, err := a.eventsTable.Insert(txn, dedupedEvents, false)
|
||||
eventIDToNID, err := a.eventsTable.Insert(txn, newEvents, false)
|
||||
if err != nil {
|
||||
return AccumulateResult{}, err
|
||||
}
|
||||
@ -423,9 +436,9 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
|
||||
}
|
||||
|
||||
var latestNID int64
|
||||
newEvents := make([]Event, 0, len(eventIDToNID))
|
||||
newEventsByID := make([]Event, 0, len(eventIDToNID))
|
||||
redactTheseEventIDs := make(map[string]*Event)
|
||||
for i, ev := range dedupedEvents {
|
||||
for i, ev := range newEvents {
|
||||
nid, ok := eventIDToNID[ev.ID]
|
||||
if ok {
|
||||
ev.NID = int64(nid)
|
||||
@ -448,10 +461,10 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
|
||||
redactsEventID = parsedEv.Get("content.redacts").Str
|
||||
}
|
||||
if redactsEventID != "" {
|
||||
redactTheseEventIDs[redactsEventID] = &dedupedEvents[i]
|
||||
redactTheseEventIDs[redactsEventID] = &newEvents[i]
|
||||
}
|
||||
}
|
||||
newEvents = append(newEvents, ev)
|
||||
newEventsByID = append(newEventsByID, ev)
|
||||
result.TimelineNIDs = append(result.TimelineNIDs, ev.NID)
|
||||
}
|
||||
}
|
||||
@ -477,7 +490,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
|
||||
}
|
||||
}
|
||||
|
||||
for _, ev := range newEvents {
|
||||
for _, ev := range newEventsByID {
|
||||
var replacesNID int64
|
||||
// the snapshot ID we assign to this event is unaffected by whether /this/ event is state or not,
|
||||
// as this is the before snapshot ID.
|
||||
@ -537,56 +550,57 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
|
||||
result.RequiresReload = currentStateRedactions > 0
|
||||
}
|
||||
|
||||
if err = a.spacesTable.HandleSpaceUpdates(txn, newEvents); err != nil {
|
||||
if err = a.spacesTable.HandleSpaceUpdates(txn, newEventsByID); err != nil {
|
||||
return AccumulateResult{}, fmt.Errorf("HandleSpaceUpdates: %s", err)
|
||||
}
|
||||
|
||||
// the last fetched snapshot ID is the current one
|
||||
info := a.roomInfoDelta(roomID, newEvents)
|
||||
info := a.roomInfoDelta(roomID, newEventsByID)
|
||||
if err = a.roomsTable.Upsert(txn, info, snapID, latestNID); err != nil {
|
||||
return AccumulateResult{}, fmt.Errorf("failed to UpdateCurrentSnapshotID to %d: %w", snapID, err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// filterAndParseTimelineEvents takes a raw timeline array from sync v2 and applies sanity to it:
|
||||
// - removes duplicate events: this is just a bug which has been seen on Synapse on matrix.org
|
||||
// - removes old events: this is an edge case when joining rooms over federation, see https://github.com/matrix-org/sliding-sync/issues/192
|
||||
// - parses it and returns Event structs.
|
||||
// - check which events are unknown. If all events are known, filter them all out.
|
||||
func (a *Accumulator) filterAndParseTimelineEvents(txn *sqlx.Tx, roomID string, timeline []json.RawMessage, prevBatch string) ([]Event, error) {
|
||||
// Check for duplicates which can happen in the real world when joining
|
||||
// Matrix HQ on Synapse, as well as when you join rooms for the first time over federation.
|
||||
dedupedEvents := make([]Event, 0, len(timeline))
|
||||
// - removes duplicate events: this is just a bug which has been seen on Synapse on matrix.org
|
||||
func parseAndDeduplicateTimelineEvents(roomID string, timeline sync2.TimelineResponse) []Event {
|
||||
dedupedEvents := make([]Event, 0, len(timeline.Events))
|
||||
seenEvents := make(map[string]struct{})
|
||||
for i := range timeline {
|
||||
for i, rawEvent := range timeline.Events {
|
||||
e := Event{
|
||||
JSON: timeline[i],
|
||||
JSON: rawEvent,
|
||||
RoomID: roomID,
|
||||
}
|
||||
if err := e.ensureFieldsSetOnEvent(); err != nil {
|
||||
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg(
|
||||
"Accumulator.filterAndParseTimelineEvents: failed to parse event, ignoring",
|
||||
"Accumulator.filterToNewTimelineEvents: failed to parse event, ignoring",
|
||||
)
|
||||
continue
|
||||
}
|
||||
if _, ok := seenEvents[e.ID]; ok {
|
||||
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
|
||||
"Accumulator.filterAndParseTimelineEvents: seen the same event ID twice, ignoring",
|
||||
"Accumulator.filterToNewTimelineEvents: seen the same event ID twice, ignoring",
|
||||
)
|
||||
continue
|
||||
}
|
||||
if i == 0 && prevBatch != "" {
|
||||
if i == 0 && timeline.PrevBatch != "" {
|
||||
// tag the first timeline event with the prev batch token
|
||||
e.PrevBatch = sql.NullString{
|
||||
String: prevBatch,
|
||||
String: timeline.PrevBatch,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
dedupedEvents = append(dedupedEvents, e)
|
||||
seenEvents[e.ID] = struct{}{}
|
||||
}
|
||||
return dedupedEvents
|
||||
}
|
||||
|
||||
// filterToNewTimelineEvents takes a raw timeline array from sync v2 and applies sanity to it:
|
||||
// - removes old events: this is an edge case when joining rooms over federation, see https://github.com/matrix-org/sliding-sync/issues/192
|
||||
// - check which events are unknown. If all events are known, filter them all out.
|
||||
func (a *Accumulator) filterToNewTimelineEvents(txn *sqlx.Tx, dedupedEvents []Event) ([]Event, error) {
|
||||
// if we only have a single timeline event we cannot determine if it is old or not, as we rely on already seen events
|
||||
// being after (higher index) than it.
|
||||
if len(dedupedEvents) <= 1 {
|
||||
@ -596,13 +610,13 @@ func (a *Accumulator) filterAndParseTimelineEvents(txn *sqlx.Tx, roomID string,
|
||||
// Figure out which of these events are unseen and hence brand new live events.
|
||||
// In some cases, we may have unseen OLD events - see https://github.com/matrix-org/sliding-sync/issues/192
|
||||
// in which case we need to drop those events.
|
||||
dedupedEventIDs := make([]string, 0, len(seenEvents))
|
||||
for evID := range seenEvents {
|
||||
dedupedEventIDs = append(dedupedEventIDs, evID)
|
||||
dedupedEventIDs := make([]string, 0, len(dedupedEvents))
|
||||
for _, ev := range dedupedEvents {
|
||||
dedupedEventIDs = append(dedupedEventIDs, ev.ID)
|
||||
}
|
||||
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, dedupedEventIDs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("filterAndParseTimelineEvents: failed to SelectUnknownEventIDs: %w", err)
|
||||
return nil, fmt.Errorf("filterToNewTimelineEvents: failed to SelectUnknownEventIDs: %w", err)
|
||||
}
|
||||
|
||||
if len(unknownEventIDs) == 0 {
|
||||
|
@ -138,7 +138,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
|
||||
}
|
||||
var result AccumulateResult
|
||||
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
|
||||
result, err = accumulator.Accumulate(txn, userID, roomID, "", newEvents)
|
||||
result, err = accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: newEvents})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -212,7 +212,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
|
||||
|
||||
// subsequent calls do nothing and are not an error
|
||||
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
|
||||
_, err = accumulator.Accumulate(txn, userID, roomID, "", newEvents)
|
||||
_, err = accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: newEvents})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -246,7 +246,7 @@ func TestAccumulatorPromptsCacheInvalidation(t *testing.T) {
|
||||
}
|
||||
var accResult AccumulateResult
|
||||
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
|
||||
accResult, err = accumulator.Accumulate(txn, "@dummy:localhost", roomID, "prevBatch", timeline)
|
||||
accResult, err = accumulator.Accumulate(txn, "@dummy:localhost", roomID, sync2.TimelineResponse{Events: timeline, PrevBatch: "prevBatch"})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -264,7 +264,7 @@ func TestAccumulatorPromptsCacheInvalidation(t *testing.T) {
|
||||
[]byte(`{"event_id":"$i", "type":"m.room.redaction", "content":{"redacts":"$f"}}`),
|
||||
}
|
||||
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
|
||||
accResult, err = accumulator.Accumulate(txn, "@dummy:localhost", roomID, "prevBatch2", timeline)
|
||||
accResult, err = accumulator.Accumulate(txn, "@dummy:localhost", roomID, sync2.TimelineResponse{Events: timeline, PrevBatch: "prevBatch2"})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -281,7 +281,7 @@ func TestAccumulatorPromptsCacheInvalidation(t *testing.T) {
|
||||
[]byte(`{"event_id":"$j", "type":"m.room.redaction", "content":{"redacts":"$g"}}`),
|
||||
}
|
||||
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
|
||||
accResult, err = accumulator.Accumulate(txn, "@dummy:localhost", roomID, "prevBatch3", timeline)
|
||||
accResult, err = accumulator.Accumulate(txn, "@dummy:localhost", roomID, sync2.TimelineResponse{Events: timeline, PrevBatch: "prevBatch3"})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -322,7 +322,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) {
|
||||
[]byte(`{"event_id":"` + roomEventIDs[7] + `", "type":"m.room.member", "state_key":"@me:localhost","unsigned":{"prev_content":{"membership":"join", "displayname":"Me"}}, "content":{"membership":"leave"}}`),
|
||||
}
|
||||
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
|
||||
_, err = accumulator.Accumulate(txn, userID, roomID, "", roomEvents)
|
||||
_, err = accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: roomEvents})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -458,7 +458,7 @@ func TestAccumulatorDupeEvents(t *testing.T) {
|
||||
}
|
||||
|
||||
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
|
||||
_, err = accumulator.Accumulate(txn, userID, roomID, "", joinRoom.Timeline.Events)
|
||||
_, err = accumulator.Accumulate(txn, userID, roomID, joinRoom.Timeline)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -658,7 +658,7 @@ func TestAccumulatorConcurrency(t *testing.T) {
|
||||
defer wg.Done()
|
||||
subset := newEvents[:(i + 1)] // i=0 => [1], i=1 => [1,2], etc
|
||||
err := sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
|
||||
result, err := accumulator.Accumulate(txn, userID, roomID, "", subset)
|
||||
result, err := accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: subset})
|
||||
totalNumNew += result.NumNew
|
||||
return err
|
||||
})
|
||||
@ -691,6 +691,147 @@ func TestAccumulatorConcurrency(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity-check the accumulator's logic for inserting missing_previous markers.
|
||||
func TestAccumulatorMissingPreviousMarkers(t *testing.T) {
|
||||
db, close := connectToDB(t)
|
||||
defer close()
|
||||
accumulator := NewAccumulator(db)
|
||||
|
||||
t.Log("Initialise a room.")
|
||||
roomID := fmt.Sprintf("!%s:localhost", t.Name())
|
||||
initialEvents := []json.RawMessage{
|
||||
[]byte(`{"event_id":"$state-A", "type":"m.room.create", "state_key":"", "content":{"creator":"@me:localhost"}}`),
|
||||
[]byte(`{"event_id":"$state-B", "type":"m.room.member", "state_key":"@me:localhost", "content":{"membership":"join"}}`),
|
||||
[]byte(`{"event_id":"$state-C", "type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"public"}}`),
|
||||
}
|
||||
_, err := accumulator.Initialise(roomID, initialEvents)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to Initialise accumulator: %s", err)
|
||||
}
|
||||
|
||||
// We're going to repeatedly Accumulate different timelines and check the number
|
||||
// of new events, plus the "missing previous" field in the DB.
|
||||
steps := []struct {
|
||||
Desc string
|
||||
// Inputs: a sync2.TimelineResponse struct.
|
||||
Events []json.RawMessage
|
||||
Limited bool
|
||||
|
||||
// CheckDesc is a brief description of our expectations.
|
||||
CheckDesc string
|
||||
// NumNew is the expected value of the NumNew field in the AccumulateResult.
|
||||
NumNew int
|
||||
// MissingPrevious is a map from timeline event IDs to the missing_previous bool expected in the DB.
|
||||
MissingPrevious map[string]bool
|
||||
}{
|
||||
{
|
||||
Desc: "non-limited timeline with one event (D)",
|
||||
Events: []json.RawMessage{
|
||||
[]byte(`{"event_id":"$msg-D", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
},
|
||||
Limited: false,
|
||||
NumNew: 1,
|
||||
CheckDesc: "(D) should not be marked as missing_previous.",
|
||||
MissingPrevious: map[string]bool{"$msg-D": false},
|
||||
},
|
||||
{
|
||||
Desc: "limited timeline with one unknown event (E)",
|
||||
Events: []json.RawMessage{
|
||||
[]byte(`{"event_id":"$msg-E", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
},
|
||||
Limited: true,
|
||||
NumNew: 1,
|
||||
CheckDesc: "(E) should be marked as missing_previous.",
|
||||
MissingPrevious: map[string]bool{"$msg-E": true},
|
||||
},
|
||||
{
|
||||
Desc: "limited timeline with two events: the first known but missing previous (E), the second unknown (F)",
|
||||
Events: []json.RawMessage{
|
||||
[]byte(`{"event_id":"$msg-E", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
[]byte(`{"event_id":"$msg-F", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
},
|
||||
Limited: true,
|
||||
NumNew: 1,
|
||||
CheckDesc: "(E) should still be marked as missing_previous; (F) should not.",
|
||||
MissingPrevious: map[string]bool{"$msg-E": true, "$msg-F": false},
|
||||
},
|
||||
{
|
||||
Desc: "limited timeline with one event, known and not missing previous (F)",
|
||||
Events: []json.RawMessage{
|
||||
[]byte(`{"event_id":"$msg-F", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
},
|
||||
Limited: true,
|
||||
NumNew: 0,
|
||||
CheckDesc: "(F) should still be marked as not missing_previous.",
|
||||
MissingPrevious: map[string]bool{"$msg-F": false},
|
||||
},
|
||||
{
|
||||
Desc: "the whole timeline [D, E, F], not limited)",
|
||||
Events: []json.RawMessage{
|
||||
[]byte(`{"event_id":"$msg-D", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
[]byte(`{"event_id":"$msg-E", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
[]byte(`{"event_id":"$msg-F", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
},
|
||||
Limited: false,
|
||||
NumNew: 0,
|
||||
CheckDesc: "(D), (E) and (F) should have no changes to missing_previous.",
|
||||
MissingPrevious: map[string]bool{"$msg-D": false, "$msg-E": true, "$msg-F": false},
|
||||
},
|
||||
{
|
||||
Desc: "the whole timeline [D, E, F], limited)",
|
||||
Events: []json.RawMessage{
|
||||
[]byte(`{"event_id":"$msg-D", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
[]byte(`{"event_id":"$msg-E", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
[]byte(`{"event_id":"$msg-F", "type":"m.room.message", "content": {"msgtype": "m.text", "body": "Hello, world!"}}`),
|
||||
},
|
||||
Limited: true,
|
||||
NumNew: 0,
|
||||
CheckDesc: "(D), (E) and (F) should have no changes to missing_previous.",
|
||||
MissingPrevious: map[string]bool{"$msg-D": false, "$msg-E": true, "$msg-F": false},
|
||||
},
|
||||
}
|
||||
|
||||
txn, err := db.Beginx()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer txn.Rollback()
|
||||
|
||||
for _, step := range steps {
|
||||
t.Log(step.Desc)
|
||||
timeline := sync2.TimelineResponse{
|
||||
Events: step.Events,
|
||||
Limited: step.Limited,
|
||||
}
|
||||
|
||||
accResult, err := accumulator.Accumulate(txn, userID, roomID, timeline)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to Accumulate: %s", err)
|
||||
}
|
||||
assertValue(t, "numNew", accResult.NumNew, step.NumNew)
|
||||
|
||||
t.Log(step.CheckDesc)
|
||||
checkIDs := make([]string, 0, len(step.MissingPrevious))
|
||||
for checkID := range step.MissingPrevious {
|
||||
checkIDs = append(checkIDs, checkID)
|
||||
}
|
||||
fetchedEvents, err := accumulator.eventsTable.SelectByIDs(txn, false, checkIDs)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assertValue(t, "len(fetchedEvents)", len(fetchedEvents), len(checkIDs))
|
||||
for _, inserted := range fetchedEvents {
|
||||
wantMissingPrevious, ok := step.MissingPrevious[inserted.ID]
|
||||
if !ok {
|
||||
t.Errorf("fetched %s from the DB, but it wasn't requested", inserted.ID)
|
||||
}
|
||||
assertValue(t, fmt.Sprintf("insertedEvents[%s].MissingPrevious", inserted.ID), inserted.MissingPrevious, wantMissingPrevious)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func currentSnapshotNIDs(t *testing.T, snapshotTable *SnapshotTable, roomID string) []int64 {
|
||||
txn := snapshotTable.db.MustBeginTx(context.Background(), nil)
|
||||
defer txn.Commit()
|
||||
|
@ -7,10 +7,11 @@ import (
|
||||
"github.com/matrix-org/sliding-sync/internal"
|
||||
)
|
||||
|
||||
// Like assertValue, but this inserts newlines between got and want.
|
||||
func assertVal(t *testing.T, msg string, got, want interface{}) {
|
||||
t.Helper()
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("%s: got\n%#v want\n%#v", msg, got, want)
|
||||
t.Errorf("%s: got\n%#v\nwant\n%#v", msg, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,8 @@ type Event struct {
|
||||
PrevBatch sql.NullString `db:"prev_batch"`
|
||||
// stripped events will be missing this field
|
||||
JSON []byte `db:"event"`
|
||||
// MissingPrevious is true iff the previous timeline event is not known to the proxy.
|
||||
MissingPrevious bool `db:"missing_previous"`
|
||||
}
|
||||
|
||||
func (ev *Event) ensureFieldsSetOnEvent() error {
|
||||
@ -125,8 +127,12 @@ func NewEventTable(db *sqlx.DB) *EventTable {
|
||||
prev_batch TEXT,
|
||||
membership TEXT,
|
||||
is_state BOOLEAN NOT NULL, -- is this event part of the v2 state response?
|
||||
event BYTEA NOT NULL
|
||||
event BYTEA NOT NULL,
|
||||
-- True iff this event was seen at the start of the timeline in a limited sync
|
||||
-- (i.e. the preceding timeline event was not known to the proxy).
|
||||
missing_previous BOOLEAN NOT NULL DEFAULT FALSE
|
||||
);
|
||||
|
||||
-- index for querying all joined rooms for a given user
|
||||
CREATE INDEX IF NOT EXISTS syncv3_events_type_sk_idx ON syncv3_events(event_type, state_key);
|
||||
-- index for querying membership deltas in particular rooms
|
||||
@ -168,13 +174,15 @@ func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map
|
||||
}
|
||||
events[i].JSON = js
|
||||
}
|
||||
chunks := sqlutil.Chunkify(8, MaxPostgresParameters, EventChunker(events))
|
||||
chunks := sqlutil.Chunkify(9, MaxPostgresParameters, EventChunker(events))
|
||||
var eventID string
|
||||
var eventNID int64
|
||||
for _, chunk := range chunks {
|
||||
rows, err := txn.NamedQuery(`
|
||||
INSERT INTO syncv3_events (event_id, event, event_type, state_key, room_id, membership, prev_batch, is_state)
|
||||
VALUES (:event_id, :event, :event_type, :state_key, :room_id, :membership, :prev_batch, :is_state) ON CONFLICT (event_id) DO NOTHING RETURNING event_id, event_nid`, chunk)
|
||||
INSERT INTO syncv3_events (event_id, event, event_type, state_key, room_id, membership, prev_batch, is_state, missing_previous)
|
||||
VALUES (:event_id, :event, :event_type, :state_key, :room_id, :membership, :prev_batch, :is_state, :missing_previous)
|
||||
ON CONFLICT (event_id) DO NOTHING
|
||||
RETURNING event_id, event_nid`, chunk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -215,7 +223,7 @@ func (t *EventTable) SelectByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (e
|
||||
wanted = len(nids)
|
||||
}
|
||||
return t.selectAny(txn, wanted, `
|
||||
SELECT event_nid, event_id, event, event_type, state_key, room_id, before_state_snapshot_id, membership, event_replaces_nid FROM syncv3_events
|
||||
SELECT event_nid, event_id, event, event_type, state_key, room_id, before_state_snapshot_id, membership, event_replaces_nid, missing_previous FROM syncv3_events
|
||||
WHERE event_nid = ANY ($1) ORDER BY event_nid ASC;`, pq.Int64Array(nids))
|
||||
}
|
||||
|
||||
@ -229,7 +237,7 @@ func (t *EventTable) SelectByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (ev
|
||||
wanted = len(ids)
|
||||
}
|
||||
return t.selectAny(txn, wanted, `
|
||||
SELECT event_nid, event_id, event, event_type, state_key, room_id, before_state_snapshot_id, membership FROM syncv3_events
|
||||
SELECT event_nid, event_id, event, event_type, state_key, room_id, before_state_snapshot_id, membership, missing_previous FROM syncv3_events
|
||||
WHERE event_id = ANY ($1) ORDER BY event_nid ASC;`, pq.StringArray(ids))
|
||||
}
|
||||
|
||||
@ -379,9 +387,22 @@ func (t *EventTable) Redact(txn *sqlx.Tx, roomVer string, redacteeEventIDToRedac
|
||||
func (t *EventTable) SelectLatestEventsBetween(txn *sqlx.Tx, roomID string, lowerExclusive, upperInclusive int64, limit int) ([]Event, error) {
|
||||
var events []Event
|
||||
// do not pull in events which were in the v2 state block
|
||||
err := txn.Select(&events, `SELECT event_nid, event FROM syncv3_events WHERE event_nid > $1 AND event_nid <= $2 AND room_id = $3 AND is_state=FALSE ORDER BY event_nid DESC LIMIT $4`,
|
||||
err := txn.Select(&events, `SELECT event_nid, event, missing_previous FROM syncv3_events WHERE event_nid > $1 AND event_nid <= $2 AND room_id = $3 AND is_state=FALSE ORDER BY event_nid DESC LIMIT $4`,
|
||||
lowerExclusive, upperInclusive, roomID, limit,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Look to see if there is an event missing its predecessor in the timeline.
|
||||
// Note: events[0] is the newest event, as the query is ORDERed BY event_nid DESC.
|
||||
for i, ev := range events {
|
||||
if ev.MissingPrevious {
|
||||
events = events[:i+1]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return events, err
|
||||
}
|
||||
|
||||
|
@ -1127,3 +1127,103 @@ func TestEventTableRedactMissingOK(t *testing.T) {
|
||||
}`),
|
||||
}}))
|
||||
}
|
||||
|
||||
func TestEventTable_SelectLatestEventsBetween_MissingPrevious(t *testing.T) {
|
||||
db, close := connectToDB(t)
|
||||
defer close()
|
||||
table := NewEventTable(db)
|
||||
txn, err := db.Beginx()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start txn: %s", err)
|
||||
}
|
||||
defer txn.Rollback()
|
||||
roomID := fmt.Sprintf("!%s", t.Name())
|
||||
// Event IDs ending with `-gap` have MissingPrevious: true.
|
||||
events := []Event{
|
||||
{ID: "chunk1-event1", MissingPrevious: false},
|
||||
{ID: "chunk1-event2", MissingPrevious: false},
|
||||
{ID: "chunk1-event3", MissingPrevious: false},
|
||||
// GAP
|
||||
{ID: "chunk2-event1", MissingPrevious: true},
|
||||
{ID: "chunk2-event2", MissingPrevious: false},
|
||||
{ID: "chunk2-event3", MissingPrevious: false},
|
||||
// GAP
|
||||
{ID: "chunk3-event1", MissingPrevious: true},
|
||||
// GAP
|
||||
{ID: "chunk4-event1", MissingPrevious: true},
|
||||
{ID: "chunk4-event2", MissingPrevious: false},
|
||||
{ID: "chunk4-event3", MissingPrevious: false},
|
||||
}
|
||||
prefix := "$" + t.Name() + "-"
|
||||
for i := range events {
|
||||
// The method under test doesn't extract IDs and the NIDs are determined at runtime.
|
||||
// It does pull out the event json though, so shove the IDs in there.
|
||||
events[i].JSON = []byte(fmt.Sprintf(`{"event_id": "%s"}`, events[i].ID))
|
||||
// In syncv3_events.event_id, store IDs with some prefix to avoid clashing with other tests.
|
||||
events[i].ID = prefix + events[i].ID
|
||||
events[i].RoomID = roomID
|
||||
}
|
||||
nids, err := table.Insert(txn, events, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
testcases := []struct {
|
||||
Desc string
|
||||
FromIDExclusive string
|
||||
ToIDInclusive string
|
||||
// NB: ExpectIDs is ordered from newest to oldest. Surprising, but this is what
|
||||
// SelectLatestEventsBetween returns.
|
||||
ExpectIDs []string
|
||||
}{
|
||||
{
|
||||
Desc: "has no gaps - should omit nothing",
|
||||
FromIDExclusive: "start",
|
||||
ToIDInclusive: "chunk1-event3",
|
||||
ExpectIDs: []string{"chunk1-event3", "chunk1-event2", "chunk1-event1"},
|
||||
},
|
||||
{
|
||||
Desc: "ends with missing_previous - should include the end only",
|
||||
FromIDExclusive: "start",
|
||||
ToIDInclusive: "chunk2-event1",
|
||||
ExpectIDs: []string{"chunk2-event1"},
|
||||
},
|
||||
{
|
||||
Desc: "single event, ends with missing_previous - should include the end only",
|
||||
FromIDExclusive: "chunk1-event3",
|
||||
ToIDInclusive: "chunk2-event1",
|
||||
ExpectIDs: []string{"chunk2-event1"},
|
||||
},
|
||||
{
|
||||
Desc: "single event, start is missing_previous - should include the end only",
|
||||
FromIDExclusive: "chunk2-event1",
|
||||
ToIDInclusive: "chunk2-event2",
|
||||
ExpectIDs: []string{"chunk2-event2"},
|
||||
},
|
||||
{
|
||||
Desc: "covers two consecutive gaps - should include events from the second gap onwards.",
|
||||
FromIDExclusive: "chunk2-event2",
|
||||
ToIDInclusive: "chunk4-event3",
|
||||
ExpectIDs: []string{"chunk4-event3", "chunk4-event2", "chunk4-event1"},
|
||||
},
|
||||
{
|
||||
Desc: "covers three gaps, the latter two consecutive - should only return from the second gap onwards",
|
||||
FromIDExclusive: "chunk1-event2",
|
||||
ToIDInclusive: "chunk4-event2",
|
||||
ExpectIDs: []string{"chunk4-event2", "chunk4-event1"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
// We're using the notation (X, Y] for a half-open interval excluding X but including Y.
|
||||
idRange := fmt.Sprintf("(%s, %s]", tc.FromIDExclusive, tc.ToIDInclusive)
|
||||
t.Log(idRange + " " + tc.Desc)
|
||||
fetched, err := table.SelectLatestEventsBetween(txn, roomID, nids[prefix+tc.FromIDExclusive], nids[prefix+tc.ToIDInclusive], 10)
|
||||
assertNoError(t, err)
|
||||
fetchedIDs := make([]string, 0, len(fetched))
|
||||
for _, ev := range fetched {
|
||||
fetchedIDs = append(fetchedIDs, gjson.GetBytes(ev.JSON, "event_id").Str)
|
||||
}
|
||||
assertValue(t, "fetchedIDs "+idRange+" limit 10", fetchedIDs, tc.ExpectIDs)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,7 @@
|
||||
-- +goose Up
|
||||
ALTER TABLE IF EXISTS syncv3_events
|
||||
ADD COLUMN IF NOT EXISTS missing_previous BOOLEAN NOT NULL DEFAULT FALSE;
|
||||
|
||||
-- +goose Down
|
||||
ALTER TABLE IF EXISTS syncv3_events
|
||||
DROP COLUMN IF EXISTS missing_previous;
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
@ -384,7 +385,7 @@ func (s *Storage) currentNotMembershipStateEventsInAllRooms(txn *sqlx.Tx, eventT
|
||||
`SELECT syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.event FROM syncv3_events
|
||||
WHERE syncv3_events.event_type IN (?)
|
||||
AND syncv3_events.event_nid IN (
|
||||
SELECT unnest(events) FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id IN (SELECT current_snapshot_id FROM syncv3_rooms)
|
||||
SELECT UNNEST(events) FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id IN (SELECT current_snapshot_id FROM syncv3_rooms)
|
||||
)`,
|
||||
eventTypes,
|
||||
)
|
||||
@ -407,12 +408,12 @@ func (s *Storage) currentNotMembershipStateEventsInAllRooms(txn *sqlx.Tx, eventT
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Storage) Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage) (result AccumulateResult, err error) {
|
||||
if len(timeline) == 0 {
|
||||
func (s *Storage) Accumulate(userID, roomID string, timeline sync2.TimelineResponse) (result AccumulateResult, err error) {
|
||||
if len(timeline.Events) == 0 {
|
||||
return AccumulateResult{}, nil
|
||||
}
|
||||
err = sqlutil.WithTransaction(s.Accumulator.db, func(txn *sqlx.Tx) error {
|
||||
result, err = s.Accumulator.Accumulate(txn, userID, roomID, prevBatch, timeline)
|
||||
result, err = s.Accumulator.Accumulate(txn, userID, roomID, timeline)
|
||||
return err
|
||||
})
|
||||
return result, err
|
||||
@ -880,7 +881,7 @@ func (s *Storage) AllJoinedMembers(txn *sqlx.Tx, tempTableName string) (joinedMe
|
||||
// Unclear if this is the first 5 *most recent* (backwards) or forwards. For now we'll use the most recent
|
||||
// ones, and select 6 of them so we can always use 5 no matter who is requesting the room name.
|
||||
rows, err := txn.Query(
|
||||
`SELECT membership_nid, room_id, state_key, membership from ` + tempTableName + ` INNER JOIN syncv3_events
|
||||
`SELECT membership_nid, room_id, state_key, membership FROM ` + tempTableName + ` INNER JOIN syncv3_events
|
||||
on membership_nid = event_nid WHERE membership='join' OR membership='_join' OR membership='invite' OR membership='_invite' ORDER BY event_nid ASC`,
|
||||
)
|
||||
if err != nil {
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
@ -31,7 +32,7 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
|
||||
testutils.NewStateEvent(t, "m.room.join_rules", "", alice, map[string]interface{}{"join_rule": "invite"}),
|
||||
testutils.NewStateEvent(t, "m.room.member", bob, alice, map[string]interface{}{"membership": "invite"}),
|
||||
}
|
||||
accResult, err := store.Accumulate(userID, roomID, "", events)
|
||||
accResult, err := store.Accumulate(userID, roomID, sync2.TimelineResponse{Events: events})
|
||||
if err != nil {
|
||||
t.Fatalf("Accumulate returned error: %s", err)
|
||||
}
|
||||
@ -160,7 +161,7 @@ func TestStorageJoinedRoomsAfterPosition(t *testing.T) {
|
||||
var latestPos int64
|
||||
var err error
|
||||
for roomID, eventMap := range roomIDToEventMap {
|
||||
accResult, err := store.Accumulate(userID, roomID, "", eventMap)
|
||||
accResult, err := store.Accumulate(userID, roomID, sync2.TimelineResponse{Events: eventMap})
|
||||
if err != nil {
|
||||
t.Fatalf("Accumulate on %s failed: %s", roomID, err)
|
||||
}
|
||||
@ -350,7 +351,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, tl := range timelineInjections {
|
||||
accResult, err := store.Accumulate(userID, tl.RoomID, "", tl.Events)
|
||||
accResult, err := store.Accumulate(userID, tl.RoomID, sync2.TimelineResponse{Events: tl.Events})
|
||||
if err != nil {
|
||||
t.Fatalf("Accumulate on %s failed: %s", tl.RoomID, err)
|
||||
}
|
||||
@ -453,7 +454,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
|
||||
t.Fatalf("LatestEventNID: %s", err)
|
||||
}
|
||||
for _, tl := range timelineInjections {
|
||||
accResult, err := store.Accumulate(userID, tl.RoomID, "", tl.Events)
|
||||
accResult, err := store.Accumulate(userID, tl.RoomID, sync2.TimelineResponse{Events: tl.Events})
|
||||
if err != nil {
|
||||
t.Fatalf("Accumulate on %s failed: %s", tl.RoomID, err)
|
||||
}
|
||||
@ -533,7 +534,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) {
|
||||
}
|
||||
eventIDs := []string{}
|
||||
for _, timeline := range timelines {
|
||||
_, err := store.Accumulate(userID, roomID, timeline.prevBatch, timeline.timeline)
|
||||
_, err = store.Accumulate(userID, roomID, sync2.TimelineResponse{Events: timeline.timeline, PrevBatch: timeline.prevBatch})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to accumulate: %s", err)
|
||||
}
|
||||
@ -775,7 +776,10 @@ func TestAllJoinedMembers(t *testing.T) {
|
||||
}, serialise(tc.InitMemberships)...))
|
||||
assertNoError(t, err)
|
||||
|
||||
_, err = store.Accumulate(userID, roomID, "foo", serialise(tc.AccumulateMemberships))
|
||||
_, err = store.Accumulate(userID, roomID, sync2.TimelineResponse{
|
||||
Events: serialise(tc.AccumulateMemberships),
|
||||
PrevBatch: "foo",
|
||||
})
|
||||
assertNoError(t, err)
|
||||
testCases[i].RoomID = roomID // remember this for later
|
||||
}
|
||||
@ -901,6 +905,7 @@ func assertRoomMetadata(t *testing.T, got, want internal.RoomMetadata) {
|
||||
}
|
||||
|
||||
func assertValue(t *testing.T, msg string, got, want interface{}) {
|
||||
t.Helper()
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("%s: got %v want %v", msg, got, want)
|
||||
}
|
||||
|
@ -262,14 +262,14 @@ func (h *Handler) OnBulkDeviceDataUpdate(payload *pubsub.V2DeviceData) {
|
||||
h.v2Pub.Notify(pubsub.ChanV2, payload)
|
||||
}
|
||||
|
||||
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error {
|
||||
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline sync2.TimelineResponse) error {
|
||||
// Remember any transaction IDs that may be unique to this user
|
||||
eventIDsWithTxns := make([]string, 0, len(timeline)) // in timeline order
|
||||
eventIDToTxnID := make(map[string]string, len(timeline)) // event_id -> txn_id
|
||||
eventIDsWithTxns := make([]string, 0, len(timeline.Events)) // in timeline order
|
||||
eventIDToTxnID := make(map[string]string, len(timeline.Events)) // event_id -> txn_id
|
||||
// Also remember events which were sent by this user but lack a transaction ID.
|
||||
eventIDsLackingTxns := make([]string, 0, len(timeline))
|
||||
eventIDsLackingTxns := make([]string, 0, len(timeline.Events))
|
||||
|
||||
for _, e := range timeline {
|
||||
for _, e := range timeline.Events {
|
||||
parsed := gjson.ParseBytes(e)
|
||||
eventID := parsed.Get("event_id").Str
|
||||
|
||||
@ -294,9 +294,9 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
|
||||
}
|
||||
|
||||
// Insert new events
|
||||
accResult, err := h.Store.Accumulate(userID, roomID, prevBatch, timeline)
|
||||
accResult, err := h.Store.Accumulate(userID, roomID, timeline)
|
||||
if err != nil {
|
||||
logger.Err(err).Int("timeline", len(timeline)).Str("room", roomID).Msg("V2: failed to accumulate room")
|
||||
logger.Err(err).Int("timeline", len(timeline.Events)).Str("room", roomID).Msg("V2: failed to accumulate room")
|
||||
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
|
||||
return err
|
||||
}
|
||||
@ -312,7 +312,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
|
||||
if accResult.NumNew != 0 {
|
||||
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Accumulate{
|
||||
RoomID: roomID,
|
||||
PrevBatch: prevBatch,
|
||||
PrevBatch: timeline.PrevBatch,
|
||||
EventNIDs: accResult.TimelineNIDs,
|
||||
})
|
||||
}
|
||||
@ -330,7 +330,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
|
||||
})
|
||||
if err != nil {
|
||||
logger.Err(err).
|
||||
Int("timeline", len(timeline)).
|
||||
Int("timeline", len(timeline.Events)).
|
||||
Int("num_transaction_ids", len(eventIDsWithTxns)).
|
||||
Int("num_missing_transaction_ids", len(eventIDsLackingTxns)).
|
||||
Str("room", roomID).
|
||||
|
@ -36,7 +36,7 @@ type V2DataReceiver interface {
|
||||
UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
|
||||
// Accumulate data for this room. This means the timeline section of the v2 response.
|
||||
// Return an error to stop the since token advancing.
|
||||
Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error // latest pos with event nids of timeline entries
|
||||
Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error // latest pos with event nids of timeline entries
|
||||
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
|
||||
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
|
||||
// Return an error to stop the since token advancing.
|
||||
@ -310,11 +310,11 @@ func (h *PollerMap) execute() {
|
||||
func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
|
||||
h.callbacks.UpdateDeviceSince(ctx, userID, deviceID, since)
|
||||
}
|
||||
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) (err error) {
|
||||
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) (err error) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
h.executor <- func() {
|
||||
err = h.callbacks.Accumulate(ctx, userID, deviceID, roomID, prevBatch, timeline)
|
||||
err = h.callbacks.Accumulate(ctx, userID, deviceID, roomID, timeline)
|
||||
wg.Done()
|
||||
}
|
||||
wg.Wait()
|
||||
@ -824,7 +824,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
|
||||
if len(roomData.Timeline.Events) > 0 {
|
||||
timelineCalls++
|
||||
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
|
||||
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
|
||||
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline)
|
||||
if err != nil {
|
||||
lastErrs = append(lastErrs, fmt.Errorf("Accumulate[%s]: %w", roomID, err))
|
||||
continue
|
||||
@ -841,7 +842,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
|
||||
for roomID, roomData := range res.Rooms.Leave {
|
||||
if len(roomData.Timeline.Events) > 0 {
|
||||
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
|
||||
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
|
||||
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline)
|
||||
if err != nil {
|
||||
lastErrs = append(lastErrs, fmt.Errorf("Accumulate_Leave[%s]: %w", roomID, err))
|
||||
continue
|
||||
|
@ -1206,8 +1206,8 @@ type mockDataReceiver struct {
|
||||
updateSinceCalled chan struct{}
|
||||
}
|
||||
|
||||
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error {
|
||||
a.timelines[roomID] = append(a.timelines[roomID], timeline...)
|
||||
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error {
|
||||
a.timelines[roomID] = append(a.timelines[roomID], timeline.Events...)
|
||||
return nil
|
||||
}
|
||||
func (a *mockDataReceiver) Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) {
|
||||
@ -1247,11 +1247,11 @@ type overrideDataReceiver struct {
|
||||
onExpiredToken func(ctx context.Context, accessTokenHash, userID, deviceID string)
|
||||
}
|
||||
|
||||
func (s *overrideDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error {
|
||||
func (s *overrideDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error {
|
||||
if s.accumulate == nil {
|
||||
return nil
|
||||
}
|
||||
return s.accumulate(ctx, userID, deviceID, roomID, prevBatch, timeline)
|
||||
return s.accumulate(ctx, userID, deviceID, roomID, timeline.PrevBatch, timeline.Events)
|
||||
}
|
||||
func (s *overrideDataReceiver) Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) {
|
||||
if s.initialise == nil {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/state"
|
||||
@ -38,12 +39,12 @@ func TestGlobalCacheLoadState(t *testing.T) {
|
||||
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": "The Room Name"}),
|
||||
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": "The Updated Room Name"}),
|
||||
}
|
||||
_, err := store.Accumulate(alice, roomID2, "", eventsRoom2)
|
||||
_, err := store.Accumulate(alice, roomID2, sync2.TimelineResponse{Events: eventsRoom2})
|
||||
if err != nil {
|
||||
t.Fatalf("Accumulate: %s", err)
|
||||
}
|
||||
|
||||
accResult, err := store.Accumulate(alice, roomID, "", events)
|
||||
accResult, err := store.Accumulate(alice, roomID, sync2.TimelineResponse{Events: events})
|
||||
if err != nil {
|
||||
t.Fatalf("Accumulate: %s", err)
|
||||
}
|
||||
|
@ -10,12 +10,18 @@ SYNCV3_PID=$!
|
||||
trap "kill $SYNCV3_PID" EXIT
|
||||
|
||||
# wait for the server to be listening, we want this endpoint to 404 instead of connrefused
|
||||
attempts=0
|
||||
until [ \
|
||||
"$(curl -s -w '%{http_code}' -o /dev/null "http://localhost:8844/idonotexist")" \
|
||||
-eq 404 ]
|
||||
do
|
||||
echo 'Waiting for server to start...'
|
||||
if [ "$attempts" -gt 60 ]; then
|
||||
echo "Server did not start after $attempts seconds"
|
||||
exit 1
|
||||
fi
|
||||
echo "Waiting (total ${attempts}s) for server to start..."
|
||||
sleep 1
|
||||
attempts=$((attempts+1))
|
||||
done
|
||||
|
||||
go test "$@"
|
@ -4,15 +4,14 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/matrix-org/sliding-sync/sqlutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/matrix-org/sliding-sync/sqlutil"
|
||||
|
||||
"github.com/matrix-org/sliding-sync/sync2"
|
||||
"github.com/matrix-org/sliding-sync/sync3"
|
||||
"github.com/matrix-org/sliding-sync/sync3/extensions"
|
||||
@ -535,3 +534,69 @@ func TestPollerExpiryEnsurePollingRaceDoesntWedge(t *testing.T) {
|
||||
t.Fatalf("never saw a v2 poll with the new token")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTimelineStopsLoadingWhenMissingPrevious(t *testing.T) {
|
||||
pqString := testutils.PrepareDBConnectionString()
|
||||
v2 := runTestV2Server(t)
|
||||
v3 := runTestServer(t, v2, pqString)
|
||||
defer v2.close()
|
||||
defer v3.close()
|
||||
|
||||
const roomID = "!unimportant"
|
||||
|
||||
t.Log("Alice creates a room.")
|
||||
v2.addAccount(t, alice, aliceToken)
|
||||
v2.queueResponse(aliceToken, sync2.SyncResponse{
|
||||
Rooms: sync2.SyncRoomsResponse{
|
||||
Join: v2JoinTimeline(roomEvents{
|
||||
roomID: "!unimportant",
|
||||
events: createRoomState(t, alice, time.Now()),
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
t.Log("Alice syncs, starting a poller.")
|
||||
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
|
||||
RoomSubscriptions: map[string]sync3.RoomSubscription{
|
||||
roomID: {
|
||||
TimelineLimit: 10,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
t.Log("Her response includes the room she created..")
|
||||
m.MatchResponse(t, res, m.MatchRoomSubscription(roomID))
|
||||
|
||||
t.Log("Alice's poller receives a gappy sync with a timeline event.")
|
||||
msgAfterGap := testutils.NewMessageEvent(t, alice, "school's out for summer")
|
||||
v2.queueResponse(aliceToken, sync2.SyncResponse{
|
||||
Rooms: sync2.SyncRoomsResponse{
|
||||
Join: map[string]sync2.SyncV2JoinResponse{
|
||||
roomID: {
|
||||
Timeline: sync2.TimelineResponse{
|
||||
Events: []json.RawMessage{msgAfterGap},
|
||||
Limited: true,
|
||||
PrevBatch: "dummyPrevBatch",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
v2.waitUntilEmpty(t, aliceToken)
|
||||
|
||||
t.Log("Alice makes a new connection and syncs, requesting the last 10 timeline events.")
|
||||
res = v3.mustDoV3Request(t, aliceToken, sync3.Request{
|
||||
ConnID: "conn2",
|
||||
RoomSubscriptions: map[string]sync3.RoomSubscription{
|
||||
roomID: {
|
||||
TimelineLimit: 10,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
t.Log("The response's timeline should only include the event after the gap.")
|
||||
m.MatchResponse(t, res, m.MatchRoomSubscription(roomID,
|
||||
m.MatchRoomTimeline([]json.RawMessage{msgAfterGap}),
|
||||
m.MatchRoomPrevBatch("dummyPrevBatch"),
|
||||
))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user