Move TimelineResponse back to sync2

This commit is contained in:
David Robertson 2023-09-19 12:41:25 +01:00
parent d9ecbbb412
commit d3ba1f1c30
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
16 changed files with 55 additions and 62 deletions

View File

@ -1,7 +1,5 @@
package internal
import "encoding/json"
type Receipt struct {
RoomID string `db:"room_id"`
EventID string `db:"event_id"`
@ -10,9 +8,3 @@ type Receipt struct {
ThreadID string `db:"thread_id"`
IsPrivate bool
}
type TimelineResponse struct {
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
}

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/prometheus/client_golang/prometheus"
"github.com/getsentry/sentry-go"
@ -340,7 +341,7 @@ type AccumulateResult struct {
// to exist in the database, and the sync stream is already linearised for us.
// - Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
// - It adds entries to the membership log for membership events.
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline internal.TimelineResponse) (AccumulateResult, error) {
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline sync2.TimelineResponse) (AccumulateResult, error) {
// The first stage of accumulating events is mostly around validation around what the upstream HS sends us. For accumulation to work correctly
// we expect:
// - there to be no duplicate events
@ -564,7 +565,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline i
// - parses it and returns Event structs.
// - removes duplicate events: this is just a bug which has been seen on Synapse on matrix.org
func parseAndDeduplicateTimelineEvents(roomID string, timeline internal.TimelineResponse) []Event {
func parseAndDeduplicateTimelineEvents(roomID string, timeline sync2.TimelineResponse) []Event {
dedupedEvents := make([]Event, 0, len(timeline.Events))
seenEvents := make(map[string]struct{})
for i, rawEvent := range timeline.Events {

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/testutils"
"reflect"
"sort"
@ -139,7 +138,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
}
var result AccumulateResult
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
result, err = accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: newEvents})
result, err = accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: newEvents})
return err
})
if err != nil {
@ -213,7 +212,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
// subsequent calls do nothing and are not an error
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
_, err = accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: newEvents})
_, err = accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: newEvents})
return err
})
if err != nil {
@ -323,7 +322,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) {
[]byte(`{"event_id":"` + roomEventIDs[7] + `", "type":"m.room.member", "state_key":"@me:localhost","unsigned":{"prev_content":{"membership":"join", "displayname":"Me"}}, "content":{"membership":"leave"}}`),
}
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
_, err = accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: roomEvents})
_, err = accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: roomEvents})
return err
})
if err != nil {
@ -659,7 +658,7 @@ func TestAccumulatorConcurrency(t *testing.T) {
defer wg.Done()
subset := newEvents[:(i + 1)] // i=0 => [1], i=1 => [1,2], etc
err := sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
result, err := accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: subset})
result, err := accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: subset})
totalNumNew += result.NumNew
return err
})
@ -793,7 +792,7 @@ func TestAccumulatorMissingPreviousMarkers(t *testing.T) {
for _, step := range steps {
t.Log(step.Desc)
timeline := internal.TimelineResponse{
timeline := sync2.TimelineResponse{
Events: step.Events,
Limited: step.Limited,
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/sync2"
"os"
"strings"
@ -384,7 +385,7 @@ func (s *Storage) currentNotMembershipStateEventsInAllRooms(txn *sqlx.Tx, eventT
`SELECT syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.event FROM syncv3_events
WHERE syncv3_events.event_type IN (?)
AND syncv3_events.event_nid IN (
SELECT unnest(events) FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id IN (SELECT current_snapshot_id FROM syncv3_rooms)
SELECT UNNEST(events) FROM syncv3_snapshots WHERE syncv3_snapshots.snapshot_id IN (SELECT current_snapshot_id FROM syncv3_rooms)
)`,
eventTypes,
)
@ -407,7 +408,7 @@ func (s *Storage) currentNotMembershipStateEventsInAllRooms(txn *sqlx.Tx, eventT
return result, nil
}
func (s *Storage) Accumulate(userID, roomID string, timeline internal.TimelineResponse) (result AccumulateResult, err error) {
func (s *Storage) Accumulate(userID, roomID string, timeline sync2.TimelineResponse) (result AccumulateResult, err error) {
if len(timeline.Events) == 0 {
return AccumulateResult{}, nil
}
@ -880,7 +881,7 @@ func (s *Storage) AllJoinedMembers(txn *sqlx.Tx, tempTableName string) (joinedMe
// Unclear if this is the first 5 *most recent* (backwards) or forwards. For now we'll use the most recent
// ones, and select 6 of them so we can always use 5 no matter who is requesting the room name.
rows, err := txn.Query(
`SELECT membership_nid, room_id, state_key, membership from ` + tempTableName + ` INNER JOIN syncv3_events
`SELECT membership_nid, room_id, state_key, membership FROM ` + tempTableName + ` INNER JOIN syncv3_events
on membership_nid = event_nid WHERE membership='join' OR membership='_join' OR membership='invite' OR membership='_invite' ORDER BY event_nid ASC`,
)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/sync2"
"reflect"
"sort"
"testing"
@ -31,7 +32,7 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
testutils.NewStateEvent(t, "m.room.join_rules", "", alice, map[string]interface{}{"join_rule": "invite"}),
testutils.NewStateEvent(t, "m.room.member", bob, alice, map[string]interface{}{"membership": "invite"}),
}
accResult, err := store.Accumulate(userID, roomID, internal.TimelineResponse{Events: events})
accResult, err := store.Accumulate(userID, roomID, sync2.TimelineResponse{Events: events})
if err != nil {
t.Fatalf("Accumulate returned error: %s", err)
}
@ -160,7 +161,7 @@ func TestStorageJoinedRoomsAfterPosition(t *testing.T) {
var latestPos int64
var err error
for roomID, eventMap := range roomIDToEventMap {
accResult, err := store.Accumulate(userID, roomID, internal.TimelineResponse{Events: eventMap})
accResult, err := store.Accumulate(userID, roomID, sync2.TimelineResponse{Events: eventMap})
if err != nil {
t.Fatalf("Accumulate on %s failed: %s", roomID, err)
}
@ -350,7 +351,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
},
}
for _, tl := range timelineInjections {
accResult, err := store.Accumulate(userID, tl.RoomID, internal.TimelineResponse{Events: tl.Events})
accResult, err := store.Accumulate(userID, tl.RoomID, sync2.TimelineResponse{Events: tl.Events})
if err != nil {
t.Fatalf("Accumulate on %s failed: %s", tl.RoomID, err)
}
@ -453,7 +454,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
t.Fatalf("LatestEventNID: %s", err)
}
for _, tl := range timelineInjections {
accResult, err := store.Accumulate(userID, tl.RoomID, internal.TimelineResponse{Events: tl.Events})
accResult, err := store.Accumulate(userID, tl.RoomID, sync2.TimelineResponse{Events: tl.Events})
if err != nil {
t.Fatalf("Accumulate on %s failed: %s", tl.RoomID, err)
}
@ -533,7 +534,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) {
}
eventIDs := []string{}
for _, timeline := range timelines {
_, err = store.Accumulate(userID, roomID, internal.TimelineResponse{Events: timeline.timeline, PrevBatch: timeline.prevBatch})
_, err = store.Accumulate(userID, roomID, sync2.TimelineResponse{Events: timeline.timeline, PrevBatch: timeline.prevBatch})
if err != nil {
t.Fatalf("failed to accumulate: %s", err)
}
@ -775,7 +776,7 @@ func TestAllJoinedMembers(t *testing.T) {
}, serialise(tc.InitMemberships)...))
assertNoError(t, err)
_, err = store.Accumulate(userID, roomID, internal.TimelineResponse{
_, err = store.Accumulate(userID, roomID, sync2.TimelineResponse{
Events: serialise(tc.AccumulateMemberships),
PrevBatch: "foo",
})

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"io/ioutil"
"net/http"
@ -162,11 +161,11 @@ type SyncRoomsResponse struct {
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2JoinResponse struct {
State EventsResponse `json:"state"`
Timeline internal.TimelineResponse `json:"timeline"`
Ephemeral EventsResponse `json:"ephemeral"`
AccountData EventsResponse `json:"account_data"`
UnreadNotifications UnreadNotifications `json:"unread_notifications"`
State EventsResponse `json:"state"`
Timeline TimelineResponse `json:"timeline"`
Ephemeral EventsResponse `json:"ephemeral"`
AccountData EventsResponse `json:"account_data"`
UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}
type UnreadNotifications struct {
@ -174,6 +173,12 @@ type UnreadNotifications struct {
NotificationCount *int `json:"notification_count,omitempty"`
}
type TimelineResponse struct {
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
}
type EventsResponse struct {
Events []json.RawMessage `json:"events"`
}

View File

@ -262,7 +262,7 @@ func (h *Handler) OnBulkDeviceDataUpdate(payload *pubsub.V2DeviceData) {
h.v2Pub.Notify(pubsub.ChanV2, payload)
}
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error {
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline sync2.TimelineResponse) error {
// Remember any transaction IDs that may be unique to this user
eventIDsWithTxns := make([]string, 0, len(timeline.Events)) // in timeline order
eventIDToTxnID := make(map[string]string, len(timeline.Events)) // event_id -> txn_id

View File

@ -36,7 +36,7 @@ type V2DataReceiver interface {
UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
// Accumulate data for this room. This means the timeline section of the v2 response.
// Return an error to stop the since token advancing.
Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error // latest pos with event nids of timeline entries
Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error // latest pos with event nids of timeline entries
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
// Return an error to stop the since token advancing.
@ -310,7 +310,7 @@ func (h *PollerMap) execute() {
func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
h.callbacks.UpdateDeviceSince(ctx, userID, deviceID, since)
}
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) (err error) {
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) (err error) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {

View File

@ -782,7 +782,7 @@ func TestPollerResendsOnCallbackError(t *testing.T) {
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
"!foo:bar": {
Timeline: internal.TimelineResponse{
Timeline: TimelineResponse{
Events: []json.RawMessage{
[]byte(`{"type":"m.room.message","content":{},"sender":"@alice:localhost","event_id":"$222"}`),
},
@ -876,7 +876,7 @@ func TestPollerResendsOnCallbackError(t *testing.T) {
Rooms: SyncRoomsResponse{
Leave: map[string]SyncV2LeaveResponse{
"!foo:bar": {
Timeline: internal.TimelineResponse{
Timeline: TimelineResponse{
Events: []json.RawMessage{
[]byte(`{"type":"m.room.member","state_key":"` + pid.UserID + `","content":{"membership":"leave"}}`),
},
@ -984,7 +984,7 @@ func TestPollerDoesNotResendOnDataError(t *testing.T) {
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
"!foo:bar": {
Timeline: internal.TimelineResponse{
Timeline: TimelineResponse{
Events: []json.RawMessage{
[]byte(`{"type":"m.room.message","content":{},"sender":"@alice:localhost","event_id":"$222"}`),
},
@ -1056,7 +1056,7 @@ type mockDataReceiver struct {
updateSinceCalled chan struct{}
}
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error {
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error {
a.timelines[roomID] = append(a.timelines[roomID], timeline.Events...)
return nil
}
@ -1097,7 +1097,7 @@ type overrideDataReceiver struct {
onExpiredToken func(ctx context.Context, accessTokenHash, userID, deviceID string)
}
func (s *overrideDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error {
func (s *overrideDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error {
if s.accumulate == nil {
return nil
}

View File

@ -4,7 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync2"
"testing"
"github.com/matrix-org/sliding-sync/state"
@ -39,12 +39,12 @@ func TestGlobalCacheLoadState(t *testing.T) {
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": "The Room Name"}),
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": "The Updated Room Name"}),
}
_, err := store.Accumulate(alice, roomID2, internal.TimelineResponse{Events: eventsRoom2})
_, err := store.Accumulate(alice, roomID2, sync2.TimelineResponse{Events: eventsRoom2})
if err != nil {
t.Fatalf("Accumulate: %s", err)
}
accResult, err := store.Accumulate(alice, roomID, internal.TimelineResponse{Events: events})
accResult, err := store.Accumulate(alice, roomID, sync2.TimelineResponse{Events: events})
if err != nil {
t.Fatalf("Accumulate: %s", err)
}

View File

@ -2,7 +2,6 @@ package syncv3
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"testing"
"time"
@ -419,7 +418,7 @@ func TestExtensionAccountData(t *testing.T) {
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomA: {
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: createRoomState(t, alice, time.Now()),
},
AccountData: sync2.EventsResponse{
@ -427,7 +426,7 @@ func TestExtensionAccountData(t *testing.T) {
},
},
roomB: {
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: createRoomState(t, alice, time.Now().Add(-1*time.Minute)),
},
AccountData: sync2.EventsResponse{
@ -435,7 +434,7 @@ func TestExtensionAccountData(t *testing.T) {
},
},
roomC: {
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: createRoomState(t, alice, time.Now().Add(-2*time.Minute)),
},
AccountData: sync2.EventsResponse{
@ -633,7 +632,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{
@ -652,7 +651,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{
@ -709,7 +708,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{
@ -730,7 +729,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{

View File

@ -2,7 +2,6 @@ package syncv3
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"testing"
"time"
@ -208,7 +207,7 @@ func TestFiltersInvite(t *testing.T) {
State: sync2.EventsResponse{
Events: createRoomState(t, "@creator:other", time.Now()),
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{testutils.NewJoinEvent(t, alice)},
},
},

View File

@ -3,7 +3,6 @@ package syncv3
import (
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"testing"
"time"
@ -81,7 +80,7 @@ func TestNotificationsOnTop(t *testing.T) {
UnreadNotifications: sync2.UnreadNotifications{
HighlightCount: ptr(1),
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{
bingEvent,
},
@ -106,7 +105,7 @@ func TestNotificationsOnTop(t *testing.T) {
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
noBingRoomID: {
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{
noBingEvent,
},

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sqlutil"
"net/http"
"os"
@ -179,7 +178,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) {
State: sync2.EventsResponse{
Events: []json.RawMessage{nameEvent, powerLevelsEvent},
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{messageEvent},
Limited: true,
PrevBatch: "batchymcbatchface",
@ -304,7 +303,7 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) {
State: sync2.EventsResponse{
Events: []json.RawMessage{bobLeave},
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{aliceMessage},
Limited: true,
PrevBatch: "batchymcbatchface",

View File

@ -2,7 +2,6 @@ package syncv3
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"sort"
"testing"
"time"
@ -120,7 +119,7 @@ func (r *testRig) SetupV2RoomsForUser(t *testing.T, v2UserID string, f FlushEnum
State: sync2.EventsResponse{
Events: stateBlock,
},
Timeline: internal.TimelineResponse{
Timeline: sync2.TimelineResponse{
Events: timeline,
},
}

View File

@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"io/ioutil"
"log"
"net/http"
@ -472,7 +471,7 @@ func v2JoinTimeline(joinEvents ...roomEvents) map[string]sync2.SyncV2JoinRespons
result := make(map[string]sync2.SyncV2JoinResponse)
for _, re := range joinEvents {
var data sync2.SyncV2JoinResponse
data.Timeline = internal.TimelineResponse{
data.Timeline = sync2.TimelineResponse{
Events: re.events,
}
if re.state != nil {