BREAKING: Modify syncv3_to_device_messages to include columns for type/sender

We want to be able to query based on these fields.
This commit is contained in:
Kegan Dougal 2022-02-22 17:04:31 +00:00
parent 8884b2c215
commit 064095c899
2 changed files with 11 additions and 26 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sync-v3/sqlutil"
"github.com/tidwall/gjson"
)
// ToDeviceTable stores to_device messages for devices.
@ -18,6 +19,8 @@ type ToDeviceRow struct {
Position int64 `db:"position"`
DeviceID string `db:"device_id"`
Message string `db:"message"`
Type string `db:"event_type"`
Sender string `db:"sender"`
}
type ToDeviceRowChunker []ToDeviceRow
@ -36,6 +39,8 @@ func NewToDeviceTable(db *sqlx.DB) *ToDeviceTable {
CREATE TABLE IF NOT EXISTS syncv3_to_device_messages (
position BIGINT NOT NULL PRIMARY KEY DEFAULT nextval('syncv3_to_device_messages_seq'),
device_id TEXT NOT NULL,
event_type TEXT NOT NULL,
sender TEXT NOT NULL,
message TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS syncv3_to_device_messages_device_idx ON syncv3_to_device_messages(device_id);
@ -80,16 +85,19 @@ func (t *ToDeviceTable) InsertMessages(deviceID string, msgs []json.RawMessage)
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
rows := make([]ToDeviceRow, len(msgs))
for i := range msgs {
m := gjson.ParseBytes(msgs[i])
rows[i] = ToDeviceRow{
DeviceID: deviceID,
Message: string(msgs[i]),
Type: m.Get("type").Str,
Sender: m.Get("sender").Str,
}
}
chunks := sqlutil.Chunkify(2, 65535, ToDeviceRowChunker(rows))
chunks := sqlutil.Chunkify(4, 65535, ToDeviceRowChunker(rows))
for _, chunk := range chunks {
result, err := t.db.NamedQuery(`INSERT INTO syncv3_to_device_messages (device_id, message)
VALUES (:device_id, :message) RETURNING position`, chunk)
result, err := t.db.NamedQuery(`INSERT INTO syncv3_to_device_messages (device_id, message, event_type, sender)
VALUES (:device_id, :message, :event_type, :sender) RETURNING position`, chunk)
if err != nil {
return err
}

View File

@ -49,28 +49,5 @@ func ProcessToDevice(store *state.Storage, userID, deviceID string, req *ToDevic
NextBatch: fmt.Sprintf("%d", upTo),
Events: msgs,
}
// Make H use since tokens correctly
// TEST: it probably won't work due to OTK counts / device lists:
/*
{
"next_batch": "s72595_4483_1934",
"rooms": {"leave": {}, "join": {}, "invite": {}},
"device_lists": {
"changed": [
"@alice:example.com",
],
"left": [
"@bob:example.com",
],
},
"device_one_time_keys_count": {
"curve25519": 10,
"signed_curve25519": 20
}
}
*/
// need to persist OTK counts (like unread notifications, clobber)
// need to persist device lists (how?)
return
}