Pass TimelineResponse struct around

This commit is contained in:
David Robertson 2023-09-13 12:46:57 +01:00
parent 16796db033
commit df01e50438
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
17 changed files with 85 additions and 67 deletions

View File

@ -1,5 +1,7 @@
package internal
import "encoding/json"
type Receipt struct {
RoomID string `db:"room_id"`
EventID string `db:"event_id"`
@ -8,3 +10,9 @@ type Receipt struct {
ThreadID string `db:"thread_id"`
IsPrivate bool
}
type TimelineResponse struct {
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
}

View File

@ -4,7 +4,6 @@ import (
"database/sql"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"github.com/prometheus/client_golang/prometheus"
@ -328,13 +327,13 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// to exist in the database, and the sync stream is already linearised for us.
// - Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
// - It adds entries to the membership log for membership events.
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch string, timeline []json.RawMessage) (numNew int, timelineNIDs []int64, err error) {
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline internal.TimelineResponse) (numNew int, timelineNIDs []int64, err error) {
// The first stage of accumulating events is mostly around validation around what the upstream HS sends us. For accumulation to work correctly
// we expect:
// - there to be no duplicate events
// - if there are new events, they are always new.
// Both of these assumptions can be false for different reasons
dedupedEvents, err := a.filterAndParseTimelineEvents(txn, roomID, timeline, prevBatch)
dedupedEvents, err := a.filterAndParseTimelineEvents(txn, roomID, timeline.Events, timeline.PrevBatch)
if err != nil {
err = fmt.Errorf("filterTimelineEvents: %w", err)
return

View File

@ -3,6 +3,7 @@ package state
import (
"context"
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/testutils"
"reflect"
"sort"
@ -138,7 +139,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
var numNew int
var latestNIDs []int64
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
numNew, latestNIDs, err = accumulator.Accumulate(txn, userID, roomID, "", newEvents)
numNew, latestNIDs, err = accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: newEvents})
return err
})
if err != nil {
@ -212,7 +213,7 @@ func TestAccumulatorAccumulate(t *testing.T) {
// subsequent calls do nothing and are not an error
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
_, _, err = accumulator.Accumulate(txn, userID, roomID, "", newEvents)
_, _, err = accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: newEvents})
return err
})
if err != nil {
@ -248,7 +249,7 @@ func TestAccumulatorMembershipLogs(t *testing.T) {
[]byte(`{"event_id":"` + roomEventIDs[7] + `", "type":"m.room.member", "state_key":"@me:localhost","unsigned":{"prev_content":{"membership":"join", "displayname":"Me"}}, "content":{"membership":"leave"}}`),
}
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
_, _, err = accumulator.Accumulate(txn, userID, roomID, "", roomEvents)
_, _, err = accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: roomEvents})
return err
})
if err != nil {
@ -384,7 +385,7 @@ func TestAccumulatorDupeEvents(t *testing.T) {
}
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
_, _, err = accumulator.Accumulate(txn, userID, roomID, "", joinRoom.Timeline.Events)
_, _, err = accumulator.Accumulate(txn, userID, roomID, joinRoom.Timeline)
return err
})
if err != nil {
@ -584,7 +585,7 @@ func TestAccumulatorConcurrency(t *testing.T) {
defer wg.Done()
subset := newEvents[:(i + 1)] // i=0 => [1], i=1 => [1,2], etc
err := sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
numNew, _, err := accumulator.Accumulate(txn, userID, roomID, "", subset)
numNew, _, err := accumulator.Accumulate(txn, userID, roomID, internal.TimelineResponse{Events: subset})
totalNumNew += numNew
return err
})

View File

@ -39,6 +39,8 @@ type Event struct {
PrevBatch sql.NullString `db:"prev_batch"`
// stripped events will be missing this field
JSON []byte `db:"event"`
// MissingPrevious is true iff the previous timeline event is not known to the proxy.
MissingPrevious bool `db:"missing_previous"`
}
func (ev *Event) ensureFieldsSetOnEvent() error {
@ -178,8 +180,10 @@ func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map
var eventNID int64
for _, chunk := range chunks {
rows, err := txn.NamedQuery(`
INSERT INTO syncv3_events (event_id, event, event_type, state_key, room_id, membership, prev_batch, is_state)
VALUES (:event_id, :event, :event_type, :state_key, :room_id, :membership, :prev_batch, :is_state) ON CONFLICT (event_id) DO NOTHING RETURNING event_id, event_nid`, chunk)
INSERT INTO syncv3_events (event_id, event, event_type, state_key, room_id, membership, prev_batch, is_state, missing_previous)
VALUES (:event_id, :event, :event_type, :state_key, :room_id, :membership, :prev_batch, :is_state, :missing_previous)
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id, event_nid`, chunk)
if err != nil {
return nil, err
}

View File

@ -336,12 +336,12 @@ func (s *Storage) currentNotMembershipStateEventsInAllRooms(txn *sqlx.Tx, eventT
return result, nil
}
func (s *Storage) Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage) (numNew int, timelineNIDs []int64, err error) {
if len(timeline) == 0 {
func (s *Storage) Accumulate(userID, roomID string, timeline internal.TimelineResponse) (numNew int, timelineNIDs []int64, err error) {
if len(timeline.Events) == 0 {
return 0, nil, nil
}
err = sqlutil.WithTransaction(s.Accumulator.db, func(txn *sqlx.Tx) error {
numNew, timelineNIDs, err = s.Accumulator.Accumulate(txn, userID, roomID, prevBatch, timeline)
numNew, timelineNIDs, err = s.Accumulator.Accumulate(txn, userID, roomID, timeline)
return err
})
return

View File

@ -31,7 +31,7 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
testutils.NewStateEvent(t, "m.room.join_rules", "", alice, map[string]interface{}{"join_rule": "invite"}),
testutils.NewStateEvent(t, "m.room.member", bob, alice, map[string]interface{}{"membership": "invite"}),
}
_, latestNIDs, err := store.Accumulate(userID, roomID, "", events)
_, latestNIDs, err := store.Accumulate(userID, roomID, internal.TimelineResponse{Events: events})
if err != nil {
t.Fatalf("Accumulate returned error: %s", err)
}
@ -161,7 +161,7 @@ func TestStorageJoinedRoomsAfterPosition(t *testing.T) {
var latestNIDs []int64
var err error
for roomID, eventMap := range roomIDToEventMap {
_, latestNIDs, err = store.Accumulate(userID, roomID, "", eventMap)
_, latestNIDs, err = store.Accumulate(userID, roomID, internal.TimelineResponse{Events: eventMap})
if err != nil {
t.Fatalf("Accumulate on %s failed: %s", roomID, err)
}
@ -351,7 +351,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
},
}
for _, tl := range timelineInjections {
numNew, _, err := store.Accumulate(userID, tl.RoomID, "", tl.Events)
numNew, _, err := store.Accumulate(userID, tl.RoomID, internal.TimelineResponse{Events: tl.Events})
if err != nil {
t.Fatalf("Accumulate on %s failed: %s", tl.RoomID, err)
}
@ -454,7 +454,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
t.Fatalf("LatestEventNID: %s", err)
}
for _, tl := range timelineInjections {
numNew, _, err := store.Accumulate(userID, tl.RoomID, "", tl.Events)
numNew, _, err := store.Accumulate(userID, tl.RoomID, internal.TimelineResponse{Events: tl.Events})
if err != nil {
t.Fatalf("Accumulate on %s failed: %s", tl.RoomID, err)
}
@ -534,7 +534,7 @@ func TestStorageLatestEventsInRoomsPrevBatch(t *testing.T) {
}
eventIDs := []string{}
for _, timeline := range timelines {
_, _, err = store.Accumulate(userID, roomID, timeline.prevBatch, timeline.timeline)
_, _, err = store.Accumulate(userID, roomID, internal.TimelineResponse{Events: timeline.timeline, PrevBatch: timeline.prevBatch})
if err != nil {
t.Fatalf("failed to accumulate: %s", err)
}
@ -776,7 +776,10 @@ func TestAllJoinedMembers(t *testing.T) {
}, serialise(tc.InitMemberships)...))
assertNoError(t, err)
_, _, err = store.Accumulate(userID, roomID, "foo", serialise(tc.AccumulateMemberships))
_, _, err = store.Accumulate(userID, roomID, internal.TimelineResponse{
Events: serialise(tc.AccumulateMemberships),
PrevBatch: "foo",
})
assertNoError(t, err)
testCases[i].RoomID = roomID // remember this for later
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"io/ioutil"
"net/http"
"net/url"
@ -159,11 +160,11 @@ type SyncRoomsResponse struct {
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2JoinResponse struct {
State EventsResponse `json:"state"`
Timeline TimelineResponse `json:"timeline"`
Ephemeral EventsResponse `json:"ephemeral"`
AccountData EventsResponse `json:"account_data"`
UnreadNotifications UnreadNotifications `json:"unread_notifications"`
State EventsResponse `json:"state"`
Timeline internal.TimelineResponse `json:"timeline"`
Ephemeral EventsResponse `json:"ephemeral"`
AccountData EventsResponse `json:"account_data"`
UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}
type UnreadNotifications struct {
@ -171,12 +172,6 @@ type UnreadNotifications struct {
NotificationCount *int `json:"notification_count,omitempty"`
}
type TimelineResponse struct {
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
}
type EventsResponse struct {
Events []json.RawMessage `json:"events"`
}

View File

@ -262,14 +262,14 @@ func (h *Handler) OnBulkDeviceDataUpdate(payload *pubsub.V2DeviceData) {
h.v2Pub.Notify(pubsub.ChanV2, payload)
}
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error {
func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error {
// Remember any transaction IDs that may be unique to this user
eventIDsWithTxns := make([]string, 0, len(timeline)) // in timeline order
eventIDToTxnID := make(map[string]string, len(timeline)) // event_id -> txn_id
eventIDsWithTxns := make([]string, 0, len(timeline.Events)) // in timeline order
eventIDToTxnID := make(map[string]string, len(timeline.Events)) // event_id -> txn_id
// Also remember events which were sent by this user but lack a transaction ID.
eventIDsLackingTxns := make([]string, 0, len(timeline))
eventIDsLackingTxns := make([]string, 0, len(timeline.Events))
for _, e := range timeline {
for _, e := range timeline.Events {
parsed := gjson.ParseBytes(e)
eventID := parsed.Get("event_id").Str
@ -294,9 +294,9 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
}
// Insert new events
numNew, latestNIDs, err := h.Store.Accumulate(userID, roomID, prevBatch, timeline)
numNew, latestNIDs, err := h.Store.Accumulate(userID, roomID, timeline)
if err != nil {
logger.Err(err).Int("timeline", len(timeline)).Str("room", roomID).Msg("V2: failed to accumulate room")
logger.Err(err).Int("timeline", len(timeline.Events)).Str("room", roomID).Msg("V2: failed to accumulate room")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return err
}
@ -305,7 +305,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
if numNew != 0 {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Accumulate{
RoomID: roomID,
PrevBatch: prevBatch,
PrevBatch: timeline.PrevBatch,
EventNIDs: latestNIDs,
})
}
@ -323,7 +323,7 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
})
if err != nil {
logger.Err(err).
Int("timeline", len(timeline)).
Int("timeline", len(timeline.Events)).
Int("num_transaction_ids", len(eventIDsWithTxns)).
Int("num_missing_transaction_ids", len(eventIDsLackingTxns)).
Str("room", roomID).

View File

@ -36,7 +36,7 @@ type V2DataReceiver interface {
UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
// Accumulate data for this room. This means the timeline section of the v2 response.
// Return an error to stop the since token advancing.
Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error // latest pos with event nids of timeline entries
Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error // latest pos with event nids of timeline entries
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
// Return an error to stop the since token advancing.
@ -310,11 +310,11 @@ func (h *PollerMap) execute() {
func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) {
h.callbacks.UpdateDeviceSince(ctx, userID, deviceID, since)
}
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) (err error) {
func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) (err error) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
err = h.callbacks.Accumulate(ctx, userID, deviceID, roomID, prevBatch, timeline)
err = h.callbacks.Accumulate(ctx, userID, deviceID, roomID, timeline)
wg.Done()
}
wg.Wait()
@ -818,7 +818,8 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
if len(roomData.Timeline.Events) > 0 {
timelineCalls++
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline)
if err != nil {
return fmt.Errorf("Accumulate[%s]: %w", roomID, err)
}
@ -834,7 +835,7 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
for roomID, roomData := range res.Rooms.Leave {
if len(roomData.Timeline.Events) > 0 {
p.trackTimelineSize(len(roomData.Timeline.Events), roomData.Timeline.Limited)
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline.PrevBatch, roomData.Timeline.Events)
err := p.receiver.Accumulate(ctx, p.userID, p.deviceID, roomID, roomData.Timeline)
if err != nil {
return fmt.Errorf("Accumulate_Leave[%s]: %w", roomID, err)
}

View File

@ -782,7 +782,7 @@ func TestPollerResendsOnCallbackError(t *testing.T) {
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
"!foo:bar": {
Timeline: TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{
[]byte(`{"type":"m.room.message","content":{},"sender":"@alice:localhost","event_id":"$222"}`),
},
@ -876,7 +876,7 @@ func TestPollerResendsOnCallbackError(t *testing.T) {
Rooms: SyncRoomsResponse{
Leave: map[string]SyncV2LeaveResponse{
"!foo:bar": {
Timeline: TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{
[]byte(`{"type":"m.room.member","state_key":"` + pid.UserID + `","content":{"membership":"leave"}}`),
},
@ -984,7 +984,7 @@ func TestPollerDoesNotResendOnDataError(t *testing.T) {
Rooms: SyncRoomsResponse{
Join: map[string]SyncV2JoinResponse{
"!foo:bar": {
Timeline: TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{
[]byte(`{"type":"m.room.message","content":{},"sender":"@alice:localhost","event_id":"$222"}`),
},
@ -1056,8 +1056,8 @@ type mockDataReceiver struct {
updateSinceCalled chan struct{}
}
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error {
a.timelines[roomID] = append(a.timelines[roomID], timeline...)
func (a *mockDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error {
a.timelines[roomID] = append(a.timelines[roomID], timeline.Events...)
return nil
}
func (a *mockDataReceiver) Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) {
@ -1097,11 +1097,11 @@ type overrideDataReceiver struct {
onExpiredToken func(ctx context.Context, accessTokenHash, userID, deviceID string)
}
func (s *overrideDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID, prevBatch string, timeline []json.RawMessage) error {
func (s *overrideDataReceiver) Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline internal.TimelineResponse) error {
if s.accumulate == nil {
return nil
}
return s.accumulate(ctx, userID, deviceID, roomID, prevBatch, timeline)
return s.accumulate(ctx, userID, deviceID, roomID, timeline.PrevBatch, timeline.Events)
}
func (s *overrideDataReceiver) Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) {
if s.initialise == nil {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"testing"
"github.com/matrix-org/sliding-sync/state"
@ -38,12 +39,12 @@ func TestGlobalCacheLoadState(t *testing.T) {
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": "The Room Name"}),
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": "The Updated Room Name"}),
}
_, _, err := store.Accumulate(alice, roomID2, "", eventsRoom2)
_, _, err := store.Accumulate(alice, roomID2, internal.TimelineResponse{Events: eventsRoom2})
if err != nil {
t.Fatalf("Accumulate: %s", err)
}
_, latestNIDs, err := store.Accumulate(alice, roomID, "", events)
_, latestNIDs, err := store.Accumulate(alice, roomID, internal.TimelineResponse{Events: events})
if err != nil {
t.Fatalf("Accumulate: %s", err)
}

View File

@ -2,6 +2,7 @@ package syncv3
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"testing"
"time"
@ -418,7 +419,7 @@ func TestExtensionAccountData(t *testing.T) {
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomA: {
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: createRoomState(t, alice, time.Now()),
},
AccountData: sync2.EventsResponse{
@ -426,7 +427,7 @@ func TestExtensionAccountData(t *testing.T) {
},
},
roomB: {
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: createRoomState(t, alice, time.Now().Add(-1*time.Minute)),
},
AccountData: sync2.EventsResponse{
@ -434,7 +435,7 @@ func TestExtensionAccountData(t *testing.T) {
},
},
roomC: {
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: createRoomState(t, alice, time.Now().Add(-2*time.Minute)),
},
AccountData: sync2.EventsResponse{
@ -632,7 +633,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{
@ -651,7 +652,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{
@ -708,7 +709,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{
@ -729,7 +730,7 @@ func TestTypingMultiplePoller(t *testing.T) {
State: sync2.EventsResponse{
Events: roomState,
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{joinEv},
},
Ephemeral: sync2.EventsResponse{

View File

@ -2,6 +2,7 @@ package syncv3
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"testing"
"time"
@ -207,7 +208,7 @@ func TestFiltersInvite(t *testing.T) {
State: sync2.EventsResponse{
Events: createRoomState(t, "@creator:other", time.Now()),
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{testutils.NewJoinEvent(t, alice)},
},
},

View File

@ -3,6 +3,7 @@ package syncv3
import (
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"testing"
"time"
@ -80,7 +81,7 @@ func TestNotificationsOnTop(t *testing.T) {
UnreadNotifications: sync2.UnreadNotifications{
HighlightCount: ptr(1),
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{
bingEvent,
},
@ -105,7 +106,7 @@ func TestNotificationsOnTop(t *testing.T) {
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
noBingRoomID: {
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{
noBingEvent,
},

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sqlutil"
"net/http"
"os"
@ -177,7 +178,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) {
State: sync2.EventsResponse{
Events: []json.RawMessage{nameEvent, powerLevelsEvent},
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{messageEvent},
Limited: true,
PrevBatch: "batchymcbatchface",
@ -302,7 +303,7 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) {
State: sync2.EventsResponse{
Events: []json.RawMessage{bobLeave},
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: []json.RawMessage{aliceMessage},
Limited: true,
PrevBatch: "batchymcbatchface",

View File

@ -2,6 +2,7 @@ package syncv3
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"sort"
"testing"
"time"
@ -119,7 +120,7 @@ func (r *testRig) SetupV2RoomsForUser(t *testing.T, v2UserID string, f FlushEnum
State: sync2.EventsResponse{
Events: stateBlock,
},
Timeline: sync2.TimelineResponse{
Timeline: internal.TimelineResponse{
Events: timeline,
},
}

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"io/ioutil"
"log"
"net/http"
@ -471,7 +472,7 @@ func v2JoinTimeline(joinEvents ...roomEvents) map[string]sync2.SyncV2JoinRespons
result := make(map[string]sync2.SyncV2JoinResponse)
for _, re := range joinEvents {
var data sync2.SyncV2JoinResponse
data.Timeline = sync2.TimelineResponse{
data.Timeline = internal.TimelineResponse{
Events: re.events,
}
if re.state != nil {