Remove ConnStateStore and use Global/UserCache exclusively

Add a `LoadJoinedRoomsOverride` to allow tests to override
and bypass DB checks. We need them in the cache in order to
synchronise loading connection state with live updates to
ensure we process events exactly once.
This commit is contained in:
Kegan Dougal 2021-10-11 18:09:29 +01:00
parent ed7433691c
commit 52da56c70d
6 changed files with 100 additions and 160 deletions

View File

@ -236,30 +236,30 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
RoomID: roomA,
Events: []json.RawMessage{
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
},
},
{
RoomID: roomB,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
},
},
{
RoomID: roomA,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
},
},
{
RoomID: roomC,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "leave"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
},
},
}
@ -326,30 +326,30 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
{
RoomID: roomE,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewStateEvent(t, "m.room.member", alice, bob, map[string]interface{}{"membership": "invite"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
},
},
{
RoomID: roomD,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "leave"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "leave"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
},
},
{
RoomID: roomE,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
},
},
}

View File

@ -76,7 +76,7 @@ func (m *ConnMap) GetOrCreateConn(cid ConnID, globalCache *GlobalCache, userID s
if conn != nil {
return conn, false
}
state := NewConnState(userID, userCache, globalCache, m)
state := NewConnState(userID, userCache, globalCache)
conn = NewConn(cid, state, state.HandleIncomingRequest)
m.cache.Set(cid.String(), conn)
m.connIDToConn[cid.String()] = conn
@ -102,15 +102,6 @@ func (m *ConnMap) LoadBaseline(roomIDToUserIDs map[string][]string) error {
return nil
}
func (m *ConnMap) Load(userID string) (joinedRoomIDs []string, initialLoadPosition int64, err error) {
initialLoadPosition, err = m.store.LatestEventNID()
if err != nil {
return
}
joinedRoomIDs, err = m.store.JoinedRoomsAfterPosition(userID, initialLoadPosition)
return
}
func (m *ConnMap) closeConn(connID string, value interface{}) {
m.mu.Lock()
defer m.mu.Unlock()

View File

@ -15,14 +15,9 @@ var (
MaxPendingEventUpdates = 200
)
type ConnStateStore interface {
Load(userID string) (joinedRoomIDs []string, initialLoadPosition int64, err error)
}
// ConnState tracks all high-level connection state for this connection, like the combined request
// and the underlying sorted room list. It doesn't track session IDs or positions of the connection.
type ConnState struct {
store ConnStateStore
muxedReq *Request
userID string
sortedJoinedRooms SortableRooms
@ -38,9 +33,8 @@ type ConnState struct {
userCache *UserCache
}
func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache, store ConnStateStore) *ConnState {
func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache) *ConnState {
return &ConnState{
store: store,
globalCache: globalCache,
userCache: userCache,
userID: userID,
@ -63,18 +57,15 @@ func NewConnState(userID string, userCache *UserCache, globalCache *GlobalCache,
// - post load() we read N events, processing them a 2nd time.
func (s *ConnState) load(req *Request) error {
s.userCache.Subsribe(s)
joinedRoomIDs, initialLoadPosition, err := s.store.Load(s.userID)
s.globalCache.Subsribe(s)
initialLoadPosition, joinedRooms, err := s.globalCache.LoadJoinedRooms(s.userID)
if err != nil {
return err
}
s.loadPosition = initialLoadPosition
s.sortedJoinedRooms = make([]SortableRoom, len(joinedRoomIDs))
for i, roomID := range joinedRoomIDs {
// load global room info
sr := s.globalCache.LoadRoom(roomID)
s.sortedJoinedRooms[i] = *sr
s.sortedJoinedRoomsPositions[sr.RoomID] = i
}
s.sortedJoinedRooms = joinedRooms
s.sort(req.Sort)
return nil
@ -91,6 +82,7 @@ func (s *ConnState) sort(sortBy []string) {
//logger.Info().Interface("pos", c.sortedJoinedRoomsPositions).Msg("sorted")
}
// HandleIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
func (s *ConnState) HandleIncomingRequest(ctx context.Context, cid ConnID, req *Request) (*Response, error) {
if s.loadPosition == 0 {
s.load(req)
@ -98,6 +90,10 @@ func (s *ConnState) HandleIncomingRequest(ctx context.Context, cid ConnID, req *
return s.onIncomingRequest(ctx, req)
}
func (s *ConnState) OnNewEvent(event *EventData) {
s.PushNewEvent(event)
}
// PushNewEvent is a callback which fires when the server gets a new event and determines this connection MAY be
// interested in it (e.g the client is joined to the room or it's an invite, etc). Each callback can fire
// from different v2 poll loops, and there is no locking in order to prevent a slow ConnState from wedging the poll loop.

View File

@ -8,6 +8,8 @@ import (
"reflect"
"testing"
"time"
"github.com/matrix-org/sync-v3/testutils"
)
func newSortableRoom(roomID string, lastMsgTimestamp int64) SortableRoom {
@ -21,34 +23,6 @@ func newSortableRoom(roomID string, lastMsgTimestamp int64) SortableRoom {
}
}
type connStateStoreMock struct {
roomIDToRoom map[string]SortableRoom
userIDToJoinedRooms map[string][]string
userIDToPosition map[string]int64
}
func (s *connStateStoreMock) Load(userID string) (joinedRoomIDs []string, initialLoadPosition int64, err error) {
joinedRoomIDs = s.userIDToJoinedRooms[userID]
initialLoadPosition = s.userIDToPosition[userID]
if initialLoadPosition == 0 {
initialLoadPosition = 1 // so we don't continually load the same rooms
}
return
}
func (s *connStateStoreMock) LoadState(roomID string, loadPosition int64, requiredState [][2]string) []json.RawMessage {
return nil
}
func (s *connStateStoreMock) PushNewEvent(cs *ConnState, ed *EventData) {
room := s.roomIDToRoom[ed.roomID]
room.LastEventJSON = ed.event
room.LastMessageTimestamp = ed.timestamp
if ed.eventType == "m.room.name" {
room.Name = ed.content.Get("name").Str
}
s.roomIDToRoom[ed.roomID] = room
cs.PushNewEvent(ed)
}
// Sync an account with 3 rooms and check that we can grab all rooms and they are sorted correctly initially. Checks
// that basic UPDATE and DELETE/INSERT works when tracking all rooms.
func TestConnStateInitial(t *testing.T) {
@ -58,6 +32,7 @@ func TestConnStateInitial(t *testing.T) {
}
userID := "@alice:localhost"
timestampNow := int64(1632131678061)
// initial sort order B, C, A
roomA := newSortableRoom("!a:localhost", timestampNow-8000)
roomB := newSortableRoom("!b:localhost", timestampNow)
roomC := newSortableRoom("!c:localhost", timestampNow-4000)
@ -65,19 +40,12 @@ func TestConnStateInitial(t *testing.T) {
globalCache.AssignRoom(roomA)
globalCache.AssignRoom(roomB)
globalCache.AssignRoom(roomC)
// initial sort order B, C, A
csm := &connStateStoreMock{
userIDToJoinedRooms: map[string][]string{
userID: {roomA.RoomID, roomB.RoomID, roomC.RoomID},
},
roomIDToRoom: map[string]SortableRoom{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
},
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []SortableRoom, err error) {
return 1, []SortableRoom{
roomA, roomB, roomC,
}, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache, csm)
cs := NewConnState(userID, NewUserCache(userID), globalCache)
if userID != cs.UserID() {
t.Fatalf("UserID returned wrong value, got %v want %v", cs.UserID(), userID)
}
@ -118,12 +86,9 @@ func TestConnStateInitial(t *testing.T) {
})
// bump A to the top
csm.PushNewEvent(cs, &EventData{
event: json.RawMessage(`{}`),
roomID: roomA.RoomID,
eventType: "unimportant",
timestamp: timestampNow + 1000,
})
globalCache.OnNewEvents(roomA.RoomID, []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+1000),
}, 1)
// request again for the diff
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
@ -153,12 +118,9 @@ func TestConnStateInitial(t *testing.T) {
})
// another message should just update
csm.PushNewEvent(cs, &EventData{
event: json.RawMessage(`{}`),
roomID: roomA.RoomID,
eventType: "still unimportant",
timestamp: timestampNow + 2000,
})
globalCache.OnNewEvents(roomA.RoomID, []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+2000),
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
@ -208,15 +170,10 @@ func TestConnStateMultipleRanges(t *testing.T) {
roomIDToRoom[roomID] = room
globalCache.AssignRoom(room)
}
// initial sort order B, C, A
csm := &connStateStoreMock{
userIDToJoinedRooms: map[string][]string{
userID: roomIDs,
},
roomIDToRoom: roomIDToRoom,
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []SortableRoom, err error) {
return 1, rooms, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache, csm)
cs := NewConnState(userID, NewUserCache(userID), globalCache)
// request first page
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
@ -285,12 +242,9 @@ func TestConnStateMultipleRanges(t *testing.T) {
// ` ` ` `
// 8,0,1,2,3,4,5,6,7,9
//
csm.PushNewEvent(cs, &EventData{
event: json.RawMessage(`{}`),
roomID: roomIDs[8],
eventType: "unimportant",
timestamp: timestampNow + 2000,
})
globalCache.OnNewEvents(roomIDs[8], []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+2000),
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
@ -325,12 +279,9 @@ func TestConnStateMultipleRanges(t *testing.T) {
// ` ` ` `
// 8,0,1,9,2,3,4,5,6,7 room
middleTimestamp := int64((roomIDToRoom[roomIDs[1]].LastMessageTimestamp + roomIDToRoom[roomIDs[2]].LastMessageTimestamp) / 2)
csm.PushNewEvent(cs, &EventData{
event: json.RawMessage(`{}`),
roomID: roomIDs[9],
eventType: "unimportant",
timestamp: middleTimestamp,
})
globalCache.OnNewEvents(roomIDs[9], []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, middleTimestamp),
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
Rooms: SliceRanges([][2]int64{
@ -375,20 +326,12 @@ func TestBumpToOutsideRange(t *testing.T) {
globalCache.AssignRoom(roomB)
globalCache.AssignRoom(roomC)
globalCache.AssignRoom(roomD)
// initial sort order A,B,C,D
csm := &connStateStoreMock{
userIDToJoinedRooms: map[string][]string{
userID: {roomA.RoomID, roomB.RoomID, roomC.RoomID, roomD.RoomID},
},
roomIDToRoom: map[string]SortableRoom{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
roomD.RoomID: roomD,
},
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []SortableRoom, err error) {
return 1, []SortableRoom{
roomA, roomB, roomC, roomD,
}, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache, csm)
cs := NewConnState(userID, NewUserCache(userID), globalCache)
// Ask for A,B
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
@ -400,7 +343,7 @@ func TestBumpToOutsideRange(t *testing.T) {
t.Fatalf("HandleIncomingRequest returned error : %s", err)
}
checkResponse(t, true, res, &Response{
Count: int64(len(csm.userIDToJoinedRooms[userID])),
Count: int64(4),
Ops: []ResponseOp{
&ResponseOpRange{
Operation: "SYNC",
@ -418,12 +361,9 @@ func TestBumpToOutsideRange(t *testing.T) {
})
// D gets bumped to C's position but it's still outside the range so nothing should happen
csm.PushNewEvent(cs, &EventData{
event: json.RawMessage(`{}`),
roomID: roomD.RoomID,
eventType: "unimportant",
timestamp: roomC.LastMessageTimestamp + 2,
})
globalCache.OnNewEvents(roomD.RoomID, []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, roomC.LastMessageTimestamp+2),
}, 1)
// expire the context after 10ms so we don't wait forevar
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
@ -460,20 +400,12 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
globalCache.AssignRoom(roomB)
globalCache.AssignRoom(roomC)
globalCache.AssignRoom(roomD)
// initial sort order A,B,C,D
csm := &connStateStoreMock{
userIDToJoinedRooms: map[string][]string{
userID: roomIDs,
},
roomIDToRoom: map[string]SortableRoom{
roomA.RoomID: roomA,
roomB.RoomID: roomB,
roomC.RoomID: roomC,
roomD.RoomID: roomD,
},
globalCache.LoadJoinedRoomsOverride = func(userID string) (pos int64, joinedRooms []SortableRoom, err error) {
return 1, []SortableRoom{
roomA, roomB, roomC, roomD,
}, nil
}
cs := NewConnState(userID, NewUserCache(userID), globalCache, csm)
cs := NewConnState(userID, NewUserCache(userID), globalCache)
// subscribe to room D
res, err := cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
@ -506,17 +438,17 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
Range: []int64{0, 1},
Rooms: []Room{
{
RoomID: roomIDs[0],
Name: csm.roomIDToRoom[roomIDs[0]].Name,
RoomID: roomA.RoomID,
Name: roomA.Name,
Timeline: []json.RawMessage{
csm.roomIDToRoom[roomIDs[0]].LastEventJSON,
roomA.LastEventJSON,
},
},
{
RoomID: roomIDs[1],
Name: csm.roomIDToRoom[roomIDs[1]].Name,
RoomID: roomB.RoomID,
Name: roomB.Name,
Timeline: []json.RawMessage{
csm.roomIDToRoom[roomIDs[1]].LastEventJSON,
roomB.LastEventJSON,
},
},
},
@ -524,12 +456,10 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
},
})
// room D gets a new event
csm.PushNewEvent(cs, &EventData{
event: json.RawMessage(`{}`),
roomID: roomD.RoomID,
eventType: "unimportant",
timestamp: timestampNow + 2000,
})
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+2000)
globalCache.OnNewEvents(roomD.RoomID, []json.RawMessage{
newEvent,
}, 1)
// we should get this message even though it's not in the range because we are subscribed to this room.
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
@ -546,7 +476,7 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
roomD.RoomID: {
RoomID: roomD.RoomID,
Timeline: []json.RawMessage{
json.RawMessage(`{}`),
newEvent,
},
},
},

View File

@ -15,6 +15,8 @@ type GlobalCacheListener interface {
}
type GlobalCache struct {
LoadJoinedRoomsOverride func(userID string) (pos int64, joinedRooms []SortableRoom, err error)
// inserts are done by v2 poll loops, selects are done by v3 request threads
// there are lots of overlapping keys as many users (threads) can be joined to the same room (key)
// hence you must lock this with `mu` before r/w
@ -71,6 +73,25 @@ func (c *GlobalCache) AssignRoom(r SortableRoom) {
c.globalRoomInfo[r.RoomID] = &r
}
func (c *GlobalCache) LoadJoinedRooms(userID string) (pos int64, joinedRooms []SortableRoom, err error) {
if c.LoadJoinedRoomsOverride != nil {
return c.LoadJoinedRoomsOverride(userID)
}
initialLoadPosition, err := c.store.LatestEventNID()
if err != nil {
return 0, nil, err
}
joinedRoomIDs, err := c.store.JoinedRoomsAfterPosition(userID, initialLoadPosition)
if err != nil {
return 0, nil, err
}
rooms := make([]SortableRoom, len(joinedRoomIDs))
for i, roomID := range joinedRoomIDs {
rooms[i] = *c.LoadRoom(roomID)
}
return initialLoadPosition, rooms, nil
}
func (c *GlobalCache) LoadRoomState(roomID string, loadPosition int64, requiredState [][2]string) []json.RawMessage {
if len(requiredState) == 0 {
return nil

View File

@ -43,18 +43,20 @@ func NewStateEvent(t *testing.T, evType, stateKey, sender string, content interf
return j
}
func NewEvent(t *testing.T, evType, sender string, content interface{}) json.RawMessage {
func NewEvent(t *testing.T, evType, sender string, content interface{}, originServerTs int64) json.RawMessage {
t.Helper()
e := struct {
Type string `json:"type"`
Sender string `json:"sender"`
Content interface{} `json:"content"`
EventID string `json:"event_id"`
TS int64 `json:"origin_server_ts"`
}{
Type: evType,
Sender: sender,
Content: content,
EventID: generateEventID(t),
TS: originServerTs,
}
j, err := json.Marshal(&e)
if err != nil {