Split sync3 into sync3 and sync3/handler

`sync3` contains data structures and logic which is very isolated and
testable (think ConnMap, Room, Request, SortableRooms, etc) whereas
`sync3/handler` contains control flow which calls into `sync3` data
structures.

This has numerous benefits:
 - Gnarly complicated structs like `ConnState` are now more isolated
   from the codebase, forcing better API design on `sync3` structs.
 - The inability to do import cycles forces structs in `sync3` to remain
   simple: they cannot pull in control flow logic from `sync3/handler`
   without causing a compile error.
 - It's significantly easier to figure out where to start looking for
   code that executes when a new request is received, for new developers.
 - It simplifies the number of things that `ConnState` can touch. Previously
   we were gut wrenching out of convenience but now we're forced to move
   more logic from `ConnState` into `sync3` (depending on the API design).
   For example, adding `SortableRooms.RoomIDs()`.
This commit is contained in:
Kegan Dougal 2021-11-05 15:45:04 +00:00
parent 12bdf20f61
commit 11b1260d07
14 changed files with 410 additions and 354 deletions

View File

@ -9,7 +9,7 @@ import (
syncv3 "github.com/matrix-org/sync-v3"
"github.com/matrix-org/sync-v3/sync2"
"github.com/matrix-org/sync-v3/sync3"
"github.com/matrix-org/sync-v3/sync3/handler"
)
var (
@ -30,7 +30,7 @@ func main() {
panic(err)
}
}()
h, err := sync3.NewSync3Handler(&sync2.HTTPClient{
h, err := handler.NewSync3Handler(&sync2.HTTPClient{
Client: &http.Client{
Timeout: 5 * time.Minute,
},

View File

@ -17,17 +17,22 @@ func (c *ConnID) String() string {
return c.SessionID + "-" + c.DeviceID
}
type HandlerIncomingReqFunc func(ctx context.Context, cid ConnID, req *Request) (*Response, error)
type ConnHandler interface {
// Callback which is allowed to block as long as the context is active. Return the response
// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
// status code to send back.
OnIncomingRequest(ctx context.Context, cid ConnID, req *Request) (*Response, error)
UserID() string
Destroy()
}
// Conn is an abstraction of a long-poll connection. It automatically handles the position values
// of the /sync request, including sending cached data in the event of retries. It does not handle
// the contents of the data at all.
type Conn struct {
ConnID ConnID
// Callback which is allowed to block as long as the context is active. Return the response
// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
// status code to send back.
HandleIncomingRequest HandlerIncomingReqFunc
handler ConnHandler
// The position/data in the stream last sent by the client
lastClientRequest Request
@ -38,16 +43,13 @@ type Conn struct {
// ensure only 1 incoming request is handled per connection
mu *sync.Mutex
connState *ConnState
}
func NewConn(connID ConnID, connState *ConnState, fn HandlerIncomingReqFunc) *Conn {
func NewConn(connID ConnID, h ConnHandler) *Conn {
return &Conn{
ConnID: connID,
HandleIncomingRequest: fn,
mu: &sync.Mutex{},
connState: connState,
ConnID: connID,
handler: h,
mu: &sync.Mutex{},
}
}
@ -78,7 +80,7 @@ func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Respo
}
c.lastClientRequest = *req
resp, err := c.HandleIncomingRequest(ctx, c.ConnID, req)
resp, err := c.handler.OnIncomingRequest(ctx, c.ConnID, req)
if err != nil {
herr, ok := err.(*internal.HandlerError)
if !ok {

View File

@ -10,6 +10,18 @@ import (
"github.com/matrix-org/sync-v3/internal"
)
type connHandlerMock struct {
fn func(ctx context.Context, cid ConnID, req *Request) (*Response, error)
}
func (c *connHandlerMock) OnIncomingRequest(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
return c.fn(ctx, cid, req)
}
func (c *connHandlerMock) UserID() string {
return "dummy"
}
func (c *connHandlerMock) Destroy() {}
// Test that Conn can send and receive requests based on positions
func TestConn(t *testing.T) {
ctx := context.Background()
@ -18,12 +30,12 @@ func TestConn(t *testing.T) {
SessionID: "s",
}
count := int64(100)
c := NewConn(connID, nil, func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
c := NewConn(connID, &connHandlerMock{func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
count += 1
return &Response{
Count: count,
}, nil
})
}})
// initial request
resp, err := c.OnIncomingRequest(ctx, &Request{
@ -62,13 +74,13 @@ func TestConnBlocking(t *testing.T) {
SessionID: "s",
}
ch := make(chan string)
c := NewConn(connID, nil, func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
c := NewConn(connID, &connHandlerMock{func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
if req.Sort[0] == "hi" {
time.Sleep(10 * time.Millisecond)
}
ch <- req.Sort[0]
return &Response{}, nil
})
}})
// two connection call the incoming request function at the same time, they should get queued up
// and processed in series.
@ -109,10 +121,10 @@ func TestConnRetries(t *testing.T) {
SessionID: "s",
}
callCount := int64(0)
c := NewConn(connID, nil, func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
c := NewConn(connID, &connHandlerMock{func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
callCount += 1
return &Response{Count: 20}, nil
})
}})
resp, err := c.OnIncomingRequest(ctx, &Request{})
assertInt(t, resp.Pos, 1)
assertInt(t, resp.Count, 20)
@ -144,9 +156,9 @@ func TestConnErrors(t *testing.T) {
SessionID: "s",
}
errCh := make(chan error, 1)
c := NewConn(connID, nil, func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
c := NewConn(connID, &connHandlerMock{func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
return nil, <-errCh
})
}})
// random errors = 500
errCh <- errors.New("oops")
@ -172,14 +184,14 @@ func TestConnErrorsNoCache(t *testing.T) {
SessionID: "s",
}
errCh := make(chan error, 1)
c := NewConn(connID, nil, func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
c := NewConn(connID, &connHandlerMock{func(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
select {
case e := <-errCh:
return nil, e
default:
return &Response{}, nil
}
})
}})
// errors should not be cached
resp, herr := c.OnIncomingRequest(ctx, &Request{})
if herr != nil {

View File

@ -40,8 +40,8 @@ func (m *ConnMap) Conn(cid ConnID) *Conn {
return cint.(*Conn)
}
// Atomically gets or creates a connection with this connection ID.
func (m *ConnMap) GetOrCreateConn(cid ConnID, globalCache *GlobalCache, userID string, userCache *UserCache) (*Conn, bool) {
// Atomically gets or creates a connection with this connection ID. Calls newConn if a new connection is required.
func (m *ConnMap) GetOrCreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Conn, bool) {
// atomically check if a conn exists already and return that if so
m.mu.Lock()
defer m.mu.Unlock()
@ -49,11 +49,11 @@ func (m *ConnMap) GetOrCreateConn(cid ConnID, globalCache *GlobalCache, userID s
if conn != nil {
return conn, false
}
state := NewConnState(userID, userCache, globalCache)
conn = NewConn(cid, state, state.HandleIncomingRequest)
h := newConnHandler()
conn = NewConn(cid, h)
m.cache.Set(cid.String(), conn)
m.connIDToConn[cid.String()] = conn
m.userIDToConn[userID] = append(m.userIDToConn[userID], conn)
m.userIDToConn[h.UserID()] = append(m.userIDToConn[h.UserID()], conn)
return conn, true
}
@ -63,17 +63,15 @@ func (m *ConnMap) closeConn(connID string, value interface{}) {
// remove conn from all the maps
conn := value.(*Conn)
delete(m.connIDToConn, connID)
state := conn.connState
if state != nil {
conns := m.userIDToConn[state.UserID()]
for i := 0; i < len(conns); i++ {
if conns[i].ConnID.String() == connID {
// delete without preserving order
conns[i] = conns[len(conns)-1]
conns = conns[:len(conns)-1]
}
h := conn.handler
conns := m.userIDToConn[h.UserID()]
for i := 0; i < len(conns); i++ {
if conns[i].ConnID.String() == connID {
// delete without preserving order
conns[i] = conns[len(conns)-1]
conns = conns[:len(conns)-1]
}
m.userIDToConn[state.UserID()] = conns
state.Destroy()
}
m.userIDToConn[h.UserID()] = conns
h.Destroy()
}

View File

@ -4,23 +4,22 @@ import (
"encoding/json"
"sync"
"github.com/matrix-org/sync-v3/state"
"github.com/tidwall/gjson"
)
const DispatcherAllUsers = "-"
type EventData struct {
event json.RawMessage
roomID string
eventType string
stateKey *string
content gjson.Result
timestamp uint64
Event json.RawMessage
RoomID string
EventType string
StateKey *string
Content gjson.Result
Timestamp uint64
// the absolute latest position for this event data. The NID for this event is guaranteed to
// be <= this value.
latestPos int64
LatestPos int64
}
type Receiver interface {
@ -44,12 +43,8 @@ func NewDispatcher() *Dispatcher {
// Load joined members into the dispatcher.
// MUST BE CALLED BEFORE V2 POLL LOOPS START.
func (d *Dispatcher) Startup(store *state.Storage) error {
func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error {
// populate joined rooms tracker
roomToJoinedUsers, err := store.AllJoinedMembers()
if err != nil {
return err
}
for roomID, userIDs := range roomToJoinedUsers {
for _, userID := range userIDs {
d.jrt.UserJoinedRoom(userID, roomID)
@ -94,33 +89,33 @@ func (d *Dispatcher) onNewEvent(
eventType := ev.Get("type").Str
ed := &EventData{
event: event,
roomID: roomID,
eventType: eventType,
stateKey: stateKey,
content: ev.Get("content"),
latestPos: latestPos,
timestamp: ev.Get("origin_server_ts").Uint(),
Event: event,
RoomID: roomID,
EventType: eventType,
StateKey: stateKey,
Content: ev.Get("content"),
LatestPos: latestPos,
Timestamp: ev.Get("origin_server_ts").Uint(),
}
// update the tracker
targetUser := ""
if ed.eventType == "m.room.member" && ed.stateKey != nil {
targetUser = *ed.stateKey
if ed.EventType == "m.room.member" && ed.StateKey != nil {
targetUser = *ed.StateKey
// TODO: de-dupe joins in jrt else profile changes will results in 2x room IDs
membership := ed.content.Get("membership").Str
membership := ed.Content.Get("membership").Str
switch membership {
case "join":
d.jrt.UserJoinedRoom(targetUser, ed.roomID)
d.jrt.UserJoinedRoom(targetUser, ed.RoomID)
case "ban":
fallthrough
case "leave":
d.jrt.UserLeftRoom(targetUser, ed.roomID)
d.jrt.UserLeftRoom(targetUser, ed.RoomID)
}
}
// notify all people in this room
userIDs := d.jrt.JoinedUsersForRoom(ed.roomID)
userIDs := d.jrt.JoinedUsersForRoom(ed.RoomID)
// invoke listeners
d.userToReceiverMu.RLock()

View File

@ -3,14 +3,21 @@ package sync3
import (
"encoding/json"
"fmt"
"os"
"sync"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/sync-v3/internal"
"github.com/matrix-org/sync-v3/state"
"github.com/rs/zerolog"
"github.com/tidwall/gjson"
)
var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})
// The purpose of global cache is to store global-level information about all rooms the server is aware of.
// Global-level information is represented as internal.RoomMetadata and includes things like Heroes, join/invite
// counts, if the room is encrypted, etc. Basically anything that is the same for all users of the system. This
@ -160,29 +167,29 @@ func (c *GlobalCache) OnNewEvent(
// update global state
c.roomIDToMetadataMu.Lock()
defer c.roomIDToMetadataMu.Unlock()
metadata := c.roomIDToMetadata[ed.roomID]
metadata := c.roomIDToMetadata[ed.RoomID]
if metadata == nil {
metadata = &internal.RoomMetadata{
RoomID: ed.roomID,
RoomID: ed.RoomID,
}
}
switch ed.eventType {
switch ed.EventType {
case "m.room.name":
if ed.stateKey != nil && *ed.stateKey == "" {
metadata.NameEvent = ed.content.Get("name").Str
if ed.StateKey != nil && *ed.StateKey == "" {
metadata.NameEvent = ed.Content.Get("name").Str
}
case "m.room.encryption":
if ed.stateKey != nil && *ed.stateKey == "" {
if ed.StateKey != nil && *ed.StateKey == "" {
metadata.Encrypted = true
}
case "m.room.canonical_alias":
if ed.stateKey != nil && *ed.stateKey == "" {
metadata.CanonicalAlias = ed.content.Get("alias").Str
if ed.StateKey != nil && *ed.StateKey == "" {
metadata.CanonicalAlias = ed.Content.Get("alias").Str
}
case "m.room.member":
if ed.stateKey != nil {
membership := ed.content.Get("membership").Str
eventJSON := gjson.ParseBytes(ed.event)
if ed.StateKey != nil {
membership := ed.Content.Get("membership").Str
eventJSON := gjson.ParseBytes(ed.Event)
if internal.IsMembershipChange(eventJSON) {
if membership == "invite" {
metadata.InviteCount += 1
@ -191,7 +198,7 @@ func (c *GlobalCache) OnNewEvent(
} else if membership == "leave" || membership == "ban" {
metadata.JoinCount -= 1
// remove this user as a hero
metadata.RemoveHero(*ed.stateKey)
metadata.RemoveHero(*ed.StateKey)
}
if eventJSON.Get("unsigned.prev_content.membership").Str == "invite" {
@ -202,21 +209,21 @@ func (c *GlobalCache) OnNewEvent(
// try to find the existing hero e.g they changed their display name
found := false
for i := range metadata.Heroes {
if metadata.Heroes[i].ID == *ed.stateKey {
metadata.Heroes[i].Name = ed.content.Get("displayname").Str
if metadata.Heroes[i].ID == *ed.StateKey {
metadata.Heroes[i].Name = ed.Content.Get("displayname").Str
found = true
break
}
}
if !found {
metadata.Heroes = append(metadata.Heroes, internal.Hero{
ID: *ed.stateKey,
Name: ed.content.Get("displayname").Str,
ID: *ed.StateKey,
Name: ed.Content.Get("displayname").Str,
})
}
}
}
}
metadata.LastMessageTimestamp = ed.timestamp
c.roomIDToMetadata[ed.roomID] = metadata
metadata.LastMessageTimestamp = ed.Timestamp
c.roomIDToMetadata[ed.RoomID] = metadata
}

View File

@ -1,4 +1,4 @@
package sync3
package handler
import (
"context"
@ -8,6 +8,7 @@ import (
"time"
"github.com/matrix-org/sync-v3/internal"
"github.com/matrix-org/sync-v3/sync3"
)
var (
@ -17,20 +18,12 @@ var (
MaxPendingEventUpdates = 200
)
type RoomConnMetadata struct {
internal.RoomMetadata
CanonicalisedName string // stripped leading symbols like #, all in lower case
HighlightCount int
NotificationCount int
}
type ConnEvent struct {
roomMetadata *internal.RoomMetadata
roomID string
msg *EventData
msg *sync3.EventData
userMsg struct {
msg *UserRoomData
msg *sync3.UserRoomData
hasCountDecreased bool
}
}
@ -40,9 +33,9 @@ type ConnEvent struct {
type ConnState struct {
userID string
// the only thing that can touch these data structures is the conn goroutine
muxedReq *Request
sortedJoinedRooms *SortableRooms
roomSubscriptions map[string]RoomSubscription
muxedReq *sync3.Request
sortedJoinedRooms *sync3.SortableRooms
roomSubscriptions map[string]sync3.RoomSubscription
loadPosition int64
// A channel which the dispatcher uses to send updates to the conn goroutine
@ -50,17 +43,17 @@ type ConnState struct {
// saying the client is dead and clean up the conn.
updateEvents chan *ConnEvent
globalCache *GlobalCache
userCache *UserCache
globalCache *sync3.GlobalCache
userCache *sync3.UserCache
userCacheID int
}
func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache) *ConnState {
func NewConnState(userID string, userCache *sync3.UserCache, globalCache *sync3.GlobalCache) *ConnState {
cs := &ConnState{
globalCache: globalCache,
userCache: userCache,
userID: userID,
roomSubscriptions: make(map[string]RoomSubscription),
roomSubscriptions: make(map[string]sync3.RoomSubscription),
updateEvents: make(chan *ConnEvent, MaxPendingEventUpdates), // TODO: customisable
}
cs.userCacheID = cs.userCache.Subsribe(cs)
@ -78,17 +71,17 @@ func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache)
// N events arrive and get buffered.
// - load() bases its current state based on the latest position, which includes processing of these N events.
// - post load() we read N events, processing them a 2nd time.
func (s *ConnState) load(req *Request) error {
func (s *ConnState) load(req *sync3.Request) error {
initialLoadPosition, joinedRooms, err := s.globalCache.LoadJoinedRooms(s.userID)
if err != nil {
return err
}
rooms := make([]RoomConnMetadata, len(joinedRooms))
rooms := make([]sync3.RoomConnMetadata, len(joinedRooms))
for i := range joinedRooms {
metadata := joinedRooms[i]
metadata.RemoveHero(s.userID)
urd := s.userCache.loadRoomData(metadata.RoomID)
rooms[i] = RoomConnMetadata{
urd := s.userCache.LoadRoomData(metadata.RoomID)
rooms[i] = sync3.RoomConnMetadata{
RoomMetadata: *metadata,
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(metadata, 5), "#!()):_@"),
@ -99,7 +92,7 @@ func (s *ConnState) load(req *Request) error {
}
s.loadPosition = initialLoadPosition
s.sortedJoinedRooms = NewSortableRooms(rooms)
s.sortedJoinedRooms = sync3.NewSortableRooms(rooms)
s.sort(req.Sort)
return nil
@ -107,7 +100,7 @@ func (s *ConnState) load(req *Request) error {
func (s *ConnState) sort(sortBy []string) {
if sortBy == nil {
sortBy = []string{SortByRecency}
sortBy = []string{sync3.SortByRecency}
}
err := s.sortedJoinedRooms.Sort(sortBy)
if err != nil {
@ -115,8 +108,8 @@ func (s *ConnState) sort(sortBy []string) {
}
}
// HandleIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
func (s *ConnState) HandleIncomingRequest(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
// OnIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request) (*sync3.Response, error) {
if s.loadPosition == 0 {
s.load(req)
}
@ -126,8 +119,8 @@ func (s *ConnState) HandleIncomingRequest(ctx context.Context, cid ConnID, req *
// onIncomingRequest is a callback which fires when the client makes a request to the server. Whilst each request may
// be on their own goroutine, the requests are linearised for us by Conn so it is safe to modify ConnState without
// additional locking mechanisms.
func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Response, error) {
var prevRange SliceRanges
func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request) (*sync3.Response, error) {
var prevRange sync3.SliceRanges
var prevSort []string
if s.muxedReq != nil {
prevRange = s.muxedReq.Rooms
@ -148,16 +141,16 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
}
// start forming the response
response := &Response{
response := &sync3.Response{
RoomSubscriptions: s.updateRoomSubscriptions(newSubs, newUnsubs),
Count: int64(s.sortedJoinedRooms.Len()),
}
// TODO: calculate the M values for N < M calcs
var responseOperations []ResponseOp
var responseOperations []sync3.ResponseOp
var added, removed, same SliceRanges
var added, removed, same sync3.SliceRanges
if prevRange != nil {
added, removed, same = prevRange.Delta(s.muxedReq.Rooms)
} else {
@ -168,7 +161,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
// the sort operations have changed, invalidate everything (if there were previous syncs), re-sort and re-SYNC
if prevSort != nil {
for _, r := range s.muxedReq.Rooms {
responseOperations = append(responseOperations, &ResponseOpRange{
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
Operation: "INVALIDATE",
Range: r[:],
})
@ -182,22 +175,19 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
// send INVALIDATE for these ranges
for _, r := range removed {
responseOperations = append(responseOperations, &ResponseOpRange{
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
Operation: "INVALIDATE",
Range: r[:],
})
}
// send full room data for these ranges
for _, r := range added {
sr := SliceRanges([][2]int64{r})
sr := sync3.SliceRanges([][2]int64{r})
subslice := sr.SliceInto(s.sortedJoinedRooms)
sortableRooms := subslice[0].(*SortableRooms)
roomIDs := make([]string, sortableRooms.Len())
for i := range sortableRooms.rooms {
roomIDs[i] = sortableRooms.rooms[i].RoomID
}
sortableRooms := subslice[0].(*sync3.SortableRooms)
roomIDs := sortableRooms.RoomIDs()
responseOperations = append(responseOperations, &ResponseOpRange{
responseOperations = append(responseOperations, &sync3.ResponseOpRange{
Operation: "SYNC",
Range: r[:],
Rooms: s.getInitialRoomData(roomIDs...),
@ -211,7 +201,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
select {
case <-ctx.Done(): // client has given up
break blockloop
case <-time.After(time.Duration(req.timeoutSecs) * time.Second):
case <-time.After(time.Duration(req.TimeoutSecs()) * time.Second):
break blockloop
case connEvent := <-s.updateEvents: // TODO: keep reading until it is empty before responding.
if connEvent.roomMetadata != nil {
@ -241,7 +231,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
return response, nil
}
func (s *ConnState) processIncomingUserEvent(roomID string, userEvent *UserRoomData, hasCountDecreased bool) ([]Room, []ResponseOp) {
func (s *ConnState) processIncomingUserEvent(roomID string, userEvent *sync3.UserRoomData, hasCountDecreased bool) ([]sync3.Room, []sync3.ResponseOp) {
// modify notification counts
s.sortedJoinedRooms.UpdateUserRoomMetadata(roomID, userEvent, hasCountDecreased)
@ -258,42 +248,35 @@ func (s *ConnState) processIncomingUserEvent(roomID string, userEvent *UserRoomD
return s.resort(roomID, fromIndex, nil)
}
func (s *ConnState) processIncomingEvent(updateEvent *EventData) ([]Room, []ResponseOp) {
if updateEvent.latestPos > s.loadPosition {
s.loadPosition = updateEvent.latestPos
func (s *ConnState) processIncomingEvent(updateEvent *sync3.EventData) ([]sync3.Room, []sync3.ResponseOp) {
if updateEvent.LatestPos > s.loadPosition {
s.loadPosition = updateEvent.LatestPos
}
// TODO: Add filters to check if this event should cause a response or should be dropped (e.g filtering out messages)
// this is why this select is in a while loop as not all update event will wake up the stream
var targetRoom RoomConnMetadata
fromIndex, ok := s.sortedJoinedRooms.IndexOf(updateEvent.roomID)
fromIndex, ok := s.sortedJoinedRooms.IndexOf(updateEvent.RoomID)
if !ok {
// the user may have just joined the room hence not have an entry in this list yet.
fromIndex = int(s.sortedJoinedRooms.Len())
roomMetadatas := s.globalCache.LoadRooms(updateEvent.roomID)
roomMetadatas := s.globalCache.LoadRooms(updateEvent.RoomID)
roomMetadata := roomMetadatas[0]
roomMetadata.RemoveHero(s.userID)
newRoomConn := RoomConnMetadata{
newRoomConn := sync3.RoomConnMetadata{
RoomMetadata: *roomMetadata,
CanonicalisedName: strings.ToLower(
strings.Trim(internal.CalculateRoomName(roomMetadata, 5), "#!()):_@"),
),
}
// TODO: don't gut wrench
s.sortedJoinedRooms.rooms = append(s.sortedJoinedRooms.rooms, newRoomConn)
targetRoom = newRoomConn
} else {
targetRoom = s.sortedJoinedRooms.rooms[fromIndex]
targetRoom.LastMessageTimestamp = updateEvent.timestamp
s.sortedJoinedRooms.rooms[fromIndex] = targetRoom
s.sortedJoinedRooms.Add(newRoomConn)
}
return s.resort(updateEvent.roomID, fromIndex, updateEvent.event)
return s.resort(updateEvent.RoomID, fromIndex, updateEvent.Event)
}
// Resort should be called after a specific room has been modified in `sortedJoinedRooms`.
func (s *ConnState) resort(roomID string, fromIndex int, newEvent json.RawMessage) ([]Room, []ResponseOp) {
func (s *ConnState) resort(roomID string, fromIndex int, newEvent json.RawMessage) ([]sync3.Room, []sync3.ResponseOp) {
s.sort(s.muxedReq.Sort)
var subs []Room
var subs []sync3.Room
isSubscribedToRoom := false
if _, ok := s.roomSubscriptions[roomID]; ok {
@ -323,13 +306,13 @@ func (s *ConnState) resort(roomID string, fromIndex int, newEvent json.RawMessag
)
return subs, nil
}
toRoom := s.sortedJoinedRooms.rooms[toIndex]
toRoom := s.sortedJoinedRooms.Get(toIndex)
// fake an update event for this room.
// We do this because we are introducing a new room in the list because of this situation:
// tracking [10,20] and room 24 jumps to position 0, so now we are tracking [9,19] as all rooms
// have been shifted to the right
rooms := s.userCache.lazyLoadTimelines(s.loadPosition, []string{toRoom.RoomID}, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
rooms := s.userCache.LazyLoadTimelines(s.loadPosition, []string{toRoom.RoomID}, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
urd := rooms[toRoom.RoomID]
// clobber before falling through
@ -340,8 +323,8 @@ func (s *ConnState) resort(roomID string, fromIndex int, newEvent json.RawMessag
return subs, s.moveRoom(roomID, newEvent, fromIndex, toIndex, s.muxedReq.Rooms, isSubscribedToRoom)
}
func (s *ConnState) updateRoomSubscriptions(subs, unsubs []string) map[string]Room {
result := make(map[string]Room)
func (s *ConnState) updateRoomSubscriptions(subs, unsubs []string) map[string]sync3.Room {
result := make(map[string]sync3.Room)
for _, roomID := range subs {
sub, ok := s.muxedReq.RoomSubscriptions[roomID]
if !ok {
@ -361,9 +344,9 @@ func (s *ConnState) updateRoomSubscriptions(subs, unsubs []string) map[string]Ro
return result
}
func (s *ConnState) getDeltaRoomData(roomID string, event json.RawMessage) *Room {
userRoomData := s.userCache.loadRoomData(roomID)
room := &Room{
func (s *ConnState) getDeltaRoomData(roomID string, event json.RawMessage) *sync3.Room {
userRoomData := s.userCache.LoadRoomData(roomID)
room := &sync3.Room{
RoomID: roomID,
NotificationCount: int64(userRoomData.NotificationCount),
HighlightCount: int64(userRoomData.HighlightCount),
@ -376,16 +359,16 @@ func (s *ConnState) getDeltaRoomData(roomID string, event json.RawMessage) *Room
return room
}
func (s *ConnState) getInitialRoomData(roomIDs ...string) []Room {
roomIDToUserRoomData := s.userCache.lazyLoadTimelines(s.loadPosition, roomIDs, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
rooms := make([]Room, len(roomIDs))
func (s *ConnState) getInitialRoomData(roomIDs ...string) []sync3.Room {
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(s.loadPosition, roomIDs, int(s.muxedReq.TimelineLimit)) // TODO: per-room timeline limit
rooms := make([]sync3.Room, len(roomIDs))
roomMetadatas := s.globalCache.LoadRooms(roomIDs...)
for i, roomID := range roomIDs {
userRoomData := roomIDToUserRoomData[roomID]
metadata := roomMetadatas[i]
metadata.RemoveHero(s.userID)
rooms[i] = Room{
rooms[i] = sync3.Room{
RoomID: roomID,
Name: internal.CalculateRoomName(metadata, 5), // TODO: customisable?
NotificationCount: int64(userRoomData.NotificationCount),
@ -404,7 +387,7 @@ func (s *ConnState) getInitialRoomData(roomIDs ...string) []Room {
func (s *ConnState) onNewConnectionEvent(connEvent *ConnEvent) {
eventData := connEvent.msg
// TODO: remove 0 check when Initialise state returns sensible positions
if eventData != nil && eventData.latestPos != 0 && eventData.latestPos < s.loadPosition {
if eventData != nil && eventData.LatestPos != 0 && eventData.LatestPos < s.loadPosition {
// do not push this event down the stream as we have already processed it when we loaded
// the room list initially.
return
@ -432,17 +415,17 @@ func (s *ConnState) UserID() string {
// 1,2,3,4,5
// 3 bumps to top -> 3,1,2,4,5 -> DELETE index=2, INSERT val=3 index=0
// 7 bumps to top -> 7,1,2,3,4 -> DELETE index=4, INSERT val=7 index=0
func (s *ConnState) moveRoom(roomID string, event json.RawMessage, fromIndex, toIndex int, ranges SliceRanges, onlySendRoomID bool) []ResponseOp {
func (s *ConnState) moveRoom(roomID string, event json.RawMessage, fromIndex, toIndex int, ranges sync3.SliceRanges, onlySendRoomID bool) []sync3.ResponseOp {
if fromIndex == toIndex {
// issue an UPDATE, nice and easy because we don't need to move entries in the list
room := &Room{
room := &sync3.Room{
RoomID: roomID,
}
if !onlySendRoomID {
room = s.getDeltaRoomData(roomID, event)
}
return []ResponseOp{
&ResponseOpSingle{
return []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "UPDATE",
Index: &fromIndex,
Room: room,
@ -459,19 +442,19 @@ func (s *ConnState) moveRoom(roomID string, event json.RawMessage, fromIndex, to
// to the highest end-range marker < index
deleteIndex = int(ranges.LowerClamp(int64(fromIndex)))
}
room := &Room{
room := &sync3.Room{
RoomID: roomID,
}
if !onlySendRoomID {
rooms := s.getInitialRoomData(roomID)
room = &rooms[0]
}
return []ResponseOp{
&ResponseOpSingle{
return []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: &deleteIndex,
},
&ResponseOpSingle{
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: &toIndex,
Room: room,
@ -481,21 +464,21 @@ func (s *ConnState) moveRoom(roomID string, event json.RawMessage, fromIndex, to
}
// Called by the user cache when events arrive
func (s *ConnState) OnNewEvent(event *EventData) {
func (s *ConnState) OnNewEvent(event *sync3.EventData) {
// pull the current room metadata from the global cache. This is safe to do without locking
// as the v2 poll loops all rely on a single poller thread to poke the dispatcher which pokes
// the caches (incl. user caches) so there cannot be any concurrent updates. We always get back
// a copy of the metadata from LoadRoom so we can pass pointers around freely.
meta := s.globalCache.LoadRooms(event.roomID)
meta := s.globalCache.LoadRooms(event.RoomID)
s.onNewConnectionEvent(&ConnEvent{
roomMetadata: meta[0],
roomID: event.roomID,
roomID: event.RoomID,
msg: event,
})
}
// Called by the user cache when unread counts have changed
func (s *ConnState) OnUnreadCountsChanged(userID, roomID string, urd UserRoomData, hasCountDecreased bool) {
func (s *ConnState) OnUnreadCountsChanged(userID, roomID string, urd sync3.UserRoomData, hasCountDecreased bool) {
var ce ConnEvent
ce.roomID = roomID
ce.userMsg.hasCountDecreased = hasCountDecreased

View File

@ -1,4 +1,4 @@
package sync3
package handler
import (
"bytes"
@ -11,6 +11,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/sync-v3/internal"
"github.com/matrix-org/sync-v3/sync3"
"github.com/matrix-org/sync-v3/testutils"
)
@ -22,10 +23,10 @@ func newRoomMetadata(roomID string, lastMsgTimestamp gomatrixserverlib.Timestamp
}
}
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]sync3.UserRoomData {
result := make(map[string]sync3.UserRoomData)
for _, roomID := range roomIDs {
result[roomID] = UserRoomData{
result[roomID] = sync3.UserRoomData{
Timeline: []json.RawMessage{
[]byte(`{}`),
},
@ -37,7 +38,7 @@ func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int
// Sync an account with 3 rooms and check that we can grab all rooms and they are sorted correctly initially. Checks
// that basic UPDATE and DELETE/INSERT works when tracking all rooms.
func TestConnStateInitial(t *testing.T) {
connID := ConnID{
ConnID := sync3.ConnID{
SessionID: "s",
DeviceID: "d",
}
@ -52,28 +53,30 @@ func TestConnStateInitial(t *testing.T) {
roomB.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "b"}, time.Now()),
roomC.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "c"}, time.Now()),
}
globalCache := NewGlobalCache(nil)
globalCache := sync3.NewGlobalCache(nil)
globalCache.Startup(map[string]internal.RoomMetadata{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
})
dispatcher := NewDispatcher()
dispatcher.jrt.UserJoinedRoom(userID, roomA.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomB.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomC.RoomID)
dispatcher := sync3.NewDispatcher()
dispatcher.Startup(map[string][]string{
roomA.RoomID: {userID},
roomB.RoomID: {userID},
roomC.RoomID: {userID},
})
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []*internal.RoomMetadata, err error) {
return 1, []*internal.RoomMetadata{
&roomA, &roomB, &roomC,
}, nil
}
userCache := NewUserCache(userID, globalCache, nil)
dispatcher.Register(userCache.userID, userCache)
dispatcher.Register(DispatcherAllUsers, globalCache)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
userCache := sync3.NewUserCache(userID, globalCache, nil)
dispatcher.Register(userCache.UserID, userCache)
dispatcher.Register(sync3.DispatcherAllUsers, globalCache)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]sync3.UserRoomData {
result := make(map[string]sync3.UserRoomData)
for _, roomID := range roomIDs {
result[roomID] = UserRoomData{
result[roomID] = sync3.UserRoomData{
Timeline: []json.RawMessage{timeline[roomID]},
}
}
@ -83,22 +86,22 @@ func TestConnStateInitial(t *testing.T) {
if userID != cs.UserID() {
t.Fatalf("UserID returned wrong value, got %v want %v", cs.UserID(), userID)
}
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 9},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &Response{
checkResponse(t, false, res, &sync3.Response{
Count: 3,
Ops: []ResponseOp{
&ResponseOpRange{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 9},
Rooms: []Room{
Rooms: []sync3.Room{
{
RoomID: roomB.RoomID,
Name: roomB.NameEvent,
@ -126,26 +129,26 @@ func TestConnStateInitial(t *testing.T) {
}, 1)
// request again for the diff
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 9},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
checkResponse(t, true, res, &sync3.Response{
Count: 3,
Ops: []ResponseOp{
&ResponseOpSingle{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(2),
},
&ResponseOpSingle{
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
Room: &Room{
Room: &sync3.Room{
RoomID: roomA.RoomID,
},
},
@ -157,22 +160,22 @@ func TestConnStateInitial(t *testing.T) {
dispatcher.OnNewEvents(roomA.RoomID, []json.RawMessage{
newEvent,
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 9},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
checkResponse(t, true, res, &sync3.Response{
Count: 3,
Ops: []ResponseOp{
&ResponseOpSingle{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "UPDATE",
Index: intPtr(0),
Room: &Room{
Room: &sync3.Room{
RoomID: roomA.RoomID,
},
},
@ -182,7 +185,7 @@ func TestConnStateInitial(t *testing.T) {
// Test that multiple ranges can be tracked in a single request
func TestConnStateMultipleRanges(t *testing.T) {
connID := ConnID{
ConnID := sync3.ConnID{
SessionID: "s",
DeviceID: "d",
}
@ -190,8 +193,8 @@ func TestConnStateMultipleRanges(t *testing.T) {
timestampNow := gomatrixserverlib.Timestamp(1632131678061)
var rooms []*internal.RoomMetadata
var roomIDs []string
globalCache := NewGlobalCache(nil)
dispatcher := NewDispatcher()
globalCache := sync3.NewGlobalCache(nil)
dispatcher := sync3.NewDispatcher()
roomIDToRoom := make(map[string]internal.RoomMetadata)
for i := int64(0); i < 10; i++ {
roomID := fmt.Sprintf("!%d:localhost", i)
@ -207,34 +210,36 @@ func TestConnStateMultipleRanges(t *testing.T) {
globalCache.Startup(map[string]internal.RoomMetadata{
room.RoomID: room,
})
dispatcher.jrt.UserJoinedRoom(userID, roomID)
dispatcher.Startup(map[string][]string{
roomID: {userID},
})
}
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []*internal.RoomMetadata, err error) {
return 1, rooms, nil
}
userCache := NewUserCache(userID, globalCache, nil)
userCache := sync3.NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = mockLazyRoomOverride
dispatcher.Register(userCache.userID, userCache)
dispatcher.Register(DispatcherAllUsers, globalCache)
dispatcher.Register(userCache.UserID, userCache)
dispatcher.Register(sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, userCache, globalCache)
// request first page
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 2},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
checkResponse(t, true, res, &sync3.Response{
Count: int64(len(rooms)),
Ops: []ResponseOp{
&ResponseOpRange{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 2},
Rooms: []Room{
Rooms: []sync3.Room{
{
RoomID: roomIDs[0],
},
@ -249,22 +254,22 @@ func TestConnStateMultipleRanges(t *testing.T) {
},
})
// add on a different non-overlapping range
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 2}, {4, 6},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
checkResponse(t, true, res, &sync3.Response{
Count: int64(len(rooms)),
Ops: []ResponseOp{
&ResponseOpRange{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{4, 6},
Rooms: []Room{
Rooms: []sync3.Room{
{
RoomID: roomIDs[4],
},
@ -290,26 +295,26 @@ func TestConnStateMultipleRanges(t *testing.T) {
newEvent,
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 2}, {4, 6},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
checkResponse(t, true, res, &sync3.Response{
Count: int64(len(rooms)),
Ops: []ResponseOp{
&ResponseOpSingle{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&ResponseOpSingle{
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(0),
Room: &Room{
Room: &sync3.Room{
RoomID: roomIDs[8],
},
},
@ -328,26 +333,26 @@ func TestConnStateMultipleRanges(t *testing.T) {
newEvent,
}, 1)
t.Logf("new event %s : %s", roomIDs[9], string(newEvent))
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 2}, {4, 6},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
checkResponse(t, true, res, &sync3.Response{
Count: int64(len(rooms)),
Ops: []ResponseOp{
&ResponseOpSingle{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpSingle{
Operation: "DELETE",
Index: intPtr(6),
},
&ResponseOpSingle{
&sync3.ResponseOpSingle{
Operation: "INSERT",
Index: intPtr(4),
Room: &Room{
Room: &sync3.Room{
RoomID: roomIDs[2],
},
},
@ -357,7 +362,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
// Regression test for https://github.com/matrix-org/sync-v3/commit/732ea46f1ccde2b6a382e0f849bbd166b80900ed
func TestBumpToOutsideRange(t *testing.T) {
connID := ConnID{
ConnID := sync3.ConnID{
SessionID: "s",
DeviceID: "d",
}
@ -367,45 +372,47 @@ func TestBumpToOutsideRange(t *testing.T) {
roomB := newRoomMetadata("!b:localhost", timestampNow-1000)
roomC := newRoomMetadata("!c:localhost", timestampNow-2000)
roomD := newRoomMetadata("!d:localhost", timestampNow-3000)
globalCache := NewGlobalCache(nil)
globalCache := sync3.NewGlobalCache(nil)
globalCache.Startup(map[string]internal.RoomMetadata{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
roomD.RoomID: roomD,
})
dispatcher := NewDispatcher()
dispatcher.jrt.UserJoinedRoom(userID, roomA.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomB.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomC.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomD.RoomID)
dispatcher := sync3.NewDispatcher()
dispatcher.Startup(map[string][]string{
roomA.RoomID: {userID},
roomB.RoomID: {userID},
roomC.RoomID: {userID},
roomD.RoomID: {userID},
})
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []*internal.RoomMetadata, err error) {
return 1, []*internal.RoomMetadata{
&roomA, &roomB, &roomC, &roomD,
}, nil
}
userCache := NewUserCache(userID, globalCache, nil)
userCache := sync3.NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = mockLazyRoomOverride
dispatcher.Register(userCache.userID, userCache)
dispatcher.Register(DispatcherAllUsers, globalCache)
dispatcher.Register(userCache.UserID, userCache)
dispatcher.Register(sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, userCache, globalCache)
// Ask for A,B
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 1},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
checkResponse(t, true, res, &sync3.Response{
Count: int64(4),
Ops: []ResponseOp{
&ResponseOpRange{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 1},
Rooms: []Room{
Rooms: []sync3.Room{
{
RoomID: roomA.RoomID,
},
@ -426,14 +433,14 @@ func TestBumpToOutsideRange(t *testing.T) {
// expire the context after 10ms so we don't wait forevar
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
res, err = cs.HandleIncomingRequest(ctx, connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err = cs.OnIncomingRequest(ctx, ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 1},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
if len(res.Ops) > 0 {
t.Errorf("response returned ops, expected none")
@ -442,7 +449,7 @@ func TestBumpToOutsideRange(t *testing.T) {
// Test that room subscriptions can be made and that events are pushed for them.
func TestConnStateRoomSubscriptions(t *testing.T) {
connID := ConnID{
ConnID := sync3.ConnID{
SessionID: "s",
DeviceID: "d",
}
@ -453,18 +460,20 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
roomC := newRoomMetadata("!c:localhost", gomatrixserverlib.Timestamp(timestampNow-2000))
roomD := newRoomMetadata("!d:localhost", gomatrixserverlib.Timestamp(timestampNow-3000))
roomIDs := []string{roomA.RoomID, roomB.RoomID, roomC.RoomID, roomD.RoomID}
globalCache := NewGlobalCache(nil)
globalCache := sync3.NewGlobalCache(nil)
globalCache.Startup(map[string]internal.RoomMetadata{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
roomD.RoomID: roomD,
})
dispatcher := NewDispatcher()
dispatcher.jrt.UserJoinedRoom(userID, roomA.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomB.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomC.RoomID)
dispatcher.jrt.UserJoinedRoom(userID, roomD.RoomID)
dispatcher := sync3.NewDispatcher()
dispatcher.Startup(map[string][]string{
roomA.RoomID: {userID},
roomB.RoomID: {userID},
roomC.RoomID: {userID},
roomD.RoomID: {userID},
})
timeline := map[string]json.RawMessage{
roomA.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "a"}, time.Now()),
roomB.RoomID: testutils.NewEvent(t, "m.room.message", userID, map[string]interface{}{"body": "b"}, time.Now()),
@ -476,11 +485,11 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
&roomA, &roomB, &roomC, &roomD,
}, nil
}
userCache := NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
result := make(map[string]UserRoomData)
userCache := sync3.NewUserCache(userID, globalCache, nil)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]sync3.UserRoomData {
result := make(map[string]sync3.UserRoomData)
for _, roomID := range roomIDs {
result[roomID] = UserRoomData{
result[roomID] = sync3.UserRoomData{
Timeline: []json.RawMessage{
timeline[roomID],
},
@ -488,27 +497,27 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
}
return result
}
dispatcher.Register(userCache.userID, userCache)
dispatcher.Register(DispatcherAllUsers, globalCache)
dispatcher.Register(userCache.UserID, userCache)
dispatcher.Register(sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, userCache, globalCache)
// subscribe to room D
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
RoomSubscriptions: map[string]RoomSubscription{
res, err := cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomD.RoomID: {
TimelineLimit: 20,
},
},
Rooms: SliceRanges([][2]int64{
Rooms: sync3.SliceRanges([][2]int64{
{0, 1},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &Response{
checkResponse(t, false, res, &sync3.Response{
Count: int64(len(roomIDs)),
RoomSubscriptions: map[string]Room{
RoomSubscriptions: map[string]sync3.Room{
roomD.RoomID: {
RoomID: roomD.RoomID,
Name: roomD.NameEvent,
@ -517,11 +526,11 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
},
},
},
Ops: []ResponseOp{
&ResponseOpRange{
Ops: []sync3.ResponseOp{
&sync3.ResponseOpRange{
Operation: "SYNC",
Range: []int64{0, 1},
Rooms: []Room{
Rooms: []sync3.Room{
{
RoomID: roomA.RoomID,
Name: roomA.NameEvent,
@ -546,18 +555,18 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
newEvent,
}, 1)
// we should get this message even though it's not in the range because we are subscribed to this room.
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
Rooms: sync3.SliceRanges([][2]int64{
{0, 1},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &Response{
checkResponse(t, false, res, &sync3.Response{
Count: int64(len(roomIDs)),
RoomSubscriptions: map[string]Room{
RoomSubscriptions: map[string]sync3.Room{
roomD.RoomID: {
RoomID: roomD.RoomID,
Timeline: []json.RawMessage{
@ -569,24 +578,24 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
})
// now swap to room C
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
RoomSubscriptions: map[string]RoomSubscription{
res, err = cs.OnIncomingRequest(context.Background(), ConnID, &sync3.Request{
Sort: []string{sync3.SortByRecency},
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomC.RoomID: {
TimelineLimit: 20,
},
},
UnsubscribeRooms: []string{roomD.RoomID},
Rooms: SliceRanges([][2]int64{
Rooms: sync3.SliceRanges([][2]int64{
{0, 1},
}),
})
if err != nil {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
t.Fatalf("OnIncomingRequest returned error : %s", err)
}
checkResponse(t, false, res, &Response{
checkResponse(t, false, res, &sync3.Response{
Count: int64(len(roomIDs)),
RoomSubscriptions: map[string]Room{
RoomSubscriptions: map[string]sync3.Room{
roomC.RoomID: {
RoomID: roomC.RoomID,
Name: roomC.NameEvent,
@ -598,7 +607,7 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
})
}
func checkResponse(t *testing.T, checkRoomIDsOnly bool, got, want *Response) {
func checkResponse(t *testing.T, checkRoomIDsOnly bool, got, want *sync3.Response) {
t.Helper()
if want.Count > 0 {
if got.Count != want.Count {
@ -623,8 +632,8 @@ func checkResponse(t *testing.T, checkRoomIDsOnly bool, got, want *Response) {
t.Errorf("operation i=%d got '%s' want '%s'", i, gotOp.Op(), wantOpVal.Op())
}
switch wantOp := wantOpVal.(type) {
case *ResponseOpRange:
gotOpRange, ok := gotOp.(*ResponseOpRange)
case *sync3.ResponseOpRange:
gotOpRange, ok := gotOp.(*sync3.ResponseOpRange)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpRange but it isn't", i, gotOp.Op())
}
@ -637,8 +646,8 @@ func checkResponse(t *testing.T, checkRoomIDsOnly bool, got, want *Response) {
for j := range wantOp.Rooms {
checkRoomsEqual(t, checkRoomIDsOnly, &gotOpRange.Rooms[j], &wantOp.Rooms[j])
}
case *ResponseOpSingle:
gotOpSingle, ok := gotOp.(*ResponseOpSingle)
case *sync3.ResponseOpSingle:
gotOpSingle, ok := gotOp.(*sync3.ResponseOpSingle)
if !ok {
t.Fatalf("operation i=%d (%s) want type ResponseOpSingle but it isn't", i, gotOp.Op())
}
@ -664,7 +673,7 @@ func checkResponse(t *testing.T, checkRoomIDsOnly bool, got, want *Response) {
}
}
func checkRoomsEqual(t *testing.T, checkRoomIDsOnly bool, got, want *Room) {
func checkRoomsEqual(t *testing.T, checkRoomIDsOnly bool, got, want *sync3.Room) {
t.Helper()
if got == nil && want == nil {
return // e.g DELETE ops

View File

@ -1,4 +1,4 @@
package sync3
package handler
import (
"encoding/json"
@ -13,6 +13,7 @@ import (
"github.com/matrix-org/sync-v3/internal"
"github.com/matrix-org/sync-v3/state"
"github.com/matrix-org/sync-v3/sync2"
"github.com/matrix-org/sync-v3/sync3"
"github.com/rs/zerolog"
"github.com/rs/zerolog/hlog"
"github.com/rs/zerolog/log"
@ -32,15 +33,15 @@ type SyncLiveHandler struct {
Storage *state.Storage
V2Store *sync2.Storage
PollerMap *sync2.PollerMap
ConnMap *ConnMap
ConnMap *sync3.ConnMap
// inserts are done by v2 poll loops, selects are done by v3 request threads
// but the v3 requests touch non-overlapping keys, which is a good use case for sync.Map
// > (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys.
userCaches *sync.Map // map[user_id]*UserCache
dispatcher *Dispatcher
Dispatcher *sync3.Dispatcher
globalCache *GlobalCache
GlobalCache *sync3.GlobalCache
}
func NewSync3Handler(v2Client sync2.Client, postgresDBURI string) (*SyncLiveHandler, error) {
@ -49,24 +50,28 @@ func NewSync3Handler(v2Client sync2.Client, postgresDBURI string) (*SyncLiveHand
V2: v2Client,
Storage: store,
V2Store: sync2.NewStore(postgresDBURI),
ConnMap: NewConnMap(),
ConnMap: sync3.NewConnMap(),
userCaches: &sync.Map{},
dispatcher: NewDispatcher(),
globalCache: NewGlobalCache(store),
Dispatcher: sync3.NewDispatcher(),
GlobalCache: sync3.NewGlobalCache(store),
}
sh.PollerMap = sync2.NewPollerMap(v2Client, sh)
if err := sh.dispatcher.Startup(sh.Storage); err != nil {
return nil, fmt.Errorf("failed to load dispatcher: %s", err)
roomToJoinedUsers, err := store.AllJoinedMembers()
if err != nil {
return nil, err
}
sh.dispatcher.Register(DispatcherAllUsers, sh.globalCache)
if err := sh.Dispatcher.Startup(roomToJoinedUsers); err != nil {
return nil, fmt.Errorf("failed to load sync3.Dispatcher: %s", err)
}
sh.Dispatcher.Register(sync3.DispatcherAllUsers, sh.GlobalCache)
// every room will be present here
roomIDToMetadata, err := store.MetadataForAllRooms()
if err != nil {
return nil, fmt.Errorf("could not get metadata for all rooms: %s", err)
}
if err := sh.globalCache.Startup(roomIDToMetadata); err != nil {
if err := sh.GlobalCache.Startup(roomIDToMetadata); err != nil {
return nil, fmt.Errorf("failed to populate global cache: %s", err)
}
@ -94,7 +99,7 @@ func (h *SyncLiveHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Entry point for sync v3
func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error {
var requestBody Request
var requestBody sync3.Request
if req.Body != nil {
defer req.Body.Close()
if err := json.NewDecoder(req.Body).Decode(&requestBody); err != nil {
@ -120,11 +125,11 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
if herr != nil {
return herr
}
requestBody.pos = cpos
requestBody.SetPos(cpos)
var timeout int
if req.URL.Query().Get("timeout") == "" {
timeout = DefaultTimeoutSecs
timeout = sync3.DefaultTimeoutSecs
} else {
timeout64, herr := parseIntFromQuery(req.URL, "timeout")
if herr != nil {
@ -133,7 +138,7 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
timeout = int(timeout64)
}
requestBody.timeoutSecs = timeout
requestBody.SetTimeoutSecs(timeout)
resp, herr := conn.OnIncomingRequest(req.Context(), &requestBody)
if herr != nil {
@ -154,9 +159,9 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
// setupConnection associates this request with an existing connection or makes a new connection.
// It also sets a v2 sync poll loop going if one didn't exist already for this user.
// When this function returns, the connection is alive and active.
func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *Request, containsPos bool) (*Conn, error) {
func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Request, containsPos bool) (*sync3.Conn, error) {
log := hlog.FromRequest(req)
var conn *Conn
var conn *sync3.Conn
// Identify the device
deviceID, err := internal.DeviceIDFromRequest(req)
@ -172,7 +177,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *Request, c
if containsPos {
// Lookup the connection
// we need to map based on both as the session ID isn't crypto secure but the device ID is (Auth header)
conn = h.ConnMap.Conn(ConnID{
conn = h.ConnMap.Conn(sync3.ConnID{
SessionID: syncReq.SessionID,
DeviceID: deviceID,
})
@ -236,10 +241,12 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *Request, c
// because we *either* do the existing check *or* make a new conn. It's important for CreateConn
// to check for an existing connection though, as it's possible for the client to call /sync
// twice for a new connection and get the same session ID.
conn, created := h.ConnMap.GetOrCreateConn(ConnID{
conn, created := h.ConnMap.GetOrCreateConn(sync3.ConnID{
SessionID: syncReq.SessionID,
DeviceID: deviceID,
}, h.globalCache, v2device.UserID, userCache)
}, func() sync3.ConnHandler {
return NewConnState(v2device.UserID, userCache, h.GlobalCache)
})
if created {
log.Info().Str("conn_id", conn.ConnID.String()).Msg("created new connection")
} else {
@ -248,12 +255,12 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *Request, c
return conn, nil
}
func (h *SyncLiveHandler) userCache(userID string) (*UserCache, error) {
func (h *SyncLiveHandler) userCache(userID string) (*sync3.UserCache, error) {
c, ok := h.userCaches.Load(userID)
if ok {
return c.(*UserCache), nil
return c.(*sync3.UserCache), nil
}
uc := NewUserCache(userID, h.globalCache, h.Storage)
uc := sync3.NewUserCache(userID, h.GlobalCache, h.Storage)
// select all non-zero highlight or notif counts and set them, as this is less costly than looping every room/user pair
err := h.Storage.UnreadTable.SelectAllNonZeroCountsForUser(userID, func(roomID string, highlightCount, notificationCount int) {
uc.OnUnreadCounts(roomID, &highlightCount, &notificationCount)
@ -261,7 +268,7 @@ func (h *SyncLiveHandler) userCache(userID string) (*UserCache, error) {
if err != nil {
return nil, fmt.Errorf("failed to load unread counts: %s", err)
}
h.dispatcher.Register(userID, uc)
h.Dispatcher.Register(userID, uc)
h.userCaches.Store(userID, uc)
return uc, nil
}
@ -288,7 +295,7 @@ func (h *SyncLiveHandler) Accumulate(roomID string, timeline []json.RawMessage)
newEvents := timeline[len(timeline)-numNew:]
// we have new events, notify active connections
h.dispatcher.OnNewEvents(roomID, newEvents, latestPos)
h.Dispatcher.OnNewEvents(roomID, newEvents, latestPos)
}
// Called from the v2 poller, implements V2DataReceiver
@ -303,7 +310,7 @@ func (h *SyncLiveHandler) Initialise(roomID string, state []json.RawMessage) {
return
}
// we have new events, notify active connections
h.dispatcher.OnNewEvents(roomID, state, 0)
h.Dispatcher.OnNewEvents(roomID, state, 0)
}
// Called from the v2 poller, implements V2DataReceiver
@ -333,7 +340,7 @@ func (h *SyncLiveHandler) UpdateUnreadCounts(roomID, userID string, highlightCou
if !ok {
return
}
userCache.(*UserCache).OnUnreadCounts(roomID, highlightCount, notifCount)
userCache.(*sync3.UserCache).OnUnreadCounts(roomID, highlightCount, notifCount)
}
func parseIntFromQuery(u *url.URL, param string) (result int64, err *internal.HandlerError) {

View File

@ -30,6 +30,19 @@ type Request struct {
SessionID string `json:"session_id"`
}
func (r *Request) Pos() int64 {
return r.pos
}
func (r *Request) SetPos(pos int64) {
r.pos = pos
}
func (r *Request) TimeoutSecs() int {
return r.timeoutSecs
}
func (r *Request) SetTimeoutSecs(timeout int) {
r.timeoutSecs = timeout
}
func (r *Request) Same(other *Request) bool {
serialised, err := json.Marshal(r)
if err != nil {

View File

@ -2,6 +2,8 @@ package sync3
import (
"encoding/json"
"github.com/matrix-org/sync-v3/internal"
)
type Room struct {
@ -12,3 +14,11 @@ type Room struct {
NotificationCount int64 `json:"notification_count"`
HighlightCount int64 `json:"highlight_count"`
}
type RoomConnMetadata struct {
internal.RoomMetadata
CanonicalisedName string // stripped leading symbols like #, all in lower case
HighlightCount int
NotificationCount int
}

View File

@ -7,6 +7,8 @@ import (
"github.com/matrix-org/sync-v3/internal"
)
// SortableRooms represents a list of rooms which can be sorted and updated. Maintains mappings of
// room IDs to current index positions after sorting.
type SortableRooms struct {
rooms []RoomConnMetadata
roomIDToIndex map[string]int // room_id -> index in rooms
@ -45,6 +47,23 @@ func (s *SortableRooms) IndexOf(roomID string) (int, bool) {
return index, ok
}
func (s *SortableRooms) RoomIDs() []string {
roomIDs := make([]string, len(s.rooms))
for i := range s.rooms {
roomIDs[i] = s.rooms[i].RoomID
}
return roomIDs
}
func (s *SortableRooms) Add(r RoomConnMetadata) {
s.rooms = append(s.rooms, r)
s.roomIDToIndex[r.RoomID] = len(s.rooms) - 1
}
func (s *SortableRooms) Get(index int) RoomConnMetadata {
return s.rooms[index]
}
func (s *SortableRooms) Len() int64 {
return int64(len(s.rooms))
}

View File

@ -20,7 +20,7 @@ type UserCacheListener interface {
type UserCache struct {
LazyRoomDataOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData
userID string
UserID string
roomToData map[string]UserRoomData
roomToDataMu *sync.RWMutex
listeners map[int]UserCacheListener
@ -32,7 +32,7 @@ type UserCache struct {
func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage) *UserCache {
uc := &UserCache{
userID: userID,
UserID: userID,
roomToDataMu: &sync.RWMutex{},
roomToData: make(map[string]UserRoomData),
listeners: make(map[int]UserCacheListener),
@ -58,14 +58,14 @@ func (c *UserCache) Unsubscribe(id int) {
delete(c.listeners, id)
}
func (c *UserCache) lazyLoadTimelines(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
func (c *UserCache) LazyLoadTimelines(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
if c.LazyRoomDataOverride != nil {
return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents)
}
result := make(map[string]UserRoomData)
var lazyRoomIDs []string
for _, roomID := range roomIDs {
urd := c.loadRoomData(roomID)
urd := c.LoadRoomData(roomID)
if len(urd.Timeline) > 0 {
timeline := urd.Timeline
if len(timeline) > maxTimelineEvents {
@ -84,7 +84,7 @@ func (c *UserCache) lazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli
if len(lazyRoomIDs) == 0 {
return result
}
roomIDToEvents, err := c.store.LatestEventsInRooms(c.userID, lazyRoomIDs, loadPos, maxTimelineEvents)
roomIDToEvents, err := c.store.LatestEventsInRooms(c.UserID, lazyRoomIDs, loadPos, maxTimelineEvents)
if err != nil {
logger.Err(err).Strs("rooms", lazyRoomIDs).Msg("failed to get LatestEventsInRooms")
return nil
@ -104,7 +104,7 @@ func (c *UserCache) lazyLoadTimelines(loadPos int64, roomIDs []string, maxTimeli
return result
}
func (c *UserCache) loadRoomData(roomID string) UserRoomData {
func (c *UserCache) LoadRoomData(roomID string) UserRoomData {
c.roomToDataMu.RLock()
defer c.roomToDataMu.RUnlock()
data, ok := c.roomToData[roomID]
@ -119,7 +119,7 @@ func (c *UserCache) loadRoomData(roomID string) UserRoomData {
// =================================================
func (c *UserCache) OnUnreadCounts(roomID string, highlightCount, notifCount *int) {
data := c.loadRoomData(roomID)
data := c.LoadRoomData(roomID)
hasCountDecreased := false
if highlightCount != nil {
hasCountDecreased = *highlightCount < data.HighlightCount
@ -135,19 +135,19 @@ func (c *UserCache) OnUnreadCounts(roomID string, highlightCount, notifCount *in
c.roomToData[roomID] = data
c.roomToDataMu.Unlock()
for _, l := range c.listeners {
l.OnUnreadCountsChanged(c.userID, roomID, data, hasCountDecreased)
l.OnUnreadCountsChanged(c.UserID, roomID, data, hasCountDecreased)
}
}
func (c *UserCache) OnNewEvent(eventData *EventData) {
// add this to our tracked timelines if we have one
urd := c.loadRoomData(eventData.roomID)
urd := c.LoadRoomData(eventData.RoomID)
if len(urd.Timeline) > 0 {
// we're tracking timelines, add this message too
urd.Timeline = append(urd.Timeline, eventData.event)
urd.Timeline = append(urd.Timeline, eventData.Event)
}
c.roomToDataMu.Lock()
c.roomToData[eventData.roomID] = urd
c.roomToData[eventData.RoomID] = urd
c.roomToDataMu.Unlock()
for _, l := range c.listeners {

View File

@ -17,6 +17,7 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/sync-v3/sync2"
"github.com/matrix-org/sync-v3/sync3"
"github.com/matrix-org/sync-v3/sync3/handler"
"github.com/matrix-org/sync-v3/testutils"
)
@ -226,7 +227,7 @@ func runTestServer(t *testing.T, v2Server *testV2Server, postgresConnectionStrin
if postgresConnectionString == "" {
postgresConnectionString = testutils.PrepareDBConnectionString(postgresTestDatabaseName)
}
h, err := sync3.NewSync3Handler(&sync2.HTTPClient{
h, err := handler.NewSync3Handler(&sync2.HTTPClient{
Client: &http.Client{
Timeout: 5 * time.Minute,
},