sliding-sync/sync2/client.go

251 lines
8.2 KiB
Go
Raw Permalink Normal View History

package sync2
2021-05-14 16:49:33 +01:00
import (
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
"context"
2021-05-14 16:49:33 +01:00
"encoding/json"
"fmt"
2023-11-16 13:57:51 +00:00
"github.com/matrix-org/sliding-sync/internal"
"io"
2021-05-14 16:49:33 +01:00
"net/http"
"net/url"
"time"
2021-05-14 16:49:33 +01:00
2023-09-19 11:48:49 +02:00
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"github.com/tidwall/gjson"
2021-05-14 16:49:33 +01:00
)
const AccountDataGlobalRoom = ""
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
var ProxyVersion = ""
var HTTP401 error = fmt.Errorf("HTTP 401")
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
type Client interface {
// Versions fetches and parses the list of Matrix versions that the homeserver
// advertises itself as supporting.
Versions(ctx context.Context) (version []string, err error)
// WhoAmI asks the homeserver to lookup the access token using the CSAPI /whoami
// endpoint. The response must contain a device ID (meaning that we assume the
// homeserver supports Matrix >= 1.1.)
WhoAmI(ctx context.Context, accessToken string) (userID, deviceID string, err error)
DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool, toDeviceOnly bool) (*SyncResponse, int, error)
}
// HTTPClient represents a Sync v2 Client.
// One client can be shared among many users.
type HTTPClient struct {
2021-05-14 16:49:33 +01:00
Client *http.Client
2023-09-19 11:48:49 +02:00
LongTimeoutClient *http.Client
2021-05-14 16:49:33 +01:00
DestinationServer string
}
2023-09-19 11:48:49 +02:00
func NewHTTPClient(shortTimeout, longTimeout time.Duration, destHomeServer string) *HTTPClient {
return &HTTPClient{
2023-11-16 19:31:43 +00:00
LongTimeoutClient: newClient(longTimeout, destHomeServer),
Client: newClient(shortTimeout, destHomeServer),
DestinationServer: internal.GetBaseURL(destHomeServer),
}
}
2023-11-16 19:31:43 +00:00
func newClient(timeout time.Duration, destHomeServer string) *http.Client {
transport := http.DefaultTransport
2023-11-16 19:31:43 +00:00
if internal.IsUnixSocket(destHomeServer) {
transport = internal.UnixTransport(destHomeServer)
}
return &http.Client{
Timeout: timeout,
Transport: otelhttp.NewTransport(transport),
2023-09-19 11:48:49 +02:00
}
}
func (v *HTTPClient) Versions(ctx context.Context) ([]string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", v.DestinationServer+"/_matrix/client/versions", nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "sync-v3-proxy-"+ProxyVersion)
res, err := v.Client.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("/versions returned HTTP %d", res.StatusCode)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
var parsedRes struct {
Result []string `json:"versions"`
}
err = json.Unmarshal(body, &parsedRes)
if err != nil {
return nil, fmt.Errorf("could not parse /versions response: %w", err)
}
return parsedRes.Result, nil
}
// Return sync2.HTTP401 if this request returns 401
func (v *HTTPClient) WhoAmI(ctx context.Context, accessToken string) (string, string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", v.DestinationServer+"/_matrix/client/r0/account/whoami", nil)
if err != nil {
return "", "", err
}
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
req.Header.Set("User-Agent", "sync-v3-proxy-"+ProxyVersion)
req.Header.Set("Authorization", "Bearer "+accessToken)
res, err := v.Client.Do(req)
if err != nil {
return "", "", err
}
if res.StatusCode != 200 {
if res.StatusCode == 401 {
return "", "", HTTP401
}
return "", "", fmt.Errorf("/whoami returned HTTP %d", res.StatusCode)
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return "", "", err
}
response := gjson.ParseBytes(body)
return response.Get("user_id").Str, response.Get("device_id").Str, nil
}
// DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code
// or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.
func (v *HTTPClient) DoSyncV2(ctx context.Context, accessToken, since string, isFirst, toDeviceOnly bool) (*SyncResponse, int, error) {
syncURL := v.createSyncURL(since, isFirst, toDeviceOnly)
req, err := http.NewRequestWithContext(ctx, "GET", syncURL, nil)
add extensions for typing and receipts; bugfixes and additional perf improvements Features: - Add `typing` extension. - Add `receipts` extension. - Add comprehensive prometheus `/metrics` activated via `SYNCV3_PROM`. - Add `SYNCV3_PPROF` support. - Add `by_notification_level` sort order. - Add `include_old_rooms` support. - Add support for `$ME` and `$LAZY`. - Add correct filtering when `*,*` is used as `required_state`. - Add `num_live` to each room response to indicate how many timeline entries are live. Bug fixes: - Use a stricter comparison function on ranges: fixes an issue whereby UTs fail on go1.19 due to change in sorting algorithm. - Send back an `errcode` on HTTP errors (e.g expired sessions). - Remove `unsigned.txn_id` on insertion into the DB. Otherwise other users would see other users txn IDs :( - Improve range delta algorithm: previously it didn't handle cases like `[0,20] -> [20,30]` and would panic. - Send HTTP 400 for invalid range requests. - Don't publish no-op unread counts which just adds extra noise. - Fix leaking DB connections which could eventually consume all available connections. - Ensure we always unblock WaitUntilInitialSync even on invalid access tokens. Other code relies on WaitUntilInitialSync() actually returning at _some_ point e.g on startup we have N workers which bound the number of concurrent pollers made at any one time, we need to not just hog a worker forever. Improvements: - Greatly improve startup times of sync3 handlers by improving `JoinedRoomsTracker`: a modest amount of data would take ~28s to create the handler, now it takes 4s. - Massively improve initial initial v3 sync times, by refactoring `JoinedRoomsTracker`, from ~47s to <1s. - Add `SlidingSyncUntil...` in tests to reduce races. - Tweak the API shape of JoinedUsersForRoom to reduce state block processing time for large rooms from 63s to 39s. - Add trace task for initial syncs. - Include the proxy version in UA strings. - HTTP errors now wait 1s before returning to stop clients tight-looping on error. - Pending event buffer is now 2000. - Index the room ID first to cull the most events when returning timeline entries. Speeds up `SelectLatestEventsBetween` by a factor of 8. - Remove cancelled `m.room_key_requests` from the to-device inbox. Cuts down the amount of events in the inbox by ~94% for very large (20k+) inboxes, ~50% for moderate sized (200 events) inboxes. Adds book-keeping to remember the unacked to-device position for each client.
2022-12-14 18:53:55 +00:00
req.Header.Set("User-Agent", "sync-v3-proxy-"+ProxyVersion)
req.Header.Set("Authorization", "Bearer "+accessToken)
2021-05-14 16:49:33 +01:00
if err != nil {
return nil, 0, fmt.Errorf("DoSyncV2: NewRequest failed: %w", err)
2021-05-14 16:49:33 +01:00
}
var res *http.Response
if isFirst {
2023-09-19 11:48:49 +02:00
res, err = v.LongTimeoutClient.Do(req)
} else {
res, err = v.Client.Do(req)
}
2021-05-14 16:49:33 +01:00
if err != nil {
return nil, 0, fmt.Errorf("DoSyncV2: request failed: %w", err)
2021-05-14 16:49:33 +01:00
}
switch res.StatusCode {
case 200:
var svr SyncResponse
2021-05-14 16:49:33 +01:00
if err := json.NewDecoder(res.Body).Decode(&svr); err != nil {
return nil, 0, fmt.Errorf("DoSyncV2: response body decode JSON failed: %w", err)
2021-05-14 16:49:33 +01:00
}
return &svr, 200, nil
2021-05-14 16:49:33 +01:00
default:
return nil, res.StatusCode, fmt.Errorf("DoSyncV2: response returned %s", res.Status)
2021-05-14 16:49:33 +01:00
}
}
func (v *HTTPClient) createSyncURL(since string, isFirst, toDeviceOnly bool) string {
qps := "?"
if isFirst { // first time polling for v2-sync in this process
qps += "timeout=0"
} else {
qps += "timeout=30000"
}
if since != "" {
qps += "&since=" + url.QueryEscape(since)
}
// Set presence to offline, this potentially reduces CPU load on upstream homeservers
qps += "&set_presence=offline"
// To reduce the likelihood of a gappy v2 sync, ask for a large timeline by default.
// Synapse's default is 10; 50 is the maximum allowed, by my reading of
// https://github.com/matrix-org/synapse/blob/89a71e73905ffa1c97ae8be27d521cd2ef3f3a0c/synapse/handlers/sync.py#L576-L577
// NB: this is a stopgap to reduce the likelihood of hitting
// https://github.com/matrix-org/sliding-sync/issues/18
timelineLimit := 50
if since == "" {
// First time the poller has sync v2-ed for this user
timelineLimit = 1
}
room := map[string]interface{}{}
room["timeline"] = map[string]interface{}{"limit": timelineLimit}
if toDeviceOnly {
// no rooms match this filter, so we get everything but room data
room["rooms"] = []string{}
}
filter := map[string]interface{}{
"room": room,
// filter out all presence events (remove this once/if the proxy handles presence)
"presence": map[string]interface{}{"not_types": []string{"*"}},
}
filterJSON, _ := json.Marshal(filter)
qps += "&filter=" + url.QueryEscape(string(filterJSON))
return v.DestinationServer + "/_matrix/client/r0/sync" + qps
}
type SyncResponse struct {
NextBatch string `json:"next_batch"`
AccountData EventsResponse `json:"account_data"`
Presence struct {
Events []json.RawMessage `json:"events,omitempty"`
2021-05-14 16:49:33 +01:00
} `json:"presence"`
Rooms SyncRoomsResponse `json:"rooms"`
ToDevice EventsResponse `json:"to_device"`
2021-05-14 16:49:33 +01:00
DeviceLists struct {
Changed []string `json:"changed,omitempty"`
Left []string `json:"left,omitempty"`
} `json:"device_lists"`
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
DeviceUnusedFallbackKeyTypes []string `json:"device_unused_fallback_key_types,omitempty"`
2021-05-14 16:49:33 +01:00
}
type SyncRoomsResponse struct {
Join map[string]SyncV2JoinResponse `json:"join"`
Invite map[string]SyncV2InviteResponse `json:"invite"`
Leave map[string]SyncV2LeaveResponse `json:"leave"`
}
2021-05-14 16:49:33 +01:00
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2JoinResponse struct {
State EventsResponse `json:"state"`
Timeline TimelineResponse `json:"timeline"`
Ephemeral EventsResponse `json:"ephemeral"`
AccountData EventsResponse `json:"account_data"`
UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}
type UnreadNotifications struct {
HighlightCount *int `json:"highlight_count,omitempty"`
NotificationCount *int `json:"notification_count,omitempty"`
2021-05-14 16:49:33 +01:00
}
type TimelineResponse struct {
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
}
type EventsResponse struct {
Events []json.RawMessage `json:"events"`
}
2021-05-14 16:49:33 +01:00
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
type SyncV2InviteResponse struct {
InviteState EventsResponse `json:"invite_state"`
2021-05-14 16:49:33 +01:00
}
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
type SyncV2LeaveResponse struct {
State struct {
2021-09-30 18:23:52 +01:00
Events []json.RawMessage `json:"events"`
2021-05-14 16:49:33 +01:00
} `json:"state"`
Timeline struct {
2021-09-30 18:23:52 +01:00
Events []json.RawMessage `json:"events"`
Limited bool `json:"limited"`
PrevBatch string `json:"prev_batch,omitempty"`
2021-05-14 16:49:33 +01:00
} `json:"timeline"`
}