Load highlight/notification counts for sorting

This commit is contained in:
Kegan Dougal 2021-11-02 15:59:03 +00:00
parent 9e68f53009
commit f581d44440
5 changed files with 153 additions and 104 deletions

View File

@ -339,6 +339,7 @@ const doSyncLoop = async(accessToken, sessionId) => {
if (!currentPos) {
reqBody.required_state = requiredStateEventsInList;
reqBody.timeline_limit = 20;
reqBody.sort = ["by_highlight_count", "by_notification_count", "by_recency"];
}
// check if we are (un)subscribing to a room and modify request this one time for it
let subscribingToRoom;

View File

@ -25,6 +25,12 @@ type RoomConnMetadata struct {
NotificationCount int
}
type ConnEvent struct {
roomID string
msg *EventData
userMsg *UserRoomData
}
// ConnState tracks all high-level connection state for this connection, like the combined request
// and the underlying sorted room list. It doesn't track session IDs or positions of the connection.
type ConnState struct {
@ -34,10 +40,10 @@ type ConnState struct {
sortedJoinedRoomsPositions map[string]int // room_id -> index in sortedJoinedRooms
roomSubscriptions map[string]RoomSubscription
loadPosition int64
// A channel which v2 poll loops use to send updates to, via the ConnMap.
// A channel which the dispatcher uses to send updates to.
// Consumed when the conn is read. There is a limit to how many updates we will store before
// saying the client is ded and cleaning up the conn.
updateEvents chan *EventData
updateEvents chan *ConnEvent
globalCache *GlobalCache
userCache *UserCache
@ -51,7 +57,7 @@ func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache)
userID: userID,
roomSubscriptions: make(map[string]RoomSubscription),
sortedJoinedRoomsPositions: make(map[string]int),
updateEvents: make(chan *EventData, MaxPendingEventUpdates), // TODO: customisable
updateEvents: make(chan *ConnEvent, MaxPendingEventUpdates), // TODO: customisable
}
cs.userCacheID = cs.userCache.Subsribe(cs)
return cs
@ -77,11 +83,14 @@ func (s *ConnState) load(req *Request) error {
for i := range joinedRooms {
metadata := joinedRooms[i]
metadata.RemoveHero(s.userID)
urd := s.userCache.loadRoomData(metadata.RoomID)
rooms[i] = RoomConnMetadata{
RoomMetadata: metadata,
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(&metadata, 5), "#!()):_"),
),
HighlightCount: urd.HighlightCount,
NotificationCount: urd.NotificationCount,
}
}
@ -96,10 +105,13 @@ func (s *ConnState) sort(sortBy []string) {
if sortBy == nil {
sortBy = []string{SortByRecency}
}
s.sortedJoinedRooms.Sort(sortBy)
err := s.sortedJoinedRooms.Sort(sortBy)
for i := range s.sortedJoinedRooms {
s.sortedJoinedRoomsPositions[s.sortedJoinedRooms[i].RoomID] = i
}
if err != nil {
logger.Warn().Err(err).Strs("sort", sortBy).Msg("failed to sort")
}
}
// HandleIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
@ -200,88 +212,22 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
break blockloop
case <-time.After(10 * time.Second): // TODO configurable
break blockloop
case updateEvent := <-s.updateEvents: // TODO: keep reading until it is empty before responding.
if updateEvent.latestPos > s.loadPosition {
s.loadPosition = updateEvent.latestPos
}
// TODO: Add filters to check if this event should cause a response or should be dropped (e.g filtering out messages)
// this is why this select is in a while loop as not all update event will wake up the stream
// TODO: Implement sorting by something other than recency. With recency sorting,
// most operations are DELETE/INSERT to bump rooms to the top of the list. We only
// do an UPDATE if the most recent room gets a 2nd event.
var targetRoom RoomConnMetadata
fromIndex, ok := s.sortedJoinedRoomsPositions[updateEvent.roomID]
var lastTimestamp uint64
if !ok {
// the user may have just joined the room hence not have an entry in this list yet.
fromIndex = len(s.sortedJoinedRooms)
newRoom := s.globalCache.LoadRoom(updateEvent.roomID)
newRoom.LastMessageTimestamp = updateEvent.timestamp
newRoom.RemoveHero(s.userID)
newRoomConn := RoomConnMetadata{
RoomMetadata: *newRoom,
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(newRoom, 5), "#!()):_"),
),
case connEvent := <-s.updateEvents: // TODO: keep reading until it is empty before responding.
if connEvent.msg != nil {
subs, ops := s.processIncomingEvent(connEvent.msg)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.RoomSubscriptions[sub.RoomID] = sub
}
s.sortedJoinedRooms = append(s.sortedJoinedRooms, newRoomConn)
targetRoom = newRoomConn
} else {
targetRoom = s.sortedJoinedRooms[fromIndex]
lastTimestamp = targetRoom.LastMessageTimestamp
targetRoom.LastMessageTimestamp = updateEvent.timestamp
s.sortedJoinedRooms[fromIndex] = targetRoom
}
// re-sort
s.sort(s.muxedReq.Sort)
isSubscribedToRoom := false
if _, ok := s.roomSubscriptions[updateEvent.roomID]; ok {
// there is a subscription for this room, so update the room subscription field
response.RoomSubscriptions[updateEvent.roomID] = *s.getDeltaRoomData(updateEvent)
isSubscribedToRoom = true
}
toIndex := s.sortedJoinedRoomsPositions[updateEvent.roomID]
logger.Info().Int("from", fromIndex).Int("to", toIndex).
Uint64("prev_ts", lastTimestamp).Uint64("event_ts", updateEvent.timestamp).
Interface("room", targetRoom.RoomID).Msg("moved!")
// the toIndex may not be inside a tracked range. If it isn't, we actually need to notify about a
// different room
if !s.muxedReq.Rooms.Inside(int64(toIndex)) {
logger.Info().Msg("room isn't inside tracked range")
toIndex = int(s.muxedReq.Rooms.UpperClamp(int64(toIndex)))
if toIndex >= len(s.sortedJoinedRooms) {
// no room exists
logger.Warn().Int("to", toIndex).Int("size", len(s.sortedJoinedRooms)).Msg(
"cannot move to index, it's greater than the list of sorted rooms",
)
continue
}
if toIndex == -1 {
logger.Warn().Int("from", fromIndex).Int("to", toIndex).Interface("ranges", s.muxedReq.Rooms).Msg(
"room moved but not in tracked ranges, ignoring",
)
continue
}
// TODO inject last event if never seen before, else just room ID updateEvent = s.sortedJoinedRooms[toIndex].LastEvent
toRoom := s.sortedJoinedRooms[toIndex]
// fake an update event for this room.
// We do this because we are introducing a new room in the list because of this situation:
// tracking [10,20] and room 24 jumps to position 0, so now we are tracking [9,19] as all rooms
// have been shifted to the right
rooms := s.userCache.lazilyLoadRoomDatas(s.loadPosition, []string{toRoom.RoomID}, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
urd := rooms[toRoom.RoomID]
updateEvent = &EventData{
event: urd.Timeline[len(urd.Timeline)-1],
roomID: toRoom.RoomID,
if connEvent.userMsg != nil {
subs, ops := s.processIncomingUserEvent(connEvent.roomID, connEvent.userMsg)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.RoomSubscriptions[sub.RoomID] = sub
}
}
responseOperations = append(
responseOperations, s.moveRoom(updateEvent, fromIndex, toIndex, s.muxedReq.Rooms, isSubscribedToRoom)...,
)
break blockloop
}
}
@ -292,6 +238,100 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
return response, nil
}
func (s *ConnState) processIncomingUserEvent(roomID string, userEvent *UserRoomData) ([]Room, []ResponseOp) {
// modify notification counts
fromIndex, ok := s.sortedJoinedRoomsPositions[roomID]
if !ok {
return nil, nil
}
targetRoom := s.sortedJoinedRooms[fromIndex]
targetRoom.HighlightCount = userEvent.HighlightCount
targetRoom.NotificationCount = userEvent.NotificationCount
s.sortedJoinedRooms[fromIndex] = targetRoom
return s.resort(roomID, fromIndex, nil)
}
func (s *ConnState) processIncomingEvent(updateEvent *EventData) ([]Room, []ResponseOp) {
if updateEvent.latestPos > s.loadPosition {
s.loadPosition = updateEvent.latestPos
}
// TODO: Add filters to check if this event should cause a response or should be dropped (e.g filtering out messages)
// this is why this select is in a while loop as not all update event will wake up the stream
var targetRoom RoomConnMetadata
fromIndex, ok := s.sortedJoinedRoomsPositions[updateEvent.roomID]
if !ok {
// the user may have just joined the room hence not have an entry in this list yet.
fromIndex = len(s.sortedJoinedRooms)
newRoom := s.globalCache.LoadRoom(updateEvent.roomID)
newRoom.LastMessageTimestamp = updateEvent.timestamp
newRoom.RemoveHero(s.userID)
newRoomConn := RoomConnMetadata{
RoomMetadata: *newRoom,
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(newRoom, 5), "#!()):_"),
),
}
s.sortedJoinedRooms = append(s.sortedJoinedRooms, newRoomConn)
targetRoom = newRoomConn
} else {
targetRoom = s.sortedJoinedRooms[fromIndex]
targetRoom.LastMessageTimestamp = updateEvent.timestamp
s.sortedJoinedRooms[fromIndex] = targetRoom
}
return s.resort(updateEvent.roomID, fromIndex, updateEvent.event)
}
// Resort should be called after a specific room has been modified in `sortedJoinedRooms`.
func (s *ConnState) resort(roomID string, fromIndex int, newEvent json.RawMessage) ([]Room, []ResponseOp) {
s.sort(s.muxedReq.Sort)
var subs []Room
isSubscribedToRoom := false
if _, ok := s.roomSubscriptions[roomID]; ok {
// there is a subscription for this room, so update the room subscription field
subs = append(subs, *s.getDeltaRoomData(roomID, newEvent))
isSubscribedToRoom = true
}
toIndex := s.sortedJoinedRoomsPositions[roomID]
logger = logger.With().Str("room", roomID).Int("from", fromIndex).Int("to", toIndex).Logger()
logger.Info().Msg("moved!")
// the toIndex may not be inside a tracked range. If it isn't, we actually need to notify about a
// different room
if !s.muxedReq.Rooms.Inside(int64(toIndex)) {
logger.Info().Msg("room isn't inside tracked range")
toIndex = int(s.muxedReq.Rooms.UpperClamp(int64(toIndex)))
if toIndex >= len(s.sortedJoinedRooms) {
// no room exists
logger.Warn().Int("to", toIndex).Int("size", len(s.sortedJoinedRooms)).Msg(
"cannot move to index, it's greater than the list of sorted rooms",
)
return subs, nil
}
if toIndex == -1 {
logger.Warn().Int("from", fromIndex).Int("to", toIndex).Interface("ranges", s.muxedReq.Rooms).Msg(
"room moved but not in tracked ranges, ignoring",
)
return subs, nil
}
toRoom := s.sortedJoinedRooms[toIndex]
// fake an update event for this room.
// We do this because we are introducing a new room in the list because of this situation:
// tracking [10,20] and room 24 jumps to position 0, so now we are tracking [9,19] as all rooms
// have been shifted to the right
rooms := s.userCache.lazyLoadTimelines(s.loadPosition, []string{toRoom.RoomID}, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
urd := rooms[toRoom.RoomID]
// clobber before falling through
roomID = toRoom.RoomID
newEvent = urd.Timeline[len(urd.Timeline)-1]
}
return subs, s.moveRoom(roomID, newEvent, fromIndex, toIndex, s.muxedReq.Rooms, isSubscribedToRoom)
}
func (s *ConnState) updateRoomSubscriptions(subs, unsubs []string) map[string]Room {
result := make(map[string]Room)
for _, roomID := range subs {
@ -313,23 +353,23 @@ func (s *ConnState) updateRoomSubscriptions(subs, unsubs []string) map[string]Ro
return result
}
func (s *ConnState) getDeltaRoomData(updateEvent *EventData) *Room {
userRoomData := s.userCache.loadRoomData(updateEvent.roomID)
func (s *ConnState) getDeltaRoomData(roomID string, event json.RawMessage) *Room {
userRoomData := s.userCache.loadRoomData(roomID)
room := &Room{
RoomID: updateEvent.roomID,
RoomID: roomID,
NotificationCount: int64(userRoomData.NotificationCount),
HighlightCount: int64(userRoomData.HighlightCount),
}
if updateEvent.event != nil {
if event != nil {
room.Timeline = []json.RawMessage{
updateEvent.event,
event,
}
}
return room
}
func (s *ConnState) getInitialRoomData(roomIDs ...string) []Room {
roomIDToUserRoomData := s.userCache.lazilyLoadRoomDatas(s.loadPosition, roomIDs, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
roomIDToUserRoomData := s.userCache.lazyLoadTimelines(s.loadPosition, roomIDs, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
rooms := make([]Room, len(roomIDs))
for i, roomID := range roomIDs {
userRoomData := roomIDToUserRoomData[roomID]
@ -352,18 +392,19 @@ func (s *ConnState) getInitialRoomData(roomIDs ...string) []Room {
// interested in it (e.g the client is joined to the room or it's an invite, etc). Each callback can fire
// from different v2 poll loops, and there is no locking in order to prevent a slow ConnState from wedging the poll loop.
// We need to move this data onto a channel for onIncomingRequest to consume later.
func (s *ConnState) OnNewEvent(eventData *EventData) {
func (s *ConnState) onNewConnectionEvent(connEvent *ConnEvent) {
eventData := connEvent.msg
// TODO: remove 0 check when Initialise state returns sensible positions
if eventData.latestPos != 0 && eventData.latestPos < s.loadPosition {
if eventData != nil && eventData.latestPos != 0 && eventData.latestPos < s.loadPosition {
// do not push this event down the stream as we have already processed it when we loaded
// the room list initially.
return
}
select {
case s.updateEvents <- eventData:
case s.updateEvents <- connEvent:
case <-time.After(5 * time.Second):
// TODO: kill the connection
logger.Warn().Interface("event", *eventData).Str("user", s.userID).Msg(
logger.Warn().Interface("event", *connEvent).Str("user", s.userID).Msg(
"cannot send event to connection, buffer exceeded",
)
}
@ -382,14 +423,14 @@ func (s *ConnState) UserID() string {
// 1,2,3,4,5
// 3 bumps to top -> 3,1,2,4,5 -> DELETE index=2, INSERT val=3 index=0
// 7 bumps to top -> 7,1,2,3,4 -> DELETE index=4, INSERT val=7 index=0
func (s *ConnState) moveRoom(updateEvent *EventData, fromIndex, toIndex int, ranges SliceRanges, onlySendRoomID bool) []ResponseOp {
func (s *ConnState) moveRoom(roomID string, event json.RawMessage, fromIndex, toIndex int, ranges SliceRanges, onlySendRoomID bool) []ResponseOp {
if fromIndex == toIndex {
// issue an UPDATE, nice and easy because we don't need to move entries in the list
room := &Room{
RoomID: updateEvent.roomID,
RoomID: roomID,
}
if !onlySendRoomID {
room = s.getDeltaRoomData(updateEvent)
room = s.getDeltaRoomData(roomID, event)
}
return []ResponseOp{
&ResponseOpSingle{
@ -410,10 +451,10 @@ func (s *ConnState) moveRoom(updateEvent *EventData, fromIndex, toIndex int, ran
deleteIndex = int(ranges.LowerClamp(int64(fromIndex)))
}
room := &Room{
RoomID: updateEvent.roomID,
RoomID: roomID,
}
if !onlySendRoomID {
rooms := s.getInitialRoomData(updateEvent.roomID)
rooms := s.getInitialRoomData(roomID)
room = &rooms[0]
}
return []ResponseOp{
@ -430,15 +471,23 @@ func (s *ConnState) moveRoom(updateEvent *EventData, fromIndex, toIndex int, ran
}
// Called by the user cache when events arrive
func (s *ConnState) OnNewEvent(event *EventData) {
s.onNewConnectionEvent(&ConnEvent{
roomID: event.roomID,
msg: event,
})
}
// Called by the user cache when unread counts have changed
func (s *ConnState) OnUnreadCountsChanged(userID, roomID string, urd UserRoomData, hasCountDecreased bool) {
if !hasCountDecreased {
// if the count increases then we'll notify the user for the event which increases the count, hence
// do nothing. We only care to notify the user when the counts decrease.
return
}
room := s.globalCache.LoadRoom(roomID)
s.OnNewEvent(&EventData{
roomID: roomID,
userRoomData: &urd,
timestamp: room.LastMessageTimestamp,
s.onNewConnectionEvent(&ConnEvent{
roomID: roomID,
userMsg: &urd,
})
}

View File

@ -323,6 +323,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
dispatcher.OnNewEvents(roomIDs[9], []json.RawMessage{
newEvent,
}, 1)
t.Logf("new event %s : %s", roomIDs[9], string(newEvent))
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{

View File

@ -18,8 +18,6 @@ type EventData struct {
content gjson.Result
timestamp uint64
// TODO: remove or factor out
userRoomData *UserRoomData
// the absolute latest position for this event data. The NID for this event is guaranteed to
// be <= this value.
latestPos int64

View File

@ -58,7 +58,7 @@ func (c *UserCache) Unsubscribe(id int) {
delete(c.listeners, id)
}
func (c *UserCache) lazilyLoadRoomDatas(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
func (c *UserCache) lazyLoadTimelines(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
if c.LazyRoomDataOverride != nil {
return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents)
}