sliding-sync/sync3/handler/connstate.go
2024-07-16 17:14:05 +02:00

795 lines
31 KiB
Go

package handler
import (
"context"
"encoding/json"
"time"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/sync3/caches"
"github.com/matrix-org/sliding-sync/sync3/extensions"
"github.com/prometheus/client_golang/prometheus"
"github.com/tidwall/gjson"
)
type JoinChecker interface {
IsUserJoined(userID, roomID string) bool
}
// ConnState tracks all high-level connection state for this connection, like the combined request
// and the underlying sorted room list. It doesn't track positions of the connection.
type ConnState struct {
userID string
deviceID string
// the only thing that can touch these data structures is the conn goroutine
muxedReq *sync3.Request
cancelLatestReq context.CancelFunc
lists *sync3.InternalRequestLists
// Confirmed room subscriptions. Entries in this list have been checked for things like
// "is the user joined to this room?" whereas subscriptions in muxedReq are untrusted.
roomSubscriptions map[string]sync3.RoomSubscription // room_id -> subscription
// This is some event NID which is used to anchor any requests for room data from the database
// to their per-room latest NIDs. It does this by selecting the latest NID for each requested room
// where the NID is <= this anchor value. Note that there are no ordering guarantees here: it's
// possible for the anchor to be higher than room X's latest NID and for this connection to have
// not yet seen room X's latest NID (it'll be sitting in the live buffer). This is why it's important
// that ConnState DOES NOT ignore events based on this value - it must ignore events based on the real
// load position for the room.
// If this value is negative or 0, it means that this connection has not been loaded yet.
anchorLoadPosition int64
// roomID -> latest load pos
loadPositions map[string]int64
txnIDWaiter *TxnIDWaiter
live *connStateLive
globalCache *caches.GlobalCache
userCache *caches.UserCache
userCacheID int
lazyCache *LazyCache
joinChecker JoinChecker
extensionsHandler extensions.HandlerInterface
setupHistogramVec *prometheus.HistogramVec
processHistogramVec *prometheus.HistogramVec
}
func NewConnState(
userID, deviceID string, userCache *caches.UserCache, globalCache *caches.GlobalCache,
ex extensions.HandlerInterface, joinChecker JoinChecker, setupHistVec *prometheus.HistogramVec, histVec *prometheus.HistogramVec,
maxPendingEventUpdates int, maxTransactionIDDelay time.Duration,
) *ConnState {
cs := &ConnState{
globalCache: globalCache,
userCache: userCache,
userID: userID,
deviceID: deviceID,
anchorLoadPosition: -1,
loadPositions: make(map[string]int64),
roomSubscriptions: make(map[string]sync3.RoomSubscription),
lists: sync3.NewInternalRequestLists(),
extensionsHandler: ex,
joinChecker: joinChecker,
lazyCache: NewLazyCache(),
setupHistogramVec: setupHistVec,
processHistogramVec: histVec,
}
cs.live = &connStateLive{
ConnState: cs,
updates: make(chan caches.Update, maxPendingEventUpdates),
}
cs.txnIDWaiter = NewTxnIDWaiter(
userID,
maxTransactionIDDelay,
func(delayed bool, update caches.Update) {
cs.live.onUpdate(update)
},
)
// subscribe for updates before loading. We risk seeing dupes but that's fine as load positions
// will stop us double-processing.
cs.userCacheID = cs.userCache.Subscribe(cs)
return cs
}
// load the initial joined room list, unfiltered and unsorted, and cache up the fields we care about
// like the room name. We have synchronisation issues here similar to the ConnMap's initial Load.
// However, unlike the ConnMap, we cannot just say "don't start any v2 poll loops yet". To keep things
// synchronised from duplicate event processing, this function will remember the latest NID it used
// to load the initial state, then ignore all incoming events until a syncPosition > the load position
// is received. This guards against the following race condition:
// - Conn is made. It is atomically added to the ConnMap, making it immediately eligible to be pushed new events.
// - Between the Conn being added to the ConnMap and the call to load() (done when we get the first HandleIncomingRequest call)
// N events arrive and get buffered.
// - load() bases its current state based on the latest position, which includes processing of these N events.
// - post load() we read N events, processing them a 2nd time.
func (s *ConnState) load(ctx context.Context, req *sync3.Request) error {
initialLoadPosition, joinedRooms, joinTimings, loadPositions, err := s.globalCache.LoadJoinedRooms(ctx, s.userID)
if err != nil {
return err
}
for roomID, pos := range loadPositions {
s.loadPositions[roomID] = pos
}
rooms := make([]sync3.RoomConnMetadata, len(joinedRooms))
i := 0
for _, metadata := range joinedRooms {
metadata.RemoveHero(s.userID)
urd := s.userCache.LoadRoomData(metadata.RoomID)
timing, ok := joinTimings[metadata.RoomID]
internal.AssertWithContext(ctx, "LoadJoinedRooms returned room with timing info", ok)
urd.JoinTiming = timing
interestedEventTimestampsByList := make(map[string]uint64, len(req.Lists))
for listKey, listReq := range req.Lists {
interestingActivityTs := metadata.LastMessageTimestamp
if len(listReq.BumpEventTypes) > 0 {
// Use the global cache to find the timestamp of the latest interesting
// event we can see. If there is no such event, fall back to the
// LastMessageTimestamp.
joinEvent := joinTimings[metadata.RoomID]
interestingActivityTs = joinEvent.Timestamp
for _, eventType := range listReq.BumpEventTypes {
timing := metadata.LatestEventsByType[eventType]
// we found a later event which we are authorised to see, use it instead
if joinEvent.NID < timing.NID && interestingActivityTs < timing.Timestamp {
interestingActivityTs = timing.Timestamp
}
}
}
interestedEventTimestampsByList[listKey] = interestingActivityTs
}
rooms[i] = sync3.RoomConnMetadata{
RoomMetadata: *metadata,
UserRoomData: urd,
LastInterestedEventTimestamps: interestedEventTimestampsByList,
}
i++
}
invites := s.userCache.Invites()
for _, urd := range invites {
metadata := urd.Invite.RoomMetadata()
inviteTimestampsByList := make(map[string]uint64, len(req.Lists))
for listKey, _ := range req.Lists {
inviteTimestampsByList[listKey] = metadata.LastMessageTimestamp
}
rooms = append(rooms, sync3.RoomConnMetadata{
RoomMetadata: *metadata,
UserRoomData: urd,
LastInterestedEventTimestamps: inviteTimestampsByList,
})
}
for _, r := range rooms {
s.lists.SetRoom(r)
}
s.anchorLoadPosition = initialLoadPosition
return nil
}
// OnIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool, start time.Time) (*sync3.Response, error) {
if s.anchorLoadPosition <= 0 {
// load() needs no ctx so drop it
_, region := internal.StartSpan(ctx, "load")
err := s.load(ctx, req)
if err != nil {
// in practice this means DB hit failures. If we try again later maybe it'll work, and we will because
// anchorLoadPosition is unset.
logger.Err(err).Str("conn", cid.String()).Msg("failed to load initial data")
}
region.End()
}
setupTime := time.Since(start)
s.trackSetupDuration(ctx, setupTime, isInitial)
return s.onIncomingRequest(ctx, req, isInitial)
}
// onIncomingRequest is a callback which fires when the client makes a request to the server. Whilst each request may
// be on their own goroutine, the requests are linearised for us by Conn so it is safe to modify ConnState without
// additional locking mechanisms.
func (s *ConnState) onIncomingRequest(reqCtx context.Context, req *sync3.Request, isInitial bool) (*sync3.Response, error) {
start := time.Now()
// ApplyDelta works fine if s.muxedReq is nil
var delta *sync3.RequestDelta
s.muxedReq, delta = s.muxedReq.ApplyDelta(req)
internal.Logf(reqCtx, "connstate", "new subs=%v unsubs=%v num_lists=%v", len(delta.Subs), len(delta.Unsubs), len(delta.Lists))
for key, l := range delta.Lists {
listData := ""
if l.Curr != nil {
listDataBytes, _ := json.Marshal(l.Curr)
listData = string(listDataBytes)
}
internal.Logf(reqCtx, "connstate", "list[%v] prev_empty=%v curr=%v", key, l.Prev == nil, listData)
}
for roomID, sub := range s.muxedReq.RoomSubscriptions {
internal.Logf(reqCtx, "connstate", "room sub[%v] %v", roomID, sub)
}
// work out which rooms we'll return data for and add their relevant subscriptions to the builder
// for it to mix together
builder := NewRoomsBuilder()
// works out which rooms are subscribed to but doesn't pull room data
s.buildRoomSubscriptions(reqCtx, builder, delta.Subs, delta.Unsubs)
// works out how rooms get moved about but doesn't pull room data
respLists := s.buildListSubscriptions(reqCtx, builder, delta.Lists)
// pull room data and set changes on the response
response := &sync3.Response{
Rooms: s.buildRooms(reqCtx, builder.BuildSubscriptions()), // pull room data
Lists: respLists,
}
// Handle extensions AFTER processing lists as extensions may need to know which rooms the client
// is being notified about (e.g. for room account data)
extCtx, region := internal.StartSpan(reqCtx, "extensions")
response.Extensions = s.extensionsHandler.Handle(extCtx, s.muxedReq.Extensions, extensions.Context{
UserID: s.userID,
DeviceID: s.deviceID,
RoomIDToTimeline: response.RoomIDsToTimelineEventIDs(),
IsInitial: isInitial,
RoomIDsToLists: s.lists.ListsByVisibleRoomIDs(s.muxedReq.Lists),
AllSubscribedRooms: internal.Keys(s.roomSubscriptions),
AllLists: s.muxedReq.ListKeys(),
})
region.End()
if response.ListOps() > 0 || len(response.Rooms) > 0 || response.Extensions.HasData(isInitial) {
// we're going to immediately return, so track how long this took. We don't do this for long
// polling requests as high numbers mean nothing. We need to check if we will block as otherwise
// we will have tons of fast requests logged (as they get tracked and then hit live streaming)
// In other words, this metric tracks the time it takes to process _changes_ in the client
// requests (initial connection, modifying index positions, etc) which should always be fast.
s.trackProcessDuration(reqCtx, time.Since(start), isInitial)
}
// do live tracking if we have nothing to tell the client yet
updateCtx, region := internal.StartSpan(reqCtx, "liveUpdate")
s.live.liveUpdate(updateCtx, req, s.muxedReq.Extensions, isInitial, response)
region.End()
// counts are AFTER events are applied, hence after liveUpdate
for listKey := range response.Lists {
l := response.Lists[listKey]
l.Count = s.lists.Count(listKey)
response.Lists[listKey] = l
}
// Add membership events for users sending typing notifications. We do this after live update
// and initial room loading code so we LL room members in all cases.
if response.Extensions.Typing != nil && response.Extensions.Typing.HasData(isInitial) {
s.lazyLoadTypingMembers(reqCtx, response)
}
return response, nil
}
func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBuilder, listKey string, prevReqList, nextReqList *sync3.RequestList) sync3.ResponseList {
ctx, span := internal.StartSpan(ctx, "onIncomingListRequest")
defer span.End()
roomList, overwritten := s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.DoNotOverwrite)
if nextReqList.ShouldGetAllRooms() {
if overwritten || prevReqList.FiltersChanged(nextReqList) {
// this is either a new list or the filters changed, so we need to splat all the rooms to the client.
subID := builder.AddSubscription(nextReqList.RoomSubscription)
allRoomIDs := roomList.RoomIDs()
builder.AddRoomsToSubscription(ctx, subID, allRoomIDs)
return sync3.ResponseList{
// send all the room IDs initially so the user knows which rooms in the top-level rooms map
// correspond to this list.
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: sync3.OpSync,
Range: [2]int64{0, int64(len(allRoomIDs) - 1)},
RoomIDs: allRoomIDs,
},
},
}
}
}
// TODO: list deltas
var responseOperations []sync3.ResponseOp
var prevRange sync3.SliceRanges
if prevReqList != nil {
prevRange = prevReqList.Ranges
}
// Handle SYNC / INVALIDATE ranges
var addedRanges, removedRanges sync3.SliceRanges
if prevRange != nil {
addedRanges, removedRanges, _ = prevRange.Delta(nextReqList.Ranges)
} else {
addedRanges = nextReqList.Ranges
}
sortChanged := prevReqList.SortOrderChanged(nextReqList)
filtersChanged := prevReqList.FiltersChanged(nextReqList)
if sortChanged || filtersChanged {
// the sort/filter operations have changed, invalidate everything (if there were previous syncs), re-sort and re-SYNC
if prevReqList != nil {
// there were previous syncs for this list, INVALIDATE the lot
logger.Trace().Interface("range", prevRange).Msg("INVALIDATEing because sort/filter ops have changed")
allRoomIDs := roomList.RoomIDs()
for _, r := range prevRange {
if r[0] >= roomList.Len() {
// This range will not have been echoed to the client because it is outside
// the total length of the room list; do not try to invalidate it.
continue
}
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
Operation: sync3.OpInvalidate,
Range: clampSliceRangeToListSize(ctx, r, int64(len(allRoomIDs))),
})
}
}
if filtersChanged {
// we need to re-create the list as the rooms may have completely changed
roomList, _ = s.lists.AssignList(ctx, listKey, nextReqList.Filters, nextReqList.Sort, sync3.Overwrite)
}
// resort as either we changed the sort order or we added/removed a bunch of rooms
if err := roomList.Sort(nextReqList.Sort); err != nil {
logger.Err(err).Str("key", listKey).Msg("cannot sort list")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
addedRanges = nextReqList.Ranges
removedRanges = nil
}
// send INVALIDATE for these ranges
if len(removedRanges) > 0 {
logger.Trace().Interface("range", removedRanges).Msg("INVALIDATEing because ranges were removed")
}
for i := range removedRanges {
if removedRanges[i][0] >= (roomList.Len()) {
// This range will not have been echoed to the client because it is outside
// the total length of the room list; do not try to invalidate it.
continue
}
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
Operation: sync3.OpInvalidate,
Range: clampSliceRangeToListSize(ctx, removedRanges[i], roomList.Len()),
})
}
// inform the builder about this list
subID := builder.AddSubscription(nextReqList.RoomSubscription)
// send full room data for these ranges
for i := range addedRanges {
sr := sync3.SliceRanges([][2]int64{addedRanges[i]})
subslice := sr.SliceInto(roomList)
if len(subslice) == 0 {
continue
}
sortableRooms := subslice[0].(*sync3.SortableRooms)
roomIDs := sortableRooms.RoomIDs()
// the builder will populate this with the right room data
builder.AddRoomsToSubscription(ctx, subID, roomIDs)
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
Operation: sync3.OpSync,
Range: clampSliceRangeToListSize(ctx, addedRanges[i], roomList.Len()),
RoomIDs: roomIDs,
})
}
if prevReqList != nil {
// If nothing has changed ordering wise in this list (sort/filter) but the timeline limit / req_state has,
// we need to make a new subscription registering this change to include the new data.
timelineChanged := prevReqList.TimelineLimitChanged(nextReqList)
reqStateChanged := prevReqList.RoomSubscription.RequiredStateChanged(nextReqList.RoomSubscription)
if !sortChanged && !filtersChanged && (timelineChanged || reqStateChanged) {
var newRS sync3.RoomSubscription
if timelineChanged {
newRS.TimelineLimit = nextReqList.TimelineLimit
}
if reqStateChanged {
newRS.RequiredState = nextReqList.RequiredState
}
newSubID := builder.AddSubscription(newRS)
// all the current rooms need to be added to this subscription
subslice := nextReqList.Ranges.SliceInto(roomList)
for _, ss := range subslice {
sortableRooms := ss.(*sync3.SortableRooms)
roomIDs := sortableRooms.RoomIDs()
// it's important that we filter out rooms the user is no longer joined to. Specifically,
// there is a race condition exercised in the security test TestSecurityLiveStreamEventLeftLeak
// whereby Eve syncs whilst still joined to the room, then she gets kicked, then syncs again
// with an existing session but with changed req state / timeline params. In this scenario,
// this code executes BEFORE the kick event has been processed on liveUpdate. This will cause
// the subscription to fetch the CURRENT state (though not timeline as the load position has
// not been updated) which Eve should not be able to see as she is no longer joined.
joinedRoomIDs := make([]string, 0, len(roomIDs))
for _, roomID := range roomIDs {
if !s.joinChecker.IsUserJoined(s.userID, roomID) {
continue
}
joinedRoomIDs = append(joinedRoomIDs, roomID)
}
// the builder will populate this with the right room data
builder.AddRoomsToSubscription(ctx, newSubID, joinedRoomIDs)
}
}
}
return sync3.ResponseList{
Ops: responseOperations,
// count will be filled in later
}
}
func (s *ConnState) buildListSubscriptions(ctx context.Context, builder *RoomsBuilder, listDeltas map[string]sync3.RequestListDelta) map[string]sync3.ResponseList {
ctx, span := internal.StartSpan(ctx, "buildListSubscriptions")
defer span.End()
result := make(map[string]sync3.ResponseList, len(s.muxedReq.Lists))
// loop each list and handle each independently
for listKey, list := range listDeltas {
if list.Curr == nil {
// they deleted this list
logger.Debug().Str("key", listKey).Msg("list deleted")
s.lists.DeleteList(listKey)
continue
}
result[listKey] = s.onIncomingListRequest(ctx, builder, listKey, list.Prev, list.Curr)
}
return result
}
func (s *ConnState) buildRoomSubscriptions(ctx context.Context, builder *RoomsBuilder, subs, unsubs []string) {
ctx, span := internal.StartSpan(ctx, "buildRoomSubscriptions")
defer span.End()
for _, roomID := range subs {
// check that the user is allowed to see these rooms as they can set arbitrary room IDs
if !s.joinChecker.IsUserJoined(s.userID, roomID) {
continue
}
sub, ok := s.muxedReq.RoomSubscriptions[roomID]
if !ok {
logger.Warn().Str("room_id", roomID).Msg(
"room listed in subscriptions but there is no subscription information in the request, ignoring room subscription.",
)
continue
}
s.roomSubscriptions[roomID] = sub
subID := builder.AddSubscription(sub)
builder.AddRoomsToSubscription(ctx, subID, []string{roomID})
}
for _, roomID := range unsubs {
delete(s.roomSubscriptions, roomID)
}
}
func (s *ConnState) buildRooms(ctx context.Context, builtSubs []BuiltSubscription) map[string]sync3.Room {
ctx, span := internal.StartSpan(ctx, "buildRooms")
defer span.End()
result := make(map[string]sync3.Room)
var bumpEventTypes []string
for _, x := range s.muxedReq.Lists {
bumpEventTypes = append(bumpEventTypes, x.BumpEventTypes...)
}
for _, bs := range builtSubs {
roomIDs := bs.RoomIDs
if bs.RoomSubscription.IncludeOldRooms != nil {
var oldRoomIDs []string
for _, currRoomID := range bs.RoomIDs { // <- the list of subs we definitely are including
// append old rooms if we are joined to them
currRoom := s.lists.ReadOnlyRoom(currRoomID)
var prevRoomID *string
if currRoom != nil {
prevRoomID = currRoom.PredecessorRoomID
}
for prevRoomID != nil { // <- the chain of old rooms
// if not joined, bail
if !s.joinChecker.IsUserJoined(s.userID, *prevRoomID) {
break
}
oldRoomIDs = append(oldRoomIDs, *prevRoomID)
// keep checking
prevRoom := s.lists.ReadOnlyRoom(*prevRoomID)
if prevRoom != nil {
prevRoomID = prevRoom.PredecessorRoomID
}
}
}
// If we have old rooms to fetch, do so.
if len(oldRoomIDs) > 0 {
// old rooms use a different subscription
oldRooms := s.getInitialRoomData(ctx, *bs.RoomSubscription.IncludeOldRooms, bumpEventTypes, oldRoomIDs...)
for oldRoomID, oldRoom := range oldRooms {
result[oldRoomID] = oldRoom
}
}
}
// There won't be anything to fetch, try the next subscription.
if len(roomIDs) == 0 {
continue
}
rooms := s.getInitialRoomData(ctx, bs.RoomSubscription, bumpEventTypes, roomIDs...)
for roomID, room := range rooms {
result[roomID] = room
}
}
return result
}
func (s *ConnState) lazyLoadTypingMembers(ctx context.Context, response *sync3.Response) {
for roomID, typingEvent := range response.Extensions.Typing.Rooms {
if !s.lazyCache.IsLazyLoading(roomID) {
continue
}
room, ok := response.Rooms[roomID]
if !ok {
room = sync3.Room{}
}
typingUsers := gjson.GetBytes(typingEvent, "content.user_ids")
for _, typingUserID := range typingUsers.Array() {
if s.lazyCache.IsSet(roomID, typingUserID.Str) {
// client should already know about this member
continue
}
// load the state event
memberEvent := s.globalCache.LoadStateEvent(ctx, roomID, s.loadPositions[roomID], "m.room.member", typingUserID.Str)
if memberEvent != nil {
room.RequiredState = append(room.RequiredState, memberEvent)
s.lazyCache.AddUser(roomID, typingUserID.Str)
}
}
// only add the room if we have membership events
if len(room.RequiredState) > 0 {
response.Rooms[roomID] = room
}
}
}
func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSubscription, bumpEventTypes []string, roomIDs ...string) map[string]sync3.Room {
ctx, span := internal.StartSpan(ctx, "getInitialRoomData")
defer span.End()
// 0. Load room metadata and timelines.
// We want to grab the user room data and the room metadata for each room ID. We use the globally
// highest NID we've seen to act as an anchor for the request. This anchor does not guarantee that
// events returned here have already been seen - the position is not globally ordered - so because
// room A has a position of 6 and B has 7 (so the highest is 7) does not mean that this connection
// has seen 6, as concurrent room updates cause A and B to race. This is why we then go through the
// response to this call to assign new load positions for each room.
roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...)
userRoomDatas := s.userCache.LoadRooms(roomIDs...)
timelines := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit))
// 1. Prepare lazy loading data structures, txn IDs.
roomToUsersInTimeline := make(map[string][]string, len(timelines))
roomToTimeline := make(map[string][]json.RawMessage)
for roomID, latestEvents := range timelines {
senders := make(map[string]struct{})
for _, ev := range latestEvents.Timeline {
senders[gjson.GetBytes(ev, "sender").Str] = struct{}{}
}
roomToUsersInTimeline[roomID] = internal.Keys(senders)
roomToTimeline[roomID] = latestEvents.Timeline
// remember what we just loaded so if we see these events down the live stream we know to ignore them.
// This means that requesting a direct room subscription causes the connection to jump ahead to whatever
// is in the database at the time of the call, rather than gradually converging by consuming live data.
// This is fine, so long as we jump ahead on a per-room basis. We need to make sure (ideally) that the
// room state is also pinned to the load position here, else you could see weird things in individual
// responses such as an updated room.name without the associated m.room.name event (though this will
// come through on the next request -> it converges to the right state so it isn't critical).
s.loadPositions[roomID] = latestEvents.LatestNID
}
roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, roomToTimeline)
// 2. Load required state events.
rsm := roomSub.RequiredStateMap(s.userID)
if rsm.IsLazyLoading() {
for roomID, userIDs := range roomToUsersInTimeline {
s.lazyCache.Add(roomID, userIDs...)
}
}
internal.Logf(ctx, "connstate", "getInitialRoomData for %d rooms, RequiredStateMap: %#v", len(roomIDs), rsm)
// Filter out rooms we are only invited to, as we don't need to fetch the state
// since we'll be using the invite_state only.
loadRoomIDs := make([]string, 0, len(roomIDs))
for _, roomID := range roomIDs {
userRoomData, ok := userRoomDatas[roomID]
if !ok || !userRoomData.IsInvite {
loadRoomIDs = append(loadRoomIDs, roomID)
}
}
// by reusing the same global load position anchor here, we can be sure that the state returned here
// matches the timeline we loaded earlier - the race conditions happen around pubsub updates and not
// the events table itself, so whatever position is picked based on this anchor is immutable.
roomIDToState := s.globalCache.LoadRoomState(ctx, loadRoomIDs, s.anchorLoadPosition, rsm, roomToUsersInTimeline)
if roomIDToState == nil { // e.g no required_state
roomIDToState = make(map[string][]json.RawMessage)
}
// 3. Build sync3.Room structs to return to clients.
rooms := make(map[string]sync3.Room, len(roomIDs))
for _, roomID := range roomIDs {
userRoomData, ok := userRoomDatas[roomID]
if !ok {
userRoomData = caches.NewUserRoomData()
}
metadata := roomMetadatas[roomID]
var inviteState []json.RawMessage
// handle invites specially as we do not want to leak additional data beyond the invite_state and if
// we happen to have this room in the global cache we will do.
// Furthermore, rooms the proxy have been invited to for the first time ever will not be in the global cache yet,
// which will cause errors below when we try calling functions on a nil metadata.
if userRoomData.IsInvite {
metadata = userRoomData.Invite.RoomMetadata()
inviteState = userRoomData.Invite.InviteState
}
metadata.RemoveHero(s.userID)
var requiredState []json.RawMessage
if !userRoomData.IsInvite {
requiredState = roomIDToState[roomID]
if requiredState == nil {
requiredState = make([]json.RawMessage, 0)
}
}
// Get the highest timestamp, determined by bumpEventTypes,
// for this room
roomListsMeta := s.lists.ReadOnlyRoom(roomID)
var maxTs uint64
for _, t := range bumpEventTypes {
if roomListsMeta == nil {
break
}
evMeta := roomListsMeta.LatestEventsByType[t]
if evMeta.Timestamp > maxTs {
maxTs = evMeta.Timestamp
}
}
// If we didn't find any events which would update the timestamp
// use the join event timestamp instead. Also don't leak
// timestamp from before we joined.
if maxTs == 0 || maxTs < roomListsMeta.JoinTiming.Timestamp {
if roomListsMeta != nil {
maxTs = roomListsMeta.JoinTiming.Timestamp
// If no bumpEventTypes are specified, use the
// LastMessageTimestamp so clients are still able
// to correctly sort on it.
if len(bumpEventTypes) == 0 {
maxTs = roomListsMeta.LastMessageTimestamp
}
}
}
roomName, calculated := internal.CalculateRoomName(metadata, 5) // TODO: customisable?
room := sync3.Room{
Name: roomName,
AvatarChange: sync3.NewAvatarChange(internal.CalculateAvatar(metadata, userRoomData.IsDM)),
NotificationCount: int64(userRoomData.NotificationCount),
HighlightCount: int64(userRoomData.HighlightCount),
Timeline: roomToTimeline[roomID],
RequiredState: requiredState,
InviteState: inviteState,
Initial: true,
IsDM: userRoomData.IsDM,
JoinedCount: metadata.JoinCount,
InvitedCount: &metadata.InviteCount,
PrevBatch: timelines[roomID].PrevBatch,
Timestamp: maxTs,
}
if roomSub.IncludeHeroes() && calculated {
room.Heroes = metadata.Heroes
}
rooms[roomID] = room
}
return rooms
}
func (s *ConnState) trackSetupDuration(ctx context.Context, dur time.Duration, isInitial bool) {
internal.SetRequestContextSetupDuration(ctx, dur)
if s.setupHistogramVec == nil {
return
}
val := "0"
if isInitial {
val = "1"
}
s.setupHistogramVec.WithLabelValues(val).Observe(float64(dur.Seconds()))
}
func (s *ConnState) trackProcessDuration(ctx context.Context, dur time.Duration, isInitial bool) {
internal.SetRequestContextProcessingDuration(ctx, dur)
if s.processHistogramVec == nil {
return
}
val := "0"
if isInitial {
val = "1"
}
s.processHistogramVec.WithLabelValues(val).Observe(float64(dur.Seconds()))
}
// Called when the connection is torn down
func (s *ConnState) Destroy() {
s.userCache.Unsubscribe(s.userCacheID)
logger.Debug().Str("user_id", s.userID).Str("device_id", s.deviceID).Msg("cancelling any in-flight requests")
if s.cancelLatestReq != nil {
s.cancelLatestReq()
}
}
func (s *ConnState) Alive() bool {
return !s.live.bufferFull
}
func (s *ConnState) UserID() string {
return s.userID
}
func (s *ConnState) OnUpdate(ctx context.Context, up caches.Update) {
// will eventually call s.live.onUpdate
s.txnIDWaiter.Ingest(up)
}
// Called by the user cache when updates arrive
func (s *ConnState) OnRoomUpdate(ctx context.Context, up caches.RoomUpdate) {
switch update := up.(type) {
case *caches.RoomEventUpdate:
if !update.EventData.AlwaysProcess && update.EventData.NID == 0 {
// 0 -> this event was from a 'state' block, do not poke active connections.
// This is not the same as checking if we have already processed this event: NID=0 means
// it's part of initial room state. If we sent these events, we'd send them to clients in
// the timeline section which is wrong.
return
}
internal.AssertWithContext(ctx, "missing global room metadata", update.GlobalRoomMetadata() != nil)
internal.Logf(ctx, "connstate", "queued update %d", update.EventData.NID)
s.OnUpdate(ctx, update)
case caches.RoomUpdate:
internal.AssertWithContext(ctx, "missing global room metadata", update.GlobalRoomMetadata() != nil)
s.OnUpdate(ctx, update)
default:
logger.Warn().Str("room_id", up.RoomID()).Msg("OnRoomUpdate unknown update type")
}
}
func (s *ConnState) PublishEventsUpTo(roomID string, nid int64) {
s.txnIDWaiter.PublishUpToNID(roomID, nid)
}
func (s *ConnState) SetCancelCallback(cancel context.CancelFunc) {
s.cancelLatestReq = cancel
}
// clampSliceRangeToListSize helps us to send client-friendly SYNC and INVALIDATE ranges.
//
// Suppose the client asks for a window on positions [10, 19]. If the list
// has exactly 12 rooms, the window will see 3: those at positions 10, 11
// and 12. In this situation, it is helpful for clients to be given the
// "effective" range [10, 12] rather than the "conceptual" range [10, 19].
//
// The "full" room list occupies positions [0, totalRooms - 1]. If the given range r
// does not overlap the full room list, return nil. Otherwise, return the intersection
// of r with the full room list.
func clampSliceRangeToListSize(ctx context.Context, r [2]int64, totalRooms int64) [2]int64 {
lastIndexWithRoom := totalRooms - 1
internal.AssertWithContext(ctx, "Start of range exceeds last room index in list", r[0] <= lastIndexWithRoom)
if r[1] <= lastIndexWithRoom {
return r
} else {
return [2]int64{r[0], lastIndexWithRoom}
}
}