More bugfixes and tests

Specifically, /sync requests with the "latest" sync token should not
block for paginated requests.
This commit is contained in:
Kegan Dougal 2021-08-10 14:54:29 +01:00
parent 91d3126a66
commit 9266a3a834
7 changed files with 61 additions and 27 deletions

View File

@ -40,7 +40,7 @@ func (r *Request) ApplyDeltas(req2 *Request) (bool, error) {
}
// Sentinel value indicating the first page of results.
const FirstPage = "1"
const FirstPage = "0"
// P is the pagination struct for streams
type P struct {

View File

@ -91,6 +91,10 @@ func (s *RoomMember) SetPosition(tok *sync3.Token, pos int64) {
tok.SetEventPosition(pos)
}
func (s *RoomMember) IsPaginationRequest(req *Request) bool {
return req.RoomMember != nil && req.RoomMember.P != nil && req.RoomMember.P.Next != ""
}
func (s *RoomMember) SessionConfirmed(session *sync3.Session, confirmedPos int64, allSessions bool) {
}

View File

@ -18,6 +18,9 @@ type Streamer interface {
// Called when a session hits /sync with a stream position. `allSessions` is true if all sessions
// are at least as far as this position (inclusive), allowing cleanup of earlier messages.
SessionConfirmed(session *sync3.Session, confirmedPos int64, allSessions bool)
// Return true if this request contains pagination parameters for your stream. Use to detect
// whether to block or not (pagination never blocks)
IsPaginationRequest(req *Request) bool
}
// ErrNotRequested should be returned in DataInRange if the request does not ask for this stream.

View File

@ -38,6 +38,10 @@ func (s *ToDevice) SetPosition(tok *sync3.Token, pos int64) {
tok.SetToDevicePosition(pos)
}
func (s *ToDevice) IsPaginationRequest(req *Request) bool {
return false // no pagination support
}
func (s *ToDevice) SessionConfirmed(session *sync3.Session, confirmedPos int64, allSessions bool) {
if !allSessions {
return

View File

@ -32,6 +32,10 @@ func (s *Typing) SetPosition(tok *sync3.Token, pos int64) {
tok.SetTypingPosition(pos)
}
func (s *Typing) IsPaginationRequest(req *Request) bool {
return false // no pagination support
}
func (s *Typing) SessionConfirmed(session *sync3.Session, confirmedPos int64, allSessions bool) {}
func (s *Typing) DataInRange(session *sync3.Session, fromExcl, toIncl int64, request *Request, resp *Response) (int64, error) {

16
v3.go
View File

@ -197,7 +197,7 @@ func (h *SyncV3Handler) serve(w http.ResponseWriter, req *http.Request) *handler
upcoming.FilterID = filterID
}
if !shouldReturnImmediately(fromToken, &upcoming, timeoutMs) {
if !shouldReturnImmediately(syncReq, h.streams, fromToken, &upcoming, timeoutMs) {
// from == upcoming so we need to block for up to timeoutMs for a new event to come in
log.Info().Int64("timeout_ms", timeoutMs).Msg("blocking")
newUpcoming := h.waitForEvents(req.Context(), session, *fromToken, time.Duration(timeoutMs)*time.Millisecond)
@ -521,9 +521,19 @@ func deviceIDFromRequest(req *http.Request) (string, error) {
return hex.EncodeToString(hash.Sum(nil)), nil
}
func shouldReturnImmediately(fromToken, upcoming *sync3.Token, timeoutMs int64) bool {
func shouldReturnImmediately(req *streams.Request, streams []streams.Streamer, fromToken, upcoming *sync3.Token, timeoutMs int64) bool {
if timeoutMs == 0 {
return true
}
return upcoming.IsAfter(*fromToken)
newerTokenExists := upcoming.IsAfter(*fromToken)
if newerTokenExists {
return true
}
// check if there is a pagination request here
for _, s := range streams {
if s.IsPaginationRequest(req) {
return true
}
}
return false
}

View File

@ -425,11 +425,13 @@ type memlog struct { // @alice:localhost => "join"
Membership string
}
const EmptySince = -1
type memrequest struct {
Limit int
Sort string
WantUserIDs []string
UsePrevSince bool
SinceRequestIndex int // the request index to use the next batch from as a since value
UsePrevP bool
InjectMembersBeforeRequest []memlog
}
@ -463,7 +465,7 @@ func TestHandlerRoomMember(t *testing.T) {
Requests []memrequest
}{
{
Name: "Can get all room members in a DM room and an empty since token",
Name: "Can get all room members in a DM room and an empty since token and a known since token",
Creator: alice,
TimelineMemberLog: []memlog{
{
@ -475,7 +477,8 @@ func TestHandlerRoomMember(t *testing.T) {
Requests: []memrequest{
{
// default limit should be >2 and sort order should be by PL then name
WantUserIDs: []string{alice, bob},
WantUserIDs: []string{alice, bob},
SinceRequestIndex: EmptySince,
},
},
},
@ -508,15 +511,18 @@ func TestHandlerRoomMember(t *testing.T) {
},
Requests: []memrequest{
{
Limit: 4,
Sort: "by_name",
WantUserIDs: []string{alice, bob, charlie, doris},
Limit: 4,
Sort: "by_name",
WantUserIDs: []string{alice, bob, charlie, doris},
SinceRequestIndex: EmptySince,
},
{
Limit: 4,
Sort: "by_name",
UsePrevP: true,
WantUserIDs: []string{eve},
// pin the next page based on the state at since
SinceRequestIndex: 0,
},
},
},
@ -534,10 +540,11 @@ func TestHandlerRoomMember(t *testing.T) {
{
// default limit should be >2 and sort order should be by PL then name,
// so even though we injected bob then alice it should return alice then bob
WantUserIDs: []string{alice, bob},
WantUserIDs: []string{alice, bob},
SinceRequestIndex: EmptySince,
},
{
UsePrevSince: true,
SinceRequestIndex: 0,
InjectMembersBeforeRequest: []memlog{
{
Sender: charlie,
@ -548,7 +555,7 @@ func TestHandlerRoomMember(t *testing.T) {
WantUserIDs: []string{charlie},
},
{
UsePrevSince: true,
SinceRequestIndex: 1,
InjectMembersBeforeRequest: []memlog{
{
Sender: eve,
@ -588,8 +595,9 @@ func TestHandlerRoomMember(t *testing.T) {
Requests: []memrequest{
{
// @bob:localhost, YCharlie, ZAlice
Sort: "by_name",
WantUserIDs: []string{bob, charlie, alice},
Sort: "by_name",
SinceRequestIndex: EmptySince,
WantUserIDs: []string{bob, charlie, alice},
},
},
},
@ -622,9 +630,10 @@ func TestHandlerRoomMember(t *testing.T) {
},
Requests: []memrequest{
{
Limit: 4,
Sort: "by_name",
WantUserIDs: []string{alice, bob, charlie, doris},
SinceRequestIndex: EmptySince,
Limit: 4,
Sort: "by_name",
WantUserIDs: []string{alice, bob, charlie, doris},
},
{
// injecting this member should not cause it to be returned in pagination mode.
@ -635,11 +644,11 @@ func TestHandlerRoomMember(t *testing.T) {
Membership: "join",
},
},
Limit: 4,
Sort: "by_name",
UsePrevP: true,
UsePrevSince: true, // we need to stay on the same state as the previous request
WantUserIDs: []string{eve},
Limit: 4,
Sort: "by_name",
UsePrevP: true,
SinceRequestIndex: 0, // we need to stay on the same state as the previous request
WantUserIDs: []string{eve},
},
},
},
@ -696,7 +705,7 @@ func TestHandlerRoomMember(t *testing.T) {
aliceV2Stream(&v2Resp)()
// run the requests
prevSince := ""
var nextBatches []string
prevP := ""
for _, req := range tc.Requests {
if req.InjectMembersBeforeRequest != nil {
@ -728,8 +737,8 @@ func TestHandlerRoomMember(t *testing.T) {
filter["sort"] = req.Sort
}
since := ""
if req.UsePrevSince {
since = prevSince
if req.SinceRequestIndex != EmptySince {
since = nextBatches[req.SinceRequestIndex]
}
if req.UsePrevP {
filter["p"] = map[string]interface{}{
@ -740,7 +749,7 @@ func TestHandlerRoomMember(t *testing.T) {
v3resp := mustDoSync3Request(t, server, aliceBearer, since, map[string]interface{}{
"room_member": filter,
})
prevSince = v3resp.Next
nextBatches = append(nextBatches, v3resp.Next)
if v3resp.RoomMember == nil {
t.Fatalf("response did not include room_member: test case %v", tc.Name)
}