247 lines
6.4 KiB
Go
Raw Permalink Normal View History

package sync3
import (
"fmt"
"sort"
"github.com/matrix-org/sliding-sync/internal"
)
type RoomFinder interface {
ReadOnlyRoom(roomID string) *RoomConnMetadata
}
// SortableRooms represents a list of rooms which can be sorted and updated. Maintains mappings of
// room IDs to current index positions after sorting.
type SortableRooms struct {
finder RoomFinder
2023-05-24 15:20:49 +01:00
listKey string
roomIDs []string
roomIDToIndex map[string]int // room_id -> index in rooms
}
2023-05-24 15:20:49 +01:00
func NewSortableRooms(finder RoomFinder, listKey string, rooms []string) *SortableRooms {
return &SortableRooms{
roomIDs: rooms,
finder: finder,
2023-05-24 15:20:49 +01:00
listKey: listKey,
roomIDToIndex: make(map[string]int),
}
}
func (s *SortableRooms) IndexOf(roomID string) (int, bool) {
index, ok := s.roomIDToIndex[roomID]
return index, ok
}
func (s *SortableRooms) RoomIDs() []string {
roomIDs := make([]string, len(s.roomIDs))
for i := range s.roomIDs {
roomIDs[i] = s.roomIDs[i]
}
return roomIDs
}
// Add a room to the list. Returns true if the room was added.
func (s *SortableRooms) Add(roomID string) bool {
_, exists := s.roomIDToIndex[roomID]
if exists {
return false
}
s.roomIDs = append(s.roomIDs, roomID)
s.roomIDToIndex[roomID] = len(s.roomIDs) - 1
return true
}
2023-04-05 15:36:05 +01:00
func (s *SortableRooms) Get(index int) string {
2023-04-05 18:31:03 +01:00
// TODO: find a way to plumb a context into this assert
2023-04-05 15:36:05 +01:00
internal.Assert(fmt.Sprintf("index is within len(rooms) %v < %v", index, len(s.roomIDs)), index < len(s.roomIDs))
return s.roomIDs[index]
}
func (s *SortableRooms) Remove(roomID string) int {
index, ok := s.roomIDToIndex[roomID]
if !ok {
return -1
}
delete(s.roomIDToIndex, roomID)
// splice out index
s.roomIDs = append(s.roomIDs[:index], s.roomIDs[index+1:]...)
// re-update the map
for i := index; i < len(s.roomIDs); i++ {
s.roomIDToIndex[s.roomIDs[i]] = i
}
return index
}
func (s *SortableRooms) Len() int64 {
return int64(len(s.roomIDs))
}
func (s *SortableRooms) Subslice(i, j int64) Subslicer {
2023-04-05 18:31:03 +01:00
// TODO: find a way to plumb a context.Context through to this assert
internal.Assert("i < j and are within len(rooms)", i < j && i < int64(len(s.roomIDs)) && j <= int64(len(s.roomIDs)))
return &SortableRooms{
roomIDs: s.roomIDs[i:j],
roomIDToIndex: s.roomIDToIndex,
}
}
2023-04-05 15:36:05 +01:00
func (s *SortableRooms) Sort(sortBy []string) error {
2023-04-05 18:31:03 +01:00
// TODO: find a way to plumb a context into this assert
2023-04-05 15:36:05 +01:00
internal.Assert("sortBy is not empty", len(sortBy) != 0)
comparators := []func(i, j int) int{}
for _, sort := range sortBy {
switch sort {
case SortByHighlightCount:
comparators = append(comparators, s.comparatorSortByHighlightCount)
case SortByNotificationCount:
comparators = append(comparators, s.comparatorSortByNotificationCount)
case SortByName:
comparators = append(comparators, s.comparatorSortByName)
case SortByRecency:
comparators = append(comparators, s.comparatorSortByRecency)
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
case SortByNotificationLevel:
comparators = append(comparators, s.comparatorSortByNotificationLevel)
default:
return fmt.Errorf("unknown sort order: %s", sort)
}
}
sort.SliceStable(s.roomIDs, func(i, j int) bool {
for _, fn := range comparators {
val := fn(i, j)
if val == 1 {
return true
} else if val == -1 {
return false
}
// continue to next comparator as these are equal
}
// the two items are identical
return false
})
for i := range s.roomIDs {
s.roomIDToIndex[s.roomIDs[i]] = i
}
return nil
}
// Comparator functions: -1 = false, +1 = true, 0 = match
func (s *SortableRooms) resolveRooms(i, j int) (ri, rj *RoomConnMetadata) {
ri = s.finder.ReadOnlyRoom(s.roomIDs[i])
rj = s.finder.ReadOnlyRoom(s.roomIDs[j])
return
}
func (s *SortableRooms) comparatorSortByName(i, j int) int {
ri, rj := s.resolveRooms(i, j)
if ri.CanonicalisedName == rj.CanonicalisedName {
return 0
}
if ri.CanonicalisedName < rj.CanonicalisedName {
return 1
}
return -1
}
func (s *SortableRooms) comparatorSortByRecency(i, j int) int {
ri, rj := s.resolveRooms(i, j)
tsRi := ri.GetLastInterestedEventTimestamp(s.listKey)
tsRj := rj.GetLastInterestedEventTimestamp(s.listKey)
if tsRi == tsRj {
return 0
}
if tsRi > tsRj {
return 1
}
return -1
}
func (s *SortableRooms) comparatorSortByHighlightCount(i, j int) int {
ri, rj := s.resolveRooms(i, j)
if ri.HighlightCount == rj.HighlightCount {
return 0
}
if ri.HighlightCount > rj.HighlightCount {
return 1
}
return -1
}
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
func (s *SortableRooms) comparatorSortByNotificationLevel(i, j int) int {
ri, rj := s.resolveRooms(i, j)
// highlight rooms come first
if ri.HighlightCount > 0 && rj.HighlightCount > 0 {
return 0
}
if ri.HighlightCount > 0 {
return 1
} else if rj.HighlightCount > 0 {
return -1
}
// then notification count
if ri.NotificationCount > 0 && rj.NotificationCount > 0 {
// when we are comparing rooms with notif counts, sort encrypted rooms above unencrypted rooms
// as the client needs to calculate highlight counts (so it's possible that notif counts are
// actually highlight counts!) - this is the "Lite" description in MSC3575
if ri.Encrypted && !rj.Encrypted {
return 1
} else if rj.Encrypted && !ri.Encrypted {
return -1
}
return 0
}
if ri.NotificationCount > 0 {
return 1
} else if rj.NotificationCount > 0 {
return -1
}
// no highlight or notifs get grouped together
return 0
}
func (s *SortableRooms) comparatorSortByNotificationCount(i, j int) int {
ri, rj := s.resolveRooms(i, j)
if ri.NotificationCount == rj.NotificationCount {
return 0
}
if ri.NotificationCount > rj.NotificationCount {
return 1
}
return -1
}
2022-03-23 14:13:59 +00:00
// FilteredSortableRooms is SortableRooms but where rooms are filtered before being added to the list.
// Updates to room metadata may result in rooms being added/removed.
type FilteredSortableRooms struct {
*SortableRooms
filter *RequestFilters
}
2023-05-24 15:20:49 +01:00
func NewFilteredSortableRooms(finder RoomFinder, listKey string, roomIDs []string, filter *RequestFilters) *FilteredSortableRooms {
var filteredRooms []string
if filter == nil {
filter = &RequestFilters{}
}
for _, roomID := range roomIDs {
r := finder.ReadOnlyRoom(roomID)
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
if filter.Include(r, finder) {
filteredRooms = append(filteredRooms, roomID)
}
}
return &FilteredSortableRooms{
2023-05-24 15:20:49 +01:00
SortableRooms: NewSortableRooms(finder, listKey, filteredRooms),
filter: filter,
}
}
func (f *FilteredSortableRooms) Add(roomID string) bool {
r := f.finder.ReadOnlyRoom(roomID)
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
if !f.filter.Include(r, f.finder) {
return false
}
return f.SortableRooms.Add(roomID)
}