mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Conn: handle positions, retries and blocking operations
This abstracts the long-pollness of the HTTP connection. Note that we cannot just maintain a server-side buffer of events to feed down the connection because the client can drastically alter _which_ events should be fed to the client. There still needs to be a request/response cycle, except we can factor out retry handling (duplicate request detection) and incrementing of the positions.
This commit is contained in:
parent
d1a14b38d9
commit
62f1eb0ee6
@ -8,7 +8,7 @@ import (
|
||||
|
||||
syncv3 "github.com/matrix-org/sync-v3"
|
||||
"github.com/matrix-org/sync-v3/sync2"
|
||||
"github.com/matrix-org/sync-v3/sync3/handler"
|
||||
"github.com/matrix-org/sync-v3/synclive"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -23,7 +23,7 @@ func main() {
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
syncv3.RunSyncV3Server(handler.NewSyncV3Handler(&sync2.HTTPClient{
|
||||
syncv3.RunSyncV3Server(synclive.NewSyncLiveHandler(&sync2.HTTPClient{
|
||||
Client: &http.Client{
|
||||
Timeout: 5 * time.Minute,
|
||||
},
|
||||
|
1
go.mod
1
go.mod
@ -3,6 +3,7 @@ module github.com/matrix-org/sync-v3
|
||||
go 1.14
|
||||
|
||||
require (
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/jmoiron/sqlx v1.3.3
|
||||
github.com/lib/pq v1.10.1
|
||||
|
17
go.sum
17
go.sum
@ -1,3 +1,6 @@
|
||||
github.com/ReneKroon/ttlcache v1.7.0 h1:8BkjFfrzVFXyrqnMtezAaJ6AHPSsVV10m6w28N/Fgkk=
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1 h1:0Exdyt5+vEsdRoFO1T7qDIYM3gq/ETbeYV+vjgcPxZk=
|
||||
github.com/ReneKroon/ttlcache/v2 v2.8.1/go.mod h1:mBxvsNY+BT8qLLd6CuAJubbKo6r0jh3nb5et22bbfGY=
|
||||
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
@ -53,6 +56,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/tidwall/gjson v1.6.0 h1:9VEQWz6LLMUsUl6PueE49ir4Ka6CzLymOAZDxpFsTDc=
|
||||
github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
|
||||
github.com/tidwall/gjson v1.8.1 h1:8j5EE9Hrh3l9Od1OIEDAb7IpezNA20UdRngNAj5N0WU=
|
||||
@ -69,13 +73,18 @@ github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
|
||||
github.com/tidwall/sjson v1.0.3 h1:DeF+0LZqvIt4fKYw41aPB29ZGlvwVkHKktoXJ1YW9Y8=
|
||||
github.com/tidwall/sjson v1.0.3/go.mod h1:bURseu1nuBkFpIES5cz6zBtjmYeOQmEESshn7VpF15Y=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
golang.org/x/crypto v0.0.0-20180723164146-c126467f60eb/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@ -85,6 +94,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@ -95,13 +106,18 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/h2non/gock.v1 v1.0.14 h1:fTeu9fcUvSnLNacYvYI54h+1/XEteDyHvrVCZEEEYNM=
|
||||
@ -110,3 +126,4 @@ gopkg.in/macaroon.v2 v2.1.0/go.mod h1:OUb+TQP/OP0WOerC2Jp/3CwhIKyIa9kQjuc7H24e6/
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@ -1,33 +1,87 @@
|
||||
package observables
|
||||
package synclive
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/matrix-org/sync-v3/internal"
|
||||
)
|
||||
|
||||
type ConnID struct {
|
||||
SessionID string
|
||||
DeviceID string
|
||||
}
|
||||
|
||||
func (c *ConnID) String() string {
|
||||
return c.SessionID + "-" + c.DeviceID
|
||||
}
|
||||
|
||||
// Conn is an abstraction of a long-poll connection. It automatically handles the position values
|
||||
// of the /sync request, including sending cached data in the event of retries. It does not handle
|
||||
// the contents of the data at all.
|
||||
type Conn struct {
|
||||
ConnID ConnID
|
||||
// The position in the stream last sent to the client
|
||||
ServerPosition int64
|
||||
// The position in the stream last confirmed by the client
|
||||
ClientPosition int64
|
||||
// Callback which is allowed to block as long as the context is active. Return the response
|
||||
// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
|
||||
// status code to send back.
|
||||
HandleIncomingRequest func(ctx context.Context, connID ConnID, reqBody []byte) ([]byte, error)
|
||||
|
||||
// The position/data in the stream last sent by the client
|
||||
lastClientRequest dataFrame
|
||||
|
||||
// A buffer of the last response sent to the client.
|
||||
// Can be resent as-is if the server response was lost
|
||||
lastResponse dataFrame
|
||||
|
||||
// ensure only 1 incoming request is handled per connection
|
||||
mu *sync.Mutex
|
||||
}
|
||||
|
||||
type ConnMap struct {
|
||||
mu *sync.Mutex
|
||||
cmap map[ConnID]*Conn
|
||||
type dataFrame struct {
|
||||
pos int64 // The first position sent back is 1, so 0 means there was a problem.
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (m *ConnMap) Conn(cid ConnID) *Conn {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.cmap[cid]
|
||||
func NewConn(connID ConnID) *Conn {
|
||||
return &Conn{
|
||||
ConnID: connID,
|
||||
mu: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ConnMap) SetConn(conn *Conn) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.cmap[conn.ConnID] = conn
|
||||
// OnIncomingRequest advances the clients position in the stream, returning the response position and data.
|
||||
func (c *Conn) OnIncomingRequest(ctx context.Context, pos int64, data []byte) (nextPos int64, nextData []byte, herr *internal.HandlerError) {
|
||||
c.mu.Lock()
|
||||
// it's intentional for the lock to be held whilst inside HandleIncomingRequest
|
||||
// as it guarantees linearisation of data within a single connection
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if pos != 0 && c.lastClientRequest.pos == pos {
|
||||
// if the request bodies match up then this is a retry, else it could be the client modifying
|
||||
// their filter params, so fallthrough
|
||||
if bytes.Equal(data, c.lastClientRequest.data) {
|
||||
// this is the 2nd+ time we've seen this request, meaning the client likely retried this
|
||||
// request. Send the response we sent before.
|
||||
return c.lastResponse.pos, c.lastResponse.data, nil
|
||||
}
|
||||
}
|
||||
c.lastClientRequest.pos = pos
|
||||
// notify handler as it may need to recalcualte or invalidate stuff
|
||||
responseBytes, err := c.HandleIncomingRequest(ctx, c.ConnID, data)
|
||||
if err != nil {
|
||||
herr, ok := err.(*internal.HandlerError)
|
||||
if !ok {
|
||||
herr = &internal.HandlerError{
|
||||
StatusCode: 500,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
responseBytes = herr.JSON()
|
||||
}
|
||||
c.lastResponse.pos += 1
|
||||
c.lastResponse.data = responseBytes
|
||||
|
||||
return c.lastResponse.pos, c.lastResponse.data, nil
|
||||
|
||||
}
|
||||
|
162
synclive/conn_test.go
Normal file
162
synclive/conn_test.go
Normal file
@ -0,0 +1,162 @@
|
||||
package synclive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/sync-v3/internal"
|
||||
)
|
||||
|
||||
// Test that Conn can send and receive arbitrary bytes based on positions
|
||||
// The handler simply prefixes a string based on the request body and then increments with
|
||||
// a basic counter. This ensures that we can modify the handler based on client data (which is
|
||||
// required for ranges to work correctly). Note that 'positions' are entirely hidden from the handler.
|
||||
func TestConn(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
connID := ConnID{
|
||||
DeviceID: "d",
|
||||
SessionID: "s",
|
||||
}
|
||||
c := NewConn(connID)
|
||||
lastPrefix := "a"
|
||||
pos := 100
|
||||
c.HandleIncomingRequest = func(_ context.Context, _ ConnID, reqBody []byte) ([]byte, error) {
|
||||
if reqBody != nil {
|
||||
lastPrefix = string(reqBody)
|
||||
}
|
||||
pos += 1
|
||||
return []byte(fmt.Sprintf("%s-%d", lastPrefix, pos)), nil
|
||||
}
|
||||
|
||||
// initial request
|
||||
nextPos, nextData, err := c.OnIncomingRequest(ctx, 0, []byte(`some_prefix`))
|
||||
assertPos(t, nextPos, 1)
|
||||
assertData(t, string(nextData), "some_prefix-101")
|
||||
assertNoError(t, err)
|
||||
// happy case, pos=1 and no prefix, so should use the last one
|
||||
nextPos, nextData, err = c.OnIncomingRequest(ctx, 1, nil)
|
||||
assertPos(t, nextPos, 2)
|
||||
assertData(t, string(nextData), "some_prefix-102")
|
||||
assertNoError(t, err)
|
||||
// updates work too
|
||||
nextPos, nextData, err = c.OnIncomingRequest(ctx, 2, []byte(`more`))
|
||||
assertPos(t, nextPos, 3)
|
||||
assertData(t, string(nextData), "more-103")
|
||||
assertNoError(t, err)
|
||||
}
|
||||
|
||||
// Test that Conn is blocking and linearises requests to OnIncomingRequest
|
||||
// It does this by triggering 2 OnIncomingRequest calls one after the other with a 1ms delay
|
||||
// The first request will "process" for 10ms whereas the 2nd request will process immediately.
|
||||
// If we don't block, we'll get "hi2" then "hi". If we block, we should see "hi" then "hi2".
|
||||
func TestConnBlocking(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
connID := ConnID{
|
||||
DeviceID: "d",
|
||||
SessionID: "s",
|
||||
}
|
||||
c := NewConn(connID)
|
||||
ch := make(chan string)
|
||||
c.HandleIncomingRequest = func(_ context.Context, _ ConnID, reqBody []byte) ([]byte, error) {
|
||||
if string(reqBody) == "hi" {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
ch <- string(reqBody)
|
||||
return reqBody, nil // echo bot
|
||||
}
|
||||
// two connection call the incoming request function at the same time, they should get queued up
|
||||
// and processed in series.
|
||||
// this should block until we read from ch
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c.OnIncomingRequest(ctx, 0, []byte(`hi`))
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(1 * time.Millisecond) // this req happens 2nd
|
||||
c.OnIncomingRequest(ctx, 0, []byte(`hi2`))
|
||||
}()
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
}()
|
||||
want := []string{"hi", "hi2"}
|
||||
for resp := range ch {
|
||||
if resp != want[0] {
|
||||
t.Fatalf("got %v want %v", resp, want[0])
|
||||
}
|
||||
want = want[1:]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestConnRetries(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
connID := ConnID{
|
||||
DeviceID: "d",
|
||||
SessionID: "s",
|
||||
}
|
||||
c := NewConn(connID)
|
||||
callCount := 0
|
||||
c.HandleIncomingRequest = func(_ context.Context, _ ConnID, _ []byte) ([]byte, error) {
|
||||
callCount += 1
|
||||
return []byte(`yep`), nil
|
||||
}
|
||||
nextPos, nextData, err := c.OnIncomingRequest(ctx, 0, nil)
|
||||
assertPos(t, nextPos, 1)
|
||||
assertData(t, string(nextData), "yep")
|
||||
assertNoError(t, err)
|
||||
if callCount != 1 {
|
||||
t.Fatalf("wanted HandleIncomingRequest called 1 times, got %d", callCount)
|
||||
}
|
||||
nextPos, nextData, err = c.OnIncomingRequest(ctx, 1, nil)
|
||||
assertPos(t, nextPos, 2)
|
||||
assertData(t, string(nextData), "yep")
|
||||
assertNoError(t, err)
|
||||
if callCount != 2 {
|
||||
t.Fatalf("wanted HandleIncomingRequest called 2 times, got %d", callCount)
|
||||
}
|
||||
// retry! Shouldn't invoke handler again
|
||||
nextPos, nextData, err = c.OnIncomingRequest(ctx, 1, nil)
|
||||
assertPos(t, nextPos, 2)
|
||||
assertData(t, string(nextData), "yep")
|
||||
assertNoError(t, err)
|
||||
if callCount != 2 {
|
||||
t.Fatalf("wanted HandleIncomingRequest called 2 times, got %d", callCount)
|
||||
}
|
||||
// retry! but with modified request body, so should invoke handler again
|
||||
nextPos, nextData, err = c.OnIncomingRequest(ctx, 1, []byte(`data`))
|
||||
assertPos(t, nextPos, 3)
|
||||
assertData(t, string(nextData), "yep")
|
||||
assertNoError(t, err)
|
||||
if callCount != 3 {
|
||||
t.Fatalf("wanted HandleIncomingRequest called 3 times, got %d", callCount)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func assertPos(t *testing.T, nextPos, wantPos int64) {
|
||||
t.Helper()
|
||||
if nextPos != wantPos {
|
||||
t.Fatalf("got pos %d want pos %d", nextPos, wantPos)
|
||||
}
|
||||
}
|
||||
|
||||
func assertData(t *testing.T, nextData, wantData string) {
|
||||
t.Helper()
|
||||
if nextData != wantData {
|
||||
t.Fatalf("got data %v want data %v", nextData, wantData)
|
||||
}
|
||||
}
|
||||
|
||||
func assertNoError(t *testing.T, err *internal.HandlerError) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatalf("got error: %v", err)
|
||||
}
|
||||
}
|
@ -1,8 +1,9 @@
|
||||
package observables
|
||||
package synclive
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
@ -18,7 +19,7 @@ type SyncLiveHandler struct {
|
||||
Storage *state.Storage
|
||||
V2Store *sync2.Storage
|
||||
PollerMap *sync2.PollerMap
|
||||
ConnMap *ConnMap
|
||||
Notifier *Notifier
|
||||
}
|
||||
|
||||
func NewSyncLiveHandler(v2Client sync2.Client, postgresDBURI string) *SyncLiveHandler {
|
||||
@ -76,7 +77,7 @@ func (h *SyncLiveHandler) getOrCreateConnection(req *http.Request) (*Conn, error
|
||||
if sessionID != "" {
|
||||
// Lookup the connection
|
||||
// we need to map based on both as the session ID isn't crypto secure but the device ID is (Auth header)
|
||||
conn = h.ConnMap.Conn(ConnID{
|
||||
conn = h.Notifier.Conn(ConnID{
|
||||
SessionID: sessionID,
|
||||
DeviceID: deviceID,
|
||||
})
|
||||
@ -89,13 +90,26 @@ func (h *SyncLiveHandler) getOrCreateConnection(req *http.Request) (*Conn, error
|
||||
}
|
||||
if conn != nil {
|
||||
// conn exists
|
||||
conn.ClientPosition, err = strconv.ParseInt(req.URL.Query().Get("pos"), 10, 64)
|
||||
|
||||
// TODO: Wrap in OnIncomingRequest(http.Request)?
|
||||
cpos, err := strconv.ParseInt(req.URL.Query().Get("pos"), 10, 64)
|
||||
if err != nil {
|
||||
return nil, &internal.HandlerError{
|
||||
StatusCode: 400,
|
||||
Err: fmt.Errorf("invalid position: %s", req.URL.Query().Get("pos")),
|
||||
}
|
||||
}
|
||||
var body []byte
|
||||
if req.Body != nil {
|
||||
body, err = ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, &internal.HandlerError{
|
||||
StatusCode: 400,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
conn.OnIncomingRequest(req.Context(), cpos, body)
|
||||
return conn, nil
|
||||
}
|
||||
// conn doesn't exist, we probably nuked it.
|
||||
@ -143,10 +157,8 @@ func (h *SyncLiveHandler) getOrCreateConnection(req *http.Request) (*Conn, error
|
||||
|
||||
func (h *SyncLiveHandler) createConn(connID ConnID) (*Conn, error) {
|
||||
// TODO register the connection with the notifier
|
||||
conn := &Conn{
|
||||
ConnID: connID,
|
||||
}
|
||||
h.ConnMap.SetConn(conn)
|
||||
conn := NewConn(connID)
|
||||
h.Notifier.SetConn(conn)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
package observables
|
||||
package synclive
|
||||
|
||||
import "github.com/matrix-org/sync-v3/state"
|
||||
|
||||
|
62
synclive/notifier.go
Normal file
62
synclive/notifier.go
Normal file
@ -0,0 +1,62 @@
|
||||
package synclive
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ReneKroon/ttlcache/v2"
|
||||
)
|
||||
|
||||
type Notifier struct {
|
||||
cache *ttlcache.Cache
|
||||
|
||||
// map of user_id to active connections. Inspect the ConnID to find the device ID.
|
||||
userIDToConn map[string][]*Conn
|
||||
// map of room_id to joined user IDs.
|
||||
joinedUsers map[string][]string
|
||||
mu *sync.Mutex
|
||||
}
|
||||
|
||||
func NewNotifier() *Notifier {
|
||||
cm := &Notifier{
|
||||
userIDToConn: make(map[string][]*Conn),
|
||||
joinedUsers: make(map[string][]string),
|
||||
mu: &sync.Mutex{},
|
||||
}
|
||||
cm.cache = ttlcache.NewCache()
|
||||
cm.cache.SetTTL(30 * time.Minute) // TODO: customisable
|
||||
return cm
|
||||
}
|
||||
|
||||
func (m *Notifier) Conn(cid ConnID) *Conn {
|
||||
cint, _ := m.cache.Get(cid.String())
|
||||
if cint == nil {
|
||||
return nil
|
||||
}
|
||||
return cint.(*Conn)
|
||||
}
|
||||
|
||||
func (m *Notifier) SetConn(conn *Conn) {
|
||||
m.cache.Set(conn.ConnID.String(), conn)
|
||||
}
|
||||
|
||||
func (m *Notifier) connsForRoom(roomID string) []*Conn {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Notifier) OnNewEvent(
|
||||
roomID, sender, eventType, stateKey, membership string, userIDs []string,
|
||||
) {
|
||||
notifyConns := m.connsForRoom(roomID)
|
||||
for _, uid := range userIDs {
|
||||
conns := m.userIDToConn[uid]
|
||||
if len(conns) > 0 {
|
||||
notifyConns = append(notifyConns, conns...)
|
||||
}
|
||||
}
|
||||
// de-duplicate the conns
|
||||
|
||||
// check if conn isn't blocking this event in their filter / ranges
|
||||
|
||||
// send message
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package observables
|
||||
package synclive
|
||||
|
||||
var (
|
||||
SortByName = "by_name"
|
||||
|
@ -1,4 +1,4 @@
|
||||
package observables
|
||||
package synclive
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
|
@ -1,4 +1,4 @@
|
||||
package observables
|
||||
package synclive
|
||||
|
||||
type Response struct {
|
||||
Ops []ResponseOp `json:"ops"`
|
||||
|
@ -1,4 +1,4 @@
|
||||
package observables
|
||||
package synclive
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user