sliding-sync/state/invites_table.go

141 lines
4.6 KiB
Go
Raw Permalink Normal View History

package state
import (
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
"database/sql"
"encoding/json"
"github.com/lib/pq"
"github.com/jmoiron/sqlx"
)
// InvitesTable stores invites for each user.
// Originally, invites were stored with the main events in a room. We ignored stripped state and
// just kept the m.room.member invite event. This had many problems though:
// - The room would be initialised by the invite event, causing the room to not populate with state
// correctly when the user joined the room.
// - The user could read room data in the room without being joined to the room e.g could pull
// `required_state` and `timeline` as they would be authorised by the invite to see this data.
//
// Instead, we now completely split out invites from the normal event flow. This fixes the issues
// outlined above but introduce more problems:
// - How do you sort the invite with rooms?
// - How do you calculate the room name when you lack heroes?
//
// For now, we say that invites:
// - are treated as a highlightable event for the purposes of sorting by highlight count.
// - are given the timestamp of when the invite arrived.
// - calculate the room name on a best-effort basis given the lack of heroes (same as element-web).
//
// When an invite is rejected, it appears in the `leave` section which then causes the invite to be
// removed from this table.
type InvitesTable struct {
db *sqlx.DB
}
func NewInvitesTable(db *sqlx.DB) *InvitesTable {
// make sure tables are made
db.MustExec(`
CREATE TABLE IF NOT EXISTS syncv3_invites (
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
-- JSON array. The contents of 'rooms.invite.$room_id.invite_state.events'
invite_state BYTEA NOT NULL,
UNIQUE(user_id, room_id)
);
`)
return &InvitesTable{db}
}
func (t *InvitesTable) RemoveInvite(userID, roomID string) error {
_, err := t.db.Exec(`DELETE FROM syncv3_invites WHERE user_id = $1 AND room_id = $2`, userID, roomID)
return err
}
// RemoveSupersededInvites accepts a list of events in the given room. The events should
// either
// - contain at most one membership event per user, or else
// - be in timeline order (most recent last)
//
// (corresponding to an Accumulate and an Initialise call, respectively).
//
// The events are scanned in order for membership changes, to determine the "final"
// memberships. Users who final membership is not "invite" have their outstanding
// invites to this room deleted.
func (t *InvitesTable) RemoveSupersededInvites(txn *sqlx.Tx, roomID string, newEvents []Event) error {
memberships := map[string]string{} // user ID -> memberships
for _, ev := range newEvents {
if ev.Type != "m.room.member" {
continue
}
memberships[ev.StateKey] = ev.Membership
}
var usersToRemove []string
for userID, membership := range memberships {
if membership != "invite" && membership != "_invite" {
usersToRemove = append(usersToRemove, userID)
}
}
if len(usersToRemove) == 0 {
return nil
}
_, err := txn.Exec(`
DELETE FROM syncv3_invites
WHERE user_id = ANY($1) AND room_id = $2
`, pq.StringArray(usersToRemove), roomID)
return err
}
func (t *InvitesTable) InsertInvite(userID, roomID string, inviteRoomState []json.RawMessage) error {
blob, err := json.Marshal(inviteRoomState)
if err != nil {
return err
}
_, err = t.db.Exec(
`INSERT INTO syncv3_invites(user_id, room_id, invite_state) VALUES($1,$2,$3)
ON CONFLICT (user_id, room_id) DO UPDATE SET invite_state = $3`,
userID, roomID, blob,
)
return err
}
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
func (t *InvitesTable) SelectInviteState(userID, roomID string) (inviteState []json.RawMessage, err error) {
var blob json.RawMessage
if err := t.db.QueryRow(`SELECT invite_state FROM syncv3_invites WHERE user_id=$1 AND room_id=$2`, userID, roomID).Scan(&blob); err != nil && err != sql.ErrNoRows {
return nil, err
}
if blob == nil {
return
}
if err := json.Unmarshal(blob, &inviteState); err != nil {
return nil, err
}
return inviteState, nil
}
// Select all invites for this user. Returns a map of room ID to invite_state (json array).
func (t *InvitesTable) SelectAllInvitesForUser(userID string) (map[string][]json.RawMessage, error) {
rows, err := t.db.Query(`SELECT room_id, invite_state FROM syncv3_invites WHERE user_id = $1`, userID)
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string][]json.RawMessage)
var roomID string
var blob json.RawMessage
for rows.Next() {
if err := rows.Scan(&roomID, &blob); err != nil {
return nil, err
}
var inviteState []json.RawMessage
if err := json.Unmarshal(blob, &inviteState); err != nil {
return nil, err
}
result[roomID] = inviteState
}
return result, nil
}