From 1584d87d329093118b70eb800a134e632576d8e2 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 17 Dec 2021 17:34:54 +1100 Subject: [PATCH 1/6] feat(requestmanager): add tracing for response messages & block processing Trace synchronous responseMessage->loaderProcess->cacheProcess block storage and link those to asynchronous request->verifyBlock traces for the same blocks. Closes: https://github.com/ipfs/go-graphsync/issues/317 --- impl/graphsync_test.go | 198 +++++++++++++----- requestmanager/asyncloader/asyncloader.go | 12 +- .../asyncloader/asyncloader_test.go | 14 +- .../responsecache/responsecache.go | 23 +- .../responsecache/responsecache_test.go | 6 +- .../unverifiedblockstore.go | 43 +++- .../unverifiedblockstore_test.go | 3 +- requestmanager/client.go | 2 +- requestmanager/server.go | 13 +- requestmanager/testloader/asyncloader.go | 2 +- testutil/tracing.go | 9 + 11 files changed, 246 insertions(+), 79 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index dba60504..edadc8cf 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -205,6 +205,7 @@ func TestRejectRequestsByDefault(t *testing.T) { "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", + "responseMessage(0)->loaderProcess(0)->cacheProcess(0)", }, tracing.TracesToStrings()) // has ContextCancelError exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) @@ -244,6 +245,7 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { wasCancelled := assertCancelOrComplete(ctx, t) tracing := collectTracing(t) + traceStrings := tracing.TracesToStrings() require.Contains(t, traceStrings, "response(0)->executeTask(0)") if wasCancelled { @@ -252,6 +254,9 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + // has ErrBudgetExceeded exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true) if wasCancelled { @@ -292,12 +297,15 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + // has ContextCancelError exception recorded in the right place // the requester gets a cancel, the responder gets a ErrBudgetExceeded tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) @@ -325,9 +333,11 @@ func TestGraphsyncRoundTrip(t *testing.T) { var receivedResponseData []byte var receivedRequestData []byte + var responseCount int requestor.RegisterIncomingResponseHook( func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 data, has := responseData.Extension(td.extensionName) if has { receivedResponseData = data @@ -371,15 +381,45 @@ func TestGraphsyncRoundTrip(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + processUpdateSpan := tracing.FindSpanByTraceString("response(0)") require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64()) require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice()) + + // each verifyBlock span should link to a cacheProcess span that stored it + + cacheProcessSpans := tracing.FindSpans("cacheProcess") + cacheProcessLinks := make(map[string]int) + verifyBlockSpans := tracing.FindSpans("verifyBlock") + + for _, verifyBlockSpan := range verifyBlockSpans { + require.Len(t, verifyBlockSpan.Links, 1, "verifyBlock span should have one link") + found := false + for _, cacheProcessSpan := range cacheProcessSpans { + sid := cacheProcessSpan.SpanContext.SpanID().String() + if verifyBlockSpan.Links[0].SpanContext.SpanID().String() == sid { + found = true + cacheProcessLinks[sid] = cacheProcessLinks[sid] + 1 + break + } + } + require.True(t, found, "verifyBlock should link to a known cacheProcess span") + } + + // each cacheProcess span should be linked to one verifyBlock span per block it stored + + for _, cacheProcessSpan := range cacheProcessSpans { + blockCount := len(testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blocks").AsStringSlice()) + require.Equal(t, cacheProcessLinks[cacheProcessSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed") + } } func TestGraphsyncRoundTripPartial(t *testing.T) { @@ -438,12 +478,13 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block } func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { @@ -458,6 +499,11 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() + var responseCount int + requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 + }) + // setup receiving peer to just record message coming in blockChainLength := 100 blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) @@ -502,12 +548,15 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ + require.ElementsMatch(t, append(append([]string{ "response(0)->executeTask(0)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + }, + testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain + ), tracing.TracesToStrings()) } func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { @@ -521,6 +570,11 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() + var responseCount int + requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 + }) + // setup receiving peer to just record message coming in blockChainLength := 100 blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) @@ -567,12 +621,15 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ + require.ElementsMatch(t, append(append([]string{ "response(0)->executeTask(0)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + }, + testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain + ), tracing.TracesToStrings()) } func TestPauseResume(t *testing.T) { @@ -651,12 +708,16 @@ func TestPauseResume(t *testing.T) { assertOneRequestCompletes(ctx, t) tracing := collectTracing(t) + traceStrings := tracing.TracesToStrings() require.Contains(t, traceStrings, "response(0)->executeTask(0)") require.Contains(t, traceStrings, "response(0)->executeTask(1)") require.Contains(t, traceStrings, "request(0)->newRequest(0)") require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + // pause recorded tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false) } @@ -724,6 +785,7 @@ func TestPauseResumeRequest(t *testing.T) { } tracing := collectTracing(t) + traceStrings := tracing.TracesToStrings() require.Contains(t, traceStrings, "response(0)->executeTask(0)") if wasCancelled { @@ -734,6 +796,9 @@ func TestPauseResumeRequest(t *testing.T) { require.Contains(t, traceStrings, "request(0)->executeTask(0)") require.Contains(t, traceStrings, "request(0)->executeTask(1)") require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + // has ErrPaused exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrPaused", hooks.ErrPaused{}.Error(), false) } @@ -751,8 +816,10 @@ func TestPauseResumeViaUpdate(t *testing.T) { var receivedUpdateData []byte // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() + var responseCount int requestor.RegisterIncomingResponseHook(func(p peer.ID, response graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 if response.Status() == graphsync.RequestPaused { var has bool receivedReponseData, has = response.Extension(td.extensionName) @@ -804,14 +871,17 @@ func TestPauseResumeViaUpdate(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ + require.ElementsMatch(t, append(append([]string{ "response(0)->executeTask(0)", "response(0)->processUpdate(0)", "response(0)->executeTask(1)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + }, + testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., + ), tracing.TracesToStrings()) // make sure the attributes are what we expect processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)") require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice()) @@ -833,6 +903,11 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() + var responseCount int + requestor.RegisterIncomingResponseHook(func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 + }) + // setup receiving peer to just record message coming in blockChainLength := 100 blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength) @@ -887,14 +962,17 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ + require.ElementsMatch(t, append(append([]string{ "response(0)->executeTask(0)", "response(0)->processUpdate(0)", "response(0)->executeTask(1)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + }, + testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., + ), tracing.TracesToStrings()) // make sure the attributes are what we expect processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)") require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice()) @@ -978,14 +1056,17 @@ func TestNetworkDisconnect(t *testing.T) { drain(responder) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "response(0)->abortRequest(0)", - "response(0)->executeTask(1)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "response(0)->abortRequest(0)") + require.Contains(t, traceStrings, "response(0)->executeTask(1)") + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block + // has ContextCancelError exception recorded in the right place tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) } @@ -1110,6 +1191,7 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { wasCancelled := assertCancelOrComplete(ctx, t) tracing := collectTracing(t) + traceStrings := tracing.TracesToStrings() require.Contains(t, traceStrings, "response(0)->executeTask(0)") // may or may not contain a second response trace: "response(1)->executeTask(0)"" @@ -1122,6 +1204,9 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { require.Contains(t, traceStrings, "request(1)->newRequest(0)") require.Contains(t, traceStrings, "request(1)->executeTask(0)") require.Contains(t, traceStrings, "request(1)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?) + // TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane // tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true) } @@ -1207,6 +1292,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { require.Contains(t, traceStrings, "request(1)->newRequest(0)") require.Contains(t, traceStrings, "request(1)->executeTask(0)") require.Contains(t, traceStrings, "request(1)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block } // TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work @@ -1233,6 +1320,11 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { // initialize graphsync on first node to make requests requestor := td.GraphSyncHost1() + var responseCount int + requestor.RegisterIncomingResponseHook(func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 + }) + // setup receiving peer to just record message coming in blockChainLength := 40 blockChainPersistence := td.persistence1 @@ -1252,12 +1344,15 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ + require.ElementsMatch(t, append(append([]string{ "response(0)->executeTask(0)", "request(0)->newRequest(0)", "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + }, + testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., + ), tracing.TracesToStrings()) } // What this test does: @@ -1386,12 +1481,13 @@ func TestUnixFSFetch(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + traceStrings := tracing.TracesToStrings() + require.Contains(t, traceStrings, "response(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->newRequest(0)") + require.Contains(t, traceStrings, "request(0)->executeTask(0)") + require.Contains(t, traceStrings, "request(0)->terminateRequest(0)") + require.Contains(t, traceStrings, "responseMessage(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response + require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block } func TestGraphsyncBlockListeners(t *testing.T) { @@ -1430,9 +1526,11 @@ func TestGraphsyncBlockListeners(t *testing.T) { var receivedResponseData []byte var receivedRequestData []byte + var responseCount int requestor.RegisterIncomingResponseHook( func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { + responseCount = responseCount + 1 data, has := responseData.Extension(td.extensionName) if has { receivedResponseData = data @@ -1481,12 +1579,16 @@ func TestGraphsyncBlockListeners(t *testing.T) { assertComplete(ctx, t) tracing := collectTracing(t) - require.ElementsMatch(t, []string{ - "response(0)->executeTask(0)", - "request(0)->newRequest(0)", - "request(0)->executeTask(0)", - "request(0)->terminateRequest(0)", - }, tracing.TracesToStrings()) + require.ElementsMatch(t, append(append( + []string{ + "response(0)->executeTask(0)", + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, + testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 100)..., + ), tracing.TracesToStrings()) } type gsTestData struct { diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index 596e3d5a..013f31aa 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -10,6 +10,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" peer "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/metadata" @@ -103,8 +104,14 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp // ProcessResponse injests new responses and completes asynchronous loads as // neccesary -func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, +func (al *AsyncLoader) ProcessResponse( + ctx context.Context, + responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { + + ctx, span := otel.Tracer("graphsync").Start(ctx, "loaderProcess") + defer span.End() + al.stateLk.Lock() defer al.stateLk.Unlock() byQueue := make(map[string][]graphsync.RequestID) @@ -119,7 +126,7 @@ func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadat for _, requestID := range requestIDs { queueResponses[requestID] = responses[requestID] } - responseCache.ProcessResponse(queueResponses, blks) + responseCache.ProcessResponse(ctx, queueResponses, blks) loadAttemptQueue.RetryLoads() } } @@ -178,7 +185,6 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac } func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { - unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener) responseCache := responsecache.New(unverifiedBlockStore) loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) types.AsyncLoadResult { diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index c1f7b858..63255d53 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -48,7 +48,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { }, } p := testutil.GeneratePeers(1)[0] - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(context.Background(), responses, blocks) resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) @@ -72,7 +72,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { }, } p := testutil.GeneratePeers(1)[0] - asyncLoader.ProcessResponse(responses, nil) + asyncLoader.ProcessResponse(context.Background(), responses, nil) resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertFailResponse(ctx, t, resultChan) @@ -116,7 +116,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(context.Background(), responses, blocks) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) st.AssertBlockStored(t, block) @@ -144,7 +144,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, nil) + asyncLoader.ProcessResponse(context.Background(), responses, nil) assertFailResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) }) @@ -182,7 +182,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { }, } p := testutil.GeneratePeers(1)[0] - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(context.Background(), responses, blocks) resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) @@ -282,7 +282,7 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(context.Background(), responses, blocks) assertSuccessResponse(ctx, t, resultChan1) assertSuccessResponse(ctx, t, resultChan2) @@ -317,7 +317,7 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(context.Background(), responses, blocks) asyncLoader.CompleteResponsesFor(requestID1) assertFailResponse(ctx, t, resultChan1) diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index 31877405..881b2a15 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -1,12 +1,16 @@ package responsecache import ( + "context" "sync" blocks "github.com/ipfs/go-block-format" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/linktracker" @@ -21,7 +25,7 @@ type UnverifiedBlockStore interface { PruneBlocks(func(ipld.Link, uint64) bool) PruneBlock(ipld.Link) VerifyBlock(ipld.Link, ipld.LinkContext) ([]byte, error) - AddUnverifiedBlock(ipld.Link, []byte) + AddUnverifiedBlock(trace.Link, ipld.Link, []byte) } // ResponseCache maintains a store of unverified blocks and response @@ -67,13 +71,26 @@ func (rc *ResponseCache) AttemptLoad(requestID graphsync.RequestID, link ipld.Li // ProcessResponse processes incoming response data, adding unverified blocks, // and tracking link metadata from a remote peer -func (rc *ResponseCache) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, +func (rc *ResponseCache) ProcessResponse( + ctx context.Context, + responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { + + cids := make([]string, 0, len(blks)) + for _, blk := range blks { + cids = append(cids, blk.Cid().String()) + } + ctx, span := otel.Tracer("graphsync").Start(ctx, "cacheProcess", trace.WithAttributes( + attribute.StringSlice("blocks", cids), + )) + traceLink := trace.LinkFromContext(ctx) + defer span.End() + rc.responseCacheLk.Lock() for _, block := range blks { log.Debugf("Received block from network: %s", block.Cid().String()) - rc.unverifiedBlockStore.AddUnverifiedBlock(cidlink.Link{Cid: block.Cid()}, block.RawData()) + rc.unverifiedBlockStore.AddUnverifiedBlock(traceLink, cidlink.Link{Cid: block.Cid()}, block.RawData()) } for requestID, md := range responses { diff --git a/requestmanager/asyncloader/responsecache/responsecache_test.go b/requestmanager/asyncloader/responsecache/responsecache_test.go index 22de3563..438fdc30 100644 --- a/requestmanager/asyncloader/responsecache/responsecache_test.go +++ b/requestmanager/asyncloader/responsecache/responsecache_test.go @@ -1,6 +1,7 @@ package responsecache import ( + "context" "fmt" "math/rand" "testing" @@ -9,6 +10,7 @@ import ( ipld "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/metadata" @@ -19,7 +21,7 @@ type fakeUnverifiedBlockStore struct { inMemoryBlocks map[ipld.Link][]byte } -func (ubs *fakeUnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) { +func (ubs *fakeUnverifiedBlockStore) AddUnverifiedBlock(_ trace.Link, lnk ipld.Link, data []byte) { ubs.inMemoryBlocks[lnk] = data } @@ -100,7 +102,7 @@ func TestResponseCacheManagingLinks(t *testing.T) { } responseCache := New(fubs) - responseCache.ProcessResponse(responses, blks) + responseCache.ProcessResponse(context.Background(), responses, blks) require.Len(t, fubs.blocks(), len(blks)-1, "should prune block with no references") testutil.RefuteContainsBlock(t, fubs.blocks(), blks[2]) diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go index d1ecd448..2478b0f5 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go @@ -1,10 +1,14 @@ package unverifiedblockstore import ( + "context" "fmt" logging "github.com/ipfs/go-log/v2" ipld "github.com/ipld/go-ipld-prime" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var log = logging.Logger("gs-unverifiedbs") @@ -16,24 +20,29 @@ type settableWriter interface { // UnverifiedBlockStore holds an in memory cache of receied blocks from the network // that have not been verified to be part of a traversal type UnverifiedBlockStore struct { - inMemoryBlocks map[ipld.Link][]byte + inMemoryBlocks map[ipld.Link]tracedBlock storer ipld.BlockWriteOpener dataSize uint64 } +type tracedBlock struct { + block []byte + traceLink trace.Link +} + // New initializes a new unverified store with the given storer function for writing // to permaneant storage if the block is verified func New(storer ipld.BlockWriteOpener) *UnverifiedBlockStore { return &UnverifiedBlockStore{ - inMemoryBlocks: make(map[ipld.Link][]byte), + inMemoryBlocks: make(map[ipld.Link]tracedBlock), storer: storer, } } // AddUnverifiedBlock adds a new unverified block to the in memory cache as it // comes in as part of a traversal. -func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) { - ubs.inMemoryBlocks[lnk] = data +func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(traceLink trace.Link, lnk ipld.Link, data []byte) { + ubs.inMemoryBlocks[lnk] = tracedBlock{data, traceLink} ubs.dataSize = ubs.dataSize + uint64(len(data)) log.Debugw("added in-memory block", "total_queued_bytes", ubs.dataSize) } @@ -42,9 +51,9 @@ func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) // if the passed in function returns true for the given link func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) { for link, data := range ubs.inMemoryBlocks { - if shouldPrune(link, uint64(len(data))) { + if shouldPrune(link, uint64(len(data.block))) { delete(ubs.inMemoryBlocks, link) - ubs.dataSize = ubs.dataSize - uint64(len(data)) + ubs.dataSize = ubs.dataSize - uint64(len(data.block)) } } log.Debugw("finished pruning in-memory blocks", "total_queued_bytes", ubs.dataSize) @@ -53,7 +62,7 @@ func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) // PruneBlock deletes an individual block from the store func (ubs *UnverifiedBlockStore) PruneBlock(link ipld.Link) { delete(ubs.inMemoryBlocks, link) - ubs.dataSize = ubs.dataSize - uint64(len(ubs.inMemoryBlocks[link])) + ubs.dataSize = ubs.dataSize - uint64(len(ubs.inMemoryBlocks[link].block)) log.Debugw("pruned in-memory block", "total_queued_bytes", ubs.dataSize) } @@ -64,8 +73,20 @@ func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link, linkContext ipld.Lin if !ok { return nil, fmt.Errorf("block not found") } + + ctx := linkContext.Ctx + if ctx == nil { + ctx = context.Background() + } + _, span := otel.Tracer("graphsync").Start( + ctx, + "verifyBlock", + trace.WithLinks(data.traceLink), + trace.WithAttributes(attribute.String("cid", lnk.String()))) + defer span.End() + delete(ubs.inMemoryBlocks, lnk) - ubs.dataSize = ubs.dataSize - uint64(len(data)) + ubs.dataSize = ubs.dataSize - uint64(len(data.block)) log.Debugw("verified block", "total_queued_bytes", ubs.dataSize) buffer, committer, err := ubs.storer(linkContext) @@ -73,9 +94,9 @@ func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link, linkContext ipld.Lin return nil, err } if settable, ok := buffer.(settableWriter); ok { - err = settable.SetBytes(data) + err = settable.SetBytes(data.block) } else { - _, err = buffer.Write(data) + _, err = buffer.Write(data.block) } if err != nil { return nil, err @@ -84,5 +105,5 @@ func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link, linkContext ipld.Lin if err != nil { return nil, err } - return data, nil + return data.block, nil } diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go index 35dc4d4d..92bd66e5 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore_test.go @@ -8,6 +8,7 @@ import ( "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync/testutil" ) @@ -25,7 +26,7 @@ func TestVerifyBlockPresent(t *testing.T) { require.Nil(t, data) require.Error(t, err, "block should not be verifiable till it's added as an unverifiable block") - unverifiedBlockStore.AddUnverifiedBlock(cidlink.Link{Cid: block.Cid()}, block.RawData()) + unverifiedBlockStore.AddUnverifiedBlock(trace.Link{}, cidlink.Link{Cid: block.Cid()}, block.RawData()) reader, err = lsys.StorageReadOpener(ipld.LinkContext{}, cidlink.Link{Cid: block.Cid()}) require.Nil(t, reader) require.Error(t, err, "block should not be loadable till it's verified") diff --git a/requestmanager/client.go b/requestmanager/client.go index 2f0cdb6a..40fc9be2 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -74,7 +74,7 @@ type PeerHandler interface { // results as new responses are processed type AsyncLoader interface { StartRequest(graphsync.RequestID, string) error - ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, + ProcessResponse(ctx context.Context, responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult CompleteResponsesFor(requestID graphsync.RequestID) diff --git a/requestmanager/server.go b/requestmanager/server.go index 568a52c8..07c624d0 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -130,7 +130,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re // the traverser has its own context because we want to fail on block boundaries, in the executor, // and make sure all blocks included up to the termination message // are processed and passed in the response channel - ctx, cancel := context.WithCancel(rm.ctx) + ctx, cancel := context.WithCancel(trace.ContextWithSpan(rm.ctx, ipr.span)) ipr.traverserCancel = cancel ipr.traverser = ipldutil.TraversalBuilder{ Root: cidlink.Link{Cid: ipr.request.Root()}, @@ -264,11 +264,20 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { log.Debugf("beging rocessing message for peer %s", p) + requestIds := make([]int, 0, len(responses)) + for _, r := range responses { + requestIds = append(requestIds, int(r.RequestID())) + } + ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "responseMessage", trace.WithAttributes( + attribute.String("peerID", p.Pretty()), + attribute.IntSlice("requestIDs", requestIds), + )) + defer span.End() filteredResponses := rm.processExtensions(responses, p) filteredResponses = rm.filterResponsesForPeer(filteredResponses, p) rm.updateLastResponses(filteredResponses) responseMetadata := metadataForResponses(filteredResponses) - rm.asyncLoader.ProcessResponse(responseMetadata, blks) + rm.asyncLoader.ProcessResponse(ctx, responseMetadata, blks) rm.processTerminations(filteredResponses) log.Debugf("end processing message for peer %s", p) } diff --git a/requestmanager/testloader/asyncloader.go b/requestmanager/testloader/asyncloader.go index 97f01c77..0c52877c 100644 --- a/requestmanager/testloader/asyncloader.go +++ b/requestmanager/testloader/asyncloader.go @@ -61,7 +61,7 @@ func (fal *FakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name str } // ProcessResponse just records values passed to verify expectations later -func (fal *FakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, +func (fal *FakeAsyncLoader) ProcessResponse(_ context.Context, responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { fal.responses <- responses fal.blks <- blks diff --git a/testutil/tracing.go b/testutil/tracing.go index ed462ba6..a690299e 100644 --- a/testutil/tracing.go +++ b/testutil/tracing.go @@ -3,6 +3,7 @@ package testutil import ( "context" "fmt" + "strings" "testing" "github.com/stretchr/testify/require" @@ -203,3 +204,11 @@ func EventAsException(t *testing.T, evt trace.Event) ExceptionEvent { require.NotEmpty(t, msg, "expected non-empty exception.message attribute for %v", evt.Name) return ExceptionEvent{Type: typ, Message: msg} } + +func RepeatTraceStrings(tmpl string, count int) []string { + res := make([]string, 0, count) + for i := 0; i < count; i++ { + res = append(res, strings.Replace(tmpl, "{}", fmt.Sprintf("%d", i), 1)) + } + return res +} From 06d8dfe5e9a233b6d184fa3685b145642bb67ddd Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 17 Dec 2021 16:53:29 -0800 Subject: [PATCH 2/6] fix(responsecache): remove cid list from span remove cid list from span and replace with simple block count --- impl/graphsync_test.go | 4 ++-- requestmanager/asyncloader/responsecache/responsecache.go | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index edadc8cf..3a376041 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -397,7 +397,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { // each verifyBlock span should link to a cacheProcess span that stored it cacheProcessSpans := tracing.FindSpans("cacheProcess") - cacheProcessLinks := make(map[string]int) + cacheProcessLinks := make(map[string]int64) verifyBlockSpans := tracing.FindSpans("verifyBlock") for _, verifyBlockSpan := range verifyBlockSpans { @@ -417,7 +417,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { // each cacheProcess span should be linked to one verifyBlock span per block it stored for _, cacheProcessSpan := range cacheProcessSpans { - blockCount := len(testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blocks").AsStringSlice()) + blockCount := testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blockCount").AsInt64() require.Equal(t, cacheProcessLinks[cacheProcessSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed") } } diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index 881b2a15..69c3e902 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -76,12 +76,8 @@ func (rc *ResponseCache) ProcessResponse( responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { - cids := make([]string, 0, len(blks)) - for _, blk := range blks { - cids = append(cids, blk.Cid().String()) - } ctx, span := otel.Tracer("graphsync").Start(ctx, "cacheProcess", trace.WithAttributes( - attribute.StringSlice("blocks", cids), + attribute.Int("blockCount", len(blks)), )) traceLink := trace.LinkFromContext(ctx) defer span.End() From 9d1d20c9bb02fc118950e5a5eb103eae6b901ddb Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 17 Dec 2021 17:24:05 -0800 Subject: [PATCH 3/6] feat(asyncloader): fix flakiness in test --- impl/graphsync_test.go | 22 ++++++++++++++++------ requestmanager/asyncloader/asyncloader.go | 10 +++++++++- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 3a376041..a7d60195 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -554,7 +554,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + responseMessageTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain ), tracing.TracesToStrings()) } @@ -627,7 +627,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + responseMessageTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain ), tracing.TracesToStrings()) } @@ -879,7 +879,7 @@ func TestPauseResumeViaUpdate(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + responseMessageTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., ), tracing.TracesToStrings()) // make sure the attributes are what we expect @@ -970,7 +970,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + responseMessageTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., ), tracing.TracesToStrings()) // make sure the attributes are what we expect @@ -1350,7 +1350,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + responseMessageTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)..., ), tracing.TracesToStrings()) } @@ -1586,7 +1586,7 @@ func TestGraphsyncBlockListeners(t *testing.T) { "request(0)->executeTask(0)", "request(0)->terminateRequest(0)", }, - testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...), + responseMessageTraces(t, tracing, responseCount)...), testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 100)..., ), tracing.TracesToStrings()) } @@ -1724,3 +1724,13 @@ func (r *receiver) Connected(p peer.ID) { func (r *receiver) Disconnected(p peer.ID) { } + +func responseMessageTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string { + traces := testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount-1) + finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1)) + require.NotNil(t, finalStub) + if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsInt64Slice()) == 0 { + return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1)) + } + return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1)) +} diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index 013f31aa..5609b4e1 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -11,6 +11,8 @@ import ( "github.com/ipld/go-ipld-prime" peer "github.com/libp2p/go-libp2p-core/peer" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/metadata" @@ -109,7 +111,13 @@ func (al *AsyncLoader) ProcessResponse( responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { - ctx, span := otel.Tracer("graphsync").Start(ctx, "loaderProcess") + requestIds := make([]int, 0, len(responses)) + for requestID := range responses { + requestIds = append(requestIds, int(requestID)) + } + ctx, span := otel.Tracer("graphsync").Start(ctx, "loaderProcess", trace.WithAttributes( + attribute.IntSlice("requestIDs", requestIds), + )) defer span.End() al.stateLk.Lock() From e4c76356b0b7b25031bfc1c0470b218410bb5bc2 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Sat, 18 Dec 2021 16:55:40 +1100 Subject: [PATCH 4/6] fix(tracing,test): sort spans by start time --- testutil/tracing.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/testutil/tracing.go b/testutil/tracing.go index a690299e..38b54e10 100644 --- a/testutil/tracing.go +++ b/testutil/tracing.go @@ -3,6 +3,7 @@ package testutil import ( "context" "fmt" + "sort" "strings" "testing" @@ -25,6 +26,9 @@ type Collector struct { // ExportSpans receives the ReadOnlySpans from the batch provider func (c *Collector) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error { c.Spans = tracetest.SpanStubsFromReadOnlySpans(spans) + sort.SliceStable(c.Spans, func(i, j int) bool { + return c.Spans[i].StartTime.Before(c.Spans[j].StartTime) + }) return nil } From 52136a5c11266061ad9bdf8863a102d791ad1e7c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 21 Dec 2021 15:09:04 -0800 Subject: [PATCH 5/6] fix(tracing): fix tracing collector --- impl/graphsync_test.go | 41 ++++++++++++------------- responsemanager/responsemanager_test.go | 5 +-- testutil/tracing.go | 8 ++--- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index a7d60195..94a3e9d9 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -51,10 +51,10 @@ import ( ) func TestMakeRequestToNetwork(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -174,10 +174,10 @@ func TestSendResponseToIncomingRequest(t *testing.T) { } func TestRejectRequestsByDefault(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -214,10 +214,10 @@ func TestRejectRequestsByDefault(t *testing.T) { } func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -265,10 +265,10 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { } func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -312,10 +312,10 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { } func TestGraphsyncRoundTrip(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -423,10 +423,10 @@ func TestGraphsyncRoundTrip(t *testing.T) { } func TestGraphsyncRoundTripPartial(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -488,10 +488,10 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { } func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -560,10 +560,10 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { } func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + ctx, collectTracing := testutil.SetupTracing(context.Background()) + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -633,10 +633,10 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { } func TestPauseResume(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -723,10 +723,10 @@ func TestPauseResume(t *testing.T) { } func TestPauseResumeRequest(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -804,10 +804,10 @@ func TestPauseResumeRequest(t *testing.T) { } func TestPauseResumeViaUpdate(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -890,10 +890,10 @@ func TestPauseResumeViaUpdate(t *testing.T) { } func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -981,10 +981,9 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { } func TestNetworkDisconnect(t *testing.T) { - collectTracing := testutil.SetupTracing() - // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1072,10 +1071,10 @@ func TestNetworkDisconnect(t *testing.T) { } func TestConnectFail(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1120,10 +1119,10 @@ func TestConnectFail(t *testing.T) { } func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1212,10 +1211,10 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { } func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1305,13 +1304,13 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { // backlog of blocks and then sending them in one giant network packet that can't // be decoded on the client side func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network if testing.Short() { t.Skip() } ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1368,12 +1367,12 @@ func TestUnixFSFetch(t *testing.T) { if testing.Short() { t.Skip() } - collectTracing := testutil.SetupTracing() const unixfsChunkSize uint64 = 1 << 10 const unixfsLinksPerLevel = 1024 ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 20*time.Second) defer cancel() @@ -1491,10 +1490,10 @@ func TestUnixFSFetch(t *testing.T) { } func TestGraphsyncBlockListeners(t *testing.T) { - collectTracing := testutil.SetupTracing() // create network ctx := context.Background() + ctx, collectTracing := testutil.SetupTracing(ctx) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() td := newGsTestData(ctx, t) diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index d9d5fc46..eadbfadd 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -38,7 +38,6 @@ import ( ) func TestIncomingQuery(t *testing.T) { - collectTracing := testutil.SetupTracing() td := newTestData(t) defer td.cancel() @@ -87,7 +86,7 @@ func TestIncomingQuery(t *testing.T) { require.Equal(t, out.request.ID(), td.requestID) td.connManager.RefuteProtected(t, td.p) - tracing := collectTracing(t) + tracing := td.collectTracing(t) require.ElementsMatch(t, []string{ "TestIncomingQuery(0)->response(0)->executeTask(0)", }, tracing.TracesToStrings()) @@ -1055,6 +1054,7 @@ type testData struct { connManager *testutil.TestConnManager transactionLk *sync.Mutex taskqueue *taskqueue.WorkerTaskQueue + collectTracing func(t *testing.T) *testutil.Collector } func newTestData(t *testing.T) testData { @@ -1062,6 +1062,7 @@ func newTestData(t *testing.T) testData { ctx := context.Background() td := testData{} td.t = t + ctx, td.collectTracing = testutil.SetupTracing(ctx) td.ctx, td.cancel = context.WithTimeout(ctx, 10*time.Second) td.blockStore = make(map[ipld.Link][]byte) diff --git a/testutil/tracing.go b/testutil/tracing.go index 38b54e10..fd5e8a42 100644 --- a/testutil/tracing.go +++ b/testutil/tracing.go @@ -139,19 +139,19 @@ func (c Collector) SingleExceptionEvent(t *testing.T, trace string, typeRe strin // a Collector. The returned helper function should be called at the point in // a test where the spans are ready to be analyzed. Any spans not properly // completed at that point won't be represented in the Collector. -func SetupTracing() func(t *testing.T) *Collector { +func SetupTracing(ctx context.Context) (context.Context, func(t *testing.T) *Collector) { collector := &Collector{} tp := trace.NewTracerProvider(trace.WithBatcher(collector)) otel.SetTracerProvider(tp) - + ctx, cancel := context.WithCancel(ctx) collect := func(t *testing.T) *Collector { t.Helper() - + cancel() require.NoError(t, tp.Shutdown(context.Background())) return collector } - return collect + return ctx, collect } // AttributeValueInTraceSpan is a test helper that asserts that at a span From 2e4bd46dc859a1727c2a81ee4a6e1dcf3dc507bc Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 21 Dec 2021 19:27:21 -0800 Subject: [PATCH 6/6] fix(impl): update test timings --- impl/graphsync_test.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 94a3e9d9..f493cac8 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -110,7 +110,7 @@ func TestMakeRequestToNetwork(t *testing.T) { func TestSendResponseToIncomingRequest(t *testing.T) { // create network ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) r := &receiver{ @@ -178,7 +178,7 @@ func TestRejectRequestsByDefault(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -218,7 +218,7 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -269,7 +269,7 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -316,7 +316,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -427,7 +427,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -492,7 +492,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -637,7 +637,7 @@ func TestPauseResume(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -727,7 +727,7 @@ func TestPauseResumeRequest(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -808,7 +808,7 @@ func TestPauseResumeViaUpdate(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -894,7 +894,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -984,7 +984,7 @@ func TestNetworkDisconnect(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1075,7 +1075,7 @@ func TestConnectFail(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1123,7 +1123,7 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1215,7 +1215,7 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t) @@ -1494,7 +1494,7 @@ func TestGraphsyncBlockListeners(t *testing.T) { // create network ctx := context.Background() ctx, collectTracing := testutil.SetupTracing(ctx) - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() td := newGsTestData(ctx, t)