2022-12-15 11:08:50 +00:00
|
|
|
package slidingsync
|
2021-05-14 16:49:33 +01:00
|
|
|
|
|
|
|
import (
|
2023-07-28 15:10:15 +02:00
|
|
|
"embed"
|
2021-09-20 17:21:02 +01:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
2021-05-14 16:49:33 +01:00
|
|
|
"net/http"
|
|
|
|
"os"
|
2022-08-16 14:23:05 +01:00
|
|
|
"strings"
|
2021-05-14 16:49:33 +01:00
|
|
|
"time"
|
|
|
|
|
2023-06-08 16:59:37 +01:00
|
|
|
"github.com/getsentry/sentry-go"
|
2021-05-14 16:49:33 +01:00
|
|
|
"github.com/gorilla/mux"
|
2023-08-02 14:00:42 +02:00
|
|
|
"github.com/jmoiron/sqlx"
|
2022-12-15 11:08:50 +00:00
|
|
|
"github.com/matrix-org/sliding-sync/internal"
|
|
|
|
"github.com/matrix-org/sliding-sync/pubsub"
|
|
|
|
"github.com/matrix-org/sliding-sync/state"
|
2023-09-19 11:48:49 +02:00
|
|
|
_ "github.com/matrix-org/sliding-sync/state/migrations"
|
2022-12-15 11:08:50 +00:00
|
|
|
"github.com/matrix-org/sliding-sync/sync2"
|
|
|
|
"github.com/matrix-org/sliding-sync/sync2/handler2"
|
|
|
|
"github.com/matrix-org/sliding-sync/sync3/handler"
|
2023-08-02 14:00:42 +02:00
|
|
|
"github.com/pressly/goose/v3"
|
2021-05-14 16:49:33 +01:00
|
|
|
"github.com/rs/zerolog"
|
|
|
|
"github.com/rs/zerolog/hlog"
|
|
|
|
)
|
|
|
|
|
2023-07-28 15:10:15 +02:00
|
|
|
//go:embed state/migrations/*
|
|
|
|
var EmbedMigrations embed.FS
|
|
|
|
|
2021-09-24 14:30:00 +01:00
|
|
|
var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
|
|
|
|
Out: os.Stderr,
|
|
|
|
TimeFormat: "15:04:05",
|
|
|
|
})
|
2022-08-16 14:23:05 +01:00
|
|
|
var Version string
|
2021-09-24 14:30:00 +01:00
|
|
|
|
2022-12-14 18:53:55 +00:00
|
|
|
type Opts struct {
|
|
|
|
AddPrometheusMetrics bool
|
2023-02-03 10:00:45 +00:00
|
|
|
// The max number of events the client is eligible to read (unfiltered) which we are willing to
|
|
|
|
// buffer on this connection. Too large and we consume lots of memory. Too small and busy accounts
|
|
|
|
// will trip the connection knifing. Customisable as tests might want to test filling the buffer.
|
|
|
|
MaxPendingEventUpdates int
|
2022-12-14 18:53:55 +00:00
|
|
|
// if true, publishing messages will block until the consumer has consumed it.
|
|
|
|
// Assumes a single producer and a single consumer.
|
|
|
|
TestingSynchronousPubsub bool
|
2023-07-26 12:23:38 +01:00
|
|
|
// MaxTransactionIDDelay is the longest amount of time that we will wait for
|
|
|
|
// confirmation of an event's transaction_id before sending it to its sender.
|
2023-07-28 18:38:05 +01:00
|
|
|
// Set to 0 to disable this delay mechanism entirely.
|
2023-07-26 12:23:38 +01:00
|
|
|
MaxTransactionIDDelay time.Duration
|
2023-06-19 15:56:22 +01:00
|
|
|
|
|
|
|
DBMaxConns int
|
|
|
|
DBConnMaxIdleTime time.Duration
|
2023-09-19 11:48:49 +02:00
|
|
|
|
|
|
|
// HTTPTimeout is used for "normal" HTTP requests
|
|
|
|
HTTPTimeout time.Duration
|
|
|
|
// HTTPLongTimeout is used for initial sync requests
|
|
|
|
HTTPLongTimeout time.Duration
|
2022-12-14 18:53:55 +00:00
|
|
|
}
|
|
|
|
|
2021-07-21 12:12:57 +01:00
|
|
|
type server struct {
|
|
|
|
chain []func(next http.Handler) http.Handler
|
|
|
|
final http.Handler
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
|
|
h := s.final
|
|
|
|
for i := range s.chain {
|
|
|
|
h = s.chain[len(s.chain)-1-i](h)
|
|
|
|
}
|
|
|
|
h.ServeHTTP(w, req)
|
|
|
|
}
|
|
|
|
|
2021-09-28 13:48:42 +01:00
|
|
|
func allowCORS(next http.Handler) http.HandlerFunc {
|
2021-09-24 14:30:00 +01:00
|
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
2021-11-24 16:27:33 +00:00
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
|
|
|
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization")
|
2021-09-24 14:30:00 +01:00
|
|
|
if req.Method == "OPTIONS" {
|
|
|
|
w.WriteHeader(200)
|
|
|
|
return
|
|
|
|
}
|
2021-09-28 13:48:42 +01:00
|
|
|
next.ServeHTTP(w, req)
|
2021-09-24 14:30:00 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-14 18:53:55 +00:00
|
|
|
// Setup the proxy
|
2023-02-21 10:50:39 +00:00
|
|
|
func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Handler, http.Handler) {
|
2022-12-14 18:53:55 +00:00
|
|
|
// Setup shared DB and HTTP client
|
2023-09-19 11:48:49 +02:00
|
|
|
v2Client := sync2.NewHTTPClient(opts.HTTPTimeout, opts.HTTPLongTimeout, destHomeserver)
|
2023-07-12 17:36:59 +01:00
|
|
|
db, err := sqlx.Open("postgres", postgresURI)
|
|
|
|
if err != nil {
|
|
|
|
sentry.CaptureException(err)
|
|
|
|
// TODO: if we panic(), will sentry have a chance to flush the event?
|
|
|
|
logger.Panic().Err(err).Str("uri", postgresURI).Msg("failed to open SQL DB")
|
|
|
|
}
|
2023-07-28 15:10:15 +02:00
|
|
|
|
2023-07-12 17:36:59 +01:00
|
|
|
if opts.DBMaxConns > 0 {
|
|
|
|
// https://github.com/go-sql-driver/mysql#important-settings
|
|
|
|
// "db.SetMaxIdleConns() is recommended to be set same to db.SetMaxOpenConns(). When it is smaller
|
|
|
|
// than SetMaxOpenConns(), connections can be opened and closed much more frequently than you expect."
|
|
|
|
db.SetMaxOpenConns(opts.DBMaxConns)
|
|
|
|
db.SetMaxIdleConns(opts.DBMaxConns)
|
|
|
|
}
|
|
|
|
if opts.DBConnMaxIdleTime > 0 {
|
|
|
|
db.SetConnMaxIdleTime(opts.DBConnMaxIdleTime)
|
2023-06-19 15:56:22 +01:00
|
|
|
}
|
2023-09-07 10:02:50 +02:00
|
|
|
store := state.NewStorageWithDB(db, opts.AddPrometheusMetrics)
|
2023-07-12 17:36:59 +01:00
|
|
|
storev2 := sync2.NewStoreWithDB(db, secret)
|
2023-08-02 14:00:42 +02:00
|
|
|
|
|
|
|
// Automatically execute migrations
|
|
|
|
goose.SetBaseFS(EmbedMigrations)
|
|
|
|
err = goose.Up(db.DB, "state/migrations", goose.WithAllowMissing())
|
|
|
|
if err != nil {
|
|
|
|
logger.Panic().Err(err).Msg("failed to execute migrations")
|
|
|
|
}
|
|
|
|
|
2022-12-14 18:53:55 +00:00
|
|
|
bufferSize := 50
|
2023-06-26 21:04:02 -07:00
|
|
|
deviceDataUpdateFrequency := time.Second
|
2022-12-14 18:53:55 +00:00
|
|
|
if opts.TestingSynchronousPubsub {
|
|
|
|
bufferSize = 0
|
2023-06-26 21:04:02 -07:00
|
|
|
deviceDataUpdateFrequency = 0 // don't batch
|
2022-12-14 18:53:55 +00:00
|
|
|
}
|
2023-02-03 10:00:45 +00:00
|
|
|
if opts.MaxPendingEventUpdates == 0 {
|
|
|
|
opts.MaxPendingEventUpdates = 2000
|
|
|
|
}
|
2022-12-14 18:53:55 +00:00
|
|
|
pubSub := pubsub.NewPubSub(bufferSize)
|
|
|
|
|
2023-06-08 16:59:37 +01:00
|
|
|
pMap := sync2.NewPollerMap(v2Client, opts.AddPrometheusMetrics)
|
2022-12-14 18:53:55 +00:00
|
|
|
// create v2 handler
|
2023-07-12 03:41:06 -07:00
|
|
|
h2, err := handler2.NewHandler(pMap, storev2, store, pubSub, pubSub, opts.AddPrometheusMetrics, deviceDataUpdateFrequency)
|
2022-12-14 18:53:55 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2023-06-08 16:59:37 +01:00
|
|
|
pMap.SetCallbacks(h2)
|
2022-12-14 18:53:55 +00:00
|
|
|
|
|
|
|
// create v3 handler
|
2023-07-26 12:23:38 +01:00
|
|
|
h3, err := handler.NewSync3Handler(store, storev2, v2Client, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates, opts.MaxTransactionIDDelay)
|
2022-12-14 18:53:55 +00:00
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
storeSnapshot, err := store.GlobalSnapshot()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
logger.Info().Msg("retrieved global snapshot from database")
|
|
|
|
h3.Startup(&storeSnapshot)
|
|
|
|
|
|
|
|
// begin consuming from these positions
|
|
|
|
h2.Listen()
|
|
|
|
h3.Listen()
|
|
|
|
return h2, h3
|
|
|
|
}
|
|
|
|
|
2021-05-14 16:49:33 +01:00
|
|
|
// RunSyncV3Server is the main entry point to the server
|
2023-03-23 22:15:23 +00:00
|
|
|
func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey string) {
|
2021-05-14 16:49:33 +01:00
|
|
|
// HTTP path routing
|
|
|
|
r := mux.NewRouter()
|
2021-11-24 16:27:33 +00:00
|
|
|
r.Handle("/_matrix/client/v3/sync", allowCORS(h))
|
2021-12-22 18:20:02 +00:00
|
|
|
r.Handle("/_matrix/client/unstable/org.matrix.msc3575/sync", allowCORS(h))
|
2022-02-24 14:14:59 +00:00
|
|
|
|
|
|
|
serverJSON, _ := json.Marshal(struct {
|
2022-08-16 14:23:05 +01:00
|
|
|
Server string `json:"server"`
|
|
|
|
Version string `json:"version"`
|
|
|
|
}{
|
|
|
|
Server: destV2Server,
|
|
|
|
Version: Version,
|
|
|
|
})
|
2022-02-24 14:14:59 +00:00
|
|
|
r.Handle("/client/server.json", allowCORS(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
|
|
|
rw.Header().Set("Content-Type", "application/json")
|
|
|
|
rw.WriteHeader(200)
|
|
|
|
rw.Write(serverJSON)
|
|
|
|
})))
|
2021-09-28 13:48:42 +01:00
|
|
|
r.PathPrefix("/client/").HandlerFunc(
|
|
|
|
allowCORS(
|
|
|
|
http.StripPrefix("/client/", http.FileServer(http.Dir("./client"))),
|
|
|
|
),
|
|
|
|
)
|
2021-07-21 12:12:57 +01:00
|
|
|
|
|
|
|
srv := &server{
|
|
|
|
chain: []func(next http.Handler) http.Handler{
|
2021-07-23 11:07:03 +01:00
|
|
|
hlog.NewHandler(logger),
|
2022-08-16 14:23:05 +01:00
|
|
|
func(next http.Handler) http.Handler {
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
r = r.WithContext(internal.RequestContext(r.Context()))
|
|
|
|
next.ServeHTTP(w, r)
|
|
|
|
})
|
|
|
|
},
|
2021-07-21 12:12:57 +01:00
|
|
|
hlog.AccessHandler(func(r *http.Request, status, size int, duration time.Duration) {
|
2022-07-14 10:48:45 +01:00
|
|
|
if r.Method == "OPTIONS" {
|
|
|
|
return
|
|
|
|
}
|
2022-08-16 14:23:05 +01:00
|
|
|
entry := internal.DecorateLogger(r.Context(), hlog.FromRequest(r).Info())
|
|
|
|
if !strings.HasSuffix(r.URL.Path, "/sync") {
|
|
|
|
entry.Str("path", r.URL.Path)
|
|
|
|
}
|
2023-08-31 11:46:09 +01:00
|
|
|
durStr := fmt.Sprintf("%.3f", duration.Seconds())
|
|
|
|
setupDur, processingDur := internal.RequestContextDurations(r.Context())
|
|
|
|
if setupDur != 0 || processingDur != 0 {
|
2023-08-31 12:04:59 +01:00
|
|
|
durStr += fmt.Sprintf("(%.3f+%.3f)", setupDur.Seconds(), processingDur.Seconds())
|
2023-08-31 11:46:09 +01:00
|
|
|
}
|
2022-08-16 14:23:05 +01:00
|
|
|
entry.Int("status", status).
|
2021-07-21 12:12:57 +01:00
|
|
|
Int("size", size).
|
2023-08-31 11:46:09 +01:00
|
|
|
Str("duration", durStr).
|
2021-07-21 12:12:57 +01:00
|
|
|
Msg("")
|
|
|
|
}),
|
|
|
|
},
|
|
|
|
final: r,
|
|
|
|
}
|
2021-05-14 16:49:33 +01:00
|
|
|
|
|
|
|
// Block forever
|
2023-03-23 22:15:23 +00:00
|
|
|
var err error
|
2023-03-29 12:17:26 +01:00
|
|
|
if tlsCert != "" && tlsKey != "" {
|
2023-03-23 22:15:23 +00:00
|
|
|
logger.Info().Msgf("listening TLS on %s", bindAddr)
|
|
|
|
err = http.ListenAndServeTLS(bindAddr, tlsCert, tlsKey, srv)
|
|
|
|
} else {
|
|
|
|
logger.Info().Msgf("listening on %s", bindAddr)
|
|
|
|
err = http.ListenAndServe(bindAddr, srv)
|
|
|
|
}
|
|
|
|
if err != nil {
|
2023-04-05 18:39:51 +01:00
|
|
|
sentry.CaptureException(err)
|
2023-04-13 15:02:17 +01:00
|
|
|
// TODO: Fatal() calls os.Exit. Will that give time for sentry.Flush() to run?
|
|
|
|
logger.Fatal().Err(err).Msg("failed to listen and serve")
|
2021-05-14 16:49:33 +01:00
|
|
|
}
|
|
|
|
}
|
2021-09-20 17:21:02 +01:00
|
|
|
|
|
|
|
type HandlerError struct {
|
|
|
|
StatusCode int
|
|
|
|
Err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *HandlerError) Error() string {
|
|
|
|
return fmt.Sprintf("HTTP %d : %s", e.StatusCode, e.Err.Error())
|
|
|
|
}
|
|
|
|
|
|
|
|
type jsonError struct {
|
|
|
|
Err string `json:"error"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e HandlerError) JSON() []byte {
|
|
|
|
je := jsonError{e.Error()}
|
|
|
|
b, _ := json.Marshal(je)
|
|
|
|
return b
|
|
|
|
}
|