bugfix: ensure typing/receipts updates aggregate correctly

Found these by adding a bunch of tests which is now possible
with the refactoring!
This commit is contained in:
Kegan Dougal 2023-02-08 18:27:44 +00:00
parent b25f5bd129
commit 8f1366af67
11 changed files with 266 additions and 28 deletions

View File

@ -58,7 +58,7 @@ func NewReceiptTable(db *sqlx.DB) *ReceiptTable {
// Returns newly inserted receipts, or nil if there are no new receipts.
// These newly inserted receipts can then be sent to the API processes for live updates.
func (t *ReceiptTable) Insert(roomID string, ephEvent json.RawMessage) (receipts []internal.Receipt, err error) {
readReceipts, privateReceipts, err := unpackReceiptsFromEDU(roomID, ephEvent)
readReceipts, privateReceipts, err := UnpackReceiptsFromEDU(roomID, ephEvent)
if err != nil {
return nil, err
}
@ -184,7 +184,7 @@ func PackReceiptsIntoEDU(receipts []internal.Receipt) (json.RawMessage, error) {
return json.Marshal(newReceiptEDU)
}
func unpackReceiptsFromEDU(roomID string, ephEvent json.RawMessage) (readReceipts, privateReceipts []internal.Receipt, err error) {
func UnpackReceiptsFromEDU(roomID string, ephEvent json.RawMessage) (readReceipts, privateReceipts []internal.Receipt, err error) {
// unpack the receipts, of the form:
// {
// "content": {

View File

@ -219,6 +219,10 @@ func (c *GlobalCache) OnEphemeralEvent(ctx context.Context, roomID string, ephEv
c.roomIDToMetadata[roomID] = metadata
}
func (c *GlobalCache) OnReceipt(ctx context.Context, receipt internal.Receipt) {
// nothing to do but we need it because the Dispatcher demands it.
}
func (c *GlobalCache) OnNewEvent(
ctx context.Context, ed *EventData,
) {

View File

@ -1,7 +1,6 @@
package caches
import (
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
@ -55,7 +54,7 @@ func (u *TypingUpdate) Type() string {
type ReceiptUpdate struct {
RoomUpdate
EphemeralEvent json.RawMessage
Receipt internal.Receipt
}
func (u *ReceiptUpdate) Type() string {

View File

@ -478,11 +478,6 @@ func (c *UserCache) OnEphemeralEvent(ctx context.Context, roomID string, ephEven
update = &TypingUpdate{
RoomUpdate: c.newRoomUpdate(roomID),
}
case "m.receipt":
update = &ReceiptUpdate{
RoomUpdate: c.newRoomUpdate(roomID),
EphemeralEvent: ephEvent,
}
}
if update == nil {
return
@ -491,6 +486,13 @@ func (c *UserCache) OnEphemeralEvent(ctx context.Context, roomID string, ephEven
c.emitOnRoomUpdate(ctx, update)
}
func (c *UserCache) OnReceipt(ctx context.Context, receipt internal.Receipt) {
c.emitOnRoomUpdate(ctx, &ReceiptUpdate{
RoomUpdate: c.newRoomUpdate(receipt.RoomID),
Receipt: receipt,
})
}
func (c *UserCache) emitOnRoomUpdate(ctx context.Context, update RoomUpdate) {
c.listenersMu.RLock()
var listeners []UserCacheListener

View File

@ -8,6 +8,7 @@ import (
"runtime/trace"
"sync"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3/caches"
"github.com/rs/zerolog"
"github.com/tidwall/gjson"
@ -22,6 +23,7 @@ const DispatcherAllUsers = "-"
type Receiver interface {
OnNewEvent(ctx context.Context, event *caches.EventData)
OnReceipt(ctx context.Context, receipt internal.Receipt)
OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
OnRegistered(latestPos int64) error
}
@ -223,6 +225,34 @@ func (d *Dispatcher) OnEphemeralEvent(ctx context.Context, roomID string, ephEve
}
}
func (d *Dispatcher) OnReceipt(ctx context.Context, receipt internal.Receipt) {
notifyUserIDs, _ := d.jrt.JoinedUsersForRoom(receipt.RoomID, func(userID string) bool {
if userID == DispatcherAllUsers {
return false // safety guard to prevent dupe global callbacks
}
_, exists := d.userToReceiver[userID]
return exists
})
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
// global listeners (invoke before per-user listeners so caches can update)
listener := d.userToReceiver[DispatcherAllUsers]
if listener != nil {
listener.OnReceipt(ctx, receipt) // FIXME: redundant, it doesn't care about receipts
}
// poke user caches OnReceipt which then pokes ConnState
for _, userID := range notifyUserIDs {
l := d.userToReceiver[userID]
if l == nil {
continue
}
l.OnReceipt(ctx, receipt)
}
}
func (d *Dispatcher) notifyListeners(ctx context.Context, ed *caches.EventData, userIDs []string, targetUser string, shouldForceInitial bool, membership string) {
trace.Log(ctx, "dispatcher", fmt.Sprintf("%s: notify %d users (nid=%d,join_count=%d)", ed.RoomID, len(userIDs), ed.LatestPos, ed.JoinCount))
// invoke listeners

View File

@ -0,0 +1,43 @@
package extensions
import (
"context"
"testing"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3/caches"
)
var (
boolTrue = true
roomA = "!a:localhost"
roomB = "!b:localhost"
ctx = context.Background()
)
type dummyRoomUpdate struct {
roomID string
userRoomData *caches.UserRoomData
globalMetadata *internal.RoomMetadata
}
func (u *dummyRoomUpdate) Type() string {
return "dummy"
}
func (u *dummyRoomUpdate) RoomID() string {
return u.roomID
}
func (u *dummyRoomUpdate) GlobalRoomMetadata() *internal.RoomMetadata {
return u.globalMetadata
}
func (u *dummyRoomUpdate) UserRoomMetadata() *caches.UserRoomData {
return u.userRoomData
}
func assertNoError(t *testing.T, err error) {
t.Helper()
if err == nil {
return
}
t.Fatalf(err.Error())
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync3/caches"
)
@ -35,16 +36,42 @@ func (r *ReceiptsRequest) ProcessLive(ctx context.Context, res *Response, extCtx
case *caches.ReceiptUpdate:
// a live receipt event happened, send this back
if res.Receipts == nil {
edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt})
if err != nil {
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into new edu")
return
}
res.Receipts = &ReceiptsResponse{
Rooms: map[string]json.RawMessage{
update.RoomID(): update.EphemeralEvent,
update.RoomID(): edu,
},
}
} else if res.Receipts.Rooms[update.RoomID()] == nil {
// we have receipts already, but not for this room
edu, err := state.PackReceiptsIntoEDU([]internal.Receipt{update.Receipt})
if err != nil {
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu")
return
}
res.Receipts.Rooms[update.RoomID()] = edu
} else {
// aggregate receipts
res.Receipts.Rooms[update.RoomID()] = update.EphemeralEvent
// we have receipts already for this room.
// aggregate receipts: we need to unpack then repack annoyingly.
pub, priv, err := state.UnpackReceiptsFromEDU(update.RoomID(), res.Receipts.Rooms[update.RoomID()])
if err != nil {
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu")
return
}
receipts := append(pub, priv...)
// add the live one
receipts = append(receipts, update.Receipt)
edu, err := state.PackReceiptsIntoEDU(receipts)
if err != nil {
logger.Err(err).Str("user", extCtx.UserID).Str("room", update.Receipt.RoomID).Msg("failed to pack receipt into edu")
return
}
res.Receipts.Rooms[update.RoomID()] = edu
}
return
}
}

View File

@ -0,0 +1,76 @@
package extensions
import (
"encoding/json"
"reflect"
"testing"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync3/caches"
)
// Test that aggregation works, which is hard to assert in integration tests
func TestLiveReceiptsAggregation(t *testing.T) {
boolTrue := true
ext := &ReceiptsRequest{
Enableable: Enableable{
Enabled: &boolTrue,
},
}
var res Response
var extCtx Context
receiptA1 := &caches.ReceiptUpdate{
Receipt: internal.Receipt{
RoomID: roomA,
EventID: "$aaa",
UserID: "@someone:here",
TS: 12345,
},
RoomUpdate: &dummyRoomUpdate{
roomID: roomA,
},
}
receiptB1 := &caches.ReceiptUpdate{
Receipt: internal.Receipt{
RoomID: roomB,
EventID: "$bbb",
UserID: "@someone:here",
TS: 45678,
},
RoomUpdate: &dummyRoomUpdate{
roomID: roomB,
},
}
receiptA2 := &caches.ReceiptUpdate{ // this should aggregate with receiptA1 - not replace it.
Receipt: internal.Receipt{
RoomID: roomA,
EventID: "$aaa",
UserID: "@someone2:here",
TS: 45678,
},
RoomUpdate: &dummyRoomUpdate{
roomID: roomA,
},
}
// tests that the receipt response is made
ext.ProcessLive(ctx, &res, extCtx, receiptA1)
// test that aggregations work in different rooms
ext.ProcessLive(ctx, &res, extCtx, receiptB1)
// test that aggregation work in the same room (aggregate not replace)
ext.ProcessLive(ctx, &res, extCtx, receiptA2)
if res.Receipts == nil {
t.Fatalf("receipts response is empty")
}
eduA, err := state.PackReceiptsIntoEDU([]internal.Receipt{receiptA1.Receipt, receiptA2.Receipt})
assertNoError(t, err)
eduB, err := state.PackReceiptsIntoEDU([]internal.Receipt{receiptB1.Receipt})
assertNoError(t, err)
want := map[string]json.RawMessage{
roomA: eduA,
roomB: eduB,
}
if !reflect.DeepEqual(res.Receipts.Rooms, want) {
t.Fatalf("got %+v\nwant %+v", res.Receipts.Rooms, want)
}
}

View File

@ -31,12 +31,13 @@ func (r *TypingResponse) HasData(isInitial bool) bool {
func (r *TypingRequest) ProcessLive(ctx context.Context, res *Response, extCtx Context, up caches.Update) {
switch update := up.(type) {
case *caches.TypingUpdate:
// a live typing event happened, send this back
res.Typing = &TypingResponse{ // TODO aggregate
Rooms: map[string]json.RawMessage{
update.RoomID(): update.GlobalRoomMetadata().TypingEvent,
},
// a live typing event happened, send this back. Allow for aggregation (>1 typing event in same room => replace)
if res.Typing == nil {
res.Typing = &TypingResponse{
Rooms: make(map[string]json.RawMessage),
}
}
res.Typing.Rooms[update.RoomID()] = update.GlobalRoomMetadata().TypingEvent
case caches.RoomUpdate:
// if this is a room update which is included in the response, send typing notifs for this room
if _, exists := extCtx.RoomIDToTimeline[update.RoomID()]; !exists {

View File

@ -0,0 +1,62 @@
package extensions
import (
"encoding/json"
"reflect"
"testing"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3/caches"
)
// Test that aggregation works, which is hard to assert in integration tests
func TestLiveTypingAggregation(t *testing.T) {
boolTrue := true
ext := &TypingRequest{
Enableable: Enableable{
Enabled: &boolTrue,
},
}
var res Response
var extCtx Context
typingA1 := &caches.TypingUpdate{
RoomUpdate: &dummyRoomUpdate{
roomID: roomA,
globalMetadata: &internal.RoomMetadata{
RoomID: roomA,
TypingEvent: json.RawMessage(`{"type":"m.typing","content":{"user_ids":["@alice:localhost"]}}`),
},
},
}
typingB1 := &caches.TypingUpdate{
RoomUpdate: &dummyRoomUpdate{
roomID: roomB,
globalMetadata: &internal.RoomMetadata{
RoomID: roomB,
TypingEvent: json.RawMessage(`{"type":"m.typing","content":{"user_ids":["@bob:localhost"]}}`),
},
},
}
typingA2 := &caches.TypingUpdate{ // this should replace typingA1 as it clobbers on roomID
RoomUpdate: &dummyRoomUpdate{
roomID: roomA,
globalMetadata: &internal.RoomMetadata{
RoomID: roomA,
TypingEvent: json.RawMessage(`{"type":"m.typing","content":{"user_ids":["@charlie:localhost"]}}`),
},
},
}
ext.ProcessLive(ctx, &res, extCtx, typingA1)
ext.ProcessLive(ctx, &res, extCtx, typingB1)
ext.ProcessLive(ctx, &res, extCtx, typingA2)
if res.Typing == nil {
t.Fatalf("typing response is empty")
}
want := map[string]json.RawMessage{
roomA: typingA2.GlobalRoomMetadata().TypingEvent,
roomB: typingB1.GlobalRoomMetadata().TypingEvent,
}
if !reflect.DeepEqual(res.Typing.Rooms, want) {
t.Fatalf("got %+v\nwant %+v", res.Typing.Rooms, want)
}
}

View File

@ -602,23 +602,17 @@ func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt) {
if !ok {
continue
}
ephEvent, err := state.PackReceiptsIntoEDU(privateReceipts)
if err != nil {
logger.Err(err).Str("room", p.RoomID).Str("user", userID).Msg("unable to pack private receipts into EDU")
continue
for _, pr := range privateReceipts {
userCache.(*caches.UserCache).OnReceipt(ctx, pr)
}
userCache.(*caches.UserCache).OnEphemeralEvent(ctx, p.RoomID, ephEvent)
}
if len(publicReceipts) == 0 {
return
}
// inform the dispatcher of global receipts
ephEvent, err := state.PackReceiptsIntoEDU(publicReceipts)
if err != nil {
logger.Err(err).Str("room", p.RoomID).Msg("unable to pack receipts into EDU")
return
for _, pr := range publicReceipts {
h.Dispatcher.OnReceipt(ctx, pr)
}
h.Dispatcher.OnEphemeralEvent(ctx, p.RoomID, ephEvent)
}
func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing) {