Nuke connections after a room is invalidated

This commit is contained in:
David Robertson 2023-11-03 15:44:59 +00:00
parent eb1ada2f95
commit c6fb96ac70
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
3 changed files with 72 additions and 19 deletions

View File

@ -181,6 +181,19 @@ func (m *ConnMap) connIDsForDevice(userID, deviceID string) []ConnID {
return connIDs
}
// CloseConnsForUser closes all conns for a given user. Returns the number of conns closed.
func (m *ConnMap) CloseConnsForUser(userID string) int {
m.mu.Lock()
defer m.mu.Unlock()
conns := m.userIDToConn[userID]
logger.Trace().Str("user", userID).Int("num_conns", len(conns)).Msg("closing all device connections due to CloseConn()")
for _, cid := range conns {
m.cache.Remove(cid.String()) // this will fire TTL callbacks which calls closeConn
}
return len(conns)
}
func (m *ConnMap) closeConnExpires(connID string, value interface{}) {
m.mu.Lock()
defer m.mu.Unlock()

View File

@ -275,22 +275,7 @@ func (d *Dispatcher) notifyListeners(ctx context.Context, ed *caches.EventData,
}
}
func (d *Dispatcher) OnInvalidateRoom(ctx context.Context, roomID string) {
// First dispatch to the global cache.
receiver, ok := d.userToReceiver[DispatcherAllUsers]
if !ok {
logger.Error().Msgf("No receiver for global cache")
}
receiver.OnInvalidateRoom(ctx, roomID)
// Then dispatch to any users who are joined to that room.
joinedUsers, _ := d.jrt.JoinedUsersForRoom(roomID, nil)
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
for _, userID := range joinedUsers {
receiver = d.userToReceiver[userID]
if receiver != nil {
receiver.OnInvalidateRoom(ctx, roomID)
}
}
func (d *Dispatcher) OnInvalidateRoom(roomID string, joins, invites []string) {
// Reset the joined room tracker.
d.jrt.ReloadMembershipsForRoom(roomID, joins, invites)
}

View File

@ -63,6 +63,11 @@ type SyncLiveHandler struct {
setupHistVec *prometheus.HistogramVec
histVec *prometheus.HistogramVec
slowReqs prometheus.Counter
// destroyedConns is the number of connections that have been destoryed after
// a room invalidation payload.
// TODO: could make this a CounterVec labelled by reason, to track expiry due
// to update buffer filling, expiry due to inactivity, etc.
destroyedConns prometheus.Counter
}
func NewSync3Handler(
@ -139,6 +144,9 @@ func (h *SyncLiveHandler) Teardown() {
if h.slowReqs != nil {
prometheus.Unregister(h.slowReqs)
}
if h.destroyedConns != nil {
prometheus.Unregister(h.destroyedConns)
}
}
func (h *SyncLiveHandler) addPrometheusMetrics() {
@ -162,9 +170,17 @@ func (h *SyncLiveHandler) addPrometheusMetrics() {
Name: "slow_requests",
Help: "Counter of slow (>=50s) requests, initial or otherwise.",
})
h.destroyedConns = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sliding_sync",
Subsystem: "api",
Name: "destroyed_conns",
Help: "Counter of conns that were destroyed.",
})
prometheus.MustRegister(h.setupHistVec)
prometheus.MustRegister(h.histVec)
prometheus.MustRegister(h.slowReqs)
prometheus.MustRegister(h.destroyedConns)
}
func (h *SyncLiveHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@ -818,7 +834,46 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) {
ctx, task := internal.StartTask(context.Background(), "OnInvalidateRoom")
defer task.End()
h.Dispatcher.OnInvalidateRoom(ctx, p.RoomID)
// 1. Reload the global cache.
h.GlobalCache.OnInvalidateRoom(ctx, p.RoomID)
// Work out who is affected.
joins, invites, leaves, err := h.Storage.FetchMemberships(p.RoomID)
involvedUsers := make([]string, 0, len(joins)+len(invites)+len(leaves))
involvedUsers = append(involvedUsers, joins...)
involvedUsers = append(involvedUsers, invites...)
involvedUsers = append(involvedUsers, leaves...)
// 2. Reload the joined-room tracker.
if err != nil {
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]any{
"room_id": p.RoomID,
})
hub.CaptureException(err)
})
logger.Err(err).
Str("room_id", p.RoomID).
Msg("Failed to fetch members after cache invalidation")
}
h.Dispatcher.OnInvalidateRoom(p.RoomID, joins, invites)
// 3. Destroy involved users' caches.
for _, userID := range involvedUsers {
h.Dispatcher.Unregister(userID)
h.userCaches.Delete(userID)
}
// 4. Destroy involved users' connections.
var destroyed int
for _, userID := range involvedUsers {
destroyed += h.ConnMap.CloseConnsForUser(userID)
}
if h.destroyedConns != nil {
h.destroyedConns.Add(float64(destroyed))
}
}
func parseIntFromQuery(u *url.URL, param string) (result int64, err *internal.HandlerError) {