mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
164 lines
5.9 KiB
Go
164 lines
5.9 KiB
Go
package state
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/getsentry/sentry-go"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/matrix-org/sliding-sync/internal"
|
|
"github.com/matrix-org/sliding-sync/sqlutil"
|
|
)
|
|
|
|
const (
|
|
BucketNew = 1
|
|
BucketSent = 2
|
|
)
|
|
|
|
type DeviceListRow struct {
|
|
UserID string `db:"user_id"`
|
|
DeviceID string `db:"device_id"`
|
|
TargetUserID string `db:"target_user_id"`
|
|
TargetState int `db:"target_state"`
|
|
Bucket int `db:"bucket"`
|
|
}
|
|
|
|
type DeviceListTable struct {
|
|
db *sqlx.DB
|
|
}
|
|
|
|
func NewDeviceListTable(db *sqlx.DB) *DeviceListTable {
|
|
db.MustExec(`
|
|
CREATE TABLE IF NOT EXISTS syncv3_device_list_updates (
|
|
user_id TEXT NOT NULL,
|
|
device_id TEXT NOT NULL,
|
|
target_user_id TEXT NOT NULL,
|
|
target_state SMALLINT NOT NULL,
|
|
bucket SMALLINT NOT NULL,
|
|
UNIQUE(user_id, device_id, target_user_id, bucket)
|
|
);
|
|
-- make an index so selecting all the rows is faster
|
|
CREATE INDEX IF NOT EXISTS syncv3_device_list_updates_bucket_idx ON syncv3_device_list_updates(user_id, device_id, bucket);
|
|
-- Set the fillfactor to 90%, to allow for HOT updates (e.g. we only
|
|
-- change the data, not anything indexed like the id)
|
|
ALTER TABLE syncv3_device_list_updates SET (fillfactor = 90);
|
|
`)
|
|
return &DeviceListTable{
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
func (t *DeviceListTable) Upsert(userID, deviceID string, deviceListChanges map[string]int) (err error) {
|
|
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
|
|
return t.UpsertTx(txn, userID, deviceID, deviceListChanges)
|
|
})
|
|
if err != nil {
|
|
sentry.CaptureException(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Upsert new device list changes.
|
|
func (t *DeviceListTable) UpsertTx(txn *sqlx.Tx, userID, deviceID string, deviceListChanges map[string]int) (err error) {
|
|
if len(deviceListChanges) == 0 {
|
|
return nil
|
|
}
|
|
var deviceListRows []DeviceListRow
|
|
for targetUserID, targetState := range deviceListChanges {
|
|
if targetState != internal.DeviceListChanged && targetState != internal.DeviceListLeft {
|
|
sentry.CaptureException(fmt.Errorf("DeviceListTable.Upsert invalid target_state: %d this is a programming error", targetState))
|
|
continue
|
|
}
|
|
deviceListRows = append(deviceListRows, DeviceListRow{
|
|
UserID: userID,
|
|
DeviceID: deviceID,
|
|
TargetUserID: targetUserID,
|
|
TargetState: targetState,
|
|
Bucket: BucketNew,
|
|
})
|
|
}
|
|
chunks := sqlutil.Chunkify(5, MaxPostgresParameters, DeviceListChunker(deviceListRows))
|
|
for _, chunk := range chunks {
|
|
_, err := txn.NamedExec(`
|
|
INSERT INTO syncv3_device_list_updates(user_id, device_id, target_user_id, target_state, bucket)
|
|
VALUES(:user_id, :device_id, :target_user_id, :target_state, :bucket)
|
|
ON CONFLICT (user_id, device_id, target_user_id, bucket) DO UPDATE SET target_state = EXCLUDED.target_state`, chunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *DeviceListTable) Select(userID, deviceID string, swap bool) (result internal.MapStringInt, err error) {
|
|
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
|
|
result, err = t.SelectTx(txn, userID, deviceID, swap)
|
|
return err
|
|
})
|
|
return
|
|
}
|
|
|
|
// Select device list changes for this client. Returns a map of user_id => change enum.
|
|
func (t *DeviceListTable) SelectTx(txn *sqlx.Tx, userID, deviceID string, swap bool) (result internal.MapStringInt, err error) {
|
|
if !swap {
|
|
// read only view, just return what we previously sent and don't do anything else.
|
|
return t.selectDeviceListChangesInBucket(txn, userID, deviceID, BucketSent)
|
|
}
|
|
|
|
// delete the now acknowledged 'sent' data
|
|
_, err = txn.Exec(`DELETE FROM syncv3_device_list_updates WHERE user_id=$1 AND device_id=$2 AND bucket=$3`, userID, deviceID, BucketSent)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// grab any 'new' updates and atomically mark these as 'sent'.
|
|
// NB: we must not SELECT then UPDATE, because a 'new' row could be inserted after the SELECT and before the UPDATE, which
|
|
// would then be incorrectly moved to 'sent' without being returned to the client, dropping the data. This happens because
|
|
// the default transaction level is 'read committed', which /allows/ nonrepeatable reads which is:
|
|
// > A transaction re-reads data it has previously read and finds that data has been modified by another transaction (that committed since the initial read).
|
|
// We could change the isolation level but this incurs extra performance costs in addition to serialisation errors which
|
|
// need to be handled. It's easier to just use UPDATE .. RETURNING. Note that we don't require UPDATE .. RETURNING to be
|
|
// atomic in any way, it's just that we need to guarantee each things SELECTed is also UPDATEd (so in the scenario above,
|
|
// we don't care if the SELECT includes or excludes the 'new' row, but if it is SELECTed it MUST be UPDATEd).
|
|
rows, err := txn.Query(`UPDATE syncv3_device_list_updates SET bucket=$1 WHERE user_id=$2 AND device_id=$3 AND bucket=$4 RETURNING target_user_id, target_state`, BucketSent, userID, deviceID, BucketNew)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
result = make(internal.MapStringInt)
|
|
var targetUserID string
|
|
var targetState int
|
|
for rows.Next() {
|
|
if err := rows.Scan(&targetUserID, &targetState); err != nil {
|
|
return nil, err
|
|
}
|
|
result[targetUserID] = targetState
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
func (t *DeviceListTable) selectDeviceListChangesInBucket(txn *sqlx.Tx, userID, deviceID string, bucket int) (result internal.MapStringInt, err error) {
|
|
rows, err := txn.Query(`SELECT target_user_id, target_state FROM syncv3_device_list_updates WHERE user_id=$1 AND device_id=$2 AND bucket=$3`, userID, deviceID, bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
result = make(internal.MapStringInt)
|
|
var targetUserID string
|
|
var targetState int
|
|
for rows.Next() {
|
|
if err := rows.Scan(&targetUserID, &targetState); err != nil {
|
|
return nil, err
|
|
}
|
|
result[targetUserID] = targetState
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
type DeviceListChunker []DeviceListRow
|
|
|
|
func (c DeviceListChunker) Len() int {
|
|
return len(c)
|
|
}
|
|
func (c DeviceListChunker) Subslice(i, j int) sqlutil.Chunker {
|
|
return c[i:j]
|
|
}
|