Skip to content

Commit 86fc465

Browse files
committed
feat: add basic tracing for responses
1 parent 06cb155 commit 86fc465

File tree

10 files changed

+216
-64
lines changed

10 files changed

+216
-64
lines changed

impl/graphsync.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,12 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
305305
}
306306

307307
// Request initiates a new GraphSync request to the given peer using the given selector spec.
308-
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
308+
func (gs *GraphSync) Request(
309+
ctx context.Context,
310+
p peer.ID, root ipld.Link,
311+
selector ipld.Node,
312+
extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
313+
309314
var extNames []string
310315
for _, ext := range extensions {
311316
extNames = append(extNames, string(ext.Name))
@@ -477,7 +482,7 @@ func (gsr *graphSyncReceiver) graphSync() *GraphSync {
477482
return (*GraphSync)(gsr)
478483
}
479484

480-
// ReceiveMessage is part of the networks Receiver interface and receives
485+
// ReceiveMessage is part of the network's Receiver interface and receives
481486
// incoming messages from the network
482487
func (gsr *graphSyncReceiver) ReceiveMessage(
483488
ctx context.Context,

impl/graphsync_test.go

+109-37
Large diffs are not rendered by default.

message/message.go

+17
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,15 @@ func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, boo
375375
return val, true
376376
}
377377

378+
// ExtensionNames returns the names of the extensions included in this request
379+
func (gsr GraphSyncRequest) ExtensionNames() []string {
380+
var extNames []string
381+
for ext := range gsr.extensions {
382+
extNames = append(extNames, ext)
383+
}
384+
return extNames
385+
}
386+
378387
// IsCancel returns true if this particular request is being cancelled
379388
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }
380389

@@ -398,7 +407,15 @@ func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bo
398407
return nil, false
399408
}
400409
return val, true
410+
}
401411

412+
// ExtensionNames returns the names of the extensions included in this request
413+
func (gsr GraphSyncResponse) ExtensionNames() []string {
414+
var extNames []string
415+
for ext := range gsr.extensions {
416+
extNames = append(extNames, ext)
417+
}
418+
return extNames
402419
}
403420

404421
// ReplaceExtensions merges the extensions given extensions into the request to create a new request,

requestmanager/executor/executor.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,16 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.
8282

8383
log.Debugw("beginning request execution", "id", requestTask.Request.ID(), "peer", pid.String(), "root_cid", requestTask.Request.Root().String())
8484
err := e.traverse(requestTask)
85-
span.RecordError(err)
86-
if err != nil && !ipldutil.IsContextCancelErr(err) {
87-
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
88-
if !isPausedErr(err) {
89-
span.SetStatus(codes.Error, err.Error())
90-
select {
91-
case <-requestTask.Ctx.Done():
92-
case requestTask.InProgressErr <- err:
85+
if err != nil {
86+
span.RecordError(err)
87+
if !ipldutil.IsContextCancelErr(err) {
88+
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
89+
if !isPausedErr(err) {
90+
span.SetStatus(codes.Error, err.Error())
91+
select {
92+
case <-requestTask.Ctx.Done():
93+
case requestTask.InProgressErr <- err:
94+
}
9395
}
9496
}
9597
}

responsemanager/client.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ipfs/go-peertaskqueue/peertask"
1010
ipld "github.com/ipld/go-ipld-prime"
1111
"github.com/libp2p/go-libp2p-core/peer"
12+
"go.opentelemetry.io/otel/trace"
1213

1314
"github.com/ipfs/go-graphsync"
1415
"github.com/ipfs/go-graphsync/ipldutil"
@@ -30,6 +31,7 @@ var log = logging.Logger("graphsync")
3031

3132
type inProgressResponseStatus struct {
3233
ctx context.Context
34+
span trace.Span
3335
cancelFn func()
3436
request gsmsg.GraphSyncRequest
3537
loader ipld.BlockReadOpener
@@ -197,8 +199,8 @@ func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.Request
197199
}
198200
}
199201

200-
// this is a test utility method to force all messages to get processed
201-
func (rm *ResponseManager) synchronize() {
202+
// Synchronize is a utility method that blocks until all current messages are processed
203+
func (rm *ResponseManager) Synchronize() {
202204
sync := make(chan error)
203205
rm.send(&synchronizeMessage{sync}, nil)
204206
select {

responsemanager/messages.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ type processRequestMessage struct {
1515
requests []gsmsg.GraphSyncRequest
1616
}
1717

18+
func (prm *processRequestMessage) handle(rm *ResponseManager) {
19+
rm.processRequests(prm.p, prm.requests)
20+
}
21+
1822
type pauseRequestMessage struct {
1923
p peer.ID
2024
requestID graphsync.RequestID
@@ -111,10 +115,6 @@ func (str *startTaskRequest) handle(rm *ResponseManager) {
111115
}
112116
}
113117

114-
func (prm *processRequestMessage) handle(rm *ResponseManager) {
115-
rm.processRequests(prm.p, prm.requests)
116-
}
117-
118118
type peerStateMessage struct {
119119
p peer.ID
120120
peerStatsChan chan<- peerstate.PeerState

responsemanager/queryexecutor/queryexecutor.go

+13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import (
1010
ipld "github.com/ipld/go-ipld-prime"
1111
"github.com/ipld/go-ipld-prime/traversal"
1212
"github.com/libp2p/go-libp2p-core/peer"
13+
"go.opentelemetry.io/otel"
14+
"go.opentelemetry.io/otel/codes"
15+
"go.opentelemetry.io/otel/trace"
1316

1417
"github.com/ipfs/go-graphsync"
1518
"github.com/ipfs/go-graphsync/ipldutil"
@@ -38,6 +41,7 @@ type ResponseTask struct {
3841
Empty bool
3942
Subscriber *notifications.TopicDataSubscriber
4043
Ctx context.Context
44+
Span trace.Span
4145
Request gsmsg.GraphSyncRequest
4246
Loader ipld.BlockReadOpener
4347
Traverser ipldutil.Traverser
@@ -97,8 +101,17 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
97101
return false
98102
}
99103

104+
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
105+
defer span.End()
106+
100107
log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
101108
err := qe.executeQuery(pid, rt)
109+
if err != nil {
110+
span.RecordError(err)
111+
if _, isPaused := err.(hooks.ErrPaused); !isPaused {
112+
span.SetStatus(codes.Error, err.Error())
113+
}
114+
}
102115
qe.manager.FinishTask(task, err)
103116
log.Debugw("finishing response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
104117
return false

responsemanager/responsemanager_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
100100
gsmsg.CancelRequest(td.requestID),
101101
}
102102
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
103-
responseManager.synchronize()
103+
responseManager.Synchronize()
104104
close(waitForCancel)
105105

106106
testutil.AssertDoesReceive(td.ctx, t, cancelledListenerCalled, "should call cancelled listener")
@@ -149,7 +149,7 @@ func TestEarlyCancellation(t *testing.T) {
149149
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
150150
responseManager.Startup()
151151
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
152-
responseManager.synchronize()
152+
responseManager.Synchronize()
153153
td.connManager.AssertProtectedWithTags(t, td.p, td.requests[0].ID().Tag())
154154

155155
// send a cancellation
@@ -158,7 +158,7 @@ func TestEarlyCancellation(t *testing.T) {
158158
}
159159
responseManager.ProcessRequests(td.ctx, td.p, cancelRequests)
160160

161-
responseManager.synchronize()
161+
responseManager.Synchronize()
162162

163163
td.assertNoResponses()
164164
td.connManager.RefuteProtected(t, td.p)
@@ -634,7 +634,7 @@ func TestValidationAndExtensions(t *testing.T) {
634634
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
635635
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
636636
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
637-
responseManager.synchronize()
637+
responseManager.Synchronize()
638638
close(wait)
639639
td.assertCompleteRequestWith(graphsync.RequestCompletedFull)
640640
td.assertReceiveExtensionResponse()
@@ -704,7 +704,7 @@ func TestValidationAndExtensions(t *testing.T) {
704704
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
705705
testutil.AssertDoesReceive(td.ctx, t, sent, "sends blocks")
706706
responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests)
707-
responseManager.synchronize()
707+
responseManager.Synchronize()
708708
close(wait)
709709
td.assertCompleteRequestWith(graphsync.RequestFailedUnknown)
710710
})

responsemanager/server.go

+42-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import (
99
"github.com/ipfs/go-peertaskqueue/peertask"
1010
"github.com/ipfs/go-peertaskqueue/peertracker"
1111
"github.com/libp2p/go-libp2p-core/peer"
12+
"go.opentelemetry.io/otel"
13+
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/codes"
15+
"go.opentelemetry.io/otel/trace"
1216

1317
"github.com/ipfs/go-graphsync"
1418
"github.com/ipfs/go-graphsync/ipldutil"
@@ -50,6 +54,7 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
5054
rm.connManager.Unprotect(key.p, key.requestID.Tag())
5155
delete(rm.inProgressResponses, key)
5256
ipr.cancelFn()
57+
ipr.span.End()
5358
}
5459

5560
func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
@@ -58,6 +63,17 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync
5863
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
5964
return
6065
}
66+
67+
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes(
68+
attribute.Int("id", int(update.ID())),
69+
attribute.Int("priority", int(update.Priority())),
70+
attribute.String("root", update.Root().String()),
71+
attribute.Bool("isCancel", update.IsCancel()),
72+
attribute.Bool("isUpdate", update.IsUpdate()),
73+
attribute.StringSlice("extensions", update.ExtensionNames()),
74+
))
75+
defer span.End()
76+
6177
if response.state != graphsync.Paused {
6278
response.updates = append(response.updates, update)
6379
select {
@@ -79,11 +95,15 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync
7995
})
8096
if result.Err != nil {
8197
response.state = graphsync.CompletingSend
98+
response.span.RecordError(result.Err)
99+
response.span.SetStatus(codes.Error, result.Err.Error())
82100
return
83101
}
84102
if result.Unpause {
85103
err := rm.unpauseRequest(key.p, key.requestID)
86104
if err != nil {
105+
span.RecordError(err)
106+
span.SetStatus(codes.Error, result.Err.Error())
87107
log.Warnf("error unpausing request: %s", err.Error())
88108
}
89109
}
@@ -119,6 +139,13 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
119139
return errors.New("could not find request")
120140
}
121141

142+
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "abortRequest")
143+
defer span.End()
144+
span.RecordError(err)
145+
span.SetStatus(codes.Error, err.Error())
146+
response.span.RecordError(err)
147+
response.span.SetStatus(codes.Error, err.Error())
148+
122149
if response.state != graphsync.Running {
123150
_ = rm.responseAssembler.Transaction(p, requestID, func(rb responseassembler.ResponseBuilder) error {
124151
if ipldutil.IsContextCancelErr(err) {
@@ -155,9 +182,17 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
155182
rm.processUpdate(key, request)
156183
continue
157184
}
185+
ctx, responseSpan := otel.Tracer("graphsync").Start(rm.ctx, "response", trace.WithAttributes(
186+
attribute.Int("id", int(request.ID())),
187+
attribute.Int("priority", int(request.Priority())),
188+
attribute.String("root", request.Root().String()),
189+
attribute.Bool("isCancel", request.IsCancel()),
190+
attribute.Bool("isUpdate", request.IsUpdate()),
191+
attribute.StringSlice("extensions", request.ExtensionNames()),
192+
))
158193
rm.connManager.Protect(p, request.ID().Tag())
159194
rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request)
160-
ctx, cancelFn := context.WithCancel(rm.ctx)
195+
ctx, cancelFn := context.WithCancel(ctx)
161196
sub := notifications.NewTopicDataSubscriber(&subscriber{
162197
p: key.p,
163198
request: request,
@@ -176,6 +211,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
176211
rm.inProgressResponses[key] =
177212
&inProgressResponseStatus{
178213
ctx: ctx,
214+
span: responseSpan,
179215
cancelFn: cancelFn,
180216
subscriber: sub,
181217
request: request,
@@ -204,6 +240,8 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons
204240
loader, traverser, isPaused, err := (&queryPreparer{rm.requestHooks, rm.responseAssembler, rm.linkSystem, rm.maxLinksPerRequest}).prepareQuery(response.ctx, key.p, response.request, response.signals, response.subscriber)
205241
if err != nil {
206242
response.state = graphsync.CompletingSend
243+
response.span.RecordError(err)
244+
response.span.SetStatus(codes.Error, err.Error())
207245
return queryexecutor.ResponseTask{Empty: true}
208246
}
209247
response.loader = loader
@@ -216,6 +254,7 @@ func (rm *ResponseManager) taskDataForKey(key responseKey) queryexecutor.Respons
216254
response.state = graphsync.Running
217255
return queryexecutor.ResponseTask{
218256
Ctx: response.ctx,
257+
Span: response.span,
219258
Empty: false,
220259
Subscriber: response.subscriber,
221260
Request: response.request,
@@ -249,6 +288,8 @@ func (rm *ResponseManager) finishTask(task *peertask.Task, err error) {
249288
log.Infow("graphsync response processing complete (messages stil sending)", "request id", key.requestID, "peer", key.p, "total time", time.Since(response.startTime))
250289

251290
if err != nil {
291+
response.span.RecordError(err)
292+
response.span.SetStatus(codes.Error, err.Error())
252293
log.Infof("response failed: %w", err)
253294
}
254295

testutil/tracing.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ func (c Collector) tracesToString(trace string, spans tracetest.SpanStubs, match
7777
// identified by its trace string as described in TracesToStrings. Note that
7878
// this string can also be a partial of a complete trace, e.g. just `"foo(0)"`
7979
// without any children to fetch the parent span.
80-
func (c Collector) FindSpanByTraceString(trace string) tracetest.SpanStub {
81-
var found tracetest.SpanStub
80+
func (c Collector) FindSpanByTraceString(trace string) *tracetest.SpanStub {
81+
var found *tracetest.SpanStub
8282
c.tracesToString("", c.FindParentSpans(), trace, func(span tracetest.SpanStub) {
83-
if found.Name != "" {
83+
if found != nil && found.Name != "" {
8484
panic("found more than one span with the same trace string")
8585
}
86-
found = span
86+
found = &span
8787
})
8888
return found
8989
}
@@ -121,7 +121,7 @@ func (c Collector) SingleExceptionEvent(t *testing.T, trace string, typeRe strin
121121
// has ContextCancelError exception recorded in the right place
122122
et := c.FindSpanByTraceString(trace)
123123
require.Len(t, et.Events, 1, "expected one event in span %v", trace)
124-
ex := EventAsException(t, EventInTraceSpan(t, et, "exception"))
124+
ex := EventAsException(t, EventInTraceSpan(t, *et, "exception"))
125125
require.Regexp(t, typeRe, ex.Type)
126126
require.Regexp(t, messageRe, ex.Message)
127127
if errorCode {

0 commit comments

Comments
 (0)