Store membership changes in events table

This commit is contained in:
Kegan Dougal 2021-10-26 18:39:22 +01:00
parent 594723e0c6
commit 0b58b0f62b
6 changed files with 114 additions and 45 deletions

View File

@ -329,19 +329,3 @@ func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (event
}
return eventsJSON, int64(events[len(events)-1].NID), nil
}
func isMembershipChange(eventJSON gjson.Result) bool {
// membership event possibly, make sure the membership has changed else
// things like display name changes will count as membership events :(
prevMembership := "leave"
pm := eventJSON.Get("unsigned.prev_content.membership")
if pm.Exists() && pm.Str != "" {
prevMembership = pm.Str
}
currMembership := "leave"
cm := eventJSON.Get("content.membership")
if cm.Exists() && cm.Str != "" {
currMembership = cm.Str
}
return prevMembership != currMembership // membership was changed
}

View File

@ -16,9 +16,10 @@ const (
)
type Event struct {
NID int64 `db:"event_nid"`
Type string `db:"event_type"`
StateKey string `db:"state_key"`
NID int64 `db:"event_nid"`
Type string `db:"event_type"`
StateKey string `db:"state_key"`
Membership string `db:"membership"`
// This is a snapshot ID which corresponds to some room state BEFORE this event has been applied.
BeforeStateSnapshotID int `db:"before_state_snapshot_id"`
ID string `db:"event_id"`
@ -61,6 +62,7 @@ func NewEventTable(db *sqlx.DB) *EventTable {
room_id TEXT NOT NULL,
event_type TEXT NOT NULL,
state_key TEXT NOT NULL,
membership TEXT,
event BYTEA NOT NULL
);
-- index for querying all joined rooms for a given user
@ -114,6 +116,16 @@ func (t *EventTable) Insert(txn *sqlx.Tx, events []Event) (int, error) {
// valid for this to be "" on message events
ev.StateKey = evJSON.Get("state_key").Str
}
if ev.StateKey != "" && ev.Type == "m.room.member" {
membershipResult := evJSON.Get("content.membership")
if !membershipResult.Exists() || membershipResult.Str == "" {
return 0, fmt.Errorf("membership event missing membership key")
}
// we only set this if it is a genuine change e.g not a profile change
if isMembershipChange(evJSON) {
ev.Membership = membershipResult.Str
}
}
events[i] = ev
}
@ -121,8 +133,8 @@ func (t *EventTable) Insert(txn *sqlx.Tx, events []Event) (int, error) {
var rowsAffected int64
for _, chunk := range chunks {
result, err := txn.NamedExec(`
INSERT INTO syncv3_events (event_id, event, event_type, state_key, room_id)
VALUES (:event_id, :event, :event_type, :state_key, :room_id) ON CONFLICT (event_id) DO NOTHING`, chunk)
INSERT INTO syncv3_events (event_id, event, event_type, state_key, room_id, membership)
VALUES (:event_id, :event, :event_type, :state_key, :room_id, :membership) ON CONFLICT (event_id) DO NOTHING`, chunk)
if err != nil {
return 0, err
}
@ -325,3 +337,19 @@ func (c EventChunker) Len() int {
func (c EventChunker) Subslice(i, j int) sqlutil.Chunker {
return c[i:j]
}
func isMembershipChange(eventJSON gjson.Result) bool {
// membership event possibly, make sure the membership has changed else
// things like display name changes will count as membership events :(
prevMembership := "leave"
pm := eventJSON.Get("unsigned.prev_content.membership")
if pm.Exists() && pm.Str != "" {
prevMembership = pm.Str
}
currMembership := "leave"
cm := eventJSON.Get("content.membership")
if cm.Exists() && cm.Str != "" {
currMembership = cm.Str
}
return prevMembership != currMembership // membership was changed
}

View File

@ -428,6 +428,47 @@ func TestEventTableSelectEventsBetween(t *testing.T) {
})
}
func TestEventTableMembershipDetection(t *testing.T) {
db, err := sqlx.Open("postgres", postgresConnectionString)
if err != nil {
t.Fatalf("failed to open SQL db: %s", err)
}
txn, err := db.Beginx()
if err != nil {
t.Fatalf("failed to start txn: %s", err)
}
defer txn.Commit()
roomID := "!TestEventTableMembershipDetection:localhost"
alice := "@TestEventTableMembershipDetection_alice:localhost"
bob := "@TestEventTableMembershipDetection_bob:localhost"
table := NewEventTable(db)
_, err = table.Insert(txn, []Event{
{
RoomID: roomID,
JSON: testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
},
{
RoomID: roomID,
JSON: testutils.NewStateEvent(t, "m.room.member", bob, alice, map[string]interface{}{"membership": "invite"}),
},
{
RoomID: roomID,
JSON: testutils.NewStateEvent(
t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"},
testutils.WithUnsigned(map[string]interface{}{
"prev_content": map[string]interface{}{
"membership": "join",
},
}),
),
},
})
if err != nil {
t.Fatalf("failed to insert: %s", err)
}
// TODO: assertions?
}
func TestChunkify(t *testing.T) {
// Make 100 dummy events
events := make([]Event, 100)

View File

@ -15,6 +15,16 @@ var (
eventIDMu sync.Mutex
)
type eventMock struct {
Type string `json:"type"`
StateKey string `json:"state_key"`
Sender string `json:"sender"`
Content interface{} `json:"content"`
EventID string `json:"event_id"`
OriginServerTS int64 `json:"origin_server_ts"`
Unsigned interface{} `json:"unsigned,omitempty"`
}
func generateEventID(t *testing.T) string {
eventIDMu.Lock()
defer eventIDMu.Unlock()
@ -24,26 +34,32 @@ func generateEventID(t *testing.T) string {
return fmt.Sprintf("$event_%d_%s", eventIDCounter, t.Name())
}
func NewStateEvent(t *testing.T, evType, stateKey, sender string, content interface{}, ts ...time.Time) json.RawMessage {
t.Helper()
e := struct {
Type string `json:"type"`
StateKey string `json:"state_key"`
Sender string `json:"sender"`
Content interface{} `json:"content"`
EventID string `json:"event_id"`
OriginServerTS int64 `json:"origin_server_ts"`
}{
Type: evType,
StateKey: stateKey,
Sender: sender,
Content: content,
EventID: generateEventID(t),
type eventMockModifier func(e *eventMock)
func WithTimestamp(ts time.Time) eventMockModifier {
return func(e *eventMock) {
e.OriginServerTS = int64(gomatrixserverlib.AsTimestamp(ts))
}
if len(ts) == 0 {
e.OriginServerTS = int64(gomatrixserverlib.AsTimestamp(time.Now()))
} else {
e.OriginServerTS = int64(gomatrixserverlib.AsTimestamp(ts[0]))
}
func WithUnsigned(unsigned interface{}) eventMockModifier {
return func(e *eventMock) {
e.Unsigned = unsigned
}
}
func NewStateEvent(t *testing.T, evType, stateKey, sender string, content interface{}, modifiers ...eventMockModifier) json.RawMessage {
t.Helper()
e := &eventMock{
Type: evType,
StateKey: stateKey,
Sender: sender,
Content: content,
EventID: generateEventID(t),
OriginServerTS: int64(gomatrixserverlib.AsTimestamp(time.Now())),
}
for _, m := range modifiers {
m(e)
}
j, err := json.Marshal(&e)
if err != nil {

View File

@ -34,7 +34,7 @@ func TestTimelines(t *testing.T) {
roomID: fmt.Sprintf("!TestTimelines_%d:localhost", i),
name: roomName,
events: append(createRoomState(t, alice, ts), []json.RawMessage{
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, ts.Add(3*time.Second)),
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, testutils.WithTimestamp(ts.Add(3*time.Second))),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, ts.Add(4*time.Second)),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "B"}, ts.Add(5*time.Second)),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "C"}, ts.Add(6*time.Second)),
@ -117,7 +117,7 @@ func TestTimelinesLiveStream(t *testing.T) {
roomID: fmt.Sprintf("!TestTimelinesLiveStream_%d:localhost", i),
name: roomName,
events: append(createRoomState(t, alice, ts), []json.RawMessage{
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, ts.Add(3*time.Second)),
testutils.NewStateEvent(t, "m.room.name", "", alice, map[string]interface{}{"name": roomName}, testutils.WithTimestamp(ts.Add(3*time.Second))),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "A"}, ts.Add(4*time.Second)),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "B"}, ts.Add(5*time.Second)),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "C"}, ts.Add(6*time.Second)),

View File

@ -245,9 +245,9 @@ func createRoomState(t *testing.T, creator string, baseTimestamp time.Time) []js
t.Helper()
// all with the same timestamp as they get made atomically
return []json.RawMessage{
testutils.NewStateEvent(t, "m.room.create", "", creator, map[string]interface{}{"creator": creator}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.member", creator, creator, map[string]interface{}{"membership": "join"}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.join_rules", "", creator, map[string]interface{}{"join_rule": "public"}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.create", "", creator, map[string]interface{}{"creator": creator}, testutils.WithTimestamp(baseTimestamp)),
testutils.NewStateEvent(t, "m.room.member", creator, creator, map[string]interface{}{"membership": "join"}, testutils.WithTimestamp(baseTimestamp)),
testutils.NewStateEvent(t, "m.room.join_rules", "", creator, map[string]interface{}{"join_rule": "public"}, testutils.WithTimestamp(baseTimestamp)),
}
}