Add DeviceListTable

Shift over unit tests from DeviceDataTable
This commit is contained in:
Kegan Dougal 2024-05-17 09:37:38 +01:00
parent 693587ef7e
commit 2cd9a81ab2
4 changed files with 222 additions and 156 deletions

View File

@ -21,7 +21,8 @@ type DeviceDataRow struct {
}
type DeviceDataTable struct {
db *sqlx.DB
db *sqlx.DB
deviceListTable *DeviceListTable
}
func NewDeviceDataTable(db *sqlx.DB) *DeviceDataTable {
@ -37,7 +38,8 @@ func NewDeviceDataTable(db *sqlx.DB) *DeviceDataTable {
ALTER TABLE syncv3_device_data SET (fillfactor = 90);
`)
return &DeviceDataTable{
db: db,
db: db,
deviceListTable: NewDeviceListTable(db),
}
}

View File

@ -118,160 +118,6 @@ func TestDeviceDataTableOTKCountAndFallbackKeyTypes(t *testing.T) {
assertDeviceData(t, *got, want)
}
// Tests the DeviceLists field
func TestDeviceDataTableDeviceList(t *testing.T) {
db, close := connectToDB(t)
defer close()
table := NewDeviceDataTable(db)
userID := "@TestDeviceDataTableDeviceList"
deviceID := "BOB"
// these are individual updates from Synapse from /sync v2
deltas := []internal.DeviceData{
{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
New: internal.ToDeviceListChangesMap([]string{"alice"}, nil),
},
},
{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
New: internal.ToDeviceListChangesMap([]string{"💣"}, nil),
},
},
}
// apply them
for _, dd := range deltas {
err := table.Upsert(&dd)
assertNoError(t, err)
}
// check we can read-only select. This doesn't modify any fields.
for i := 0; i < 3; i++ {
got, err := table.Select(userID, deviceID, false)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.MapStringInt{}, // until we "swap" we don't consume the New entries
},
})
}
// now swap-er-roo, which shifts everything from New into Sent.
got, err := table.Select(userID, deviceID, true)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil),
},
})
// this is permanent, read-only views show this too.
got, err = table.Select(userID, deviceID, false)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil),
},
})
// We now expect empty DeviceLists, as we swapped twice.
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.MapStringInt{},
},
})
// get back the original state
assertNoError(t, err)
for _, dd := range deltas {
err = table.Upsert(&dd)
assertNoError(t, err)
}
// Move original state to Sent by swapping
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil),
},
})
// Add new entries to New before acknowledging Sent
err = table.Upsert(&internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
New: internal.ToDeviceListChangesMap([]string{"💣"}, []string{"charlie"}),
},
})
assertNoError(t, err)
// Reading without swapping does not move New->Sent, so returns the previous value
got, err = table.Select(userID, deviceID, false)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.ToDeviceListChangesMap([]string{"alice", "💣"}, nil),
},
})
// Append even more items to New
err = table.Upsert(&internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
New: internal.ToDeviceListChangesMap([]string{"dave"}, []string{"dave"}),
},
})
assertNoError(t, err)
// Now swap: all the combined items in New go into Sent
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.ToDeviceListChangesMap([]string{"💣", "dave"}, []string{"charlie", "dave"}),
},
})
// Swapping again clears Sent out, and since nothing is in New we get an empty list
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertDeviceData(t, *got, internal.DeviceData{
UserID: userID,
DeviceID: deviceID,
DeviceLists: internal.DeviceLists{
Sent: internal.MapStringInt{},
},
})
// delete everything, no data returned
assertNoError(t, table.DeleteDevice(userID, deviceID))
got, err = table.Select(userID, deviceID, false)
assertNoError(t, err)
if got != nil {
t.Errorf("wanted no data, got %v", got)
}
}
func TestDeviceDataTableBitset(t *testing.T) {
db, close := connectToDB(t)
defer close()

110
state/device_list_table.go Normal file
View File

@ -0,0 +1,110 @@
package state
import (
"fmt"
"github.com/getsentry/sentry-go"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sqlutil"
)
const (
BucketNew = 1
BucketSent = 2
)
type DeviceListTable struct {
db *sqlx.DB
}
func NewDeviceListTable(db *sqlx.DB) *DeviceListTable {
db.MustExec(`
CREATE TABLE IF NOT EXISTS syncv3_device_list_updates (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
target_user_id TEXT NOT NULL,
target_state SMALLINT NOT NULL,
bucket SMALLINT NOT NULL,
UNIQUE(user_id, device_id, target_user_id, bucket)
);
-- make an index so selecting all the rows is faster
CREATE INDEX IF NOT EXISTS syncv3_device_list_updates_bucket_idx ON syncv3_device_list_updates(user_id, device_id, bucket);
-- Set the fillfactor to 90%, to allow for HOT updates (e.g. we only
-- change the data, not anything indexed like the id)
ALTER TABLE syncv3_device_list_updates SET (fillfactor = 90);
`)
return &DeviceListTable{
db: db,
}
}
// Upsert new device list changes.
func (t *DeviceListTable) Upsert(userID, deviceID string, deviceListChanges map[string]int) (err error) {
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
for targetUserID, targetState := range deviceListChanges {
if targetState != internal.DeviceListChanged && targetState != internal.DeviceListLeft {
sentry.CaptureException(fmt.Errorf("DeviceListTable.Upsert invalid target_state: %d this is a programming error", targetState))
continue
}
_, err = txn.Exec(
`INSERT INTO syncv3_device_list_updates(user_id, device_id, target_user_id, target_state, bucket) VALUES($1,$2,$3,$4,$5)
ON CONFLICT (user_id, device_id, target_user_id, bucket) DO UPDATE SET target_state=$4`,
userID, deviceID, targetUserID, targetState, BucketNew,
)
if err != nil {
return err
}
}
return nil
})
if err != nil {
sentry.CaptureException(err)
}
return
}
// Select device list changes for this client. Returns a map of user_id => change enum.
func (t *DeviceListTable) Select(userID, deviceID string, swap bool) (result internal.MapStringInt, err error) {
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
if !swap {
// read only view, just return what we previously sent and don't do anything else.
result, err = t.selectDeviceListChangesInBucket(txn, userID, deviceID, BucketSent)
return err
}
// delete the now acknowledged 'sent' data
_, err = txn.Exec(`DELETE FROM syncv3_device_list_updates WHERE user_id=$1 AND device_id=$2 AND bucket=$3`, userID, deviceID, BucketSent)
if err != nil {
return err
}
// grab any 'new' updates
result, err = t.selectDeviceListChangesInBucket(txn, userID, deviceID, BucketNew)
if err != nil {
return err
}
// mark these 'new' updates as 'sent'
_, err = txn.Exec(`UPDATE syncv3_device_list_updates SET bucket=$1 WHERE user_id=$2 AND device_id=$3 AND bucket=$4`, BucketSent, userID, deviceID, BucketNew)
return err
})
return
}
func (t *DeviceListTable) selectDeviceListChangesInBucket(txn *sqlx.Tx, userID, deviceID string, bucket int) (result internal.MapStringInt, err error) {
rows, err := txn.Query(`SELECT target_user_id, target_state FROM syncv3_device_list_updates WHERE user_id=$1 AND device_id=$2 AND bucket=$3`, userID, deviceID, bucket)
if err != nil {
return nil, err
}
defer rows.Close()
result = make(internal.MapStringInt)
var targetUserID string
var targetState int
for rows.Next() {
if err := rows.Scan(&targetUserID, &targetState); err != nil {
return nil, err
}
result[targetUserID] = targetState
}
return result, rows.Err()
}

View File

@ -0,0 +1,108 @@
package state
import (
"testing"
"github.com/matrix-org/sliding-sync/internal"
)
// Tests the DeviceLists table
func TestDeviceListTable(t *testing.T) {
db, close := connectToDB(t)
defer close()
table := NewDeviceListTable(db)
userID := "@TestDeviceListTable"
deviceID := "BOB"
// these are individual updates from Synapse from /sync v2
deltas := []internal.MapStringInt{
{
"alice": internal.DeviceListChanged,
},
{
"💣": internal.DeviceListChanged,
},
}
// apply them
for _, dd := range deltas {
err := table.Upsert(userID, deviceID, dd)
assertNoError(t, err)
}
// check we can read-only select. This doesn't modify any fields.
for i := 0; i < 3; i++ {
got, err := table.Select(userID, deviceID, false)
assertNoError(t, err)
// until we "swap" we don't consume the New entries
assertVal(t, "unexpected data on swapless select", got, internal.MapStringInt{})
}
// now swap-er-roo, which shifts everything from New into Sent.
got, err := table.Select(userID, deviceID, true)
assertNoError(t, err)
assertVal(t, "did not select what was upserted on swap select", got, internal.MapStringInt{
"alice": internal.DeviceListChanged,
"💣": internal.DeviceListChanged,
})
// this is permanent, read-only views show this too.
got, err = table.Select(userID, deviceID, false)
assertNoError(t, err)
assertVal(t, "swapless select did not return the same data as before", got, internal.MapStringInt{
"alice": internal.DeviceListChanged,
"💣": internal.DeviceListChanged,
})
// We now expect empty DeviceLists, as we swapped twice.
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertVal(t, "swap select did not return nothing", got, internal.MapStringInt{})
// get back the original state
assertNoError(t, err)
for _, dd := range deltas {
err = table.Upsert(userID, deviceID, dd)
assertNoError(t, err)
}
// Move original state to Sent by swapping
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertVal(t, "did not select what was upserted on swap select", got, internal.MapStringInt{
"alice": internal.DeviceListChanged,
"💣": internal.DeviceListChanged,
})
// Add new entries to New before acknowledging Sent
err = table.Upsert(userID, deviceID, internal.MapStringInt{
"💣": internal.DeviceListChanged,
"charlie": internal.DeviceListLeft,
})
assertNoError(t, err)
// Reading without swapping does not move New->Sent, so returns the previous value
got, err = table.Select(userID, deviceID, false)
assertNoError(t, err)
assertVal(t, "swapless select did not return the same data as before", got, internal.MapStringInt{
"alice": internal.DeviceListChanged,
"💣": internal.DeviceListChanged,
})
// Append even more items to New
err = table.Upsert(userID, deviceID, internal.MapStringInt{
"charlie": internal.DeviceListChanged, // we previously said "left" for charlie, so as "changed" is newer, we should see "changed"
"dave": internal.DeviceListLeft,
})
assertNoError(t, err)
// Now swap: all the combined items in New go into Sent
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertVal(t, "swap select did not return combined new items", got, internal.MapStringInt{
"💣": internal.DeviceListChanged,
"charlie": internal.DeviceListChanged,
"dave": internal.DeviceListLeft,
})
// Swapping again clears Sent out, and since nothing is in New we get an empty list
got, err = table.Select(userID, deviceID, true)
assertNoError(t, err)
assertVal(t, "swap select did not return combined new items", got, internal.MapStringInt{})
}