mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Add more OTLP tags to spans
This allows easier discovery when we see rageshakes from EX.
This commit is contained in:
parent
2730e8f3b0
commit
93dc9f9484
@ -23,9 +23,35 @@ const tracerName = "sliding-sync"
|
||||
type TraceKey string
|
||||
|
||||
var (
|
||||
OTLPSpan TraceKey = "otlp"
|
||||
OTLPTagDeviceID TraceKey = "device_id"
|
||||
OTLPTagUserID TraceKey = "user_id"
|
||||
OTLPTagConnID TraceKey = "conn_id"
|
||||
OTLPTagTxnID TraceKey = "txn_id"
|
||||
)
|
||||
|
||||
// SetAttributeOnContext sets one of the trace tag keys on the given context, so derived spans will use said tags.
|
||||
func SetAttributeOnContext(ctx context.Context, key TraceKey, val string) context.Context {
|
||||
return context.WithValue(ctx, key, val)
|
||||
}
|
||||
|
||||
// attributesFromContext sets span tags based on data in the provided ctx
|
||||
func attributesFromContext(ctx context.Context) []otrace.SpanStartOption {
|
||||
var attrs []attribute.KeyValue
|
||||
for _, tag := range []TraceKey{OTLPTagConnID, OTLPTagDeviceID, OTLPTagUserID, OTLPTagTxnID} {
|
||||
val := ctx.Value(tag)
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
attrs = append(attrs, attribute.String(string(tag), val.(string)))
|
||||
}
|
||||
if len(attrs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return []otrace.SpanStartOption{
|
||||
otrace.WithAttributes(attrs...),
|
||||
}
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
t *trace.Task
|
||||
o otrace.Span
|
||||
@ -57,7 +83,7 @@ func Logf(ctx context.Context, category, format string, args ...interface{}) {
|
||||
|
||||
func StartSpan(ctx context.Context, name string) (newCtx context.Context, span *RuntimeTraceOTLPSpan) {
|
||||
region := trace.StartRegion(ctx, name)
|
||||
newCtx, ospan := otel.Tracer(tracerName).Start(ctx, name)
|
||||
newCtx, ospan := otel.Tracer(tracerName).Start(ctx, name, attributesFromContext(ctx)...)
|
||||
// use the same api as NewTask to allow context nesting
|
||||
return newCtx, &RuntimeTraceOTLPSpan{
|
||||
region: region,
|
||||
@ -67,7 +93,7 @@ func StartSpan(ctx context.Context, name string) (newCtx context.Context, span *
|
||||
|
||||
func StartTask(ctx context.Context, name string) (context.Context, *Task) {
|
||||
ctx, task := trace.NewTask(ctx, name)
|
||||
newCtx, ospan := otel.Tracer(tracerName).Start(ctx, name)
|
||||
newCtx, ospan := otel.Tracer(tracerName).Start(ctx, name, attributesFromContext(ctx)...)
|
||||
return newCtx, &Task{
|
||||
t: task,
|
||||
o: ospan,
|
||||
|
@ -222,6 +222,12 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
|
||||
}
|
||||
}
|
||||
}
|
||||
if requestBody.ConnID != "" {
|
||||
req = req.WithContext(internal.SetAttributeOnContext(req.Context(), internal.OTLPTagConnID, requestBody.ConnID))
|
||||
}
|
||||
if requestBody.TxnID != "" {
|
||||
req = req.WithContext(internal.SetAttributeOnContext(req.Context(), internal.OTLPTagTxnID, requestBody.TxnID))
|
||||
}
|
||||
hlog.FromRequest(req).UpdateContext(func(c zerolog.Context) zerolog.Context {
|
||||
c.Str("txn_id", requestBody.TxnID)
|
||||
return c
|
||||
@ -243,7 +249,7 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
|
||||
}
|
||||
}
|
||||
|
||||
conn, herr := h.setupConnection(req, &requestBody, req.URL.Query().Get("pos") != "")
|
||||
req, conn, herr := h.setupConnection(req, &requestBody, req.URL.Query().Get("pos") != "")
|
||||
if herr != nil {
|
||||
logErrorOrWarning("failed to get or create Conn", herr)
|
||||
return herr
|
||||
@ -320,7 +326,7 @@ func (h *SyncLiveHandler) serve(w http.ResponseWriter, req *http.Request) error
|
||||
// setupConnection associates this request with an existing connection or makes a new connection.
|
||||
// It also sets a v2 sync poll loop going if one didn't exist already for this user.
|
||||
// When this function returns, the connection is alive and active.
|
||||
func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Request, containsPos bool) (*sync3.Conn, *internal.HandlerError) {
|
||||
func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Request, containsPos bool) (*http.Request, *sync3.Conn, *internal.HandlerError) {
|
||||
taskCtx, task := internal.StartTask(req.Context(), "setupConnection")
|
||||
defer task.End()
|
||||
var conn *sync3.Conn
|
||||
@ -328,7 +334,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
|
||||
accessToken, err := internal.ExtractAccessToken(req)
|
||||
if err != nil || accessToken == "" {
|
||||
hlog.FromRequest(req).Warn().Err(err).Msg("failed to get access token from request")
|
||||
return nil, &internal.HandlerError{
|
||||
return req, nil, &internal.HandlerError{
|
||||
StatusCode: http.StatusUnauthorized,
|
||||
Err: err,
|
||||
}
|
||||
@ -342,17 +348,19 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
|
||||
hlog.FromRequest(req).Info().Msg("Received connection from unknown access token, querying with homeserver")
|
||||
newToken, herr := h.identifyUnknownAccessToken(accessToken, hlog.FromRequest(req))
|
||||
if herr != nil {
|
||||
return nil, herr
|
||||
return req, nil, herr
|
||||
}
|
||||
token = newToken
|
||||
} else {
|
||||
hlog.FromRequest(req).Err(err).Msg("Failed to lookup access token")
|
||||
return nil, &internal.HandlerError{
|
||||
return req, nil, &internal.HandlerError{
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
req = req.WithContext(internal.SetAttributeOnContext(req.Context(), internal.OTLPTagUserID, token.UserID))
|
||||
req = req.WithContext(internal.SetAttributeOnContext(req.Context(), internal.OTLPTagDeviceID, token.DeviceID))
|
||||
log := hlog.FromRequest(req).With().
|
||||
Str("user", token.UserID).
|
||||
Str("device", token.DeviceID).
|
||||
@ -379,10 +387,10 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
|
||||
conn = h.ConnMap.Conn(connID)
|
||||
if conn != nil {
|
||||
log.Trace().Str("conn", conn.ConnID.String()).Msg("reusing conn")
|
||||
return conn, nil
|
||||
return req, conn, nil
|
||||
}
|
||||
// conn doesn't exist, we probably nuked it.
|
||||
return nil, internal.ExpiredSessionError()
|
||||
return req, nil, internal.ExpiredSessionError()
|
||||
}
|
||||
|
||||
pid := sync2.PollerID{UserID: token.UserID, DeviceID: token.DeviceID}
|
||||
@ -393,7 +401,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
|
||||
// We'll be quicker next time as the poller will already exist.
|
||||
if req.Context().Err() != nil {
|
||||
log.Warn().Msg("client gave up, not creating connection")
|
||||
return nil, &internal.HandlerError{
|
||||
return req, nil, &internal.HandlerError{
|
||||
StatusCode: 400,
|
||||
Err: req.Context().Err(),
|
||||
}
|
||||
@ -402,7 +410,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
|
||||
userCache, err := h.userCache(token.UserID)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("failed to load user cache")
|
||||
return nil, &internal.HandlerError{
|
||||
return req, nil, &internal.HandlerError{
|
||||
StatusCode: 500,
|
||||
Err: err,
|
||||
}
|
||||
@ -424,7 +432,7 @@ func (h *SyncLiveHandler) setupConnection(req *http.Request, syncReq *sync3.Requ
|
||||
} else {
|
||||
log.Info().Msg("using existing connection")
|
||||
}
|
||||
return conn, nil
|
||||
return req, conn, nil
|
||||
}
|
||||
|
||||
func (h *SyncLiveHandler) identifyUnknownAccessToken(accessToken string, logger *zerolog.Logger) (*sync2.Token, *internal.HandlerError) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user