mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Read unread counts on startup; cache counts when live streaming
With a few more tests
This commit is contained in:
parent
c3486d7ff3
commit
4b377b3b6d
@ -23,6 +23,27 @@ func NewUnreadTable(db *sqlx.DB) *UnreadTable {
|
||||
return &UnreadTable{db}
|
||||
}
|
||||
|
||||
func (t *UnreadTable) SelectAllNonZeroCounts(callback func(roomID, userID string, highlightCount, notificationCount int)) error {
|
||||
rows, err := t.db.Query(
|
||||
`SELECT user_id, room_id, notification_count, highlight_count FROM syncv3_unread WHERE notification_count > 0 OR highlight_count > 0`,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var roomID string
|
||||
var userID string
|
||||
var highlightCount int
|
||||
var notifCount int
|
||||
if err := rows.Scan(&userID, &roomID, ¬ifCount, &highlightCount); err != nil {
|
||||
return err
|
||||
}
|
||||
callback(roomID, userID, highlightCount, notifCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *UnreadTable) SelectUnreadCounters(userID, roomID string) (highlightCount, notificationCount int, err error) {
|
||||
err = t.db.QueryRow(
|
||||
`SELECT notification_count, highlight_count FROM syncv3_unread WHERE user_id=$1 AND room_id=$2`, userID, roomID,
|
||||
|
@ -30,12 +30,41 @@ func TestUnreadTable(t *testing.T) {
|
||||
assertUnread(t, table, userID, roomC, 0, 2)
|
||||
|
||||
// try all kinds of updates
|
||||
assertNoError(t, table.UpdateUnreadCounters(userID, roomA, &zero, nil)) // one
|
||||
assertNoError(t, table.UpdateUnreadCounters(userID, roomB, nil, &two)) // one
|
||||
assertNoError(t, table.UpdateUnreadCounters(userID, roomC, &two, &two)) // both
|
||||
assertNoError(t, table.UpdateUnreadCounters(userID, roomA, &zero, nil)) // one
|
||||
assertNoError(t, table.UpdateUnreadCounters(userID, roomB, nil, &two)) // one
|
||||
assertNoError(t, table.UpdateUnreadCounters(userID, roomC, &zero, &zero)) // both
|
||||
assertUnread(t, table, userID, roomA, 0, 1)
|
||||
assertUnread(t, table, userID, roomB, 2, 2)
|
||||
assertUnread(t, table, userID, roomC, 2, 2)
|
||||
assertUnread(t, table, userID, roomC, 0, 0)
|
||||
|
||||
wantHighlights := map[string]int{
|
||||
roomB: 2,
|
||||
}
|
||||
wantNotifs := map[string]int{
|
||||
roomA: 1,
|
||||
roomB: 2,
|
||||
}
|
||||
assertNoError(t, table.SelectAllNonZeroCounts(func(gotRoomID string, gotUserID string, gotHighlight int, gotNotif int) {
|
||||
if userID != gotUserID {
|
||||
t.Errorf("SelectAllNonZeroCounts: got user %v want %v", gotUserID, userID)
|
||||
}
|
||||
wantHighlight := wantHighlights[gotRoomID]
|
||||
if wantHighlight != gotHighlight {
|
||||
t.Errorf("SelectAllNonZeroCounts for %v got %d highlights, want %d", gotRoomID, gotHighlight, wantHighlight)
|
||||
}
|
||||
wantNotif := wantNotifs[gotRoomID]
|
||||
if wantNotif != gotNotif {
|
||||
t.Errorf("SelectAllNonZeroCounts for %v got %d notifs, want %d", gotRoomID, gotNotif, wantNotif)
|
||||
}
|
||||
delete(wantHighlights, gotRoomID)
|
||||
delete(wantNotifs, gotRoomID)
|
||||
}))
|
||||
if len(wantHighlights) != 0 {
|
||||
t.Errorf("SelectAllNonZeroCounts missed highlight rooms: %+v", wantHighlights)
|
||||
}
|
||||
if len(wantNotifs) != 0 {
|
||||
t.Errorf("SelectAllNonZeroCounts missed notif rooms: %+v", wantNotifs)
|
||||
}
|
||||
}
|
||||
|
||||
func assertUnread(t *testing.T, table *UnreadTable, userID, roomID string, wantHighight, wantNotif int) {
|
||||
|
@ -38,7 +38,8 @@ type ConnMap struct {
|
||||
jrt *JoinedRoomsTracker
|
||||
|
||||
// TODO: this can be pulled out of here and invoked from handler?
|
||||
globalRoomInfo map[string]*SortableRoom
|
||||
globalRoomInfo map[string]*SortableRoom
|
||||
perUserPerRoomData map[string]userRoomData
|
||||
|
||||
store *state.Storage
|
||||
|
||||
@ -47,13 +48,14 @@ type ConnMap struct {
|
||||
|
||||
func NewConnMap(store *state.Storage) *ConnMap {
|
||||
cm := &ConnMap{
|
||||
userIDToConn: make(map[string][]*Conn),
|
||||
connIDToConn: make(map[string]*Conn),
|
||||
cache: ttlcache.NewCache(),
|
||||
mu: &sync.Mutex{},
|
||||
jrt: NewJoinedRoomsTracker(),
|
||||
store: store,
|
||||
globalRoomInfo: make(map[string]*SortableRoom),
|
||||
userIDToConn: make(map[string][]*Conn),
|
||||
connIDToConn: make(map[string]*Conn),
|
||||
cache: ttlcache.NewCache(),
|
||||
mu: &sync.Mutex{},
|
||||
jrt: NewJoinedRoomsTracker(),
|
||||
store: store,
|
||||
globalRoomInfo: make(map[string]*SortableRoom),
|
||||
perUserPerRoomData: make(map[string]userRoomData),
|
||||
}
|
||||
cm.cache.SetTTL(30 * time.Minute) // TODO: customisable
|
||||
cm.cache.SetExpirationCallback(cm.closeConn)
|
||||
@ -93,6 +95,7 @@ func (m *ConnMap) GetOrCreateConn(cid ConnID, userID string) (*Conn, bool) {
|
||||
// - LoadBaseline loads the latest NID=50 due to LatestEventNID, processes this join event in the process
|
||||
// - OnNewEvents is called with the join event
|
||||
// - join event is processed twice.
|
||||
// TODO: Move to cache struct
|
||||
func (m *ConnMap) LoadBaseline(roomIDToUserIDs map[string][]string) error {
|
||||
// TODO: load last N events as a sliding window?
|
||||
latestEvents, err := m.store.SelectLatestEventInAllRooms()
|
||||
@ -136,9 +139,17 @@ func (m *ConnMap) LoadBaseline(roomIDToUserIDs map[string][]string) error {
|
||||
m.jrt.UserJoinedRoom(userID, roomID)
|
||||
}
|
||||
}
|
||||
// select all non-zero highlight or notif counts and set them, as this is less costly than looping every room/user pair
|
||||
err = m.store.UnreadTable.SelectAllNonZeroCounts(func(roomID, userID string, highlightCount, notificationCount int) {
|
||||
m.OnUnreadCounts(roomID, userID, &highlightCount, ¬ificationCount)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load unread counts: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Move to cache struct
|
||||
func (m *ConnMap) LoadRoom(roomID string) *SortableRoom {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
@ -218,6 +229,20 @@ func (m *ConnMap) closeConn(connID string, value interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move to cache struct
|
||||
func (m *ConnMap) OnUnreadCounts(roomID, userID string, highlightCount, notifCount *int) {
|
||||
key := userID + " " + roomID
|
||||
userData := m.perUserPerRoomData[key]
|
||||
if highlightCount != nil {
|
||||
userData.highlightCount = *highlightCount
|
||||
}
|
||||
if notifCount != nil {
|
||||
userData.notificationCount = *notifCount
|
||||
}
|
||||
m.perUserPerRoomData[key] = userData
|
||||
}
|
||||
|
||||
// TODO: Move to cache struct
|
||||
// Call this when there is a new event received on a v2 stream.
|
||||
// This event must be globally unique, i.e indicated so by the state store.
|
||||
func (m *ConnMap) OnNewEvents(
|
||||
@ -227,6 +252,8 @@ func (m *ConnMap) OnNewEvents(
|
||||
m.onNewEvent(roomID, event, latestPos)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move to cache struct
|
||||
func (m *ConnMap) onNewEvent(
|
||||
roomID string, event json.RawMessage, latestPos int64,
|
||||
) {
|
||||
|
@ -270,4 +270,5 @@ func (h *SyncLiveHandler) UpdateUnreadCounts(roomID, userID string, highlightCou
|
||||
if err != nil {
|
||||
logger.Err(err).Str("user", userID).Str("room", roomID).Msg("failed to update unread counters")
|
||||
}
|
||||
h.ConnMap.OnUnreadCounts(roomID, userID, highlightCount, notifCount)
|
||||
}
|
||||
|
@ -30,3 +30,8 @@ func (s SortableRooms) Len() int64 {
|
||||
func (s SortableRooms) Subslice(i, j int64) Subslicer {
|
||||
return s[i:j]
|
||||
}
|
||||
|
||||
type userRoomData struct {
|
||||
notificationCount int
|
||||
highlightCount int
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user