Merge branch 'main' into dmr/resnapshot

This commit is contained in:
David Robertson 2023-10-02 13:13:13 +01:00 committed by GitHub
commit b11ed531b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 74 additions and 36 deletions

View File

@ -167,6 +167,15 @@ jobs:
./tests-e2e/test-e2e-server.log
if-no-files-found: error
element_web:
# Cypress seems consistently unable to connect to its browser:
#
# Still waiting to connect to Chrome, retrying in 1 second (attempt 18/62)
# read ECONNRESET
# Error: read ECONNRESET
# at TCP.onStreamRead (node:internal/stream_base_commons:217:20)
#
# Disable this job for now---it's not useful at present.
if: false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

View File

@ -4,9 +4,9 @@ import (
"database/sql"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/prometheus/client_golang/prometheus"
"github.com/getsentry/sentry-go"
@ -23,13 +23,12 @@ import (
// Accumulate function for timeline events. v2 sync must be called with a large enough timeline.limit
// for this to work!
type Accumulator struct {
db *sqlx.DB
roomsTable *RoomsTable
eventsTable *EventTable
snapshotTable *SnapshotTable
spacesTable *SpacesTable
entityName string
snapshotMemberCountVec *prometheus.HistogramVec // TODO: Remove, this is temporary to get a feeling how often a new snapshot is created
db *sqlx.DB
roomsTable *RoomsTable
eventsTable *EventTable
snapshotTable *SnapshotTable
spacesTable *SpacesTable
entityName string
}
func NewAccumulator(db *sqlx.DB) *Accumulator {
@ -273,12 +272,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
if a.snapshotMemberCountVec != nil {
logger.Trace().Str("room_id", roomID).Int("members", len(memberNIDs)).Msg("Inserted new snapshot")
a.snapshotMemberCountVec.WithLabelValues(roomID).Observe(float64(len(memberNIDs)))
}
// 5. Any other processing of new state events.
res.AddedEvents = true
latestNID := int64(0)
for _, nid := range otherNIDs {
if nid > latestNID {
@ -518,10 +512,6 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline s
if err = a.snapshotTable.Insert(txn, newSnapshot); err != nil {
return AccumulateResult{}, fmt.Errorf("failed to insert new snapshot: %w", err)
}
if a.snapshotMemberCountVec != nil {
logger.Trace().Str("room_id", roomID).Int("members", len(memNIDs)).Msg("Inserted new snapshot")
a.snapshotMemberCountVec.WithLabelValues(roomID).Observe(float64(len(memNIDs)))
}
snapID = newSnapshot.SnapshotID
}
if err := a.eventsTable.UpdateBeforeSnapshotID(txn, ev.NID, beforeSnapID, replacesNID); err != nil {

View File

@ -4,13 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/sync2"
"os"
"strings"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/getsentry/sentry-go"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/internal"
@ -88,17 +88,6 @@ func NewStorageWithDB(db *sqlx.DB, addPrometheusMetrics bool) *Storage {
entityName: "server",
}
if addPrometheusMetrics {
acc.snapshotMemberCountVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "sliding_sync",
Subsystem: "poller",
Name: "snapshot_size",
Help: "Number of membership events in a snapshot",
Buckets: []float64{100.0, 500.0, 1000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 150000.0},
}, []string{"room_id"})
prometheus.MustRegister(acc.snapshotMemberCountVec)
}
return &Storage{
Accumulator: acc,
ToDeviceTable: NewToDeviceTable(db),
@ -1119,9 +1108,6 @@ func (s *Storage) Teardown() {
if err != nil {
panic("Storage.Teardown: " + err.Error())
}
if s.Accumulator.snapshotMemberCountVec != nil {
prometheus.Unregister(s.Accumulator.snapshotMemberCountVec)
}
}
// circularSlice is a slice which can be appended to which will wraparound at `max`.

View File

@ -20,6 +20,9 @@ var ProxyVersion = ""
var HTTP401 error = fmt.Errorf("HTTP 401")
type Client interface {
// Versions fetches and parses the list of Matrix versions that the homeserver
// advertises itself as supporting.
Versions(ctx context.Context) (version []string, err error)
// WhoAmI asks the homeserver to lookup the access token using the CSAPI /whoami
// endpoint. The response must contain a device ID (meaning that we assume the
// homeserver supports Matrix >= 1.1.)
@ -49,6 +52,34 @@ func NewHTTPClient(shortTimeout, longTimeout time.Duration, destHomeServer strin
}
}
func (v *HTTPClient) Versions(ctx context.Context) ([]string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", v.DestinationServer+"/_matrix/client/versions", nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "sync-v3-proxy-"+ProxyVersion)
res, err := v.Client.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("/versions returned HTTP %d", res.StatusCode)
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var parsedRes struct {
Result []string `json:"versions"`
}
err = json.Unmarshal(body, &parsedRes)
if err != nil {
return nil, fmt.Errorf("could not parse /versions response: %w", err)
}
return parsedRes.Result, nil
}
// Return sync2.HTTP401 if this request returns 401
func (v *HTTPClient) WhoAmI(ctx context.Context, accessToken string) (string, string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", v.DestinationServer+"/_matrix/client/r0/account/whoami", nil)

View File

@ -159,6 +159,8 @@ func (h *Handler) StartV2Pollers() {
)
if err != nil {
logger.Err(err).Str("user_id", t.UserID).Str("device_id", t.DeviceID).Msg("Failed to start poller")
} else {
h.updateMetrics()
}
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2InitialSyncComplete{
UserID: t.UserID,
@ -170,7 +172,6 @@ func (h *Handler) StartV2Pollers() {
}
wg.Wait()
logger.Info().Msg("StartV2Pollers finished")
h.updateMetrics()
h.startPollerExpiryTicker()
}

View File

@ -222,19 +222,25 @@ func (h *PollerMap) DeviceIDs(userID string) []string {
func (h *PollerMap) ExpirePollers(pids []PollerID) int {
h.pollerMu.Lock()
defer h.pollerMu.Unlock()
numTerminated := 0
var pollersToTerminate []*poller
for _, pid := range pids {
p, ok := h.Pollers[pid]
if !ok || p.terminated.Load() {
continue
}
pollersToTerminate = append(pollersToTerminate, p)
}
h.pollerMu.Unlock()
// now terminate the pollers.
for _, p := range pollersToTerminate {
p.Terminate()
// Ensure that we won't recreate this poller on startup. If it reappears later,
// we'll make another EnsurePolling call which will recreate the poller.
h.callbacks.OnExpiredToken(context.Background(), hashToken(p.accessToken), p.userID, p.deviceID)
numTerminated++
}
return numTerminated
}

View File

@ -1188,6 +1188,9 @@ type mockClient struct {
fn func(authHeader, since string) (*SyncResponse, int, error)
}
func (c *mockClient) Versions(ctx context.Context) ([]string, error) {
return []string{"v1.1"}, nil
}
func (c *mockClient) DoSyncV2(ctx context.Context, authHeader, since string, isFirst, toDeviceOnly bool) (*SyncResponse, int, error) {
return c.fn(authHeader, since)
}

View File

@ -245,6 +245,10 @@ func runTestV2Server(t testutils.TestBenchInterface) *testV2Server {
timeToWaitForV2Response: time.Second,
}
r := mux.NewRouter()
r.HandleFunc("/_matrix/client/versions", func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(200)
w.Write([]byte(`{"versions": ["v1.1"]}`))
})
r.HandleFunc("/_matrix/client/r0/account/whoami", func(w http.ResponseWriter, req *http.Request) {
token := strings.TrimPrefix(req.Header.Get("Authorization"), "Bearer ")
userID := server.userID(token)

8
v3.go
View File

@ -1,6 +1,7 @@
package slidingsync
import (
"context"
"embed"
"encoding/json"
"fmt"
@ -86,6 +87,13 @@ func allowCORS(next http.Handler) http.HandlerFunc {
func Setup(destHomeserver, postgresURI, secret string, opts Opts) (*handler2.Handler, http.Handler) {
// Setup shared DB and HTTP client
v2Client := sync2.NewHTTPClient(opts.HTTPTimeout, opts.HTTPLongTimeout, destHomeserver)
// Sanity check that we can contact the upstream homeserver.
_, err := v2Client.Versions(context.Background())
if err != nil {
logger.Warn().Err(err).Str("dest", destHomeserver).Msg("Could not contact upstream homeserver. Is SYNCV3_SERVER set correctly?")
}
db, err := sqlx.Open("postgres", postgresURI)
if err != nil {
sentry.CaptureException(err)