Merge branch 'main' into dmr/oidc

This commit is contained in:
David Robertson 2023-05-16 12:25:29 +01:00
commit 14a3370da6
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
12 changed files with 264 additions and 56 deletions

View File

@ -44,6 +44,7 @@ Using a Docker image:
docker run --rm -e "SYNCV3_SERVER=https://matrix-client.matrix.org" -e "SYNCV3_SECRET=$(cat .secret)" -e "SYNCV3_BINDADDR=:8008" -e "SYNCV3_DB=user=$(whoami) dbname=syncv3 sslmode=disable host=host.docker.internal" -p 8008:8008 ghcr.io/matrix-org/sliding-sync:v0.98.0
```
Optionally also set `SYNCV3_TLS_CERT=path/to/cert.pem` and `SYNCV3_TLS_KEY=path/to/key.pem` to listen on HTTPS instead of HTTP.
Make sure to tweak the `SYNCV3_DB` environment variable if the Postgres database isn't running on the host.
Regular users may now log in with their sliding-sync compatible Matrix client. If developing sliding-sync, a simple client is provided (although it is not included in the Docker image).
@ -57,6 +58,8 @@ INF Poller: v2 poll loop started ip=::1 since= user_id=@kegan:matrix.org
Wait for the first initial v2 sync to be processed (this can take minutes!) and then v3 APIs will be responsive.
Note that some clients might require that your home server advertises support for sliding-sync in the `.well-known/matrix/client` endpoint; details are in [the work-in-progress specification document](https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#unstable-prefix).
### Prometheus
To enable metrics, pass `SYNCV3_PROM=:2112` to listen on that port and expose a scraping endpoint `GET /metrics`.

View File

@ -8,6 +8,7 @@ import (
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"net/http"
_ "net/http/pprof"
@ -37,6 +38,7 @@ const (
EnvDebug = "SYNCV3_DEBUG"
EnvJaeger = "SYNCV3_JAEGER_URL"
EnvSentryDsn = "SYNCV3_SENTRY_DSN"
EnvLogLevel = "SYNCV3_LOG_LEVEL"
)
var helpMsg = fmt.Sprintf(`
@ -51,7 +53,8 @@ Environment var
%s Default: unset. The bind addr for Prometheus metrics, which will be accessible at /metrics at this address.
%s Default: unset. The Jaeger URL to send spans to e.g http://localhost:14268/api/traces - if unset does not send OTLP traces.
%s Default: unset. The Sentry DSN to report events to e.g https://sliding-sync@sentry.example.com/123 - if unset does not send sentry events.
`, EnvServer, EnvDB, EnvSecret, EnvBindAddr, EnvTLSCert, EnvTLSKey, EnvPPROF, EnvPrometheus, EnvJaeger, EnvSentryDsn)
%s Default: info. The level of verbosity for messages logged. Available values are trace, debug, info, warn, error and fatal
`, EnvServer, EnvDB, EnvSecret, EnvBindAddr, EnvTLSCert, EnvTLSKey, EnvPPROF, EnvPrometheus, EnvJaeger, EnvSentryDsn, EnvLogLevel)
func defaulting(in, dft string) string {
if in == "" {
@ -76,6 +79,7 @@ func main() {
EnvDebug: os.Getenv(EnvDebug),
EnvJaeger: os.Getenv(EnvJaeger),
EnvSentryDsn: os.Getenv(EnvSentryDsn),
EnvLogLevel: os.Getenv(EnvLogLevel),
}
requiredEnvVars := []string{EnvServer, EnvDB, EnvSecret, EnvBindAddr}
for _, requiredEnvVar := range requiredEnvVars {
@ -131,13 +135,33 @@ func main() {
}
}
if args[EnvDebug] == "1" {
zerolog.SetGlobalLevel(zerolog.TraceLevel)
} else {
switch strings.ToLower(args[EnvLogLevel]) {
case "trace":
zerolog.SetGlobalLevel(zerolog.TraceLevel)
case "debug":
zerolog.SetGlobalLevel(zerolog.DebugLevel)
case "info":
zerolog.SetGlobalLevel(zerolog.InfoLevel)
case "warn":
zerolog.SetGlobalLevel(zerolog.WarnLevel)
case "err", "error":
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
case "fatal":
zerolog.SetGlobalLevel(zerolog.FatalLevel)
default:
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}
}
err := sync2.MigrateDeviceIDs(args[EnvServer], args[EnvDB], args[EnvSecret], true)
if err != nil {
panic(err)
}
h2, h3 := syncv3.Setup(args[EnvServer], args[EnvDB], args[EnvSecret], syncv3.Opts{
Debug: args[EnvDebug] == "1",
AddPrometheusMetrics: args[EnvPrometheus] != "",
})

View File

@ -25,6 +25,13 @@ type RoomMetadata struct {
TypingEvent json.RawMessage
}
func NewRoomMetadata(roomID string) *RoomMetadata {
return &RoomMetadata{
RoomID: roomID,
ChildSpaceRooms: make(map[string]struct{}),
}
}
// SameRoomName checks if the fields relevant for room names have changed between the two metadatas.
// Returns true if there are no changes.
func (m *RoomMetadata) SameRoomName(other *RoomMetadata) bool {

View File

@ -4,10 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/getsentry/sentry-go"
"os"
"strings"
"github.com/getsentry/sentry-go"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/matrix-org/sliding-sync/internal"
@ -173,7 +174,7 @@ func (s *Storage) MetadataForAllRooms(txn *sqlx.Tx, result map[string]internal.R
metadata := result[ev.RoomID]
metadata.LastMessageTimestamp = gjson.ParseBytes(ev.JSON).Get("origin_server_ts").Uint()
// it's possible the latest event is a brand new room not caught by the first SELECT for joined
// rooms e.g when you're invited to a room so we need to make sure to se the metadata again here
// rooms e.g when you're invited to a room so we need to make sure to set the metadata again here
metadata.RoomID = ev.RoomID
result[ev.RoomID] = metadata
}
@ -766,9 +767,9 @@ func (s *Storage) AllJoinedMembers(txn *sqlx.Tx) (result map[string][]string, me
}
metadata = make(map[string]internal.RoomMetadata)
for roomID, joinedMembers := range result {
metadata[roomID] = internal.RoomMetadata{
JoinCount: len(joinedMembers),
}
m := internal.NewRoomMetadata(roomID)
m.JoinCount = len(joinedMembers)
metadata[roomID] = *m
}
return result, metadata, nil
}

View File

@ -638,6 +638,7 @@ func TestGlobalSnapshot(t *testing.T) {
Heroes: []internal.Hero{{ID: alice}},
Encrypted: true,
PredecessorRoomID: &oldRoomID,
ChildSpaceRooms: make(map[string]struct{}),
},
roomBob: {
RoomID: roomBob,
@ -646,6 +647,7 @@ func TestGlobalSnapshot(t *testing.T) {
Heroes: []internal.Hero{{ID: bob}},
NameEvent: "My Room",
RoomType: &roomType,
ChildSpaceRooms: make(map[string]struct{}),
},
roomAliceBob: {
RoomID: roomAliceBob,
@ -654,6 +656,7 @@ func TestGlobalSnapshot(t *testing.T) {
Heroes: []internal.Hero{{ID: bob}, {ID: alice}},
CanonicalAlias: "#alias",
UpgradedRoomID: &newRoomID,
ChildSpaceRooms: make(map[string]struct{}),
},
roomSpace: {
RoomID: roomSpace,

View File

@ -208,10 +208,7 @@ func (c *GlobalCache) OnEphemeralEvent(ctx context.Context, roomID string, ephEv
defer c.roomIDToMetadataMu.Unlock()
metadata := c.roomIDToMetadata[roomID]
if metadata == nil {
metadata = &internal.RoomMetadata{
RoomID: roomID,
ChildSpaceRooms: make(map[string]struct{}),
}
metadata = internal.NewRoomMetadata(roomID)
}
switch evType {
@ -233,10 +230,7 @@ func (c *GlobalCache) OnNewEvent(
defer c.roomIDToMetadataMu.Unlock()
metadata := c.roomIDToMetadata[ed.RoomID]
if metadata == nil {
metadata = &internal.RoomMetadata{
RoomID: ed.RoomID,
ChildSpaceRooms: make(map[string]struct{}),
}
metadata = internal.NewRoomMetadata(ed.RoomID)
}
switch ed.EventType {
case "m.room.name":

View File

@ -164,17 +164,16 @@ func (i *InviteData) RoomMetadata() *internal.RoomMetadata {
if i.RoomType != "" {
roomType = &i.RoomType
}
return &internal.RoomMetadata{
RoomID: i.roomID,
Heroes: i.Heroes,
NameEvent: i.NameEvent,
CanonicalAlias: i.CanonicalAlias,
InviteCount: 1,
JoinCount: 1,
LastMessageTimestamp: i.LastMessageTimestamp,
Encrypted: i.Encrypted,
RoomType: roomType,
}
metadata := internal.NewRoomMetadata(i.roomID)
metadata.Heroes = i.Heroes
metadata.NameEvent = i.NameEvent
metadata.CanonicalAlias = i.CanonicalAlias
metadata.InviteCount = 1
metadata.JoinCount = 1
metadata.LastMessageTimestamp = i.LastMessageTimestamp
metadata.Encrypted = i.Encrypted
metadata.RoomType = roomType
return metadata
}
type UserCacheListener interface {
@ -363,6 +362,7 @@ func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomID
if !ok {
urd = NewUserRoomData()
}
oldLoadPos := urd.LoadPos
urd.Timeline = events
urd.LoadPos = loadPos
if len(events) > 0 {
@ -371,7 +371,13 @@ func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomID
}
result[roomID] = urd
c.roomToData[roomID] = urd
// only replace our knowledge if it is the future
// TODO FIXME: this is an incomplete solution as all the non-Timeline fields we return in this
// function will be AHEAD of the load pos provided, meaning you could see a stale timeline with
// incorrect notif counts (until you've consumed the channel)
if oldLoadPos < loadPos {
c.roomToData[roomID] = urd
}
}
c.roomToDataMu.Unlock()
return result
@ -416,9 +422,7 @@ func (c *UserCache) newRoomUpdate(ctx context.Context, roomID string) RoomUpdate
// this can happen when we join a room we didn't know about because we process unread counts
// before the timeline events. Warn and send a stub
logger.Warn().Str("room", roomID).Msg("UserCache update: room doesn't exist in global cache yet, generating stub")
r = &internal.RoomMetadata{
RoomID: roomID,
}
r = internal.NewRoomMetadata(roomID)
} else {
r = globalRooms[roomID]
}
@ -664,10 +668,8 @@ func (c *UserCache) OnLeftRoom(ctx context.Context, roomID string) {
roomID: roomID,
// do NOT pull from the global cache as it is a snapshot of the room at the point of
// the invite: don't leak additional data!!!
globalRoomData: &internal.RoomMetadata{
RoomID: roomID,
},
userRoomData: &urd,
globalRoomData: internal.NewRoomMetadata(roomID),
userRoomData: &urd,
},
}
c.emitOnRoomUpdate(ctx, up)

View File

@ -64,14 +64,9 @@ type SyncLiveHandler struct {
func NewSync3Handler(
store *state.Storage, storev2 *sync2.Storage, v2Client sync2.Client, postgresDBURI, secret string,
debug bool, pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int,
pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int,
) (*SyncLiveHandler, error) {
logger.Info().Msg("creating handler")
if debug {
zerolog.SetGlobalLevel(zerolog.TraceLevel)
} else {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}
sh := &SyncLiveHandler{
V2: v2Client,
Storage: store,
@ -217,11 +212,19 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
}
}
conn, err := h.setupConnection(req, &requestBody, req.URL.Query().Get("pos") != "")
if err != nil {
hlog.FromRequest(req).Err(err).Msg("failed to get or create Conn")
internal.GetSentryHubFromContextOrDefault(req.Context()).CaptureException(err)
return err
logErrorAndReport500s := func(msg string, herr *internal.HandlerError) {
if herr.StatusCode >= 500 {
hlog.FromRequest(req).Err(herr).Msg(msg)
internal.GetSentryHubFromContextOrDefault(req.Context()).CaptureException(herr)
} else {
hlog.FromRequest(req).Warn().Err(herr).Msg(msg)
}
}
conn, herr := h.setupConnection(req, &requestBody, req.URL.Query().Get("pos") != "")
if herr != nil {
logErrorAndReport500s("failed to get or create Conn", herr)
return herr
}
// set pos and timeout if specified
cpos, herr := parseIntFromQuery(req.URL, "pos")
@ -248,12 +251,7 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
resp, herr := conn.OnIncomingRequest(req.Context(), &requestBody)
if herr != nil {
if herr.StatusCode >= 500 {
log.Err(herr).Msg("failed to OnIncomingRequest")
internal.GetSentryHubFromContextOrDefault(req.Context()).CaptureException(herr)
} else {
log.Warn().Err(herr).Msg("failed to OnIncomingRequest")
}
logErrorAndReport500s("failed to OnIncomingRequest", herr)
return herr
}
// for logging
@ -278,10 +276,12 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
if err := json.NewEncoder(w).Encode(resp); err != nil {
return &internal.HandlerError{
herr = &internal.HandlerError{
StatusCode: 500,
Err: err,
}
logErrorAndReport500s("failed to JSON-encode result", herr)
return herr
}
return nil
}
@ -289,7 +289,8 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
// setupConnection associates this request with an existing connection or makes a new connection.
// It also sets a v2 sync poll loop going if one didn't exist already for this user.
// When this function returns, the connection is alive and active.
func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Request, containsPos bool) (*sync3.Conn, error) {
func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Request, containsPos bool) (*sync3.Conn, *internal.HandlerError) {
var conn *sync3.Conn
// Extract an access token
accessToken, err := internal.ExtractAccessToken(req)

View File

@ -67,3 +67,112 @@ func TestRoomSubscriptionJoinRoomRace(t *testing.T) {
},
}))
}
// Regression test for https://github.com/vector-im/element-x-ios-rageshakes/issues/314
// Caused by: the user cache getting corrupted and missing events, caused by it incorrectly replacing
// its timeline with an older one.
// To the end user, it manifests as missing messages in the timeline, because the proxy incorrectly
// said the events are C,F,G and not E,F,G.
func TestRoomSubscriptionMisorderedTimeline(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
roomState := createRoomState(t, alice, time.Now())
abcInitialEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "A"),
testutils.NewMessageEvent(t, alice, "B"),
testutils.NewMessageEvent(t, alice, "C"),
}
room := roomEvents{
roomID: "!room:localhost",
events: append(roomState, abcInitialEvents...),
}
v2.addAccount(alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(room),
},
})
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"list": {
Ranges: sync3.SliceRanges{{0, 10}},
RoomSubscription: sync3.RoomSubscription{TimelineLimit: 1},
},
},
})
// test that we get 1 event initally due to the timeline limit
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchRoomTimeline(abcInitialEvents[len(abcInitialEvents)-1:]),
},
}))
// now live stream 2 events
deLiveEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "D"),
testutils.NewMessageEvent(t, alice, "E"),
}
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: deLiveEvents,
}),
},
})
v2.waitUntilEmpty(t, alice)
// now add a room sub with timeline limit = 5, we will need to hit the DB to satisfy this.
// We might destroy caches in a bad way. We might not return the most recent 5 events.
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
room.roomID: {
TimelineLimit: 5,
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
// TODO: this is the correct result, but due to how timeline loading works currently
// it will be returning the last 5 events BEFORE D,E, which isn't ideal but also isn't
// incorrect per se due to the fact that clients don't know when D,E have been processed
// on the server.
// m.MatchRoomTimeline(append(abcInitialEvents, deLiveEvents...)),
m.MatchRoomTimeline(append(roomState[len(roomState)-2:], abcInitialEvents...)),
},
}), m.LogResponse(t))
// live stream the final 2 events
fgLiveEvents := []json.RawMessage{
testutils.NewMessageEvent(t, alice, "F"),
testutils.NewMessageEvent(t, alice, "G"),
}
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: fgLiveEvents,
}),
},
})
v2.waitUntilEmpty(t, alice)
// now ask for timeline limit = 3, which may miss events if the caches got corrupted.
// Do this on a fresh connection to force loadPos to update.
res = v3.mustDoV3Request(t, aliceToken, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
room.roomID: {
TimelineLimit: 3,
},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchRoomTimeline(append(deLiveEvents[1:], fgLiveEvents...)),
},
}), m.LogResponse(t))
}

View File

@ -0,0 +1,66 @@
package syncv3
import (
"encoding/json"
"testing"
"time"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
)
// Regression test for a panic in the wild when we tried to write to internal.RoomMetadata.ChildSpaceRooms and the map didn't exist.
func TestBecomingASpaceDoesntCrash(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
roomID := "!foo:bar"
v2.addAccount(alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: createRoomState(t, alice, time.Now()),
}),
},
})
// let the proxy store the room
v3.mustDoV3Request(t, aliceToken, sync3.Request{})
// restart the proxy: at this point we may have a nil ChildSpaceRooms map
v3.restart(t, v2, pqString)
// check it by injecting a space child
spaceChildEvent := testutils.NewStateEvent(t, "m.space.child", "!somewhere:else", alice, map[string]interface{}{
"via": []string{"example.com"},
})
// TODO: we inject bob here because alice's sync stream seems to discard this response post-restart for unknown reasons
v2.addAccount(bob, bobToken)
v2.queueResponse(bob, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: []json.RawMessage{
spaceChildEvent,
},
}),
},
})
// we should be able to request the room without crashing
v3.mustDoV3Request(t, bobToken, sync3.Request{})
// we should see the data
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomID: {TimelineLimit: 1},
},
})
m.MatchResponse(t, res, m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
roomID: {
m.MatchRoomTimeline([]json.RawMessage{spaceChildEvent}),
},
}))
}

View File

@ -373,7 +373,6 @@ func runTestServer(t testutils.TestBenchInterface, v2Server *testV2Server, postg
}
}
h2, h3 := syncv3.Setup(v2Server.url(), postgresConnectionString, os.Getenv("SYNCV3_SECRET"), syncv3.Opts{
Debug: true,
TestingSynchronousPubsub: true, // critical to avoid flakey tests
MaxPendingEventUpdates: maxPendingEventUpdates,
AddPrometheusMetrics: metricsEnabled,

3
v3.go
View File

@ -27,7 +27,6 @@ var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.C
var Version string
type Opts struct {
Debug bool
AddPrometheusMetrics bool
// The max number of events the client is eligible to read (unfiltered) which we are willing to
// buffer on this connection. Too large and we consume lots of memory. Too small and busy accounts
@ -91,7 +90,7 @@ func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Han
}
// create v3 handler
h3, err := handler.NewSync3Handler(store, storev2, v2Client, postgresURI, secret, opts.Debug, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates)
h3, err := handler.NewSync3Handler(store, storev2, v2Client, postgresURI, secret, pubSub, pubSub, opts.AddPrometheusMetrics, opts.MaxPendingEventUpdates)
if err != nil {
panic(err)
}