mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
170 lines
4.5 KiB
Go
170 lines
4.5 KiB
Go
package state
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/lib/pq"
|
|
"github.com/matrix-org/sliding-sync/sqlutil"
|
|
"github.com/tidwall/gjson"
|
|
)
|
|
|
|
const (
|
|
RelationMSpaceParent = 1
|
|
RelationMSpaceChild = 2
|
|
)
|
|
|
|
type SpaceRelation struct {
|
|
Parent string `db:"parent"`
|
|
Child string `db:"child"`
|
|
Relation int `db:"relation"`
|
|
Ordering string `db:"ordering"`
|
|
IsSuggested bool `db:"suggested"`
|
|
}
|
|
|
|
func (sr *SpaceRelation) Key() string {
|
|
return fmt.Sprintf("%s-%s-%d", sr.Parent, sr.Child, sr.Relation)
|
|
}
|
|
|
|
// Returns a space relation from a compatible event, else nil.
|
|
func NewSpaceRelationFromEvent(ev Event) (sr *SpaceRelation, isDeleted bool) {
|
|
event := gjson.ParseBytes(ev.JSON)
|
|
if !event.Get("state_key").Exists() {
|
|
return nil, false
|
|
}
|
|
switch ev.Type {
|
|
case "m.space.child":
|
|
return &SpaceRelation{
|
|
Parent: ev.RoomID,
|
|
Child: ev.StateKey,
|
|
Relation: RelationMSpaceChild,
|
|
Ordering: event.Get("content.ordering").Str,
|
|
IsSuggested: event.Get("content.suggested").Bool(),
|
|
}, !event.Get("content.via").IsArray()
|
|
case "m.space.parent":
|
|
return &SpaceRelation{
|
|
Parent: ev.StateKey,
|
|
Child: ev.RoomID,
|
|
Relation: RelationMSpaceParent,
|
|
// parent events have a Canonical field but we don't care?
|
|
}, !event.Get("content.via").IsArray()
|
|
default:
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// SpacesTable stores the space graph for all users.
|
|
type SpacesTable struct{}
|
|
|
|
func NewSpacesTable(db *sqlx.DB) *SpacesTable {
|
|
// make sure tables are made
|
|
db.MustExec(`
|
|
CREATE TABLE IF NOT EXISTS syncv3_spaces (
|
|
parent TEXT NOT NULL,
|
|
child TEXT NOT NULL,
|
|
relation SMALLINT NOT NULL,
|
|
suggested BOOL NOT NULL,
|
|
ordering TEXT NOT NULL, -- "" for unset
|
|
UNIQUE(parent, child, relation)
|
|
);
|
|
`)
|
|
return &SpacesTable{}
|
|
}
|
|
|
|
// Insert space relations by (parent, child, relation)
|
|
func (t *SpacesTable) BulkInsert(txn *sqlx.Tx, relations []SpaceRelation) error {
|
|
if len(relations) == 0 {
|
|
return nil
|
|
}
|
|
chunks := sqlutil.Chunkify(5, MaxPostgresParameters, SpaceRelationChunker(relations))
|
|
for _, chunk := range chunks {
|
|
_, err := txn.NamedExec(`
|
|
INSERT INTO syncv3_spaces (parent, child, relation, ordering, suggested)
|
|
VALUES (:parent, :child, :relation, :ordering, :suggested) ON CONFLICT (parent, child, relation) DO UPDATE SET ordering = EXCLUDED.ordering, suggested = EXCLUDED.suggested`, chunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete space relations by (parent, child, relation)
|
|
func (t *SpacesTable) BulkDelete(txn *sqlx.Tx, relations []SpaceRelation) error {
|
|
if len(relations) == 0 {
|
|
return nil
|
|
}
|
|
for _, r := range relations {
|
|
_, err := txn.Exec(
|
|
`DELETE FROM syncv3_spaces WHERE parent=$1 AND child=$2 AND relation=$3`, r.Parent, r.Child, r.Relation,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Select all children for these spaces
|
|
func (t *SpacesTable) SelectChildren(txn *sqlx.Tx, spaces []string) (map[string][]SpaceRelation, error) {
|
|
result := make(map[string][]SpaceRelation)
|
|
var data []SpaceRelation
|
|
err := txn.Select(&data, `SELECT parent, child, relation, ordering, suggested FROM syncv3_spaces WHERE parent = ANY($1)`, pq.StringArray(spaces))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// bucket by space
|
|
for _, d := range data {
|
|
result[d.Parent] = append(result[d.Parent], d)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (t *SpacesTable) HandleSpaceUpdates(txn *sqlx.Tx, events []Event) error {
|
|
// pull out relations, and bucket them so the last event wins to ensure we always use the latest
|
|
// values in case someone repeatedly adds/removes the same space
|
|
relations := make(map[string]struct {
|
|
relation *SpaceRelation
|
|
isDeleted bool
|
|
})
|
|
for _, ev := range events {
|
|
r, isDeleted := NewSpaceRelationFromEvent(ev)
|
|
if r != nil {
|
|
relations[r.Key()] = struct {
|
|
relation *SpaceRelation
|
|
isDeleted bool
|
|
}{
|
|
relation: r,
|
|
isDeleted: isDeleted,
|
|
}
|
|
}
|
|
}
|
|
|
|
// now bucket by add/remove for bulk operations
|
|
var added, removed []SpaceRelation
|
|
for _, r := range relations {
|
|
if r.isDeleted {
|
|
removed = append(removed, *r.relation)
|
|
} else {
|
|
added = append(added, *r.relation)
|
|
}
|
|
}
|
|
// update the database
|
|
if err := t.BulkInsert(txn, added); err != nil {
|
|
return fmt.Errorf("failed to BulkInsert: %s", err)
|
|
}
|
|
if err := t.BulkDelete(txn, removed); err != nil {
|
|
return fmt.Errorf("failed to BulkDelete: %s", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type SpaceRelationChunker []SpaceRelation
|
|
|
|
func (c SpaceRelationChunker) Len() int {
|
|
return len(c)
|
|
}
|
|
func (c SpaceRelationChunker) Subslice(i, j int) sqlutil.Chunker {
|
|
return c[i:j]
|
|
}
|