Merge pull request #140 from matrix-org/dmr/bump-event-types-3

Preparatory work to make bump event types work on startup
This commit is contained in:
David Robertson 2023-06-06 11:15:40 +01:00 committed by GitHub
commit b055979768
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 248 additions and 114 deletions

View File

@ -6,6 +6,13 @@ import (
"strings"
)
// EventMetadata holds timing information about an event, to be used when sorting room
// lists by recency.
type EventMetadata struct {
NID int64
Timestamp uint64
}
// RoomMetadata holds room-scoped data. It is primarily used in two places:
// - in the caches.GlobalCache, to hold the latest version of data that is consistent
// between all users in the room; and
@ -21,15 +28,15 @@ type RoomMetadata struct {
CanonicalAlias string
JoinCount int
InviteCount int
// LastMessageTimestamp is the origin_server_ts of the event most recently seen in
// this room. Because events arrive at the upstream homeserver out-of-order (and
// because origin_server_ts is an untrusted event field), this timestamp can
// _decrease_ as new events come in.
// TODO removeme
LastMessageTimestamp uint64
Encrypted bool
PredecessorRoomID *string
UpgradedRoomID *string
RoomType *string
// LatestEventsByType tracks timing information for the latest event in the room,
// grouped by event type.
LatestEventsByType map[string]EventMetadata
Encrypted bool
PredecessorRoomID *string
UpgradedRoomID *string
RoomType *string
// if this room is a space, which rooms are m.space.child state events. This is the same for all users hence is global.
ChildSpaceRooms map[string]struct{}
// The latest m.typing ephemeral event for this room.
@ -38,8 +45,9 @@ type RoomMetadata struct {
func NewRoomMetadata(roomID string) *RoomMetadata {
return &RoomMetadata{
RoomID: roomID,
ChildSpaceRooms: make(map[string]struct{}),
RoomID: roomID,
LatestEventsByType: make(map[string]EventMetadata),
ChildSpaceRooms: make(map[string]struct{}),
}
}

View File

@ -328,10 +328,20 @@ func (t *EventTable) SelectLatestEventsBetween(txn *sqlx.Tx, roomID string, lowe
return events, err
}
func (t *EventTable) selectLatestEventInAllRooms(txn *sqlx.Tx) ([]Event, error) {
func (t *EventTable) selectLatestEventByTypeInAllRooms(txn *sqlx.Tx) ([]Event, error) {
result := []Event{}
// TODO: this query ends up doing a sequential scan on the events table. We have
// an index on (event_type, room_id, event_nid) so I'm a little surprised that PG
// decides to do so. Can we do something better here? Ideas:
// - Find a better query for selecting the newest event of each type in a room.
// - At present we only care about the _timestamps_ of these events. Perhaps we
// could store those in the DB (and even in an index) as a column and select
// those, to avoid having to parse the event bodies.
// - We could have the application maintain a `latest_events` table so that the
// rows can be directly read. Assuming a mostly-static set of event types, reads
// are then linear in the number of rooms.
rows, err := txn.Query(
`SELECT room_id, event FROM syncv3_events WHERE event_nid in (SELECT MAX(event_nid) FROM syncv3_events GROUP BY room_id)`,
`SELECT room_id, event_nid, event FROM syncv3_events WHERE event_nid in (SELECT MAX(event_nid) FROM syncv3_events GROUP BY room_id, event_type)`,
)
if err != nil {
return nil, err
@ -339,7 +349,7 @@ func (t *EventTable) selectLatestEventInAllRooms(txn *sqlx.Tx) ([]Event, error)
defer rows.Close()
for rows.Next() {
var ev Event
if err := rows.Scan(&ev.RoomID, &ev.JSON); err != nil {
if err := rows.Scan(&ev.RoomID, &ev.NID, &ev.JSON); err != nil {
return nil, err
}
result = append(result, ev)

View File

@ -182,13 +182,22 @@ func (s *Storage) MetadataForAllRooms(txn *sqlx.Tx, tempTableName string, result
}
// work out latest timestamps
events, err := s.accumulator.eventsTable.selectLatestEventInAllRooms(txn)
events, err := s.accumulator.eventsTable.selectLatestEventByTypeInAllRooms(txn)
if err != nil {
return err
}
for _, ev := range events {
metadata := result[ev.RoomID]
metadata, ok := result[ev.RoomID]
metadata.LastMessageTimestamp = gjson.ParseBytes(ev.JSON).Get("origin_server_ts").Uint()
if !ok {
metadata = *internal.NewRoomMetadata(ev.RoomID)
}
parsed := gjson.ParseBytes(ev.JSON)
eventMetadata := internal.EventMetadata{
NID: ev.NID,
Timestamp: parsed.Get("origin_server_ts").Uint(),
}
metadata.LatestEventsByType[parsed.Get("type").Str] = eventMetadata
// it's possible the latest event is a brand new room not caught by the first SELECT for joined
// rooms e.g when you're invited to a room so we need to make sure to set the metadata again here
metadata.RoomID = ev.RoomID
@ -601,7 +610,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
return nil, fmt.Errorf("VisibleEventNIDsBetweenForRooms.SelectEventsWithTypeStateKeyInRooms: %s", err)
}
}
joinedRoomIDs, err := s.joinedRoomsAfterPositionWithEvents(membershipEvents, userID, from)
joinNIDsByRoomID, err := s.determineJoinedRoomsFromMemberships(membershipEvents)
if err != nil {
return nil, fmt.Errorf("failed to work out joined rooms for %s at pos %d: %s", userID, from, err)
}
@ -612,7 +621,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
return nil, fmt.Errorf("failed to load membership events: %s", err)
}
return s.visibleEventNIDsWithData(joinedRoomIDs, membershipEvents, userID, from, to)
return s.visibleEventNIDsWithData(joinNIDsByRoomID, membershipEvents, userID, from, to)
}
// Work out the NID ranges to pull events from for this user. Given a from and to event nid stream position,
@ -642,7 +651,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
// - For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites)
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][][2]int64, error) {
// load *ALL* joined rooms for this user at from (inclusive)
joinedRoomIDs, err := s.JoinedRoomsAfterPosition(userID, from)
joinNIDsByRoomID, err := s.JoinedRoomsAfterPosition(userID, from)
if err != nil {
return nil, fmt.Errorf("failed to work out joined rooms for %s at pos %d: %s", userID, from, err)
}
@ -653,10 +662,10 @@ func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[st
return nil, fmt.Errorf("failed to load membership events: %s", err)
}
return s.visibleEventNIDsWithData(joinedRoomIDs, membershipEvents, userID, from, to)
return s.visibleEventNIDsWithData(joinNIDsByRoomID, membershipEvents, userID, from, to)
}
func (s *Storage) visibleEventNIDsWithData(joinedRoomIDs []string, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) {
func (s *Storage) visibleEventNIDsWithData(joinNIDsByRoomID map[string]int64, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) {
// load membership events in order and bucket based on room ID
roomIDToLogs := make(map[string][]membershipEvent)
for _, ev := range membershipEvents {
@ -718,7 +727,7 @@ func (s *Storage) visibleEventNIDsWithData(joinedRoomIDs []string, membershipEve
// For each joined room, perform the algorithm and delete the logs afterwards
result := make(map[string][][2]int64)
for _, joinedRoomID := range joinedRoomIDs {
for joinedRoomID, _ := range joinNIDsByRoomID {
roomResult := calculateVisibleEventNIDs(true, from, to, roomIDToLogs[joinedRoomID])
result[joinedRoomID] = roomResult
delete(roomIDToLogs, joinedRoomID)
@ -785,38 +794,49 @@ func (s *Storage) AllJoinedMembers(txn *sqlx.Tx, tempTableName string) (result m
return result, metadata, nil
}
func (s *Storage) JoinedRoomsAfterPosition(userID string, pos int64) ([]string, error) {
func (s *Storage) JoinedRoomsAfterPosition(userID string, pos int64) (
joinedRoomsWithJoinNIDs map[string]int64, err error,
) {
// fetch all the membership events up to and including pos
membershipEvents, err := s.accumulator.eventsTable.SelectEventsWithTypeStateKey("m.room.member", userID, 0, pos)
if err != nil {
return nil, fmt.Errorf("JoinedRoomsAfterPosition.SelectEventsWithTypeStateKey: %s", err)
}
return s.joinedRoomsAfterPositionWithEvents(membershipEvents, userID, pos)
return s.determineJoinedRoomsFromMemberships(membershipEvents)
}
func (s *Storage) joinedRoomsAfterPositionWithEvents(membershipEvents []Event, userID string, pos int64) ([]string, error) {
joinedRoomsSet := make(map[string]bool)
// determineJoinedRoomsFromMemberships scans a slice of membership events from multiple
// rooms, to determine which rooms a user is currently joined to. Those events MUST be
// - sorted by ascending NIDs, and
// - only memberships for the given user;
// neither of these preconditions are checked by this function.
//
// Returns a slice of joined room IDs and a slice of joined event NIDs, whose entries
// correspond to one another. Rooms appear in these slices in no particular order.
func (s *Storage) determineJoinedRoomsFromMemberships(membershipEvents []Event) (
joinNIDsByRoomID map[string]int64, err error,
) {
joinNIDsByRoomID = make(map[string]int64, len(membershipEvents))
for _, ev := range membershipEvents {
// some of these events will be profile changes but that's ok as we're just interested in the
// end result, not the deltas
membership := gjson.GetBytes(ev.JSON, "content.membership").Str
switch membership {
// These are "join" and the only memberships that you can transition to after
// a join: see e.g. the transition diagram in
// https://spec.matrix.org/v1.7/client-server-api/#room-membership
case "join":
joinedRoomsSet[ev.RoomID] = true
// Only remember a join NID if we are not joined to this room according to
// the state before ev.
if _, currentlyJoined := joinNIDsByRoomID[ev.RoomID]; !currentlyJoined {
joinNIDsByRoomID[ev.RoomID] = ev.NID
}
case "ban":
fallthrough
case "leave":
joinedRoomsSet[ev.RoomID] = false
}
}
joinedRooms := make([]string, 0, len(joinedRoomsSet))
for roomID, joined := range joinedRoomsSet {
if joined {
joinedRooms = append(joinedRooms, roomID)
delete(joinNIDsByRoomID, ev.RoomID)
}
}
return joinedRooms, nil
return joinNIDsByRoomID, nil
}
func (s *Storage) Teardown() {

View File

@ -113,6 +113,12 @@ func TestStorageRoomStateBeforeAndAfterEventPosition(t *testing.T) {
}
func TestStorageJoinedRoomsAfterPosition(t *testing.T) {
// Clean DB. If we don't, other tests' events will be in the DB, but we won't
// provide keys in the metadata dict we pass to MetadataForAllRooms, leading to a
// panic.
if err := cleanDB(t); err != nil {
t.Fatalf("failed to wipe DB: %s", err)
}
store := NewStorage(postgresConnectionString)
defer store.Teardown()
joinedRoomID := "!joined:bar"
@ -160,19 +166,24 @@ func TestStorageJoinedRoomsAfterPosition(t *testing.T) {
}
latestPos = latestNIDs[len(latestNIDs)-1]
}
aliceJoinedRooms, err := store.JoinedRoomsAfterPosition(alice, latestPos)
aliceJoinNIDsByRoomID, err := store.JoinedRoomsAfterPosition(alice, latestPos)
if err != nil {
t.Fatalf("failed to JoinedRoomsAfterPosition: %s", err)
}
if len(aliceJoinedRooms) != 1 || aliceJoinedRooms[0] != joinedRoomID {
t.Fatalf("JoinedRoomsAfterPosition at %v for %s got %v want %v", latestPos, alice, aliceJoinedRooms, joinedRoomID)
if len(aliceJoinNIDsByRoomID) != 1 {
t.Fatalf("JoinedRoomsAfterPosition at %v for %s got %v, want room %s only", latestPos, alice, aliceJoinNIDsByRoomID, joinedRoomID)
}
bobJoinedRooms, err := store.JoinedRoomsAfterPosition(bob, latestPos)
for gotRoomID, _ := range aliceJoinNIDsByRoomID {
if gotRoomID != joinedRoomID {
t.Fatalf("JoinedRoomsAfterPosition at %v for %s got %v want %v", latestPos, alice, gotRoomID, joinedRoomID)
}
}
bobJoinNIDsByRoomID, err := store.JoinedRoomsAfterPosition(bob, latestPos)
if err != nil {
t.Fatalf("failed to JoinedRoomsAfterPosition: %s", err)
}
if len(bobJoinedRooms) != 3 {
t.Fatalf("JoinedRoomsAfterPosition for %s got %v rooms want %v", bob, len(bobJoinedRooms), 3)
if len(bobJoinNIDsByRoomID) != 3 {
t.Fatalf("JoinedRoomsAfterPosition for %s got %v rooms want %v", bob, len(bobJoinNIDsByRoomID), 3)
}
// also test currentNotMembershipStateEventsInAllRooms
@ -199,21 +210,20 @@ func TestStorageJoinedRoomsAfterPosition(t *testing.T) {
}
}
newMetadata := func(roomID string, joinCount int) internal.RoomMetadata {
m := internal.NewRoomMetadata(roomID)
m.JoinCount = joinCount
return *m
}
// also test MetadataForAllRooms
roomIDToMetadata := map[string]internal.RoomMetadata{
joinedRoomID: {
JoinCount: 1,
},
invitedRoomID: {
JoinCount: 1,
},
banRoomID: {
JoinCount: 1,
},
bobJoinedRoomID: {
JoinCount: 2,
},
joinedRoomID: newMetadata(joinedRoomID, 1),
invitedRoomID: newMetadata(invitedRoomID, 1),
banRoomID: newMetadata(banRoomID, 1),
bobJoinedRoomID: newMetadata(bobJoinedRoomID, 2),
}
tempTableName, err := store.PrepareSnapshot(txn)
if err != nil {
t.Fatalf("PrepareSnapshot: %s", err)
@ -606,17 +616,10 @@ func TestGlobalSnapshot(t *testing.T) {
testutils.NewStateEvent(t, "m.room.member", alice, bob, map[string]interface{}{"membership": "invite"}),
},
}
// make a fresh DB which is unpolluted from other tests
db, close := connectToDB(t)
_, err := db.Exec(`
DROP TABLE IF EXISTS syncv3_rooms;
DROP TABLE IF EXISTS syncv3_invites;
DROP TABLE IF EXISTS syncv3_snapshots;
DROP TABLE IF EXISTS syncv3_spaces;`)
if err != nil {
if err := cleanDB(t); err != nil {
t.Fatalf("failed to wipe DB: %s", err)
}
close()
store := NewStorage(postgresConnectionString)
defer store.Teardown()
for roomID, stateEvents := range roomIDToEventMap {
@ -679,6 +682,18 @@ func TestGlobalSnapshot(t *testing.T) {
}
}
func cleanDB(t *testing.T) error {
// make a fresh DB which is unpolluted from other tests
db, close := connectToDB(t)
_, err := db.Exec(`
DROP TABLE IF EXISTS syncv3_rooms;
DROP TABLE IF EXISTS syncv3_invites;
DROP TABLE IF EXISTS syncv3_snapshots;
DROP TABLE IF EXISTS syncv3_spaces;`)
close()
return err
}
func assertRoomMetadata(t *testing.T, got, want internal.RoomMetadata) {
t.Helper()
assertValue(t, "CanonicalAlias", got.CanonicalAlias, want.CanonicalAlias)

View File

@ -52,7 +52,8 @@ var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.C
// information is populated at startup from the database and then kept up-to-date by hooking into the
// Dispatcher for new events.
type GlobalCache struct {
LoadJoinedRoomsOverride func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error)
// LoadJoinedRoomsOverride allows tests to mock out the behaviour of LoadJoinedRooms.
LoadJoinedRoomsOverride func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinNIDs map[string]int64, err error)
// inserts are done by v2 poll loops, selects are done by v3 request threads
// there are lots of overlapping keys as many users (threads) can be joined to the same room (key)
@ -76,49 +77,72 @@ func (c *GlobalCache) OnRegistered(_ context.Context, _ int64) error {
return nil
}
// Load the current room metadata for the given room IDs. Races unless you call this in a dispatcher loop.
// LoadRooms loads the current room metadata for the given room IDs. Races unless you call this in a dispatcher loop.
// Always returns copies of the room metadata so ownership can be passed to other threads.
// Keeps the ordering of the room IDs given.
func (c *GlobalCache) LoadRooms(ctx context.Context, roomIDs ...string) map[string]*internal.RoomMetadata {
c.roomIDToMetadataMu.RLock()
defer c.roomIDToMetadataMu.RUnlock()
result := make(map[string]*internal.RoomMetadata, len(roomIDs))
for i := range roomIDs {
roomID := roomIDs[i]
sr := c.roomIDToMetadata[roomID]
if sr == nil {
logger.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room, generating stub")
c.roomIDToMetadata[roomID] = internal.NewRoomMetadata(roomID)
sr = c.roomIDToMetadata[roomID]
}
srCopy := *sr
// copy the heroes or else we may modify the same slice which would be bad :(
srCopy.Heroes = make([]internal.Hero, len(sr.Heroes))
for i := range sr.Heroes {
srCopy.Heroes[i] = sr.Heroes[i]
}
result[roomID] = &srCopy
result[roomID] = c.copyRoom(roomID)
}
return result
}
// Load all current joined room metadata for the user given. Returns the absolute database position along
// with the results. TODO: remove with LoadRoomState?
func (c *GlobalCache) LoadJoinedRooms(ctx context.Context, userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) {
// LoadRoomsFromMap is like LoadRooms, except it is given a map with room IDs as keys.
// The values in that map are completely ignored.
func (c *GlobalCache) LoadRoomsFromMap(ctx context.Context, joinNIDsByRoomID map[string]int64) map[string]*internal.RoomMetadata {
c.roomIDToMetadataMu.RLock()
defer c.roomIDToMetadataMu.RUnlock()
result := make(map[string]*internal.RoomMetadata, len(joinNIDsByRoomID))
for roomID, _ := range joinNIDsByRoomID {
result[roomID] = c.copyRoom(roomID)
}
return result
}
// copyRoom returns a copy of the internal.RoomMetadata stored for this room.
// This is an internal implementation detail of LoadRooms and LoadRoomsFromMap.
// If the room is not present in the global cache, returns a stub metadata entry.
// The caller MUST acquire a read lock on roomIDToMetadataMu before calling this.
func (c *GlobalCache) copyRoom(roomID string) *internal.RoomMetadata {
sr := c.roomIDToMetadata[roomID]
if sr == nil {
logger.Warn().Str("room", roomID).Msg("GlobalCache.LoadRoom: no metadata for this room, returning stub")
return internal.NewRoomMetadata(roomID)
}
srCopy := *sr
// copy the heroes or else we may modify the same slice which would be bad :(
srCopy.Heroes = make([]internal.Hero, len(sr.Heroes))
for i := range sr.Heroes {
srCopy.Heroes[i] = sr.Heroes[i]
}
return &srCopy
}
// LoadJoinedRooms loads all current joined room metadata for the user given, together
// with the NID of the user's latest join (excluding profile changes) to the room.
// Returns the absolute database position (the latest event NID across the whole DB),
// along with the results.
// TODO: remove with LoadRoomState?
func (c *GlobalCache) LoadJoinedRooms(ctx context.Context, userID string) (
pos int64, joinedRooms map[string]*internal.RoomMetadata, joinNIDsByRoomID map[string]int64, err error,
) {
if c.LoadJoinedRoomsOverride != nil {
return c.LoadJoinedRoomsOverride(userID)
}
initialLoadPosition, err := c.store.LatestEventNID()
if err != nil {
return 0, nil, err
return 0, nil, nil, err
}
joinedRoomIDs, err := c.store.JoinedRoomsAfterPosition(userID, initialLoadPosition)
joinNIDsByRoomID, err = c.store.JoinedRoomsAfterPosition(userID, initialLoadPosition)
if err != nil {
return 0, nil, err
return 0, nil, nil, err
}
// TODO: no guarantee that this state is the same as latest unless called in a dispatcher loop
rooms := c.LoadRooms(ctx, joinedRoomIDs...)
return initialLoadPosition, rooms, nil
rooms := c.LoadRoomsFromMap(ctx, joinNIDsByRoomID)
return initialLoadPosition, rooms, joinNIDsByRoomID, nil
}
func (c *GlobalCache) LoadStateEvent(ctx context.Context, roomID string, loadPosition int64, evType, stateKey string) json.RawMessage {

View File

@ -56,6 +56,8 @@ type UserRoomData struct {
Tags map[string]float64
// LoadPos is an event NID. UserRoomData instances represent the status of this room after the corresponding event, as seen by this user.
LoadPos int64
// JoinNID is the NID of our latest join to the room, excluding profile changes.
JoinNID int64
}
func NewUserRoomData() UserRoomData {
@ -210,10 +212,16 @@ func (c *UserCache) Unsubscribe(id int) {
delete(c.listeners, id)
}
// OnRegistered is called after the sync3.Dispatcher has successfully registered this
// cache to receive updates. We use this to run some final initialisation logic that
// is sensitive to race conditions; confusingly, most of the initialisation is driven
// externally by sync3.SyncLiveHandler.userCache. It's importatn that we don't spend too
// long inside this function, because it is called within a global lock on the
// sync3.Dispatcher (see sync3.Dispatcher.Register).
func (c *UserCache) OnRegistered(ctx context.Context, _ int64) error {
// select all spaces the user is a part of to seed the cache correctly. This has to be done in
// the OnRegistered callback which has locking guarantees. This is why...
latestPos, joinedRooms, err := c.globalCache.LoadJoinedRooms(ctx, c.UserID)
latestPos, joinedRooms, joinNIDs, err := c.globalCache.LoadJoinedRooms(ctx, c.UserID)
if err != nil {
return fmt.Errorf("failed to load joined rooms: %s", err)
}
@ -249,7 +257,8 @@ func (c *UserCache) OnRegistered(ctx context.Context, _ int64) error {
// |<--------new space event---------------|
//
// the db pos is _always_ equal to or ahead of the dispatcher, so we will discard any position less than this.
// the db pos is _always_ equal to or ahead of the dispatcher, so we will discard
// any updates from the dispatcher with position less than this.
c.latestPos = latestPos
for _, room := range joinedRooms {
// inject space children events
@ -263,6 +272,18 @@ func (c *UserCache) OnRegistered(ctx context.Context, _ int64) error {
})
}
}
// Record when we joined the room. We've just had to scan the history of our
// membership in this room to produce joinedRooms above, so we may as well
// do this here too.
c.roomToDataMu.Lock()
urd, ok := c.roomToData[room.RoomID]
if !ok {
urd = NewUserRoomData()
}
urd.JoinNID = joinNIDs[room.RoomID]
c.roomToData[room.RoomID] = urd
c.roomToDataMu.Unlock()
}
return nil
}

View File

@ -19,10 +19,14 @@ var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.C
const DispatcherAllUsers = "-"
// Receiver represents the callbacks that a Dispatcher may fire.
type Receiver interface {
OnNewEvent(ctx context.Context, event *caches.EventData)
OnReceipt(ctx context.Context, receipt internal.Receipt)
OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
// OnRegistered is called after a successful call to Dispatcher.Register. After
// this call, the receiver will be told about events whose NID is greater than
// latestPos.
OnRegistered(ctx context.Context, latestPos int64) error
}
@ -31,7 +35,8 @@ type Dispatcher struct {
jrt *JoinedRoomsTracker
userToReceiver map[string]Receiver
userToReceiverMu *sync.RWMutex
latestPos int64
// latestPos is an eventNID, the largest that this dispatcher has seen.
latestPos int64
}
func NewDispatcher() *Dispatcher {

View File

@ -84,7 +84,7 @@ func NewConnState(
// - load() bases its current state based on the latest position, which includes processing of these N events.
// - post load() we read N events, processing them a 2nd time.
func (s *ConnState) load(ctx context.Context, req *sync3.Request) error {
initialLoadPosition, joinedRooms, err := s.globalCache.LoadJoinedRooms(ctx, s.userID)
initialLoadPosition, joinedRooms, joinNIDs, err := s.globalCache.LoadJoinedRooms(ctx, s.userID)
if err != nil {
return err
}
@ -93,6 +93,8 @@ func (s *ConnState) load(ctx context.Context, req *sync3.Request) error {
for _, metadata := range joinedRooms {
metadata.RemoveHero(s.userID)
urd := s.userCache.LoadRoomData(metadata.RoomID)
urd.JoinNID = joinNIDs[metadata.RoomID]
interestedEventTimestampsByList := make(map[string]uint64, len(req.Lists))
for listKey, _ := range req.Lists {
// Best-effort only: we're not going to scan the database for all events in
@ -101,8 +103,8 @@ func (s *ConnState) load(ctx context.Context, req *sync3.Request) error {
interestedEventTimestampsByList[listKey] = metadata.LastMessageTimestamp
}
rooms[i] = sync3.RoomConnMetadata{
RoomMetadata: *metadata,
UserRoomData: urd,
RoomMetadata: *metadata,
UserRoomData: urd,
LastInterestedEventTimestamps: interestedEventTimestampsByList,
}
i++

View File

@ -85,12 +85,16 @@ func TestConnStateInitial(t *testing.T) {
roomB.RoomID: {userID},
roomC.RoomID: {userID},
})
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) {
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinNIDs map[string]int64, err error) {
return 1, map[string]*internal.RoomMetadata{
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
}, nil
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
}, map[string]int64{
roomA.RoomID: 123,
roomB.RoomID: 456,
roomC.RoomID: 789,
}, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
dispatcher.Register(context.Background(), userCache.UserID, userCache)
@ -253,12 +257,14 @@ func TestConnStateMultipleRanges(t *testing.T) {
roomID: {userID},
})
}
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) {
res := make(map[string]*internal.RoomMetadata)
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinNIDs map[string]int64, err error) {
roomMetadata := make(map[string]*internal.RoomMetadata)
joinNIDs = make(map[string]int64)
for i, r := range rooms {
res[r.RoomID] = rooms[i]
roomMetadata[r.RoomID] = rooms[i]
joinNIDs[r.RoomID] = 123456 // Dummy value
}
return 1, res, nil
return 1, roomMetadata, joinNIDs, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = mockLazyRoomOverride
@ -425,13 +431,19 @@ func TestBumpToOutsideRange(t *testing.T) {
roomC.RoomID: {userID},
roomD.RoomID: {userID},
})
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) {
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinNIDs map[string]int64, err error) {
return 1, map[string]*internal.RoomMetadata{
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
roomD.RoomID: &roomD,
}, nil
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
roomD.RoomID: &roomD,
}, map[string]int64{
roomA.RoomID: 1,
roomB.RoomID: 2,
roomC.RoomID: 3,
roomD.RoomID: 4,
}, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = mockLazyRoomOverride
@ -523,13 +535,19 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
roomC.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "c"}),
roomD.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "d"}),
}
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, err error) {
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms map[string]*internal.RoomMetadata, joinNIDs map[string]int64, err error) {
return 1, map[string]*internal.RoomMetadata{
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
roomD.RoomID: &roomD,
}, nil
roomA.RoomID: &roomA,
roomB.RoomID: &roomB,
roomC.RoomID: &roomC,
roomD.RoomID: &roomD,
}, map[string]int64{
roomA.RoomID: 1,
roomB.RoomID: 2,
roomC.RoomID: 3,
roomD.RoomID: 4,
}, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {

View File

@ -451,6 +451,17 @@ func (h *SyncLiveHandler) CacheForUser(userID string) *caches.UserCache {
return nil
}
// userCache fetches an existing caches.UserCache for this user if one exists. If not,
// it
// - creates a blank caches.UserCache struct,
// - fires callbacks on that struct as necessary to populate it with initial state,
// - stores the struct so it will not be recreated in the future, and
// - registers the cache with the Dispatcher.
//
// Some extra initialisation takes place in caches.UserCache.OnRegister.
// TODO: the calls to uc.OnBlahBlah etc can be moved into NewUserCache, now that the
//
// UserCache holds a reference to the storage layer.
func (h *SyncLiveHandler) userCache(userID string) (*caches.UserCache, error) {
// bail if we already have a cache
c, ok := h.userCaches.Load(userID)