diff --git a/state/device_data_table.go b/state/device_data_table.go index 1ed3e87..2c5576d 100644 --- a/state/device_data_table.go +++ b/state/device_data_table.go @@ -21,7 +21,8 @@ type DeviceDataRow struct { } type DeviceDataTable struct { - db *sqlx.DB + db *sqlx.DB + deviceListTable *DeviceListTable } func NewDeviceDataTable(db *sqlx.DB) *DeviceDataTable { @@ -37,7 +38,8 @@ func NewDeviceDataTable(db *sqlx.DB) *DeviceDataTable { ALTER TABLE syncv3_device_data SET (fillfactor = 90); `) return &DeviceDataTable{ - db: db, + db: db, + deviceListTable: NewDeviceListTable(db), } } diff --git a/state/device_data_table_test.go b/state/device_data_table_test.go index d099eda..b4fe6ad 100644 --- a/state/device_data_table_test.go +++ b/state/device_data_table_test.go @@ -118,160 +118,6 @@ func TestDeviceDataTableOTKCountAndFallbackKeyTypes(t *testing.T) { assertDeviceData(t, *got, want) } -// Tests the DeviceLists field -func TestDeviceDataTableDeviceList(t *testing.T) { - db, close := connectToDB(t) - defer close() - table := NewDeviceDataTable(db) - userID := "@TestDeviceDataTableDeviceList" - deviceID := "BOB" - - // these are individual updates from Synapse from /sync v2 - deltas := []internal.DeviceData{ - { - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - New: internal.ToDeviceListChangesMap([]string{"alice"}, nil), - }, - }, - { - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - New: internal.ToDeviceListChangesMap([]string{"💣"}, nil), - }, - }, - } - // apply them - for _, dd := range deltas { - err := table.Upsert(&dd) - assertNoError(t, err) - } - - // check we can read-only select. This doesn't modify any fields. - for i := 0; i < 3; i++ { - got, err := table.Select(userID, deviceID, false) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.MapStringInt{}, // until we "swap" we don't consume the New entries - }, - }) - } - // now swap-er-roo, which shifts everything from New into Sent. - got, err := table.Select(userID, deviceID, true) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil), - }, - }) - - // this is permanent, read-only views show this too. - got, err = table.Select(userID, deviceID, false) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil), - }, - }) - - // We now expect empty DeviceLists, as we swapped twice. - got, err = table.Select(userID, deviceID, true) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.MapStringInt{}, - }, - }) - - // get back the original state - assertNoError(t, err) - for _, dd := range deltas { - err = table.Upsert(&dd) - assertNoError(t, err) - } - // Move original state to Sent by swapping - got, err = table.Select(userID, deviceID, true) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil), - }, - }) - // Add new entries to New before acknowledging Sent - err = table.Upsert(&internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - New: internal.ToDeviceListChangesMap([]string{"💣"}, []string{"charlie"}), - }, - }) - assertNoError(t, err) - - // Reading without swapping does not move New->Sent, so returns the previous value - got, err = table.Select(userID, deviceID, false) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil), - }, - }) - - // Append even more items to New - err = table.Upsert(&internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - New: internal.ToDeviceListChangesMap([]string{"dave"}, []string{"dave"}), - }, - }) - assertNoError(t, err) - - // Now swap: all the combined items in New go into Sent - got, err = table.Select(userID, deviceID, true) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.ToDeviceListChangesMap([]string{"💣", "dave"}, []string{"charlie", "dave"}), - }, - }) - - // Swapping again clears Sent out, and since nothing is in New we get an empty list - got, err = table.Select(userID, deviceID, true) - assertNoError(t, err) - assertDeviceData(t, *got, internal.DeviceData{ - UserID: userID, - DeviceID: deviceID, - DeviceLists: internal.DeviceLists{ - Sent: internal.MapStringInt{}, - }, - }) - - // delete everything, no data returned - assertNoError(t, table.DeleteDevice(userID, deviceID)) - got, err = table.Select(userID, deviceID, false) - assertNoError(t, err) - if got != nil { - t.Errorf("wanted no data, got %v", got) - } -} - func TestDeviceDataTableBitset(t *testing.T) { db, close := connectToDB(t) defer close() diff --git a/state/device_list_table.go b/state/device_list_table.go new file mode 100644 index 0000000..5baae56 --- /dev/null +++ b/state/device_list_table.go @@ -0,0 +1,110 @@ +package state + +import ( + "fmt" + + "github.com/getsentry/sentry-go" + "github.com/jmoiron/sqlx" + "github.com/matrix-org/sliding-sync/internal" + "github.com/matrix-org/sliding-sync/sqlutil" +) + +const ( + BucketNew = 1 + BucketSent = 2 +) + +type DeviceListTable struct { + db *sqlx.DB +} + +func NewDeviceListTable(db *sqlx.DB) *DeviceListTable { + db.MustExec(` + CREATE TABLE IF NOT EXISTS syncv3_device_list_updates ( + user_id TEXT NOT NULL, + device_id TEXT NOT NULL, + target_user_id TEXT NOT NULL, + target_state SMALLINT NOT NULL, + bucket SMALLINT NOT NULL, + UNIQUE(user_id, device_id, target_user_id, bucket) + ); + -- make an index so selecting all the rows is faster + CREATE INDEX IF NOT EXISTS syncv3_device_list_updates_bucket_idx ON syncv3_device_list_updates(user_id, device_id, bucket); + -- Set the fillfactor to 90%, to allow for HOT updates (e.g. we only + -- change the data, not anything indexed like the id) + ALTER TABLE syncv3_device_list_updates SET (fillfactor = 90); + `) + return &DeviceListTable{ + db: db, + } +} + +// Upsert new device list changes. +func (t *DeviceListTable) Upsert(userID, deviceID string, deviceListChanges map[string]int) (err error) { + err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error { + for targetUserID, targetState := range deviceListChanges { + if targetState != internal.DeviceListChanged && targetState != internal.DeviceListLeft { + sentry.CaptureException(fmt.Errorf("DeviceListTable.Upsert invalid target_state: %d this is a programming error", targetState)) + continue + } + _, err = txn.Exec( + `INSERT INTO syncv3_device_list_updates(user_id, device_id, target_user_id, target_state, bucket) VALUES($1,$2,$3,$4,$5) + ON CONFLICT (user_id, device_id, target_user_id, bucket) DO UPDATE SET target_state=$4`, + userID, deviceID, targetUserID, targetState, BucketNew, + ) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + sentry.CaptureException(err) + } + return +} + +// Select device list changes for this client. Returns a map of user_id => change enum. +func (t *DeviceListTable) Select(userID, deviceID string, swap bool) (result internal.MapStringInt, err error) { + err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error { + if !swap { + // read only view, just return what we previously sent and don't do anything else. + result, err = t.selectDeviceListChangesInBucket(txn, userID, deviceID, BucketSent) + return err + } + + // delete the now acknowledged 'sent' data + _, err = txn.Exec(`DELETE FROM syncv3_device_list_updates WHERE user_id=$1 AND device_id=$2 AND bucket=$3`, userID, deviceID, BucketSent) + if err != nil { + return err + } + // grab any 'new' updates + result, err = t.selectDeviceListChangesInBucket(txn, userID, deviceID, BucketNew) + if err != nil { + return err + } + + // mark these 'new' updates as 'sent' + _, err = txn.Exec(`UPDATE syncv3_device_list_updates SET bucket=$1 WHERE user_id=$2 AND device_id=$3 AND bucket=$4`, BucketSent, userID, deviceID, BucketNew) + return err + }) + return +} + +func (t *DeviceListTable) selectDeviceListChangesInBucket(txn *sqlx.Tx, userID, deviceID string, bucket int) (result internal.MapStringInt, err error) { + rows, err := txn.Query(`SELECT target_user_id, target_state FROM syncv3_device_list_updates WHERE user_id=$1 AND device_id=$2 AND bucket=$3`, userID, deviceID, bucket) + if err != nil { + return nil, err + } + defer rows.Close() + result = make(internal.MapStringInt) + var targetUserID string + var targetState int + for rows.Next() { + if err := rows.Scan(&targetUserID, &targetState); err != nil { + return nil, err + } + result[targetUserID] = targetState + } + return result, rows.Err() +} diff --git a/state/device_list_table_test.go b/state/device_list_table_test.go new file mode 100644 index 0000000..79cf843 --- /dev/null +++ b/state/device_list_table_test.go @@ -0,0 +1,108 @@ +package state + +import ( + "testing" + + "github.com/matrix-org/sliding-sync/internal" +) + +// Tests the DeviceLists table +func TestDeviceListTable(t *testing.T) { + db, close := connectToDB(t) + defer close() + table := NewDeviceListTable(db) + userID := "@TestDeviceListTable" + deviceID := "BOB" + + // these are individual updates from Synapse from /sync v2 + deltas := []internal.MapStringInt{ + { + "alice": internal.DeviceListChanged, + }, + { + "💣": internal.DeviceListChanged, + }, + } + // apply them + for _, dd := range deltas { + err := table.Upsert(userID, deviceID, dd) + assertNoError(t, err) + } + + // check we can read-only select. This doesn't modify any fields. + for i := 0; i < 3; i++ { + got, err := table.Select(userID, deviceID, false) + assertNoError(t, err) + // until we "swap" we don't consume the New entries + assertVal(t, "unexpected data on swapless select", got, internal.MapStringInt{}) + } + // now swap-er-roo, which shifts everything from New into Sent. + got, err := table.Select(userID, deviceID, true) + assertNoError(t, err) + assertVal(t, "did not select what was upserted on swap select", got, internal.MapStringInt{ + "alice": internal.DeviceListChanged, + "💣": internal.DeviceListChanged, + }) + + // this is permanent, read-only views show this too. + got, err = table.Select(userID, deviceID, false) + assertNoError(t, err) + assertVal(t, "swapless select did not return the same data as before", got, internal.MapStringInt{ + "alice": internal.DeviceListChanged, + "💣": internal.DeviceListChanged, + }) + + // We now expect empty DeviceLists, as we swapped twice. + got, err = table.Select(userID, deviceID, true) + assertNoError(t, err) + assertVal(t, "swap select did not return nothing", got, internal.MapStringInt{}) + + // get back the original state + assertNoError(t, err) + for _, dd := range deltas { + err = table.Upsert(userID, deviceID, dd) + assertNoError(t, err) + } + // Move original state to Sent by swapping + got, err = table.Select(userID, deviceID, true) + assertNoError(t, err) + assertVal(t, "did not select what was upserted on swap select", got, internal.MapStringInt{ + "alice": internal.DeviceListChanged, + "💣": internal.DeviceListChanged, + }) + // Add new entries to New before acknowledging Sent + err = table.Upsert(userID, deviceID, internal.MapStringInt{ + "💣": internal.DeviceListChanged, + "charlie": internal.DeviceListLeft, + }) + assertNoError(t, err) + + // Reading without swapping does not move New->Sent, so returns the previous value + got, err = table.Select(userID, deviceID, false) + assertNoError(t, err) + assertVal(t, "swapless select did not return the same data as before", got, internal.MapStringInt{ + "alice": internal.DeviceListChanged, + "💣": internal.DeviceListChanged, + }) + + // Append even more items to New + err = table.Upsert(userID, deviceID, internal.MapStringInt{ + "charlie": internal.DeviceListChanged, // we previously said "left" for charlie, so as "changed" is newer, we should see "changed" + "dave": internal.DeviceListLeft, + }) + assertNoError(t, err) + + // Now swap: all the combined items in New go into Sent + got, err = table.Select(userID, deviceID, true) + assertNoError(t, err) + assertVal(t, "swap select did not return combined new items", got, internal.MapStringInt{ + "💣": internal.DeviceListChanged, + "charlie": internal.DeviceListChanged, + "dave": internal.DeviceListLeft, + }) + + // Swapping again clears Sent out, and since nothing is in New we get an empty list + got, err = table.Select(userID, deviceID, true) + assertNoError(t, err) + assertVal(t, "swap select did not return combined new items", got, internal.MapStringInt{}) +}