mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Make transaction id delay time configurable
This commit is contained in:
parent
f74794bcb4
commit
5904e5a3c7
@ -173,9 +173,10 @@ func main() {
|
||||
panic("invalid value for " + EnvMaxConns + ": " + args[EnvMaxConns])
|
||||
}
|
||||
h2, h3 := syncv3.Setup(args[EnvServer], args[EnvDB], args[EnvSecret], syncv3.Opts{
|
||||
AddPrometheusMetrics: args[EnvPrometheus] != "",
|
||||
DBMaxConns: maxConnsInt,
|
||||
DBConnMaxIdleTime: time.Hour,
|
||||
AddPrometheusMetrics: args[EnvPrometheus] != "",
|
||||
DBMaxConns: maxConnsInt,
|
||||
DBConnMaxIdleTime: time.Hour,
|
||||
MaxTransactionIDDelay: time.Second,
|
||||
})
|
||||
|
||||
go h2.StartV2Pollers()
|
||||
|
@ -60,7 +60,7 @@ type ConnState struct {
|
||||
func NewConnState(
|
||||
userID, deviceID string, userCache *caches.UserCache, globalCache *caches.GlobalCache,
|
||||
ex extensions.HandlerInterface, joinChecker JoinChecker, setupHistVec *prometheus.HistogramVec, histVec *prometheus.HistogramVec,
|
||||
maxPendingEventUpdates int,
|
||||
maxPendingEventUpdates int, maxTransactionIDDelay time.Duration,
|
||||
) *ConnState {
|
||||
cs := &ConnState{
|
||||
globalCache: globalCache,
|
||||
@ -81,7 +81,7 @@ func NewConnState(
|
||||
ConnState: cs,
|
||||
updates: make(chan caches.Update, maxPendingEventUpdates),
|
||||
}
|
||||
cs.txnIDWaiter = NewTxnIDWaiter(userID, cs.live.onUpdate, cs.subscribedOrVisible)
|
||||
cs.txnIDWaiter = NewTxnIDWaiter(userID, maxTransactionIDDelay, cs.live.onUpdate, cs.subscribedOrVisible)
|
||||
// subscribe for updates before loading. We risk seeing dupes but that's fine as load positions
|
||||
// will stop us double-processing.
|
||||
cs.userCacheID = cs.userCache.Subsribe(cs)
|
||||
|
@ -58,6 +58,7 @@ type SyncLiveHandler struct {
|
||||
|
||||
GlobalCache *caches.GlobalCache
|
||||
maxPendingEventUpdates int
|
||||
maxTransactionIDDelay time.Duration
|
||||
|
||||
setupHistVec *prometheus.HistogramVec
|
||||
histVec *prometheus.HistogramVec
|
||||
@ -67,6 +68,7 @@ type SyncLiveHandler struct {
|
||||
func NewSync3Handler(
|
||||
store *state.Storage, storev2 *sync2.Storage, v2Client sync2.Client, secret string,
|
||||
pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int,
|
||||
maxTransactionIDDelay time.Duration,
|
||||
) (*SyncLiveHandler, error) {
|
||||
logger.Info().Msg("creating handler")
|
||||
sh := &SyncLiveHandler{
|
||||
@ -78,6 +80,7 @@ func NewSync3Handler(
|
||||
Dispatcher: sync3.NewDispatcher(),
|
||||
GlobalCache: caches.NewGlobalCache(store),
|
||||
maxPendingEventUpdates: maxPendingEventUpdates,
|
||||
maxTransactionIDDelay: maxTransactionIDDelay,
|
||||
}
|
||||
sh.Extensions = &extensions.Handler{
|
||||
Store: store,
|
||||
@ -411,7 +414,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
|
||||
// to check for an existing connection though, as it's possible for the client to call /sync
|
||||
// twice for a new connection.
|
||||
conn, created := h.ConnMap.CreateConn(connID, func() sync3.ConnHandler {
|
||||
return NewConnState(token.UserID, token.DeviceID, userCache, h.GlobalCache, h.Extensions, h.Dispatcher, h.setupHistVec, h.histVec, h.maxPendingEventUpdates)
|
||||
return NewConnState(token.UserID, token.DeviceID, userCache, h.GlobalCache, h.Extensions, h.Dispatcher, h.setupHistVec, h.histVec, h.maxPendingEventUpdates, h.maxTransactionIDDelay)
|
||||
})
|
||||
if created {
|
||||
log.Info().Msg("created new connection")
|
||||
|
@ -5,22 +5,22 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var maxDelay = 1 * time.Second
|
||||
|
||||
type TxnIDWaiter struct {
|
||||
userID string
|
||||
publish func(update caches.Update)
|
||||
subscribedOrVisible func(roomID string) bool
|
||||
// TODO: probably need a mutex around t.queues so the expiry won't race with enqueuing
|
||||
queues map[string][]*caches.RoomEventUpdate
|
||||
queues map[string][]*caches.RoomEventUpdate
|
||||
maxDelay time.Duration
|
||||
}
|
||||
|
||||
func NewTxnIDWaiter(userID string, publish func(caches.Update), subscribedOrVisible func(string) bool) *TxnIDWaiter {
|
||||
func NewTxnIDWaiter(userID string, maxDelay time.Duration, publish func(caches.Update), subscribedOrVisible func(string) bool) *TxnIDWaiter {
|
||||
return &TxnIDWaiter{
|
||||
userID: userID,
|
||||
publish: publish,
|
||||
subscribedOrVisible: subscribedOrVisible,
|
||||
queues: make(map[string][]*caches.RoomEventUpdate),
|
||||
maxDelay: maxDelay,
|
||||
// TODO: metric that tracks how long events were queued for.
|
||||
}
|
||||
}
|
||||
@ -58,7 +58,7 @@ func (t *TxnIDWaiter) Ingest(up caches.Update) {
|
||||
// TODO: bound the queue size?
|
||||
t.queues[roomID] = append(queue, eventUpdate)
|
||||
|
||||
time.AfterFunc(maxDelay, func() { t.publishUpToNID(roomID, eventUpdate.EventData.NID) })
|
||||
time.AfterFunc(t.maxDelay, func() { t.publishUpToNID(roomID, eventUpdate.EventData.NID) })
|
||||
}
|
||||
|
||||
func (t *TxnIDWaiter) publishUpToNID(roomID string, publishNID int64) {
|
||||
|
@ -370,6 +370,7 @@ func runTestServer(t testutils.TestBenchInterface, v2Server *testV2Server, postg
|
||||
TestingSynchronousPubsub: true, // critical to avoid flakey tests
|
||||
AddPrometheusMetrics: false,
|
||||
MaxPendingEventUpdates: 200,
|
||||
MaxTransactionIDDelay: 1 * time.Millisecond,
|
||||
}
|
||||
if len(opts) > 0 {
|
||||
opt := opts[0]
|
||||
@ -380,6 +381,9 @@ func runTestServer(t testutils.TestBenchInterface, v2Server *testV2Server, postg
|
||||
combinedOpts.MaxPendingEventUpdates = opt.MaxPendingEventUpdates
|
||||
handler.BufferWaitTime = 5 * time.Millisecond
|
||||
}
|
||||
if opt.MaxTransactionIDDelay > 0 {
|
||||
combinedOpts.MaxTransactionIDDelay = opt.MaxTransactionIDDelay
|
||||
}
|
||||
}
|
||||
h2, h3 := syncv3.Setup(v2Server.url(), postgresConnectionString, os.Getenv("SYNCV3_SECRET"), combinedOpts)
|
||||
// for ease of use we don't start v2 pollers at startup in tests
|
||||
|
5
v3.go
5
v3.go
@ -37,6 +37,9 @@ type Opts struct {
|
||||
// if true, publishing messages will block until the consumer has consumed it.
|
||||
// Assumes a single producer and a single consumer.
|
||||
TestingSynchronousPubsub bool
|
||||
// MaxTransactionIDDelay is the longest amount of time that we will wait for
|
||||
// confirmation of an event's transaction_id before sending it to its sender.
|
||||
MaxTransactionIDDelay time.Duration
|
||||
|
||||
DBMaxConns int
|
||||
DBConnMaxIdleTime time.Duration
|
||||
@ -115,7 +118,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han
|
||||
pMap.SetCallbacks(h2)
|
||||
|
||||
// create v3 handler
|
||||
h3, err := handler.NewSync3Handler(store, storev2, v2Client, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates)
|
||||
h3, err := handler.NewSync3Handler(store, storev2, v2Client, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates, opts.MaxTransactionIDDelay)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user