Invalidation: don't bother propagating a snapshot

This commit is contained in:
David Robertson 2023-09-08 12:12:59 +01:00
parent c1cc0eba25
commit afe589921e
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
7 changed files with 17 additions and 19 deletions

View File

@ -131,8 +131,7 @@ type V2ExpiredToken struct {
func (*V2ExpiredToken) Type() string { return "V2ExpiredToken" }
type V2InvalidateRoom struct {
RoomID string
SnapshotID int64
RoomID string
}
func (*V2InvalidateRoom) Type() string { return "V2InvalidateRoom" }

View File

@ -321,10 +321,10 @@ type AccumulateResult struct {
// TODO: is this redundant---identical to len(TimelineNIDs)?
NumNew int
TimelineNIDs []int64
// ReloadSnapshot is set to the snapshot ID representing the current state of the room, if this Accumulate has
// changed state in a non-incremental fashion. If so, the application should reload its view of this room from the
// given snapshot. Otherwise this is 0.
ReloadFromSnapshot int64
// RequiresReload is set to true when we have accumulated a non-incremental state
// change (typically a redaction) that requires consumers to reload the room state
// from the latest snapshot.
RequiresReload bool
}
// Accumulate internal state from a user's sync response. The timeline order MUST be in the order
@ -515,7 +515,7 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, prevBatch
// TODO: this is going to send out a cache invalidation when someone redacts a
// message event, which is unnecessary. Can we get eventsTable.Redact to tell us
// if it redacted any state events, and only include a snapID here if so?
result.ReloadFromSnapshot = snapID
result.RequiresReload = true
}
if err = a.spacesTable.HandleSpaceUpdates(txn, newEvents); err != nil {

View File

@ -302,10 +302,9 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID, prev
}
// TODO: does the ordering of this versus V2Accumulate matter here?
if accResult.ReloadFromSnapshot > 0 {
if accResult.RequiresReload {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InvalidateRoom{
RoomID: roomID,
SnapshotID: accResult.ReloadFromSnapshot,
RoomID: roomID,
})
}

View File

@ -386,6 +386,6 @@ func (c *GlobalCache) OnNewEvent(
c.roomIDToMetadata[ed.RoomID] = metadata
}
func (c *GlobalCache) OnInvalidateRoom(ctx context.Context, roomID string, fromSnapshotID int64) {
func (c *GlobalCache) OnInvalidateRoom(ctx context.Context, roomID string) {
logger.Warn().Msgf("DMR: invalidate GLOBALLLLLL room %s new snapshot %d", roomID, fromSnapshotID)
}

View File

@ -751,6 +751,6 @@ func (u *UserCache) ShouldIgnore(userID string) bool {
return ignored
}
func (u *UserCache) OnInvalidateRoom(ctx context.Context, roomID string, fromSnapshotID int64) {
logger.Warn().Msgf("DMR: invalidate USERRRRRRR room %s new snapshot %d", roomID, fromSnapshotID)
func (u *UserCache) OnInvalidateRoom(ctx context.Context, roomID string) {
logger.Warn().Msgf("DMR: invalidate USERRRRRRR room %s new snapshot %d", roomID)
}

View File

@ -24,7 +24,7 @@ type Receiver interface {
OnNewEvent(ctx context.Context, event *caches.EventData)
OnReceipt(ctx context.Context, receipt internal.Receipt)
OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
OnInvalidateRoom(ctx context.Context, roomID string, fromSnapshotID int64)
OnInvalidateRoom(ctx context.Context, roomID string)
// OnRegistered is called after a successful call to Dispatcher.Register
OnRegistered(ctx context.Context) error
}
@ -287,7 +287,7 @@ func (d *Dispatcher) notifyListeners(ctx context.Context, ed *caches.EventData,
}
}
func (d *Dispatcher) OnInvalidateRoom(ctx context.Context, roomID string, snapshotID int64) {
func (d *Dispatcher) OnInvalidateRoom(ctx context.Context, roomID string) {
joinedUsers, _ := d.jrt.JoinedUsersForRoom(roomID, nil)
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
@ -297,6 +297,6 @@ func (d *Dispatcher) OnInvalidateRoom(ctx context.Context, roomID string, snapsh
logger.Warn().Str("user_id", userID).Msgf("User has no receiver")
continue
}
receiver.OnInvalidateRoom(ctx, roomID, snapshotID)
receiver.OnInvalidateRoom(ctx, roomID)
}
}

View File

@ -806,9 +806,9 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) {
ctx, task := internal.StartTask(context.Background(), "OnInvalidateRoom")
defer task.End()
logger.Warn().Msgf("DMR: invalidate room %s new snapshot %d", p.RoomID, p.SnapshotID)
h.GlobalCache.OnInvalidateRoom(ctx, p.RoomID, p.SnapshotID)
h.Dispatcher.OnInvalidateRoom(ctx, p.RoomID, p.SnapshotID)
logger.Warn().Msgf("DMR: invalidate room %s", p.RoomID)
h.GlobalCache.OnInvalidateRoom(ctx, p.RoomID)
h.Dispatcher.OnInvalidateRoom(ctx, p.RoomID)
}
func parseIntFromQuery(u *url.URL, param string) (result int64, err *internal.HandlerError) {