Dump v2 responses into the accumulator

This commit is contained in:
Kegan Dougal 2021-06-03 16:18:01 +01:00
parent cc22c99a03
commit 502274b475
4 changed files with 92 additions and 37 deletions

View File

@ -247,19 +247,33 @@ func (a *Accumulator) Accumulate(roomID string, timeline []json.RawMessage) erro
// State events exist in this timeline, so make a new snapshot
// by pulling out the current snapshot and adding these state events
var oldStripped []StrippedEvent
snapID, err := a.roomsTable.CurrentSnapshotID(txn, roomID)
if err != nil {
return err
}
if snapID == 0 {
log.Error().Str("room_id", roomID).Msg(
"Accumulator.Accumulate: room has no current snapshot, probably because Initialise was never called. This is a bug.",
)
return fmt.Errorf("room not initialised yet!")
}
oldStripped, err := a.strippedEventsForSnapshot(txn, snapID)
if err != nil {
return err
// a missing snapshot is only okay if this is the start of the room, so we should have a create
// event in this list somewhere: verify it.
hasCreateEvent := false
for _, stateEvent := range newStateEvents {
if gjson.GetBytes(stateEvent, "type").Str == "m.room.create" {
hasCreateEvent = true
break
}
}
if !hasCreateEvent {
log.Error().Str("room_id", roomID).Msg(
"Accumulator.Accumulate: room has no current snapshot, and the timeline provided has no create event. " +
"Either Initialise should be called OR Accumulate with a create event to set up the snapshot. This is a bug.",
)
return fmt.Errorf("room not initialised yet!")
}
} else {
oldStripped, err = a.strippedEventsForSnapshot(txn, snapID)
if err != nil {
return err
}
}
// pull stripped events for the state we just inserted
newStripped, err := a.eventsTable.SelectStrippedEventsByIDs(txn, newStateEventIDs)
@ -298,6 +312,9 @@ func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (event
if err != nil {
return nil, 0, err
}
if len(events) == 0 {
return nil, lastEventNID, nil
}
eventsJSON = make([]json.RawMessage, len(events))
for i := range events {
eventsJSON[i] = events[i].JSON

View File

@ -152,30 +152,49 @@ func TestAccumulatorAccumulate(t *testing.T) {
}
}
/*
func TestAccumulatorDelta(t *testing.T) {
roomID := "!TestAccumulatorAccumulate:localhost"
roomEvents := []json.RawMessage{
[]byte(`{"event_id":"D", "type":"m.room.create", "state_key":"", "content":{"creator":"@me:localhost"}}`),
[]byte(`{"event_id":"E", "type":"m.room.member", "state_key":"@me:localhost", "content":{"membership":"join"}}`),
[]byte(`{"event_id":"F", "type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"public"}}`),
}
roomID := "!TestAccumulatorDelta:localhost"
accumulator := NewAccumulator(postgresConnectionString)
err := accumulator.Initialise(roomID, roomEvents)
err := accumulator.Initialise(roomID, nil)
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}
// accumulate new state makes a new snapshot and removes the old snapshot
newEvents := []json.RawMessage{
// non-state event does nothing
[]byte(`{"event_id":"G", "type":"m.room.message","content":{"body":"Hello World","msgtype":"m.text"}}`),
// join_rules should clobber the one from initialise
[]byte(`{"event_id":"H", "type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"public"}}`),
// new state event should be added to the snapshot
[]byte(`{"event_id":"I", "type":"m.room.history_visibility", "state_key":"", "content":{"visibility":"public"}}`),
roomEvents := []json.RawMessage{
[]byte(`{"event_id":"aD", "type":"m.room.create", "state_key":"", "content":{"creator":"@me:localhost"}}`),
[]byte(`{"event_id":"aE", "type":"m.room.member", "state_key":"@me:localhost", "content":{"membership":"join"}}`),
[]byte(`{"event_id":"aF", "type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"public"}}`),
[]byte(`{"event_id":"aG", "type":"m.room.message","content":{"body":"Hello World","msgtype":"m.text"}}`),
[]byte(`{"event_id":"aH", "type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"public"}}`),
[]byte(`{"event_id":"aI", "type":"m.room.history_visibility", "state_key":"", "content":{"visibility":"public"}}`),
}
if err = accumulator.Accumulate(roomID, newEvents); err != nil {
if err = accumulator.Accumulate(roomID, roomEvents); err != nil {
t.Fatalf("failed to Accumulate: %s", err)
}
} */
// Draw the create event, tests limits
events, position, err := accumulator.Delta(roomID, EventsStart, 1)
if err != nil {
t.Fatalf("failed to Delta: %s", err)
}
if len(events) != 1 {
t.Fatalf("failed to get events from Delta, got %d want 1", len(events))
}
if gjson.GetBytes(events[0], "event_id").Str != gjson.GetBytes(roomEvents[0], "event_id").Str {
t.Fatalf("failed to draw first event, got %s want %s", string(events[0]), string(roomEvents[0]))
}
if position == 0 {
t.Errorf("Delta returned zero position")
}
// Draw up to the end
events, position, err = accumulator.Delta(roomID, position, 1000)
if err != nil {
t.Fatalf("failed to Delta: %s", err)
}
if len(events) != len(roomEvents)-1 {
t.Fatalf("failed to get events from Delta, got %d want %d", len(events), len(roomEvents)-1)
}
if position == 0 {
t.Errorf("Delta returned zero position")
}
}

12
v2.go
View File

@ -67,18 +67,18 @@ type SyncV2Response struct {
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2JoinResponse struct {
State struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
Events []json.RawMessage `json:"events"`
} `json:"state"`
Timeline struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
} `json:"timeline"`
Ephemeral struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
Events []json.RawMessage `json:"events"`
} `json:"ephemeral"`
AccountData struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
Events []json.RawMessage `json:"events"`
} `json:"account_data"`
}

27
v3.go
View File

@ -12,6 +12,7 @@ import (
"github.com/gorilla/mux"
"github.com/justinas/alice"
"github.com/matrix-org/sync-v3/state"
"github.com/rs/zerolog"
"github.com/rs/zerolog/hlog"
)
@ -45,7 +46,8 @@ func RunSyncV3Server(destinationServer, bindAddr, postgresDBURI string) {
},
DestinationServer: destinationServer,
},
Sessions: NewSessions(postgresDBURI),
Sessions: NewSessions(postgresDBURI),
Accumulator: state.NewAccumulator(postgresDBURI),
}
// HTTP path routing
@ -61,8 +63,9 @@ func RunSyncV3Server(destinationServer, bindAddr, postgresDBURI string) {
}
type SyncV3Handler struct {
V2 *V2
Sessions *Sessions
V2 *V2
Sessions *Sessions
Accumulator *state.Accumulator
}
func (h *SyncV3Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -112,7 +115,7 @@ func (h *SyncV3Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
// populate tables
h.accumulate(v2res)
// return data based on filters
@ -131,6 +134,22 @@ func (h *SyncV3Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Write(v3res)
}
func (h *SyncV3Handler) accumulate(res *SyncV2Response) {
for roomID, roomData := range res.Rooms.Join {
if len(roomData.State.Events) > 0 {
err := h.Accumulator.Initialise(roomID, roomData.State.Events)
if err != nil {
log.Err(err).Str("room_id", roomID).Int("num_state_events", len(roomData.State.Events)).Msg("Accumulator.Initialise failed")
}
}
err := h.Accumulator.Accumulate(roomID, roomData.Timeline.Events)
if err != nil {
log.Err(err).Str("room_id", roomID).Int("num_timeline_events", len(roomData.Timeline.Events)).Msg("Accumulator.Accumulate failed")
}
}
log.Info().Int("num_rooms", len(res.Rooms.Join)).Msg("accumulated data")
}
type jsonError struct {
Err string `json:"error"`
}