device data: use CBOR instead of JSONB

Using JSONB columns adds too much DB load. Prefer a slightly
faster serialisation format instead, and use the old system of
handling BYTEA, which is about 2x faster.
```
BenchmarkSerialiseDeviceDataJSON-12    	    1770	    576646 ns/op	  426297 B/op	    6840 allocs/op
BenchmarkSerialiseDeviceDataCBOR-12    	    4635	    247509 ns/op	  253971 B/op	    4796 allocs/op
```
This was using a growing list of 1000 device list changes.
This commit is contained in:
Kegan Dougal 2023-08-14 18:53:45 +01:00
parent f73c8e4604
commit 4714088231
5 changed files with 247 additions and 89 deletions

2
go.mod
View File

@ -27,6 +27,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fxamacker/cbor/v2 v2.5.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
@ -41,6 +42,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect

4
go.sum
View File

@ -64,6 +64,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE=
github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/getsentry/sentry-go v0.20.0 h1:bwXW98iMRIWxn+4FgPW7vMrjmbym6HblXALmhjHmQaQ=
github.com/getsentry/sentry-go v0.20.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
@ -259,6 +261,8 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

View File

@ -2,14 +2,11 @@ package state
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"reflect"
"github.com/fxamacker/cbor/v2"
"github.com/getsentry/sentry-go"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sqlutil"
)
@ -32,7 +29,7 @@ func NewDeviceDataTable(db *sqlx.DB) *DeviceDataTable {
CREATE TABLE IF NOT EXISTS syncv3_device_data (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
data JSONB NOT NULL,
data BYTEA NOT NULL,
UNIQUE(user_id, device_id)
);
-- Set the fillfactor to 90%, to allow for HOT updates (e.g. we only
@ -58,7 +55,7 @@ func (t *DeviceDataTable) Select(userID, deviceID string, swap bool) (result *in
return err
}
// unmarshal to swap
if err = json.Unmarshal(row.Data, &result); err != nil {
if err = cbor.Unmarshal(row.Data, &result); err != nil {
return err
}
result.UserID = userID
@ -72,21 +69,19 @@ func (t *DeviceDataTable) Select(userID, deviceID string, swap bool) (result *in
writeBack.DeviceLists.New = make(map[string]int)
writeBack.ChangedBits = 0
// DeepEqual uses fewer allocations and is faster than
// json.Marshal and comparing bytes
if reflect.DeepEqual(writeBack, *result) {
if reflect.DeepEqual(result, &writeBack) {
// The update to the DB would be a no-op; don't bother with it.
// This helps reduce write usage and the contention on the unique index for
// the device_data table.
return nil
}
// re-marshal and write
data, err := cbor.Marshal(writeBack)
if err != nil {
return err
}
// Some JSON juggling in Postgres ahead. This is to avoid pushing
// DeviceLists.Sent -> DeviceLists.New over the wire again.
_, err = txn.Exec(`UPDATE syncv3_device_data SET data = jsonb_set(
jsonb_set(data, '{dl,s}', data->'dl'->'n', false), -- move 'dl.n' -> 'dl.s'
'{dl,n}', '{}', false) || '{"c":0}' -- clear 'dl.n' and set the changed bits to 0
WHERE user_id = $1 AND device_id = $2`, userID, deviceID)
_, err = txn.Exec(`UPDATE syncv3_device_data SET data=$1 WHERE user_id=$2 AND device_id=$3`, data, userID, deviceID)
return err
})
return
@ -100,89 +95,39 @@ func (t *DeviceDataTable) DeleteDevice(userID, deviceID string) error {
// Upsert combines what is in the database for this user|device with the partial entry `dd`
func (t *DeviceDataTable) Upsert(dd *internal.DeviceData) (err error) {
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
// see if anything already exists. We do this by pulling out the changed bit because we need it to calculate the new bitset,
// so it's a useful proxy for "does this data exist for this user/device?"
// We use FOR UPDATE to block any potential changes to this JSON structure through the lifetime of the transaction.
var changedBits int
err = txn.QueryRow(`SELECT data->>'c' FROM syncv3_device_data WHERE user_id=$1 AND device_id=$2 FOR UPDATE`, dd.UserID, dd.DeviceID).Scan(&changedBits)
// select what already exists
var row DeviceDataRow
err = txn.Get(&row, `SELECT data FROM syncv3_device_data WHERE user_id=$1 AND device_id=$2`, dd.UserID, dd.DeviceID)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to check if device data exists: %w", err)
}
// brand new insert, do it and return
if errors.Is(err, sql.ErrNoRows) {
// we need to tell postgres these fields are objects not arrays, else || won't behave as we want
if dd.DeviceLists.New == nil {
dd.DeviceLists.New = make(internal.MapStringInt)
}
if dd.DeviceLists.Sent == nil {
dd.DeviceLists.Sent = make(internal.MapStringInt)
}
newInsert := internal.DeviceData{
DeviceLists: dd.DeviceLists,
}
if dd.FallbackKeyTypes != nil {
newInsert.FallbackKeyTypes = dd.FallbackKeyTypes
newInsert.SetFallbackKeysChanged()
}
if dd.OTKCounts != nil {
newInsert.OTKCounts = dd.OTKCounts
newInsert.SetOTKCountChanged()
}
data, err := json.Marshal(newInsert)
if err != nil {
return fmt.Errorf("failed to marshal new device data: %w", err)
}
_, err = txn.Exec(
`INSERT INTO syncv3_device_data(user_id, device_id, data) VALUES($1,$2,$3)
ON CONFLICT (user_id, device_id) DO UPDATE SET data=$3`,
dd.UserID, dd.DeviceID, data,
)
return err
}
// There are changes and we already have a JSON object in the DB. Rather than updating the entire JSON object,
// we'll just update the keys that have changes.
// remember what the old changed bits were, as we might need to set additional bits.
dd.ChangedBits = changedBits
// if there are fallback key changes, update that key
// unmarshal and combine
var tempDD internal.DeviceData
if len(row.Data) > 0 {
if err = cbor.Unmarshal(row.Data, &tempDD); err != nil {
return err
}
}
if dd.FallbackKeyTypes != nil {
_, err = txn.Exec(`UPDATE syncv3_device_data SET data = jsonb_set(data, '{fallback}', to_jsonb($3::text[]), true) WHERE user_id = $1 AND device_id = $2`, dd.UserID, dd.DeviceID, pq.StringArray(dd.FallbackKeyTypes))
if err != nil {
return fmt.Errorf("failed to set fallback keys: %w", err)
}
// mark it as changed on the changed bits
dd.SetFallbackKeysChanged()
tempDD.FallbackKeyTypes = dd.FallbackKeyTypes
tempDD.SetFallbackKeysChanged()
}
// if there are otk count changes, update that key
if dd.OTKCounts != nil {
_, err = txn.Exec(`UPDATE syncv3_device_data SET data = jsonb_set(data, '{otk}', $3, true) WHERE user_id = $1 AND device_id = $2`, dd.UserID, dd.DeviceID, dd.OTKCounts)
if err != nil {
return fmt.Errorf("failed to set otk counts: %w", err)
}
// mark it as changed on the changed bits
dd.SetOTKCountChanged()
}
// if there are device list changes, combine what is already in the map with dd.DeviceLists.New
// This does the same thing as the old Combine function.
if len(dd.DeviceLists.New) > 0 {
// set dl.n to what was already in dl.n concatenated with our new JSON object (cast it to JSON so postgres knows it's an object)
_, err = txn.Exec(`UPDATE syncv3_device_data SET data = jsonb_set(data, '{dl,n}', data->'dl'->'n' || $3::jsonb, true) WHERE user_id = $1 AND device_id = $2`,
dd.UserID, dd.DeviceID, dd.DeviceLists.New)
if err != nil {
return fmt.Errorf("failed to update device list changes: %w", err)
}
tempDD.OTKCounts = dd.OTKCounts
tempDD.SetOTKCountChanged()
}
tempDD.DeviceLists = tempDD.DeviceLists.Combine(dd.DeviceLists)
// if we have modified fallback keys or otk counts, the bits will be different so update it.
if changedBits != dd.ChangedBits {
_, err = txn.Exec(`UPDATE syncv3_device_data SET data = jsonb_set(data, '{c}', $3) WHERE user_id = $1 AND device_id = $2`, dd.UserID, dd.DeviceID, dd.ChangedBits)
if err != nil {
return fmt.Errorf("failed to set changed bit: %w", err)
}
data, err := cbor.Marshal(tempDD)
if err != nil {
return err
}
return nil
_, err = txn.Exec(
`INSERT INTO syncv3_device_data(user_id, device_id, data) VALUES($1,$2,$3)
ON CONFLICT (user_id, device_id) DO UPDATE SET data=$3`,
dd.UserID, dd.DeviceID, data,
)
return err
})
if err != nil && err != sql.ErrNoRows {
sentry.CaptureException(err)

View File

@ -0,0 +1,142 @@
package migrations
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/fxamacker/cbor/v2"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/pressly/goose/v3"
)
func init() {
goose.AddMigrationContext(upCborDeviceData, downCborDeviceData)
}
func upCborDeviceData(ctx context.Context, tx *sql.Tx) error {
// check if we even need to do anything
var dataType string
err := tx.QueryRow("select data_type from information_schema.columns where table_name = 'syncv3_device_data' AND column_name = 'data'").Scan(&dataType)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// The table/column doesn't exist in is likely going to be created soon with the
// correct schema
return nil
}
return err
}
if strings.ToLower(dataType) == "bytea" {
return nil
}
_, err = tx.ExecContext(ctx, "ALTER TABLE IF EXISTS syncv3_device_data ADD COLUMN IF NOT EXISTS datab BYTEA;")
if err != nil {
return err
}
rows, err := tx.Query("SELECT user_id, device_id, data FROM syncv3_device_data")
if err != nil {
return err
}
defer rows.Close()
// abusing PollerID here
var deviceData sync2.PollerID
var data []byte
// map from PollerID -> deviceData
deviceDatas := make(map[sync2.PollerID][]byte)
for rows.Next() {
if err = rows.Scan(&deviceData.UserID, &deviceData.DeviceID, &data); err != nil {
return err
}
deviceDatas[deviceData] = data
}
for dd, jsonBytes := range deviceDatas {
var data internal.DeviceData
if err := json.Unmarshal(jsonBytes, &data); err != nil {
return fmt.Errorf("failed to unmarshal JSON: %v -> %v", string(jsonBytes), err)
}
cborBytes, err := cbor.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal as CBOR: %v", err)
}
_, err = tx.ExecContext(ctx, "UPDATE syncv3_device_data SET datab = $1 WHERE user_id = $2 AND device_id = $3;", cborBytes, dd.UserID, dd.DeviceID)
if err != nil {
return err
}
}
if rows.Err() != nil {
return rows.Err()
}
_, err = tx.ExecContext(ctx, "ALTER TABLE IF EXISTS syncv3_device_data DROP COLUMN IF EXISTS data;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE IF EXISTS syncv3_device_data RENAME COLUMN datab TO data;")
if err != nil {
return err
}
return nil
}
func downCborDeviceData(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "ALTER TABLE IF EXISTS syncv3_device_data ADD COLUMN IF NOT EXISTS dataj JSONB;")
if err != nil {
return err
}
rows, err := tx.Query("SELECT user_id, device_id, data FROM syncv3_device_data")
if err != nil {
return err
}
defer rows.Close()
// abusing PollerID here
var deviceData sync2.PollerID
var data []byte
// map from PollerID -> deviceData
deviceDatas := make(map[sync2.PollerID][]byte)
for rows.Next() {
if err = rows.Scan(&deviceData.UserID, &deviceData.DeviceID, &data); err != nil {
return err
}
deviceDatas[deviceData] = data
}
for dd, cborBytes := range deviceDatas {
var data internal.DeviceData
if err := cbor.Unmarshal(cborBytes, &data); err != nil {
return fmt.Errorf("failed to unmarshal CBOR: %v", err)
}
jsonBytes, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal as JSON: %v", err)
}
_, err = tx.ExecContext(ctx, "UPDATE syncv3_device_data SET dataj = $1 WHERE user_id = $2 AND device_id = $3;", jsonBytes, dd.UserID, dd.DeviceID)
if err != nil {
return err
}
}
_, err = tx.ExecContext(ctx, "ALTER TABLE IF EXISTS syncv3_device_data DROP COLUMN IF EXISTS data;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE IF EXISTS syncv3_device_data RENAME COLUMN dataj TO data;")
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,65 @@
package migrations
import (
"context"
"encoding/json"
"testing"
_ "github.com/lib/pq"
"github.com/matrix-org/sliding-sync/internal"
)
func TestCBORBMigration(t *testing.T) {
ctx := context.Background()
db, close := connectToDB(t)
defer close()
// Create the table in the old format (data = JSONB instead of BYTEA)
_, err := db.Exec(`CREATE TABLE syncv3_device_data (
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
data JSONB NOT NULL,
UNIQUE(user_id, device_id)
);`)
if err != nil {
t.Fatal(err)
}
tx, err := db.Begin()
if err != nil {
t.Fatal(err)
}
defer tx.Commit()
// insert some "invalid" data
dd := internal.DeviceData{
DeviceLists: internal.DeviceLists{
New: map[string]int{"@💣:localhost": 1},
Sent: map[string]int{},
},
OTKCounts: map[string]int{},
FallbackKeyTypes: []string{},
}
data, err := json.Marshal(dd)
if err != nil {
t.Fatal(err)
}
_, err = tx.ExecContext(ctx, `INSERT INTO syncv3_device_data (user_id, device_id, data) VALUES ($1, $2, $3)`, "bob", "bobDev", data)
if err != nil {
t.Fatal(err)
}
// validate that invalid data can be migrated upwards
err = upCborDeviceData(ctx, tx)
if err != nil {
t.Fatal(err)
}
// and downgrade again
err = downCborDeviceData(ctx, tx)
if err != nil {
t.Fatal(err)
}
}