Skip to content

Commit 5c27818

Browse files
committed
feat(responsemanager): trace full messages via links to responses
Fixes: #318
1 parent 8e9f6cf commit 5c27818

File tree

4 files changed

+83
-19
lines changed

4 files changed

+83
-19
lines changed

impl/graphsync_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ func TestRejectRequestsByDefault(t *testing.T) {
201201

202202
tracing := collectTracing(t)
203203
require.ElementsMatch(t, []string{
204+
"requestMessage(0)",
204205
"response(0)",
205206
"request(0)->newRequest(0)",
206207
"request(0)->executeTask(0)",
@@ -549,6 +550,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
549550

550551
tracing := collectTracing(t)
551552
require.ElementsMatch(t, append(append([]string{
553+
"requestMessage(0)",
552554
"response(0)->executeTask(0)",
553555
"request(0)->newRequest(0)",
554556
"request(0)->executeTask(0)",
@@ -622,6 +624,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
622624

623625
tracing := collectTracing(t)
624626
require.ElementsMatch(t, append(append([]string{
627+
"requestMessage(0)",
625628
"response(0)->executeTask(0)",
626629
"request(0)->newRequest(0)",
627630
"request(0)->executeTask(0)",
@@ -872,6 +875,8 @@ func TestPauseResumeViaUpdate(t *testing.T) {
872875

873876
tracing := collectTracing(t)
874877
require.ElementsMatch(t, append(append([]string{
878+
"requestMessage(0)",
879+
"requestMessage(1)",
875880
"response(0)->executeTask(0)",
876881
"response(0)->processUpdate(0)",
877882
"response(0)->executeTask(1)",
@@ -887,6 +892,16 @@ func TestPauseResumeViaUpdate(t *testing.T) {
887892
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())
888893
// pause recorded
889894
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)
895+
896+
message0Span := tracing.FindSpanByTraceString("requestMessage(0)")
897+
message1Span := tracing.FindSpanByTraceString("requestMessage(1)")
898+
responseSpan := tracing.FindSpanByTraceString("response(0)")
899+
// response(0) originates in requestMessage(0)
900+
require.Len(t, responseSpan.Links, 1)
901+
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
902+
// response(0)->processUpdate(0) occurs thanks to requestMessage(1)
903+
require.Len(t, processUpdateSpan.Links, 1)
904+
require.Equal(t, processUpdateSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
890905
}
891906

892907
func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
@@ -963,6 +978,8 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
963978

964979
tracing := collectTracing(t)
965980
require.ElementsMatch(t, append(append([]string{
981+
"requestMessage(0)",
982+
"requestMessage(1)",
966983
"response(0)->executeTask(0)",
967984
"response(0)->processUpdate(0)",
968985
"response(0)->executeTask(1)",
@@ -1057,6 +1074,7 @@ func TestNetworkDisconnect(t *testing.T) {
10571074
tracing := collectTracing(t)
10581075

10591076
traceStrings := tracing.TracesToStrings()
1077+
require.Contains(t, traceStrings, "requestMessage(0)")
10601078
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
10611079
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
10621080
require.Contains(t, traceStrings, "response(0)->executeTask(1)")
@@ -1344,6 +1362,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
13441362

13451363
tracing := collectTracing(t)
13461364
require.ElementsMatch(t, append(append([]string{
1365+
"requestMessage(0)",
13471366
"response(0)->executeTask(0)",
13481367
"request(0)->newRequest(0)",
13491368
"request(0)->executeTask(0)",
@@ -1580,6 +1599,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
15801599
tracing := collectTracing(t)
15811600
require.ElementsMatch(t, append(append(
15821601
[]string{
1602+
"requestMessage(0)",
15831603
"response(0)->executeTask(0)",
15841604
"request(0)->newRequest(0)",
15851605
"request(0)->executeTask(0)",

responsemanager/messages.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type errorRequestMessage struct {
4141
}
4242

4343
func (erm *errorRequestMessage) handle(rm *ResponseManager) {
44-
err := rm.abortRequest(erm.p, erm.requestID, erm.err)
44+
err := rm.abortRequest(rm.ctx, erm.p, erm.requestID, erm.err)
4545
select {
4646
case <-rm.ctx.Done():
4747
case erm.response <- err:

responsemanager/responsemanager_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,13 @@ func TestIncomingQuery(t *testing.T) {
8888

8989
tracing := td.collectTracing(t)
9090
require.ElementsMatch(t, []string{
91+
"requestMessage(0)",
9192
"TestIncomingQuery(0)->response(0)->executeTask(0)",
9293
}, tracing.TracesToStrings())
94+
messageSpan := tracing.FindSpanByTraceString("requestMessage(0)")
95+
responseSpan := tracing.FindSpanByTraceString("TestIncomingQuery(0)->response(0)")
96+
require.Len(t, responseSpan.Links, 1)
97+
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), messageSpan.SpanContext.SpanID())
9398
}
9499

95100
func TestCancellationQueryInProgress(t *testing.T) {
@@ -128,6 +133,24 @@ func TestCancellationQueryInProgress(t *testing.T) {
128133
td.connManager.RefuteProtected(t, td.p)
129134

130135
td.assertRequestCleared()
136+
137+
tracing := td.collectTracing(t)
138+
require.ElementsMatch(t, []string{
139+
"requestMessage(0)",
140+
"response(0)->executeTask(0)",
141+
"response(0)->abortRequest(0)",
142+
"requestMessage(1)",
143+
}, tracing.TracesToStrings())
144+
message0Span := tracing.FindSpanByTraceString("requestMessage(0)")
145+
message1Span := tracing.FindSpanByTraceString("requestMessage(1)")
146+
responseSpan := tracing.FindSpanByTraceString("response(0)")
147+
abortRequestSpan := tracing.FindSpanByTraceString("response(0)->abortRequest(0)")
148+
// response(0) originates in requestMessage(0)
149+
require.Len(t, responseSpan.Links, 1)
150+
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
151+
// response(0)->abortRequest(0) occurs thanks to requestMessage(1)
152+
require.Len(t, abortRequestSpan.Links, 1)
153+
require.Equal(t, abortRequestSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
131154
}
132155

133156
func TestCancellationViaCommand(t *testing.T) {

responsemanager/server.go

+39-18
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,22 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
5656
ipr.span.End()
5757
}
5858

59-
func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
59+
func (rm *ResponseManager) processUpdate(ctx context.Context, key responseKey, update gsmsg.GraphSyncRequest) {
6060
response, ok := rm.inProgressResponses[key]
6161
if !ok || response.state == graphsync.CompletingSend {
6262
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
6363
return
6464
}
6565

66-
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes(
67-
attribute.Int("id", int(update.ID())),
68-
attribute.StringSlice("extensions", update.ExtensionNames()),
69-
))
66+
_, span := otel.Tracer("graphsync").Start(
67+
trace.ContextWithSpan(ctx, response.span),
68+
"processUpdate",
69+
trace.WithLinks(trace.LinkFromContext(ctx)),
70+
trace.WithAttributes(
71+
attribute.Int("id", int(update.ID())),
72+
attribute.StringSlice("extensions", update.ExtensionNames()),
73+
))
74+
7075
defer span.End()
7176

7277
if response.state != graphsync.Paused {
@@ -125,15 +130,18 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request
125130
return nil
126131
}
127132

128-
func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error {
133+
func (rm *ResponseManager) abortRequest(ctx context.Context, p peer.ID, requestID graphsync.RequestID, err error) error {
129134
key := responseKey{p, requestID}
130135
rm.responseQueue.Remove(key, key.p)
131136
response, ok := rm.inProgressResponses[key]
132137
if !ok || response.state == graphsync.CompletingSend {
133138
return errors.New("could not find request")
134139
}
135140

136-
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "abortRequest")
141+
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, response.span),
142+
"abortRequest",
143+
trace.WithLinks(trace.LinkFromContext(ctx)),
144+
)
137145
defer span.End()
138146
span.RecordError(err)
139147
span.SetStatus(codes.Error, err.Error())
@@ -166,25 +174,38 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
166174
}
167175

168176
func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSyncRequest) {
177+
ctx, messageSpan := otel.Tracer("graphsync").Start(
178+
rm.ctx,
179+
"requestMessage",
180+
trace.WithAttributes(attribute.String("peerID", p.Pretty())),
181+
)
182+
defer messageSpan.End()
183+
169184
for _, request := range requests {
170185
key := responseKey{p: p, requestID: request.ID()}
171186
if request.IsCancel() {
172-
_ = rm.abortRequest(p, request.ID(), ipldutil.ContextCancelError{})
187+
_ = rm.abortRequest(ctx, p, request.ID(), ipldutil.ContextCancelError{})
173188
continue
174189
}
175190
if request.IsUpdate() {
176-
rm.processUpdate(key, request)
191+
rm.processUpdate(ctx, key, request)
177192
continue
178193
}
179194
rm.connManager.Protect(p, request.ID().Tag())
180-
ctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
181-
ctx, responseSpan := otel.Tracer("graphsync").Start(ctx, "response", trace.WithAttributes(
182-
attribute.Int("id", int(request.ID())),
183-
attribute.Int("priority", int(request.Priority())),
184-
attribute.String("root", request.Root().String()),
185-
attribute.StringSlice("extensions", request.ExtensionNames()),
186-
))
187-
ctx, cancelFn := context.WithCancel(ctx)
195+
// don't use `ctx` which has the "message" trace, but rm.ctx for a fresh trace which allows
196+
// for a request hook to join this particular response up to an existing external trace
197+
rctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
198+
rctx, responseSpan := otel.Tracer("graphsync").Start(
199+
rctx,
200+
"response",
201+
trace.WithLinks(trace.LinkFromContext(ctx)),
202+
trace.WithAttributes(
203+
attribute.Int("id", int(request.ID())),
204+
attribute.Int("priority", int(request.Priority())),
205+
attribute.String("root", request.Root().String()),
206+
attribute.StringSlice("extensions", request.ExtensionNames()),
207+
))
208+
rctx, cancelFn := context.WithCancel(rctx)
188209
sub := &subscriber{
189210
p: key.p,
190211
request: request,
@@ -202,7 +223,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
202223

203224
rm.inProgressResponses[key] =
204225
&inProgressResponseStatus{
205-
ctx: ctx,
226+
ctx: rctx,
206227
span: responseSpan,
207228
cancelFn: cancelFn,
208229
request: request,

0 commit comments

Comments
 (0)