Merge branch 'main' of github.com:matrix-org/sliding-sync into s7evink/devicedata

This commit is contained in:
Till Faelligen 2023-08-03 09:24:13 +02:00
commit 414fa865db
No known key found for this signature in database
GPG Key ID: ACCDC9606D472758
4 changed files with 158 additions and 11 deletions

View File

@ -26,7 +26,7 @@ import (
var GitCommit string
const version = "0.99.4"
const version = "0.99.5"
var (
flags = flag.NewFlagSet("goose", flag.ExitOnError)

View File

@ -3,6 +3,8 @@ package handler2
import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"os"
"sync"
"time"
@ -30,12 +32,14 @@ var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.C
// processing v2 data (as a sync2.V2DataReceiver) and publishing updates (pubsub.Payload to V2Listeners);
// and receiving and processing EnsurePolling events.
type Handler struct {
pMap sync2.IPollerMap
v2Store *sync2.Storage
Store *state.Storage
v2Pub pubsub.Notifier
v3Sub *pubsub.V3Sub
unreadMap map[string]struct {
pMap sync2.IPollerMap
v2Store *sync2.Storage
Store *state.Storage
v2Pub pubsub.Notifier
v3Sub *pubsub.V3Sub
// user_id|room_id|event_type => fnv_hash(last_event_bytes)
accountDataMap *sync.Map
unreadMap map[string]struct {
Highlight int
Notif int
}
@ -64,6 +68,7 @@ func NewHandler(
Highlight int
Notif int
}),
accountDataMap: &sync.Map{},
typingMu: &sync.Mutex{},
typingHandler: make(map[string]sync2.PollerID),
PendingTxnIDs: sync2.NewPendingTransactionIDs(pMap.DeviceIDs),
@ -451,7 +456,28 @@ func (h *Handler) UpdateUnreadCounts(ctx context.Context, roomID, userID string,
}
func (h *Handler) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) {
data, err := h.Store.InsertAccountData(userID, roomID, events)
// duplicate suppression for multiple devices on the same account.
// We suppress by remembering the last bytes for a given account data, and if they match we ignore.
dedupedEvents := make([]json.RawMessage, 0, len(events))
for i := range events {
evType := gjson.GetBytes(events[i], "type").Str
key := fmt.Sprintf("%s|%s|%s", userID, roomID, evType)
thisHash := fnvHash(events[i])
last, _ := h.accountDataMap.Load(key)
if last != nil {
lastHash := last.(uint64)
if lastHash == thisHash {
continue // skip this event
}
}
dedupedEvents = append(dedupedEvents, events[i])
h.accountDataMap.Store(key, thisHash)
}
if len(dedupedEvents) == 0 {
return
}
data, err := h.Store.InsertAccountData(userID, roomID, dedupedEvents)
if err != nil {
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update account data")
sentry.CaptureException(err)
@ -531,3 +557,9 @@ func (h *Handler) EnsurePolling(p *pubsub.V3EnsurePolling) {
})
}()
}
func fnvHash(event json.RawMessage) uint64 {
h := fnv.New64a()
h.Write(event)
return h.Sum64()
}

View File

@ -2,11 +2,13 @@ package syncv3_test
import (
"encoding/json"
"testing"
"time"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/sync3/extensions"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
"testing"
)
func TestAccountDataRespectsExtensionScope(t *testing.T) {
@ -99,14 +101,14 @@ func TestAccountDataRespectsExtensionScope(t *testing.T) {
alice,
room1,
"com.example.room",
map[string]interface{}{"room": 1, "version": 1},
map[string]interface{}{"room": 1, "version": 11},
)
room2AccountDataEvent := putRoomAccountData(
t,
alice,
room2,
"com.example.room",
map[string]interface{}{"room": 2, "version": 2},
map[string]interface{}{"room": 2, "version": 22},
)
t.Log("Alice syncs until she sees the account data for room 2. She shouldn't see account data for room 1")
@ -122,7 +124,90 @@ func TestAccountDataRespectsExtensionScope(t *testing.T) {
return m.MatchAccountData(nil, map[string][]json.RawMessage{room2: {room2AccountDataEvent}})(response)
},
)
}
// Regression test for https://github.com/matrix-org/sliding-sync/issues/189
func TestAccountDataDoesntDupe(t *testing.T) {
alice := registerNewUser(t)
alice2 := *alice
alice2.Login(t, "password", "device2")
// send some initial account data
putGlobalAccountData(t, alice, "initial", map[string]interface{}{"foo": "bar"})
// no devices are polling.
// syncing with both devices => only shows 1 copy of this event per connection
for _, client := range []*CSAPI{alice, &alice2} {
res := client.SlidingSync(t, sync3.Request{
Extensions: extensions.Request{
AccountData: &extensions.AccountDataRequest{
Core: extensions.Core{
Enabled: &boolTrue,
},
},
},
})
m.MatchResponse(t, res, MatchGlobalAccountData([]Event{
{
Type: "m.push_rules",
},
{
Type: "initial",
Content: map[string]interface{}{"foo": "bar"},
},
}))
}
// now both devices are polling, we're going to do the same thing to make sure we only see only 1 copy still.
putGlobalAccountData(t, alice, "initial2", map[string]interface{}{"foo2": "bar2"})
time.Sleep(time.Second) // TODO: we need to make sure the pollers have seen this and explciitly don't want to use SlidingSyncUntil...
var responses []*sync3.Response
for _, client := range []*CSAPI{alice, &alice2} {
res := client.SlidingSync(t, sync3.Request{
Extensions: extensions.Request{
AccountData: &extensions.AccountDataRequest{
Core: extensions.Core{
Enabled: &boolTrue,
},
},
},
})
m.MatchResponse(t, res, MatchGlobalAccountData([]Event{
{
Type: "m.push_rules",
},
{
Type: "initial",
Content: map[string]interface{}{"foo": "bar"},
},
{
Type: "initial2",
Content: map[string]interface{}{"foo2": "bar2"},
},
}))
responses = append(responses, res) // we need the pos values later
}
// now we're going to do an incremental sync with account data to make sure we don't see dupes either.
putGlobalAccountData(t, alice, "incremental", map[string]interface{}{"foo3": "bar3"})
time.Sleep(time.Second) // TODO: we need to make sure the pollers have seen this and explciitly don't want to use SlidingSyncUntil...
for i, client := range []*CSAPI{alice, &alice2} {
res := client.SlidingSync(t, sync3.Request{
Extensions: extensions.Request{
AccountData: &extensions.AccountDataRequest{
Core: extensions.Core{
Enabled: &boolTrue,
},
},
},
}, WithPos(responses[i].Pos))
m.MatchResponse(t, res, MatchGlobalAccountData([]Event{
{
Type: "incremental",
Content: map[string]interface{}{"foo3": "bar3"},
},
}))
}
}
// putAccountData is a wrapper around SetGlobalAccountData. It returns the account data

View File

@ -6,6 +6,7 @@ import (
"net/url"
"os"
"reflect"
"sort"
"strings"
"sync/atomic"
"testing"
@ -13,6 +14,7 @@ import (
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils/m"
"github.com/tidwall/gjson"
)
var (
@ -166,6 +168,34 @@ func MatchRoomInviteState(events []Event, partial bool) m.RoomMatcher {
}
}
// MatchGlobalAccountData builds a matcher which asserts that the account data in a sync
// response matches the given `globals`, with any ordering.
// If there is no account data extension in the response, the match fails.
func MatchGlobalAccountData(globals []Event) m.RespMatcher {
// sort want list by type
sort.Slice(globals, func(i, j int) bool {
return globals[i].Type < globals[j].Type
})
return func(res *sync3.Response) error {
if res.Extensions.AccountData == nil {
return fmt.Errorf("MatchGlobalAccountData: no account_data extension")
}
if len(globals) != len(res.Extensions.AccountData.Global) {
return fmt.Errorf("MatchGlobalAccountData: got %v global account data, want %v", len(res.Extensions.AccountData.Global), len(globals))
}
// sort the got list by type
got := res.Extensions.AccountData.Global
sort.Slice(got, func(i, j int) bool {
return gjson.GetBytes(got[i], "type").Str < gjson.GetBytes(got[j], "type").Str
})
if err := eventsEqual(globals, got); err != nil {
return fmt.Errorf("MatchGlobalAccountData: %s", err)
}
return nil
}
}
func registerNewUser(t *testing.T) *CSAPI {
return registerNamedUser(t, "user")
}