Skip to content

Commit 022a166

Browse files
authored
feat(responsemanager): trace full messages via links to responses (#325)
* feat(responsemanager): trace full messages via links to responses Fixes: #318 * chore(responsemanager): rename processRequests internals for consistency * fix(responsemanager): make TestCancellationQueryInProgress less strict
1 parent 4f4414d commit 022a166

File tree

5 files changed

+87
-31
lines changed

5 files changed

+87
-31
lines changed

impl/graphsync_test.go

+21-7
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,10 @@ func TestRejectRequestsByDefault(t *testing.T) {
206206
"request(0)->executeTask(0)",
207207
"request(0)->terminateRequest(0)",
208208
"processResponses(0)->loaderProcess(0)->cacheProcess(0)",
209-
"response(0)->transaction(0)->execute(0)->buildMessage(0)",
209+
"processRequests(0)->transaction(0)->execute(0)->buildMessage(0)",
210210
"message(0)->sendMessage(0)",
211211
"message(1)->sendMessage(0)",
212+
"response(0)",
212213
}, tracing.TracesToStrings())
213214
// has ContextCancelError exception recorded in the right place
214215
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
@@ -561,7 +562,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
561562
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), // half of the full chain
562563
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
563564
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
564-
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
565+
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
565566
), tracing.TracesToStrings())
566567
}
567568

@@ -635,7 +636,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
635636
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...),
636637
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
637638
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
638-
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
639+
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
639640
), tracing.TracesToStrings())
640641
}
641642

@@ -887,6 +888,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
887888
"request(0)->newRequest(0)",
888889
"request(0)->executeTask(0)",
889890
"request(0)->terminateRequest(0)",
891+
"processRequests(1)",
890892
},
891893
processResponsesTraces(t, tracing, responseCount)...),
892894
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
@@ -895,13 +897,23 @@ func TestPauseResumeViaUpdate(t *testing.T) {
895897
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
896898
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...),
897899
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
898-
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
900+
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
899901
), tracing.TracesToStrings())
900902
// make sure the attributes are what we expect
901903
processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)")
902904
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())
903905
// pause recorded
904906
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)
907+
908+
message0Span := tracing.FindSpanByTraceString("processRequests(0)")
909+
message1Span := tracing.FindSpanByTraceString("processRequests(1)")
910+
responseSpan := tracing.FindSpanByTraceString("response(0)")
911+
// response(0) originates in processRequests(0)
912+
require.Len(t, responseSpan.Links, 1)
913+
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
914+
// response(0)->processUpdate(0) occurs thanks to processRequests(1)
915+
require.Len(t, processUpdateSpan.Links, 1)
916+
require.Equal(t, processUpdateSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
905917
}
906918

907919
func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
@@ -980,6 +992,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
980992
"request(0)->newRequest(0)",
981993
"request(0)->executeTask(0)",
982994
"request(0)->terminateRequest(0)",
995+
"processRequests(1)",
983996
},
984997
processResponsesTraces(t, tracing, responseCount)...),
985998
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
@@ -988,7 +1001,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
9881001
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
9891002
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...),
9901003
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
991-
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
1004+
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
9921005
), tracing.TracesToStrings())
9931006
// make sure the attributes are what we expect
9941007
processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)")
@@ -1074,6 +1087,7 @@ func TestNetworkDisconnect(t *testing.T) {
10741087
tracing := collectTracing(t)
10751088

10761089
traceStrings := tracing.TracesToStrings()
1090+
require.Contains(t, traceStrings, "processRequests(0)->transaction(0)->execute(0)->buildMessage(0)")
10771091
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
10781092
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
10791093
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
@@ -1370,7 +1384,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
13701384
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
13711385
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
13721386
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
1373-
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
1387+
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
13741388
), tracing.TracesToStrings())
13751389
}
13761390

@@ -1610,7 +1624,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
16101624
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
16111625
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
16121626
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
1613-
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
1627+
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
16141628
), tracing.TracesToStrings())
16151629
}
16161630

responsemanager/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func New(ctx context.Context,
154154

155155
// ProcessRequests processes incoming requests for the given peer
156156
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
157-
rm.send(&processRequestMessage{p, requests}, ctx.Done())
157+
rm.send(&processRequestsMessage{p, requests}, ctx.Done())
158158
}
159159

160160
// UnpauseResponse unpauses a response that was previously paused

responsemanager/messages.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010
"github.com/ipfs/go-graphsync/responsemanager/queryexecutor"
1111
)
1212

13-
type processRequestMessage struct {
13+
type processRequestsMessage struct {
1414
p peer.ID
1515
requests []gsmsg.GraphSyncRequest
1616
}
1717

18-
func (prm *processRequestMessage) handle(rm *ResponseManager) {
18+
func (prm *processRequestsMessage) handle(rm *ResponseManager) {
1919
rm.processRequests(prm.p, prm.requests)
2020
}
2121

@@ -41,7 +41,7 @@ type errorRequestMessage struct {
4141
}
4242

4343
func (erm *errorRequestMessage) handle(rm *ResponseManager) {
44-
err := rm.abortRequest(erm.p, erm.requestID, erm.err)
44+
err := rm.abortRequest(rm.ctx, erm.p, erm.requestID, erm.err)
4545
select {
4646
case <-rm.ctx.Done():
4747
case erm.response <- err:

responsemanager/responsemanager_test.go

+23-2
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,15 @@ func TestIncomingQuery(t *testing.T) {
8787
td.connManager.RefuteProtected(t, td.p)
8888

8989
tracing := td.collectTracing(t)
90-
require.ElementsMatch(t, append(
91-
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->loadBlock(0)", td.blockChainLength),
90+
require.ElementsMatch(t, append(append(
91+
[]string{"processRequests(0)"},
92+
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->loadBlock(0)", td.blockChainLength)...),
9293
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", td.blockChainLength)..., // half of the full chain
9394
), tracing.TracesToStrings())
95+
messageSpan := tracing.FindSpanByTraceString("processRequests(0)")
96+
responseSpan := tracing.FindSpanByTraceString("TestIncomingQuery(0)->response(0)")
97+
require.Len(t, responseSpan.Links, 1)
98+
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), messageSpan.SpanContext.SpanID())
9499
}
95100

96101
func TestCancellationQueryInProgress(t *testing.T) {
@@ -129,6 +134,22 @@ func TestCancellationQueryInProgress(t *testing.T) {
129134
td.connManager.RefuteProtected(t, td.p)
130135

131136
td.assertRequestCleared()
137+
138+
tracing := td.collectTracing(t)
139+
traceStrings := tracing.TracesToStrings()
140+
require.Contains(t, traceStrings, "processRequests(0)")
141+
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
142+
require.Contains(t, traceStrings, "processRequests(1)")
143+
message0Span := tracing.FindSpanByTraceString("processRequests(0)")
144+
message1Span := tracing.FindSpanByTraceString("processRequests(1)")
145+
responseSpan := tracing.FindSpanByTraceString("response(0)")
146+
abortRequestSpan := tracing.FindSpanByTraceString("response(0)->abortRequest(0)")
147+
// response(0) originates in processRequests(0)
148+
require.Len(t, responseSpan.Links, 1)
149+
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
150+
// response(0)->abortRequest(0) occurs thanks to processRequests(1)
151+
require.Len(t, abortRequestSpan.Links, 1)
152+
require.Equal(t, abortRequestSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
132153
}
133154

134155
func TestCancellationViaCommand(t *testing.T) {

responsemanager/server.go

+39-18
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,22 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
5656
ipr.span.End()
5757
}
5858

59-
func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
59+
func (rm *ResponseManager) processUpdate(ctx context.Context, key responseKey, update gsmsg.GraphSyncRequest) {
6060
response, ok := rm.inProgressResponses[key]
6161
if !ok || response.state == graphsync.CompletingSend {
6262
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
6363
return
6464
}
6565

66-
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes(
67-
attribute.Int("id", int(update.ID())),
68-
attribute.StringSlice("extensions", update.ExtensionNames()),
69-
))
66+
_, span := otel.Tracer("graphsync").Start(
67+
trace.ContextWithSpan(ctx, response.span),
68+
"processUpdate",
69+
trace.WithLinks(trace.LinkFromContext(ctx)),
70+
trace.WithAttributes(
71+
attribute.Int("id", int(update.ID())),
72+
attribute.StringSlice("extensions", update.ExtensionNames()),
73+
))
74+
7075
defer span.End()
7176

7277
if response.state != graphsync.Paused {
@@ -125,15 +130,18 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request
125130
return nil
126131
}
127132

128-
func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error {
133+
func (rm *ResponseManager) abortRequest(ctx context.Context, p peer.ID, requestID graphsync.RequestID, err error) error {
129134
key := responseKey{p, requestID}
130135
rm.responseQueue.Remove(key, key.p)
131136
response, ok := rm.inProgressResponses[key]
132137
if !ok || response.state == graphsync.CompletingSend {
133138
return errors.New("could not find request")
134139
}
135140

136-
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "abortRequest")
141+
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, response.span),
142+
"abortRequest",
143+
trace.WithLinks(trace.LinkFromContext(ctx)),
144+
)
137145
defer span.End()
138146
span.RecordError(err)
139147
span.SetStatus(codes.Error, err.Error())
@@ -166,25 +174,38 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
166174
}
167175

168176
func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSyncRequest) {
177+
ctx, messageSpan := otel.Tracer("graphsync").Start(
178+
rm.ctx,
179+
"processRequests",
180+
trace.WithAttributes(attribute.String("peerID", p.Pretty())),
181+
)
182+
defer messageSpan.End()
183+
169184
for _, request := range requests {
170185
key := responseKey{p: p, requestID: request.ID()}
171186
if request.IsCancel() {
172-
_ = rm.abortRequest(p, request.ID(), ipldutil.ContextCancelError{})
187+
_ = rm.abortRequest(ctx, p, request.ID(), ipldutil.ContextCancelError{})
173188
continue
174189
}
175190
if request.IsUpdate() {
176-
rm.processUpdate(key, request)
191+
rm.processUpdate(ctx, key, request)
177192
continue
178193
}
179194
rm.connManager.Protect(p, request.ID().Tag())
180-
ctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
181-
ctx, responseSpan := otel.Tracer("graphsync").Start(ctx, "response", trace.WithAttributes(
182-
attribute.Int("id", int(request.ID())),
183-
attribute.Int("priority", int(request.Priority())),
184-
attribute.String("root", request.Root().String()),
185-
attribute.StringSlice("extensions", request.ExtensionNames()),
186-
))
187-
ctx, cancelFn := context.WithCancel(ctx)
195+
// don't use `ctx` which has the "message" trace, but rm.ctx for a fresh trace which allows
196+
// for a request hook to join this particular response up to an existing external trace
197+
rctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
198+
rctx, responseSpan := otel.Tracer("graphsync").Start(
199+
rctx,
200+
"response",
201+
trace.WithLinks(trace.LinkFromContext(ctx)),
202+
trace.WithAttributes(
203+
attribute.Int("id", int(request.ID())),
204+
attribute.Int("priority", int(request.Priority())),
205+
attribute.String("root", request.Root().String()),
206+
attribute.StringSlice("extensions", request.ExtensionNames()),
207+
))
208+
rctx, cancelFn := context.WithCancel(rctx)
188209
sub := &subscriber{
189210
p: key.p,
190211
request: request,
@@ -202,7 +223,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
202223

203224
rm.inProgressResponses[key] =
204225
&inProgressResponseStatus{
205-
ctx: ctx,
226+
ctx: rctx,
206227
span: responseSpan,
207228
cancelFn: cancelFn,
208229
request: request,

0 commit comments

Comments
 (0)