255 lines
9.4 KiB
Go
Raw Permalink Normal View History

2021-10-05 16:22:02 +01:00
package sync3
import (
"context"
2021-09-21 16:17:35 +01:00
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3/caches"
)
// The amount of time to artificially wait if the server detects spamming clients. This time will
// be added to responses when the server detects the same request being sent over and over e.g
// /sync?pos=5 then /sync?pos=5 over and over. Likewise /sync without a ?pos=.
2023-07-24 18:33:20 +01:00
var SpamProtectionInterval = 10 * time.Millisecond
type ConnID struct {
2023-04-28 13:43:45 +01:00
UserID string
DeviceID string
CID string // client-supplied conn_id
}
func (c *ConnID) String() string {
return fmt.Sprintf("%s|%s|%s", c.UserID, c.DeviceID, c.CID)
}
type ConnHandler interface {
// Callback which is allowed to block as long as the context is active. Return the response
// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
// status code to send back.
OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool, start time.Time) (*Response, error)
2023-01-18 11:41:20 +00:00
OnUpdate(ctx context.Context, update caches.Update)
2023-07-26 13:47:33 +01:00
PublishEventsUpTo(roomID string, nid int64)
Destroy()
Alive() bool
SetCancelCallback(cancel context.CancelFunc)
}
// Conn is an abstraction of a long-poll connection. It automatically handles the position values
// of the /sync request, including sending cached data in the event of retries. It does not handle
// the contents of the data at all.
type Conn struct {
2023-04-28 13:43:45 +01:00
ConnID
handler ConnHandler
// The position/data in the stream last sent by the client
lastClientRequest Request
// A buffer of the last responses sent to the client.
// Can be resent as-is if the server response was lost.
// This array has 3 categories: ACKed messages, the ACKed message, unACKed messages e.g
// [ACK, ACK, ACKing, NACK, NACK]
// - The ACKing message is always the response with the same pos as req.pos
// - Everything before it is old and can be deleted
// - Everything after that is new and unseen, and the first element is the one we want to return.
serverResponses []Response
lastPos int64
// ensure only 1 incoming request is handled per connection
mu *sync.Mutex
cancelOutstandingRequest func()
cancelOutstandingRequestMu *sync.Mutex
}
func NewConn(connID ConnID, h ConnHandler) *Conn {
return &Conn{
ConnID: connID,
handler: h,
mu: &sync.Mutex{},
cancelOutstandingRequestMu: &sync.Mutex{},
}
}
func (c *Conn) Alive() bool {
return c.handler.Alive()
}
2023-01-18 11:41:20 +00:00
func (c *Conn) OnUpdate(ctx context.Context, update caches.Update) {
c.handler.OnUpdate(ctx, update)
}
// tryRequest is a wrapper around ConnHandler.OnIncomingRequest which automatically
// starts and closes a tracing task.
//
// If the wrapped call panics, it is recovered from, reported to Sentry, and an error
// is passed to the caller. If the wrapped call returns an error, that error is passed
// upwards but will NOT be logged to Sentry (neither here nor by the caller). Errors
// should be reported to Sentry as close as possible to the point of creating the error,
// to provide the best possible Sentry traceback.
func (c *Conn) tryRequest(ctx context.Context, req *Request, start time.Time) (res *Response, err error) {
2023-04-05 15:04:17 +01:00
// TODO: include useful information from the request in the sentry hub/context
// Might be better done in the caller though?
defer func() {
panicErr := recover()
if panicErr != nil {
err = fmt.Errorf("panic: %s", panicErr)
logger.Error().Msg(string(debug.Stack()))
2023-04-05 17:54:25 +01:00
// Note: as we've captured the panicErr ourselves, there isn't much
// difference between RecoverWithContext and CaptureException. But
// there /is/ a small difference:
//
// - RecoverWithContext will generate generate an Sentry event marked with
// a "RecoveredException" hint
// - CaptureException instead uses an "OriginalException" hint.
//
// I'm guessing that Sentry will use the former to display panicErr as
// having come from a panic.
2023-04-05 17:14:47 +01:00
internal.GetSentryHubFromContextOrDefault(ctx).RecoverWithContext(ctx, panicErr)
}
}()
2023-02-01 12:13:42 +00:00
taskType := "OnIncomingRequest"
if req.pos == 0 {
taskType = "OnIncomingRequestInitial"
}
ctx, task := internal.StartTask(ctx, taskType)
2023-02-01 12:13:42 +00:00
defer task.End()
2023-04-28 13:43:45 +01:00
internal.Logf(ctx, "connstate", "starting user=%v device=%v pos=%v", c.UserID, c.ConnID.DeviceID, req.pos)
return c.handler.OnIncomingRequest(ctx, c.ConnID, req, req.pos == 0, start)
}
func (c *Conn) isOutstanding(pos int64) bool {
for _, r := range c.serverResponses {
if r.PosInt() == pos {
return true
}
}
return false
}
// OnIncomingRequest advances the client's position in the stream, returning the response position and data.
// If an error is returned, it will be logged by the caller and transmitted to the
// client. It will NOT be reported to Sentry---this should happen as close as possible
// to the creation of the error (or else Sentry cannot provide a meaningful traceback.)
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request, start time.Time) (resp *Response, herr *internal.HandlerError) {
2024-02-26 13:01:18 +00:00
ctx, span := internal.StartSpan(ctx, "OnIncomingRequest.AcquireMutex")
c.cancelOutstandingRequestMu.Lock()
if c.cancelOutstandingRequest != nil {
c.cancelOutstandingRequest()
}
c.cancelOutstandingRequestMu.Unlock()
c.mu.Lock()
ctx, cancel := context.WithCancel(ctx)
c.cancelOutstandingRequestMu.Lock()
c.cancelOutstandingRequest = cancel
c.cancelOutstandingRequestMu.Unlock()
// it's intentional for the lock to be held whilst inside HandleIncomingRequest
// as it guarantees linearisation of data within a single connection
defer c.mu.Unlock()
2024-02-26 13:01:18 +00:00
span.End()
isFirstRequest := req.pos == 0
isRetransmit := !isFirstRequest && c.lastClientRequest.pos == req.pos
isSameRequest := !isFirstRequest && c.lastClientRequest.Same(req)
2021-09-21 16:17:35 +01:00
// if there is a position and it isn't something we've told the client nor a retransmit, they
// are playing games
if !isFirstRequest && !isRetransmit && !c.isOutstanding(req.pos) {
2021-09-21 16:17:35 +01:00
// the client made up a position, reject them
2022-04-12 12:27:20 +01:00
logger.Trace().Int64("pos", req.pos).Msg("unknown pos")
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
return nil, internal.ExpiredSessionError()
}
// purge the response buffer based on the client's new position. Higher pos values are later.
var nextUnACKedResponse *Response
delIndex := -1
for i := range c.serverResponses { // sorted so low pos are first
if req.pos > c.serverResponses[i].PosInt() {
// the client has advanced _beyond_ this position so it is safe to delete it, we won't
// see it again as a retransmit
delIndex = i
} else if req.pos < c.serverResponses[i].PosInt() {
// the client has not seen this response before, so we'll send it to them next no matter what.
nextUnACKedResponse = &c.serverResponses[i]
break
}
}
c.serverResponses = c.serverResponses[delIndex+1:] // slice out the first delIndex+1 elements
defer func() {
2023-04-28 13:43:45 +01:00
l := logger.Trace().Int("num_res_acks", delIndex+1).Bool("is_retransmit", isRetransmit).Bool("is_first", isFirstRequest).Bool("is_same", isSameRequest).Int64("pos", req.pos).Str("user", c.UserID)
if nextUnACKedResponse != nil {
l.Int64("new_pos", nextUnACKedResponse.PosInt())
}
l.Msg("OnIncomingRequest finished")
}()
if !isFirstRequest {
if isRetransmit {
// if the request bodies match up then this is a retry, else it could be the client modifying
// their filter params, so fallthrough
if isSameRequest {
// this is the 2nd+ time we've seen this request, meaning the client likely retried this
// request. Send the response we sent before.
logger.Trace().Int64("pos", req.pos).Msg("returning cached response for pos, with delay")
// apply a small artificial wait to protect the proxy in case this is caused by a buggy
// client sending the same request over and over
time.Sleep(SpamProtectionInterval)
return nextUnACKedResponse, nil
} else {
logger.Info().Int64("pos", req.pos).Msg("client has resent this pos with different request data")
// we need to fallthrough to process this request as the client will not resend this request data,
}
}
}
// if the client has no new data for us but we still have buffered responses, return that rather than
// invoking the handler.
if nextUnACKedResponse != nil {
if isSameRequest {
return nextUnACKedResponse, nil
}
// we have buffered responses but we cannot return it else we'll ignore the data in this request,
// so we need to wait for this incoming request to be processed _before_ we can return the data.
// To ensure this doesn't take too long, be cheeky and inject a low timeout value to ensure we
// don't needlessly block.
req.SetTimeoutMSecs(1)
}
resp, err := c.tryRequest(ctx, req, start)
if err != nil {
herr, ok := err.(*internal.HandlerError)
if !ok {
herr = &internal.HandlerError{
StatusCode: 500,
Err: err,
}
}
return nil, herr
}
// assign the last client request now _after_ we have processed the request so we don't incorrectly
// cache errors or panics and result in getting wedged or tightlooping.
c.lastClientRequest = *req
// this position is the highest stored pos +1
resp.Pos = fmt.Sprintf("%d", c.lastPos+1)
resp.TxnID = req.TxnID
// buffer it
c.serverResponses = append(c.serverResponses, *resp)
c.lastPos = resp.PosInt()
if nextUnACKedResponse == nil {
nextUnACKedResponse = resp
}
// return the oldest value
return nextUnACKedResponse, nil
}
func (c *Conn) SetCancelCallback(cancel context.CancelFunc) {
c.handler.SetCancelCallback(cancel)
}