Merge pull request #333 from matrix-org/dmr/cleanup-invites-in-poller

This commit is contained in:
David Robertson 2023-10-11 17:55:45 +01:00 committed by GitHub
commit de9006de9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 212 additions and 15 deletions

View File

@ -28,6 +28,7 @@ type Accumulator struct {
eventsTable *EventTable
snapshotTable *SnapshotTable
spacesTable *SpacesTable
invitesTable *InvitesTable
entityName string
}
@ -38,6 +39,7 @@ func NewAccumulator(db *sqlx.DB) *Accumulator {
eventsTable: NewEventTable(db),
snapshotTable: NewSnapshotsTable(db),
spacesTable: NewSpacesTable(db),
invitesTable: NewInvitesTable(db),
entityName: "server",
}
}
@ -295,6 +297,10 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
}
}
if err = a.invitesTable.RemoveSupersededInvites(txn, roomID, events); err != nil {
return fmt.Errorf("RemoveSupersededInvites: %w", err)
}
if err = a.spacesTable.HandleSpaceUpdates(txn, events); err != nil {
return fmt.Errorf("HandleSpaceUpdates: %s", err)
}
@ -431,7 +437,9 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
}
var latestNID int64
newEventsByID := make([]Event, 0, len(eventIDToNID))
// postInsertEvents matches newEvents, but a) it has NIDs, and b) state events have
// the IsState field hacked to true so we don't insert them into the timeline.
postInsertEvents := make([]Event, 0, len(eventIDToNID))
redactTheseEventIDs := make(map[string]*Event)
for i, ev := range newEvents {
nid, ok := eventIDToNID[ev.ID]
@ -459,7 +467,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
redactTheseEventIDs[redactsEventID] = &newEvents[i]
}
}
newEventsByID = append(newEventsByID, ev)
postInsertEvents = append(postInsertEvents, ev)
result.TimelineNIDs = append(result.TimelineNIDs, ev.NID)
}
}
@ -485,7 +493,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
}
}
for _, ev := range newEventsByID {
for _, ev := range postInsertEvents {
var replacesNID int64
// the snapshot ID we assign to this event is unaffected by whether /this/ event is state or not,
// as this is the before snapshot ID.
@ -541,12 +549,16 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
result.RequiresReload = currentStateRedactions > 0
}
if err = a.spacesTable.HandleSpaceUpdates(txn, newEventsByID); err != nil {
if err = a.invitesTable.RemoveSupersededInvites(txn, roomID, postInsertEvents); err != nil {
return AccumulateResult{}, fmt.Errorf("RemoveSupersededInvites: %w", err)
}
if err = a.spacesTable.HandleSpaceUpdates(txn, postInsertEvents); err != nil {
return AccumulateResult{}, fmt.Errorf("HandleSpaceUpdates: %s", err)
}
// the last fetched snapshot ID is the current one
info := a.roomInfoDelta(roomID, newEventsByID)
info := a.roomInfoDelta(roomID, postInsertEvents)
if err = a.roomsTable.Upsert(txn, info, snapID, latestNID); err != nil {
return AccumulateResult{}, fmt.Errorf("failed to UpdateCurrentSnapshotID to %d: %w", snapID, err)
}

View File

@ -3,6 +3,7 @@ package state
import (
"database/sql"
"encoding/json"
"github.com/lib/pq"
"github.com/jmoiron/sqlx"
)
@ -14,14 +15,17 @@ import (
// correctly when the user joined the room.
// - The user could read room data in the room without being joined to the room e.g could pull
// `required_state` and `timeline` as they would be authorised by the invite to see this data.
//
// Instead, we now completely split out invites from the normal event flow. This fixes the issues
// outlined above but introduce more problems:
// - How do you sort the invite with rooms?
// - How do you calculate the room name when you lack heroes?
//
// For now, we say that invites:
// - are treated as a highlightable event for the purposes of sorting by highlight count.
// - are given the timestamp of when the invite arrived.
// - calculate the room name on a best-effort basis given the lack of heroes (same as element-web).
//
// When an invite is rejected, it appears in the `leave` section which then causes the invite to be
// removed from this table.
type InvitesTable struct {
@ -47,6 +51,44 @@ func (t *InvitesTable) RemoveInvite(userID, roomID string) error {
return err
}
// RemoveSupersededInvites accepts a list of events in the given room. The events should
// either
// - contain at most one membership event per user, or else
// - be in timeline order (most recent last)
//
// (corresponding to an Accumulate and an Initialise call, respectively).
//
// The events are scanned in order for membership changes, to determine the "final"
// memberships. Users who final membership is not "invite" have their outstanding
// invites to this room deleted.
func (t *InvitesTable) RemoveSupersededInvites(txn *sqlx.Tx, roomID string, newEvents []Event) error {
memberships := map[string]string{} // user ID -> memberships
for _, ev := range newEvents {
if ev.Type != "m.room.member" {
continue
}
memberships[ev.StateKey] = ev.Membership
}
var usersToRemove []string
for userID, membership := range memberships {
if membership != "invite" && membership != "_invite" {
usersToRemove = append(usersToRemove, userID)
}
}
if len(usersToRemove) == 0 {
return nil
}
_, err := txn.Exec(`
DELETE FROM syncv3_invites
WHERE user_id = ANY($1) AND room_id = $2
`, pq.StringArray(usersToRemove), roomID)
return err
}
func (t *InvitesTable) InsertInvite(userID, roomID string, inviteRoomState []json.RawMessage) error {
blob, err := json.Marshal(inviteRoomState)
if err != nil {

View File

@ -2,6 +2,8 @@ package state
import (
"encoding/json"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"reflect"
"testing"
)
@ -128,6 +130,155 @@ func TestInviteTable(t *testing.T) {
}
}
func TestInviteTable_RemoveSupersededInvites(t *testing.T) {
db, close := connectToDB(t)
defer close()
alice := "@alice:localhost"
bob := "@bob:localhost"
roomA := "!a:localhost"
roomB := "!b:localhost"
inviteState := []json.RawMessage{[]byte(`{"foo":"bar"}`)}
table := NewInvitesTable(db)
t.Log("Invite Alice and Bob to both rooms.")
// Add some invites
if err := table.InsertInvite(alice, roomA, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}
if err := table.InsertInvite(bob, roomA, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}
if err := table.InsertInvite(alice, roomB, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}
if err := table.InsertInvite(bob, roomB, inviteState); err != nil {
t.Fatalf("failed to InsertInvite: %s", err)
}
t.Log("Alice joins room A. Remove her superseded invite.")
newEvents := []Event{
{
Type: "m.room.member",
StateKey: alice,
Membership: "join",
RoomID: roomA,
},
}
err := sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomA, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}
t.Log("Alice should still be invited to room B.")
assertInvites(t, table, alice, map[string][]json.RawMessage{roomB: inviteState})
t.Log("Bob should still be invited to rooms A and B.")
assertInvites(t, table, bob, map[string][]json.RawMessage{roomA: inviteState, roomB: inviteState})
t.Log("Bob declines his invitation to room B.")
newEvents = []Event{
{
Type: "m.room.member",
StateKey: bob,
Membership: "leave",
RoomID: roomB,
},
}
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomB, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}
t.Log("Alice should still be invited to room B.")
assertInvites(t, table, alice, map[string][]json.RawMessage{roomB: inviteState})
t.Log("Bob should still be invited to room A.")
assertInvites(t, table, bob, map[string][]json.RawMessage{roomA: inviteState})
// Now try multiple membership changes in one call.
t.Log("Alice joins, changes profile, leaves and is re-invited to room B.")
newEvents = []Event{
{
Type: "m.room.member",
StateKey: alice,
Membership: "join",
RoomID: roomB,
},
{
Type: "m.room.member",
StateKey: alice,
Membership: "_join",
RoomID: roomB,
},
{
Type: "m.room.member",
StateKey: alice,
Membership: "leave",
RoomID: roomB,
},
{
Type: "m.room.member",
StateKey: alice,
Membership: "invite",
RoomID: roomB,
},
}
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomB, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}
t.Log("Alice should still be invited to room B.")
assertInvites(t, table, alice, map[string][]json.RawMessage{roomB: inviteState})
t.Log("Bob declines, is reinvited to and joins room A.")
newEvents = []Event{
{
Type: "m.room.member",
StateKey: bob,
Membership: "leave",
RoomID: roomA,
},
{
Type: "m.room.member",
StateKey: bob,
Membership: "invite",
RoomID: roomA,
},
{
Type: "m.room.member",
StateKey: bob,
Membership: "join",
RoomID: roomA,
},
}
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
return table.RemoveSupersededInvites(txn, roomA, newEvents)
})
if err != nil {
t.Fatalf("failed to RemoveSupersededInvites: %s", err)
}
assertInvites(t, table, bob, map[string][]json.RawMessage{})
}
func assertInvites(t *testing.T, table *InvitesTable, user string, expected map[string][]json.RawMessage) {
invites, err := table.SelectAllInvitesForUser(user)
if err != nil {
t.Fatalf("failed to SelectAllInvitesForUser: %s", err)
}
if !reflect.DeepEqual(invites, expected) {
t.Fatalf("got %v invites, want %v", invites, expected)
}
}
func jsonArrStr(a []json.RawMessage) (result string) {
for _, e := range a {
result += string(e) + "\n"

View File

@ -85,6 +85,7 @@ func NewStorageWithDB(db *sqlx.DB, addPrometheusMetrics bool) *Storage {
eventsTable: NewEventTable(db),
snapshotTable: NewSnapshotsTable(db),
spacesTable: NewSpacesTable(db),
invitesTable: NewInvitesTable(db),
entityName: "server",
}
@ -94,7 +95,7 @@ func NewStorageWithDB(db *sqlx.DB, addPrometheusMetrics bool) *Storage {
UnreadTable: NewUnreadTable(db),
EventsTable: acc.eventsTable,
AccountDataTable: NewAccountDataTable(db),
InvitesTable: NewInvitesTable(db),
InvitesTable: acc.invitesTable,
TransactionsTable: NewTransactionsTable(db),
DeviceDataTable: NewDeviceDataTable(db),
ReceiptTable: NewReceiptTable(db),

View File

@ -345,15 +345,6 @@ func (c *GlobalCache) OnNewEvent(
// remove this user as a hero
metadata.RemoveHero(*ed.StateKey)
}
if membership == "join" && eventJSON.Get("unsigned.prev_content.membership").Str == "invite" {
// invite -> join, retire any outstanding invites
err := c.store.InvitesTable.RemoveInvite(*ed.StateKey, ed.RoomID)
if err != nil {
logger.Err(err).Str("user", *ed.StateKey).Str("room", ed.RoomID).Msg("failed to remove accepted invite")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
}
}
if len(metadata.Heroes) < 6 && (membership == "join" || membership == "invite") {
// try to find the existing hero e.g they changed their display name