sliding-sync/v3.go
Kegan Dougal be8543a21a add extensions for typing and receipts; bugfixes and additional perf improvements
Features:
 - Add `typing` extension.
 - Add `receipts` extension.
 - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`.
 - Add `SYNCV3_PPROF` support.
 - Add `by_notification_level` sort order.
 - Add `include_old_rooms` support.
 - Add support for `$ME` and `$LAZY`.
 - Add correct filtering when `*,*` is used as `required_state`.
 - Add `num_live` to each room response to indicate how many timeline entries are live.

Bug fixes:
 - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm.
 - Send back an `errcode` on HTTP errors (e.g expired sessions).
 - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :(
 - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic.
 - Send HTTP 400 for invalid range requests.
 - Don't publish no-op unread counts which just adds extra noise.
 - Fix leaking DB connections which could eventually consume all available connections.
 - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever.

Improvements:
 - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s.
 - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s.
 - Add `SlidingSyncUntil...` in tests to reduce races.
 - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s.
 - Add trace task for initial syncs.
 - Include the proxy version in UA strings.
 - HTTP errors now wait 1s before returning to stop clients tight-looping on error.
 - Pending event buffer is now 2000.
 - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8.
 - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00

179 lines
4.7 KiB
Go

package syncv3
import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/matrix-org/sync-v3/internal"
"github.com/matrix-org/sync-v3/pubsub"
"github.com/matrix-org/sync-v3/state"
"github.com/matrix-org/sync-v3/sync2"
"github.com/matrix-org/sync-v3/sync2/handler2"
"github.com/matrix-org/sync-v3/sync3/handler"
"github.com/rs/zerolog"
"github.com/rs/zerolog/hlog"
)
var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})
var Version string
type Opts struct {
Debug bool
AddPrometheusMetrics bool
// if true, publishing messages will block until the consumer has consumed it.
// Assumes a single producer and a single consumer.
TestingSynchronousPubsub bool
}
type server struct {
chain []func(next http.Handler) http.Handler
final http.Handler
}
func (s *server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h := s.final
for i := range s.chain {
h = s.chain[len(s.chain)-1-i](h)
}
h.ServeHTTP(w, req)
}
func allowCORS(next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization")
if req.Method == "OPTIONS" {
w.WriteHeader(200)
return
}
next.ServeHTTP(w, req)
}
}
// Setup the proxy
func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Handler, *handler.SyncLiveHandler) {
// Setup shared DB and HTTP client
v2Client := &sync2.HTTPClient{
Client: &http.Client{
Timeout: 5 * time.Minute,
},
DestinationServer: destHomeserver,
}
store := state.NewStorage(postgresURI)
storev2 := sync2.NewStore(postgresURI, secret)
bufferSize := 50
if opts.TestingSynchronousPubsub {
bufferSize = 0
}
pubSub := pubsub.NewPubSub(bufferSize)
// create v2 handler
h2, err := handler2.NewHandler(postgresURI, sync2.NewPollerMap(v2Client, opts.AddPrometheusMetrics), storev2, store, v2Client, pubSub, pubSub, opts.AddPrometheusMetrics)
if err != nil {
panic(err)
}
// create v3 handler
h3, err := handler.NewSync3Handler(store, storev2, v2Client, postgresURI, secret, opts.Debug, pubSub, pubSub, opts.AddPrometheusMetrics)
if err != nil {
panic(err)
}
storeSnapshot, err := store.GlobalSnapshot()
if err != nil {
panic(err)
}
logger.Info().Msg("retrieved global snapshot from database")
h3.Startup(&storeSnapshot)
// begin consuming from these positions
h2.Listen()
h3.Listen()
return h2, h3
}
// RunSyncV3Server is the main entry point to the server
func RunSyncV3Server(h http.Handler, bindAddr, destV2Server string) {
// HTTP path routing
r := mux.NewRouter()
r.Handle("/_matrix/client/v3/sync", allowCORS(h))
r.Handle("/_matrix/client/unstable/org.matrix.msc3575/sync", allowCORS(h))
serverJSON, _ := json.Marshal(struct {
Server string `json:"server"`
Version string `json:"version"`
}{
Server: destV2Server,
Version: Version,
})
r.Handle("/client/server.json", allowCORS(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(200)
rw.Write(serverJSON)
})))
r.PathPrefix("/client/").HandlerFunc(
allowCORS(
http.StripPrefix("/client/", http.FileServer(http.Dir("./client"))),
),
)
srv := &server{
chain: []func(next http.Handler) http.Handler{
hlog.NewHandler(logger),
func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(internal.RequestContext(r.Context()))
next.ServeHTTP(w, r)
})
},
hlog.AccessHandler(func(r *http.Request, status, size int, duration time.Duration) {
if r.Method == "OPTIONS" {
return
}
entry := internal.DecorateLogger(r.Context(), hlog.FromRequest(r).Info())
if !strings.HasSuffix(r.URL.Path, "/sync") {
entry.Str("path", r.URL.Path)
}
entry.Int("status", status).
Int("size", size).
Dur("duration", duration).
Msg("")
}),
},
final: r,
}
// Block forever
logger.Info().Msgf("listening on %s", bindAddr)
if err := http.ListenAndServe(bindAddr, srv); err != nil {
logger.Fatal().Err(err).Msg("failed to listen and serve")
}
}
type HandlerError struct {
StatusCode int
Err error
}
func (e *HandlerError) Error() string {
return fmt.Sprintf("HTTP %d : %s", e.StatusCode, e.Err.Error())
}
type jsonError struct {
Err string `json:"error"`
}
func (e HandlerError) JSON() []byte {
je := jsonError{e.Error()}
b, _ := json.Marshal(je)
return b
}