tracing: do runtime/trace and OTLP at the same time

This commit is contained in:
Kegan Dougal 2023-02-20 14:57:49 +00:00
parent 613cbd9a67
commit c2a3c53542
9 changed files with 52 additions and 39 deletions

4
go.mod
View File

@ -19,6 +19,8 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
@ -29,6 +31,8 @@ require (
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.opentelemetry.io/otel v1.13.0 // indirect
go.opentelemetry.io/otel/trace v1.13.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/sync v0.0.0-20220907140024-f12130a52804 // indirect
golang.org/x/sys v0.4.0 // indirect

9
go.sum
View File

@ -72,6 +72,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -241,6 +246,10 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y=
go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg=
go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY=
go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"os"
"runtime/trace"
"strings"
"github.com/jmoiron/sqlx"
@ -346,7 +345,8 @@ func (s *Storage) StateSnapshot(snapID int64) (state []json.RawMessage, err erro
// If the list of state keys is empty then all events matching that event type will be returned. If the map is empty entirely, then all room state
// will be returned.
func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []string, pos int64, eventTypesToStateKeys map[string][]string) (roomToEvents map[string][]Event, err error) {
defer trace.StartRegion(ctx, "RoomStateAfterEventPosition").End()
_, span := internal.StartSpan(ctx, "RoomStateAfterEventPosition")
defer span.End()
roomToEvents = make(map[string][]Event, len(roomIDs))
roomIndex := make(map[string]int, len(roomIDs))
err = sqlutil.WithTransaction(s.accumulator.db, func(txn *sqlx.Tx) error {

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"runtime/debug"
"runtime/trace"
"sync"
"github.com/matrix-org/sliding-sync/internal"
@ -90,9 +89,9 @@ func (c *Conn) tryRequest(ctx context.Context, req *Request) (res *Response, err
if req.pos == 0 {
taskType = "OnIncomingRequestInitial"
}
ctx, task := trace.NewTask(ctx, taskType)
ctx, task := internal.StartTask(ctx, taskType)
defer task.End()
trace.Logf(ctx, "connstate", "starting user=%v device=%v pos=%v", c.handler.UserID(), c.ConnID.DeviceID, req.pos)
internal.Logf(ctx, "connstate", "starting user=%v device=%v pos=%v", c.handler.UserID(), c.ConnID.DeviceID, req.pos)
return c.handler.OnIncomingRequest(ctx, c.ConnID, req, req.pos == 0)
}

View File

@ -3,9 +3,7 @@ package sync3
import (
"context"
"encoding/json"
"fmt"
"os"
"runtime/trace"
"sync"
"github.com/matrix-org/sliding-sync/internal"
@ -254,7 +252,7 @@ func (d *Dispatcher) OnReceipt(ctx context.Context, receipt internal.Receipt) {
}
func (d *Dispatcher) notifyListeners(ctx context.Context, ed *caches.EventData, userIDs []string, targetUser string, shouldForceInitial bool, membership string) {
trace.Log(ctx, "dispatcher", fmt.Sprintf("%s: notify %d users (nid=%d,join_count=%d)", ed.RoomID, len(userIDs), ed.LatestPos, ed.JoinCount))
internal.Logf(ctx, "dispatcher", "%s: notify %d users (nid=%d,join_count=%d)", ed.RoomID, len(userIDs), ed.LatestPos, ed.JoinCount)
// invoke listeners
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()

View File

@ -5,8 +5,8 @@ import (
"context"
"os"
"reflect"
"runtime/trace"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync3/caches"
"github.com/rs/zerolog"
@ -197,8 +197,8 @@ func (h *Handler) Handle(ctx context.Context, req Request, extCtx Context) (res
extCtx.Handler = h
exts := req.EnabledExtensions()
for _, ext := range exts {
region := trace.StartRegion(ctx, "extension_"+ext.Name())
ext.ProcessInitial(ctx, &res, extCtx)
childCtx, region := internal.StartSpan(ctx, "extension_"+ext.Name())
ext.ProcessInitial(childCtx, &res, extCtx)
region.End()
}
return

View File

@ -3,7 +3,6 @@ package handler
import (
"context"
"encoding/json"
"runtime/trace"
"time"
"github.com/matrix-org/sliding-sync/internal"
@ -119,7 +118,8 @@ func (s *ConnState) load() error {
// OnIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool) (*sync3.Response, error) {
if s.loadPosition == -1 {
region := trace.StartRegion(ctx, "load")
// load() needs no ctx so drop it
_, region := internal.StartSpan(ctx, "load")
s.load()
region.End()
}
@ -134,14 +134,14 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
// ApplyDelta works fine if s.muxedReq is nil
var delta *sync3.RequestDelta
s.muxedReq, delta = s.muxedReq.ApplyDelta(req)
trace.Logf(ctx, "connstate", "new subs=%v unsubs=%v num_lists=%v", len(delta.Subs), len(delta.Unsubs), len(delta.Lists))
internal.Logf(ctx, "connstate", "new subs=%v unsubs=%v num_lists=%v", len(delta.Subs), len(delta.Unsubs), len(delta.Lists))
for key, l := range delta.Lists {
listData := ""
if l.Curr != nil {
listDataBytes, _ := json.Marshal(l.Curr)
listData = string(listDataBytes)
}
trace.Logf(ctx, "connstate", "list[%v] prev_empty=%v curr=%v", key, l.Prev == nil, listData)
internal.Logf(ctx, "connstate", "list[%v] prev_empty=%v curr=%v", key, l.Prev == nil, listData)
}
// work out which rooms we'll return data for and add their relevant subscriptions to the builder
@ -160,7 +160,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
// Handle extensions AFTER processing lists as extensions may need to know which rooms the client
// is being notified about (e.g. for room account data)
region := trace.StartRegion(ctx, "extensions")
ctx, region := internal.StartSpan(ctx, "extensions")
response.Extensions = s.extensionsHandler.Handle(ctx, s.muxedReq.Extensions, extensions.Context{
UserID: s.userID,
DeviceID: s.deviceID,
@ -179,7 +179,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
}
// do live tracking if we have nothing to tell the client yet
region = trace.StartRegion(ctx, "liveUpdate")
ctx, region = internal.StartSpan(ctx, "liveUpdate")
s.live.liveUpdate(ctx, req, s.muxedReq.Extensions, isInitial, response)
region.End()
@ -193,7 +193,8 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *sync3.Request, i
}
func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBuilder, listKey string, prevReqList, nextReqList *sync3.RequestList) sync3.ResponseList {
defer trace.StartRegion(ctx, "onIncomingListRequest").End()
ctx, span := internal.StartSpan(ctx, "onIncomingListRequest")
defer span.End()
roomList, overwritten := s.lists.AssignList(listKey, nextReqList.Filters, nextReqList.Sort, sync3.DoNotOverwrite)
if nextReqList.ShouldGetAllRooms() {
@ -337,7 +338,8 @@ func (s *ConnState) onIncomingListRequest(ctx context.Context, builder *RoomsBui
}
func (s *ConnState) buildListSubscriptions(ctx context.Context, builder *RoomsBuilder, listDeltas map[string]sync3.RequestListDelta) map[string]sync3.ResponseList {
defer trace.StartRegion(ctx, "buildListSubscriptions").End()
ctx, span := internal.StartSpan(ctx, "buildListSubscriptions")
defer span.End()
result := make(map[string]sync3.ResponseList, len(s.muxedReq.Lists))
// loop each list and handle each independently
for listKey, list := range listDeltas {
@ -353,7 +355,8 @@ func (s *ConnState) buildListSubscriptions(ctx context.Context, builder *RoomsBu
}
func (s *ConnState) buildRoomSubscriptions(ctx context.Context, builder *RoomsBuilder, subs, unsubs []string) {
defer trace.StartRegion(ctx, "buildRoomSubscriptions").End()
ctx, span := internal.StartSpan(ctx, "buildRoomSubscriptions")
defer span.End()
for _, roomID := range subs {
// check that the user is allowed to see these rooms as they can set arbitrary room IDs
if !s.joinChecker.IsUserJoined(s.userID, roomID) {
@ -377,7 +380,8 @@ func (s *ConnState) buildRoomSubscriptions(ctx context.Context, builder *RoomsBu
}
func (s *ConnState) buildRooms(ctx context.Context, builtSubs []BuiltSubscription) map[string]sync3.Room {
defer trace.StartRegion(ctx, "buildRooms").End()
ctx, span := internal.StartSpan(ctx, "buildRooms")
defer span.End()
result := make(map[string]sync3.Room)
for _, bs := range builtSubs {
roomIDs := bs.RoomIDs
@ -419,7 +423,8 @@ func (s *ConnState) buildRooms(ctx context.Context, builtSubs []BuiltSubscriptio
}
func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSubscription, roomIDs ...string) map[string]sync3.Room {
defer trace.StartRegion(ctx, "getInitialRoomData").End()
ctx, span := internal.StartSpan(ctx, "getInitialRoomData")
defer span.End()
rooms := make(map[string]sync3.Room, len(roomIDs))
// We want to grab the user room data and the room metadata for each room ID.
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(s.loadPosition, roomIDs, int(roomSub.TimelineLimit))
@ -529,7 +534,7 @@ func (s *ConnState) OnRoomUpdate(ctx context.Context, up caches.RoomUpdate) {
return
}
internal.Assert("missing global room metadata", update.GlobalRoomMetadata() != nil)
trace.Logf(ctx, "connstate", "queued update %d", update.EventData.LatestPos)
internal.Logf(ctx, "connstate", "queued update %d", update.EventData.LatestPos)
s.live.onUpdate(update)
case caches.RoomUpdate:
internal.Assert("missing global room metadata", update.GlobalRoomMetadata() != nil)

View File

@ -3,7 +3,6 @@ package handler
import (
"context"
"encoding/json"
"runtime/trace"
"time"
"github.com/matrix-org/sliding-sync/internal"
@ -75,14 +74,14 @@ func (s *connStateLive) liveUpdate(
select {
case <-ctx.Done(): // client has given up
logger.Trace().Str("user", s.userID).Msg("liveUpdate: client gave up")
trace.Logf(ctx, "liveUpdate", "context cancelled")
internal.Logf(ctx, "liveUpdate", "context cancelled")
return
case <-time.After(timeLeftToWait): // we've timed out
logger.Trace().Str("user", s.userID).Msg("liveUpdate: timed out")
trace.Logf(ctx, "liveUpdate", "timed out after %v", timeLeftToWait)
internal.Logf(ctx, "liveUpdate", "timed out after %v", timeLeftToWait)
return
case update := <-s.updates:
trace.Logf(ctx, "liveUpdate", "process live update")
internal.Logf(ctx, "liveUpdate", "process live update")
s.processLiveUpdate(ctx, update, response)
// pass event to extensions AFTER processing

View File

@ -8,7 +8,6 @@ import (
"net/url"
"os"
"reflect"
"runtime/trace"
"strconv"
"sync"
"time"
@ -495,7 +494,7 @@ func (h *SyncLiveHandler) OnInitialSyncComplete(p *pubsub.V2InitialSyncComplete)
// Called from the v2 poller, implements V2DataReceiver
func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate) {
ctx, task := trace.NewTask(context.Background(), "Accumulate")
ctx, task := internal.StartTask(context.Background(), "Accumulate")
defer task.End()
events, err := h.Storage.EventNIDs(p.EventNIDs)
if err != nil {
@ -505,14 +504,14 @@ func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate) {
if len(events) == 0 {
return
}
trace.Log(ctx, "room", fmt.Sprintf("%s: %d events", p.RoomID, len(events)))
internal.Logf(ctx, "room", fmt.Sprintf("%s: %d events", p.RoomID, len(events)))
// we have new events, notify active connections
h.Dispatcher.OnNewEvents(ctx, p.RoomID, events, p.EventNIDs[len(p.EventNIDs)-1])
}
// Called from the v2 poller, implements V2DataReceiver
func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise) {
ctx, task := trace.NewTask(context.Background(), "Initialise")
ctx, task := internal.StartTask(context.Background(), "Initialise")
defer task.End()
state, err := h.Storage.StateSnapshot(p.SnapshotNID)
if err != nil {
@ -524,7 +523,7 @@ func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise) {
}
func (h *SyncLiveHandler) OnUnreadCounts(p *pubsub.V2UnreadCounts) {
ctx, task := trace.NewTask(context.Background(), "OnUnreadCounts")
ctx, task := internal.StartTask(context.Background(), "OnUnreadCounts")
defer task.End()
userCache, ok := h.userCaches.Load(p.UserID)
if !ok {
@ -535,7 +534,7 @@ func (h *SyncLiveHandler) OnUnreadCounts(p *pubsub.V2UnreadCounts) {
// push device data updates on waiting conns (otk counts, device list changes)
func (h *SyncLiveHandler) OnDeviceData(p *pubsub.V2DeviceData) {
ctx, task := trace.NewTask(context.Background(), "OnDeviceData")
ctx, task := internal.StartTask(context.Background(), "OnDeviceData")
defer task.End()
conn := h.ConnMap.Conn(sync3.ConnID{
DeviceID: p.DeviceID,
@ -547,7 +546,7 @@ func (h *SyncLiveHandler) OnDeviceData(p *pubsub.V2DeviceData) {
}
func (h *SyncLiveHandler) OnDeviceMessages(p *pubsub.V2DeviceMessages) {
ctx, task := trace.NewTask(context.Background(), "OnDeviceMessages")
ctx, task := internal.StartTask(context.Background(), "OnDeviceMessages")
defer task.End()
conn := h.ConnMap.Conn(sync3.ConnID{
DeviceID: p.DeviceID,
@ -559,7 +558,7 @@ func (h *SyncLiveHandler) OnDeviceMessages(p *pubsub.V2DeviceMessages) {
}
func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom) {
ctx, task := trace.NewTask(context.Background(), "OnInvite")
ctx, task := internal.StartTask(context.Background(), "OnInvite")
defer task.End()
userCache, ok := h.userCaches.Load(p.UserID)
if !ok {
@ -574,7 +573,7 @@ func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom) {
}
func (h *SyncLiveHandler) OnLeftRoom(p *pubsub.V2LeaveRoom) {
ctx, task := trace.NewTask(context.Background(), "OnLeftRoom")
ctx, task := internal.StartTask(context.Background(), "OnLeftRoom")
defer task.End()
userCache, ok := h.userCaches.Load(p.UserID)
if !ok {
@ -584,7 +583,7 @@ func (h *SyncLiveHandler) OnLeftRoom(p *pubsub.V2LeaveRoom) {
}
func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt) {
ctx, task := trace.NewTask(context.Background(), "OnReceipt")
ctx, task := internal.StartTask(context.Background(), "OnReceipt")
defer task.End()
// split receipts into public / private
userToPrivateReceipts := make(map[string][]internal.Receipt)
@ -616,7 +615,7 @@ func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt) {
}
func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing) {
ctx, task := trace.NewTask(context.Background(), "OnTyping")
ctx, task := internal.StartTask(context.Background(), "OnTyping")
defer task.End()
rooms := h.GlobalCache.LoadRooms(p.RoomID)
if rooms[p.RoomID] != nil {
@ -628,7 +627,7 @@ func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing) {
}
func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData) {
ctx, task := trace.NewTask(context.Background(), "OnAccountData")
ctx, task := internal.StartTask(context.Background(), "OnAccountData")
defer task.End()
userCache, ok := h.userCaches.Load(p.UserID)
if !ok {