Add stub room_member stream

This commit is contained in:
Kegan Dougal 2021-08-05 17:28:38 +01:00
parent cb3e974d01
commit 7d42f1413a
9 changed files with 84 additions and 57 deletions

View File

@ -3,6 +3,8 @@
Run an experimental sync v3 server using an existing Matrix account. This is possible because, for the most part,
v3 sync is a strict subset of v2 sync.
**UNDER ACTIVE DEVELOPMENT, BREAKING CHANGES ARE FREQUENT.**
## Usage
```bash

View File

@ -10,9 +10,10 @@ import (
// A request is made by the combination of the client HTTP request parameters and the stored filters
// on the server.
type Request struct {
RoomList *FilterRoomList `json:"room_list,omitempty"`
Typing *FilterTyping `json:"typing,omitempty"`
ToDevice *FilterToDevice `json:"to_device,omitempty"`
RoomList *FilterRoomList `json:"room_list,omitempty"`
Typing *FilterTyping `json:"typing,omitempty"`
ToDevice *FilterToDevice `json:"to_device,omitempty"`
RoomMember *FilterRoomMember `json:"room_member,omitempty"`
}
// ApplyDeltas updates Request with the values in req2. Returns true if there were deltas.
@ -37,3 +38,10 @@ func (r *Request) ApplyDeltas(req2 *Request) (bool, error) {
}
return !bytes.Equal(original, combined), nil
}
// P is the pagination struct for streams
type P struct {
Limit int64 `json:"limit,omitempty"`
Sort string `json:"sort,omitempty"`
Next string `json:"next,omitempty"`
}

View File

@ -30,35 +30,6 @@ type FilterRoomList struct {
IncludeRoomAvatarMXC *bool
}
// Combine two filters together. A new filter is returned.
func (f *FilterRoomList) Combine(new *FilterRoomList) *FilterRoomList {
combined := &FilterRoomList{}
// nil slice != 0 length slice
if new.SummaryEventTypes != nil {
combined.SummaryEventTypes = new.SummaryEventTypes
} else {
combined.SummaryEventTypes = f.SummaryEventTypes
}
if new.EntriesPerBatch != 0 {
combined.EntriesPerBatch = new.EntriesPerBatch
} else {
combined.EntriesPerBatch = f.EntriesPerBatch
}
if new.RoomNameSize != 0 {
combined.RoomNameSize = new.RoomNameSize
} else {
combined.RoomNameSize = f.RoomNameSize
}
// pointer to bool to indicate absence
if new.IncludeRoomAvatarMXC != nil {
combined.IncludeRoomAvatarMXC = new.IncludeRoomAvatarMXC
} else {
combined.IncludeRoomAvatarMXC = f.IncludeRoomAvatarMXC
}
return combined
}
type ControlMessageRoomList struct {
EntriesPerBatch *int `json:"entries_per_batch,omitempty"`
Upcoming string `json:"upcoming"`

View File

@ -0,0 +1,58 @@
package streams
import (
"encoding/json"
"github.com/matrix-org/sync-v3/state"
"github.com/matrix-org/sync-v3/sync3"
)
const (
defaultRoomMemberLimit = 50
maxRoomMemberLimit = 1000
)
type RoomMemberSortOrder string
var (
SortRoomMemberByPL RoomMemberSortOrder = "by_pl"
SortRoomMemberByName RoomMemberSortOrder = "by_pl"
)
type FilterRoomMember struct {
Limit int64 `json:"limit"`
SortBy RoomMemberSortOrder `json:"sort_by"`
P P `json:"p"`
}
type RoomMemberResponse struct {
Limit int64 `json:"limit"`
Events []json.RawMessage `json:"events"`
}
// RoomMember represents a stream of room members.
type RoomMember struct {
storage *state.Storage
}
func NewRoomMember(s *state.Storage) *RoomMember {
return &RoomMember{s}
}
func (s *RoomMember) Position(tok *sync3.Token) int64 {
return tok.RoomMemberPosition()
}
func (s *RoomMember) SetPosition(tok *sync3.Token, pos int64) {
tok.SetRoomMemberPosition(pos)
}
func (s *RoomMember) SessionConfirmed(session *sync3.Session, confirmedPos int64, allSessions bool) {
}
func (s *RoomMember) DataInRange(session *sync3.Session, fromExcl, toIncl int64, request *Request, resp *Response) (int64, error) {
if request.RoomMember == nil {
return 0, ErrNotRequested
}
return toIncl, nil
}

View File

@ -16,16 +16,6 @@ type FilterToDevice struct {
Limit int64 `json:"limit"`
}
func (f *FilterToDevice) Combine(other *FilterToDevice) *FilterToDevice {
combined := &FilterToDevice{
Limit: f.Limit,
}
if other.Limit != 0 {
combined.Limit = other.Limit
}
return combined
}
type ToDeviceResponse struct {
Limit int64 `json:"limit"`
Events []json.RawMessage `json:"events"`

View File

@ -11,16 +11,6 @@ type FilterTyping struct {
RoomID string `json:"room_id"`
}
func (f *FilterTyping) Combine(other *FilterTyping) *FilterTyping {
combined := &FilterTyping{
RoomID: f.RoomID,
}
if other.RoomID != "" {
combined.RoomID = other.RoomID
}
return combined
}
type TypingResponse struct {
UserIDs []string `json:"user_ids"`
}

View File

@ -11,10 +11,11 @@ import (
const (
IndexEventPosition = iota
IndexRoomMemberPosition
IndexTypingPosition
IndexToDevicePosition
)
const totalStreamPositions = 3
const totalStreamPositions = 4
// V3_S1_F9_57423_123_5183
// "V3_S" $SESSION "_F" $FILTER "_" $A "_" $B "_" $C
@ -36,6 +37,9 @@ func (t *Token) TypingPosition() int64 {
func (t *Token) ToDevicePosition() int64 {
return t.positions[IndexToDevicePosition]
}
func (t *Token) RoomMemberPosition() int64 {
return t.positions[IndexRoomMemberPosition]
}
func (t *Token) SetEventPosition(pos int64) {
t.positions[IndexEventPosition] = pos
}
@ -45,6 +49,9 @@ func (t *Token) SetTypingPosition(pos int64) {
func (t *Token) SetToDevicePosition(pos int64) {
t.positions[IndexToDevicePosition] = pos
}
func (t *Token) SetRoomMemberPosition(pos int64) {
t.positions[IndexRoomMemberPosition] = pos
}
func (t *Token) IsAfter(x Token) bool {
for i := range t.positions {

View File

@ -22,20 +22,20 @@ func TestNewSyncToken(t *testing.T) {
},
{
// with filter
in: "V3_S1_F6_12_19_11",
in: "V3_S1_F6_12_19_11_1",
outToken: &Token{
SessionID: 1,
FilterID: 6,
positions: [totalStreamPositions]int64{12, 19, 11},
positions: [totalStreamPositions]int64{12, 19, 11, 1},
},
},
{
// without filter
in: "V3_S1_33_100_1313",
in: "V3_S1_33_100_1313_1",
outToken: &Token{
SessionID: 1,
FilterID: 0,
positions: [totalStreamPositions]int64{33, 100, 1313},
positions: [totalStreamPositions]int64{33, 100, 1313, 1},
},
},
}

1
v3.go
View File

@ -113,6 +113,7 @@ func NewSyncV3Handler(v2Client sync2.Client, postgresDBURI string) *SyncV3Handle
}
sh.streams = append(sh.streams, streams.NewTyping(sh.Storage))
sh.streams = append(sh.streams, streams.NewToDevice(sh.Storage))
sh.streams = append(sh.streams, streams.NewRoomMember(sh.Storage))
latestToken := sync3.NewBlankSyncToken(0, 0)
nid, err := sh.Storage.LatestEventNID()