Type switch payload types; add Prometheus instructions

The type names should make it self-explanatory what kinds of
payloads are being processed.
This commit is contained in:
Kegan Dougal 2022-12-16 10:52:08 +00:00
parent aa28df161c
commit 233d21ad2e
4 changed files with 83 additions and 36 deletions

View File

@ -39,6 +39,42 @@ INF Poller: v2 poll loop started ip=::1 since= user_id=@kegan:matrix.org
Wait for the first initial v2 sync to be processed (this can take minutes!) and then v3 APIs will be responsive.
### Prometheus
To enable metrics, pass `SYNCV3_PROM=:2112` to listen on that port and expose a scraping endpoint `GET /metrics`.
If you want to hook this up to a prometheus, you can just define `prometheus.yml`:
```yaml
global:
scrape_interval: 30s
scrape_timeout: 10s
scrape_configs:
- job_name: ss
static_configs:
- targets: ["host.docker.internal:2112"]
```
then run Prometheus in a docker container:
```bash
docker run -p 9090:9090 -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
```
to play with the data, use PromLens and point it at http://localhost:9090:
```bash
docker run -p 8080:8080 prom/promlens
```
Useful queries include:
- `rate(sliding_sync_poller_num_payloads{job="ss"}[5m])` : This shows the payload rate from pollers to API processes,
broken down by type. A stacked graph display is especially useful as the height then represents the total payload
rate. This can be used to highlight abnormal incoming data, such as excessive payload rates. It can also be used
to gauge how costly certain payload types are. In general, receipts and device data tend to be the most frequent
background noise. A full list of payload types are defined in the [pubsub directory](https://github.com/matrix-org/sliding-sync/blob/main/pubsub/v2.go).
- `sliding_sync_poller_num_pollers` : Absolute count of the number of /sync v2 pollers over time. Useful either as a single value,
or display over time. The higher this value, the more pressure is put on the upstream Homeserver.
- `sliding_sync_api_num_active_conns` : Absolute count of the number of active sliding sync connections. Useful either as a single value,
or display over time. The higher this value, the more pressure is put on the proxy API processes.
- `sum(increase(sliding_sync_poller_process_duration_secs_bucket[1m])) by (le)` : Useful heatmap to show how long /sync v2 responses take to process.
This can highlight database pressure as processing responses involves database writes and notifications over pubsub.
- `sum(increase(sliding_sync_api_process_duration_secs_bucket[1m])) by (le)` : Useful heatmap to show how long sliding sync responses take to calculate,
which excludes all long-polling requests. This can highlight slow sorting/database performance, as these requests should always be fast.
### How can I help?
At present, the best way to help would be to run a local v3 server pointed at a busy account and just leave it and a client running in the background. Look at it occasionally and submit any issues you notice. You can save console logs by right-clicking -> Save As.

View File

@ -2,14 +2,21 @@ package pubsub
import (
"fmt"
"os"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
// Every payload needs a type to distinguish what kind of update it is.
var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})
type Payload interface {
// The type of payload; used mostly for logging and prometheus metrics
Type() string
}

View File

@ -27,7 +27,7 @@ type V2Initialise struct {
SnapshotNID int64
}
func (v V2Initialise) Type() string { return "s" }
func (*V2Initialise) Type() string { return "V2Initialise" }
type V2Accumulate struct {
RoomID string
@ -35,7 +35,7 @@ type V2Accumulate struct {
EventNIDs []int64
}
func (v V2Accumulate) Type() string { return "a" }
func (*V2Accumulate) Type() string { return "V2Accumulate" }
type V2UnreadCounts struct {
UserID string
@ -44,7 +44,7 @@ type V2UnreadCounts struct {
NotificationCount *int
}
func (v V2UnreadCounts) Type() string { return "u" }
func (*V2UnreadCounts) Type() string { return "V2UnreadCounts" }
type V2AccountData struct {
UserID string
@ -52,48 +52,48 @@ type V2AccountData struct {
Types []string
}
func (v V2AccountData) Type() string { return "c" }
func (*V2AccountData) Type() string { return "V2AccountData" }
type V2LeaveRoom struct {
UserID string
RoomID string
}
func (v V2LeaveRoom) Type() string { return "l" }
func (*V2LeaveRoom) Type() string { return "V2LeaveRoom" }
type V2InviteRoom struct {
UserID string
RoomID string
}
func (v V2InviteRoom) Type() string { return "i" }
func (*V2InviteRoom) Type() string { return "V2InviteRoom" }
type V2InitialSyncComplete struct {
UserID string
DeviceID string
}
func (v V2InitialSyncComplete) Type() string { return "x" }
func (*V2InitialSyncComplete) Type() string { return "V2InitialSyncComplete" }
type V2DeviceData struct {
Pos int64
}
func (v V2DeviceData) Type() string { return "d" }
func (*V2DeviceData) Type() string { return "V2DeviceData" }
type V2Typing struct {
RoomID string
EphemeralEvent json.RawMessage
}
func (v V2Typing) Type() string { return "t" }
func (*V2Typing) Type() string { return "V2Typing" }
type V2Receipt struct {
RoomID string
Receipts []internal.Receipt
}
func (v V2Receipt) Type() string { return "r" }
func (*V2Receipt) Type() string { return "V2Receipt" }
type V2Sub struct {
listener Listener
@ -112,27 +112,29 @@ func (v *V2Sub) Teardown() {
}
func (v *V2Sub) onMessage(p Payload) {
switch p.Type() {
case V2Receipt{}.Type():
v.receiver.OnReceipt(p.(*V2Receipt))
case V2Initialise{}.Type():
v.receiver.Initialise(p.(*V2Initialise))
case V2Accumulate{}.Type():
v.receiver.Accumulate(p.(*V2Accumulate))
case V2AccountData{}.Type():
v.receiver.OnAccountData(p.(*V2AccountData))
case V2InviteRoom{}.Type():
v.receiver.OnInvite(p.(*V2InviteRoom))
case V2LeaveRoom{}.Type():
v.receiver.OnLeftRoom(p.(*V2LeaveRoom))
case V2UnreadCounts{}.Type():
v.receiver.OnUnreadCounts(p.(*V2UnreadCounts))
case V2InitialSyncComplete{}.Type():
v.receiver.OnInitialSyncComplete(p.(*V2InitialSyncComplete))
case V2DeviceData{}.Type():
v.receiver.OnDeviceData(p.(*V2DeviceData))
case V2Typing{}.Type():
v.receiver.OnTyping(p.(*V2Typing))
switch pl := p.(type) {
case *V2Receipt:
v.receiver.OnReceipt(pl)
case *V2Initialise:
v.receiver.Initialise(pl)
case *V2Accumulate:
v.receiver.Accumulate(pl)
case *V2AccountData:
v.receiver.OnAccountData(pl)
case *V2InviteRoom:
v.receiver.OnInvite(pl)
case *V2LeaveRoom:
v.receiver.OnLeftRoom(pl)
case *V2UnreadCounts:
v.receiver.OnUnreadCounts(pl)
case *V2InitialSyncComplete:
v.receiver.OnInitialSyncComplete(pl)
case *V2DeviceData:
v.receiver.OnDeviceData(pl)
case *V2Typing:
v.receiver.OnTyping(pl)
default:
logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type")
}
}

View File

@ -12,7 +12,7 @@ type V3EnsurePolling struct {
DeviceID string
}
func (v V3EnsurePolling) Type() string { return "p" }
func (*V3EnsurePolling) Type() string { return "V3EnsurePolling" }
type V3Sub struct {
listener Listener
@ -31,9 +31,11 @@ func (v *V3Sub) Teardown() {
}
func (v *V3Sub) onMessage(p Payload) {
switch p.Type() {
case V3EnsurePolling{}.Type():
v.receiver.EnsurePolling(p.(*V3EnsurePolling))
switch pl := p.(type) {
case *V3EnsurePolling:
v.receiver.EnsurePolling(pl)
default:
logger.Warn().Str("type", p.Type()).Msg("V3Sub: unhandled payload type")
}
}