From c290fe5a72530f6915448d7ad8fa3ff875f5532c Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 25 Nov 2021 13:22:21 +1100 Subject: [PATCH 1/2] feat: add basic OT tracing for incoming requests Closes: #271 --- go.mod | 3 + go.sum | 10 +- impl/graphsync.go | 12 ++ impl/graphsync_test.go | 245 ++++++++++++++++++++++++++++ requestmanager/client.go | 21 ++- requestmanager/executor/executor.go | 10 ++ requestmanager/messages.go | 9 +- requestmanager/server.go | 24 ++- testutil/tracing.go | 173 ++++++++++++++++++++ 9 files changed, 500 insertions(+), 7 deletions(-) create mode 100644 testutil/tracing.go diff --git a/go.mod b/go.mod index 00a3834f..a9588c2e 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,9 @@ require ( github.com/multiformats/go-multihash v0.0.15 github.com/stretchr/testify v1.7.0 github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 + go.opentelemetry.io/otel v1.2.0 + go.opentelemetry.io/otel/sdk v1.2.0 + go.opentelemetry.io/otel/trace v1.2.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/protobuf v1.27.1 diff --git a/go.sum b/go.sum index a8264a3b..85b9a42f 100644 --- a/go.sum +++ b/go.sum @@ -255,8 +255,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -1116,6 +1117,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ= +go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I= +go.opentelemetry.io/otel/sdk v1.2.0 h1:wKN260u4DesJYhyjxDa7LRFkuhH7ncEVKU37LWcyNIo= +go.opentelemetry.io/otel/sdk v1.2.0/go.mod h1:jNN8QtpvbsKhgaC6V5lHiejMoKD+V8uadoSafgHPx1U= +go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0= +go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -1353,6 +1360,7 @@ golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/impl/graphsync.go b/impl/graphsync.go index fe26fed1..bd8434cf 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -8,6 +8,9 @@ import ( "github.com/ipfs/go-peertaskqueue" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/allocator" @@ -304,6 +307,15 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, // Request initiates a new GraphSync request to the given peer using the given selector spec. func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { + var extNames []string + for _, ext := range extensions { + extNames = append(extNames, string(ext.Name)) + } + ctx, _ = otel.Tracer("graphsync").Start(ctx, "request", trace.WithAttributes( + attribute.String("peerID", p.Pretty()), + attribute.String("root", root.String()), + attribute.StringSlice("extensions", extNames), + )) return gs.requestManager.NewRequest(ctx, p, root, selector, extensions...) } diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 6d0a9ce1..0d32c959 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -41,13 +41,18 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/donotsendfirstblocks" + "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" gsnet "github.com/ipfs/go-graphsync/network" + "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/storeutil" + "github.com/ipfs/go-graphsync/taskqueue" "github.com/ipfs/go-graphsync/testutil" ) func TestMakeRequestToNetwork(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) @@ -84,6 +89,26 @@ func TestMakeRequestToNetwork(t *testing.T) { returnedData, found := receivedRequest.Extension(td.extensionName) require.True(t, found) require.Equal(t, td.extensionData, returnedData, "Failed to encode extension") + + graphSync.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) + + // make sure the attributes are what we expect + requestSpans := tracing.FindSpans("request") + peerIdAttr := testutil.AttributeValueInTraceSpan(t, requestSpans[0], "peerID") + require.Equal(t, td.host2.ID().Pretty(), peerIdAttr.AsString()) + rootAttr := testutil.AttributeValueInTraceSpan(t, requestSpans[0], "root") + require.Equal(t, blockChain.TipLink.String(), rootAttr.AsString()) + extensionsAttr := testutil.AttributeValueInTraceSpan(t, requestSpans[0], "extensions") + require.Equal(t, []string{string(td.extensionName)}, extensionsAttr.AsStringSlice()) + requestIdAttr := testutil.AttributeValueInTraceSpan(t, requestSpans[0], "requestID") + require.Equal(t, int64(0), requestIdAttr.AsInt64()) } func TestSendResponseToIncomingRequest(t *testing.T) { @@ -153,6 +178,8 @@ func TestSendResponseToIncomingRequest(t *testing.T) { } func TestRejectRequestsByDefault(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -171,9 +198,22 @@ func TestRejectRequestsByDefault(t *testing.T) { testutil.VerifyEmptyResponse(ctx, t, progressChan) testutil.VerifySingleTerminalError(ctx, t, errChan) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) + // has ContextCancelError exception recorded in the right place + tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) } func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -197,9 +237,22 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse)) testutil.VerifySingleTerminalError(ctx, t, errChan) require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) + // has ErrBudgetExceeded exception recorded in the right place + tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true) } func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -223,9 +276,23 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse)) testutil.VerifySingleTerminalError(ctx, t, errChan) require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) + // has ContextCancelError exception recorded in the right place + // the requester gets a cancel, the responder gets a ErrBudgetExceeded + tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) } func TestGraphsyncRoundTrip(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -284,9 +351,20 @@ func TestGraphsyncRoundTrip(t *testing.T) { var finalResponseStatus graphsync.ResponseStatusCode testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } func TestGraphsyncRoundTripPartial(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -333,9 +411,20 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { var finalResponseStatus graphsync.ResponseStatusCode testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -382,9 +471,20 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { require.Equal(t, blockChainLength, totalSent) require.Equal(t, blockChainLength-set.Len(), totalSentOnWire) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -433,9 +533,20 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { require.Equal(t, blockChainLength, totalSent) require.Equal(t, blockChainLength-50, totalSentOnWire) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } func TestPauseResume(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -485,8 +596,19 @@ func TestPauseResume(t *testing.T) { testutil.VerifyEmptyErrors(ctx, t, errChan) require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks") + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } + func TestPauseResumeRequest(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -531,9 +653,23 @@ func TestPauseResumeRequest(t *testing.T) { blockChain.VerifyRemainder(ctx, progressChan, stopPoint) testutil.VerifyEmptyErrors(ctx, t, errChan) require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->executeTask(1)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) + // has ErrPaused exception recorded in the right place + tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrPaused", hooks.ErrPaused{}.Error(), false) } func TestPauseResumeViaUpdate(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -590,9 +726,20 @@ func TestPauseResumeViaUpdate(t *testing.T) { require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data") require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -651,9 +798,20 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data") require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } func TestNetworkDisconnect(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -722,9 +880,22 @@ func TestNetworkDisconnect(t *testing.T) { testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) + // has ContextCancelError exception recorded in the right place + tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) } func TestConnectFail(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -757,9 +928,22 @@ func TestConnectFail(t *testing.T) { testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error") testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) + // has ContextCancelError exception recorded in the right place + tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false) } func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -822,9 +1006,26 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { testutil.VerifyEmptyErrors(ctx, t, errChan) require.Len(t, td.blockStore1, 0, "should store no blocks in normal store") require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + // two complete request traces expected + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + "request(1)->newRequest(0)", + "request(1)->executeTask(0)", + "request(1)->terminateRequest(0)", + }, tracing.TracesToStrings()) + // TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane + // tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true) } func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -887,6 +1088,18 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { testutil.VerifyEmptyErrors(ctx, t, errChan2) require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 2") + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + // two complete request traces expected + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + "request(1)->newRequest(0)", + "request(1)->executeTask(0)", + "request(1)->terminateRequest(0)", + }, tracing.TracesToStrings()) } // TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work @@ -898,6 +1111,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { // backlog of blocks and then sending them in one giant network packet that can't // be decoded on the client side func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network if testing.Short() { t.Skip() @@ -924,6 +1139,15 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { blockChain.VerifyWholeChain(ctx, progressChan) testutil.VerifyEmptyErrors(ctx, t, errChan) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } // What this test does: @@ -939,6 +1163,7 @@ func TestUnixFSFetch(t *testing.T) { if testing.Short() { t.Skip() } + collectTracing := testutil.SetupTracing() const unixfsChunkSize uint64 = 1 << 10 const unixfsLinksPerLevel = 1024 @@ -1044,9 +1269,20 @@ func TestUnixFSFetch(t *testing.T) { // verify original bytes match final bytes! require.Equal(t, origBytes, finalBytes, "should have gotten same bytes written as read but didn't") + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } func TestGraphsyncBlockListeners(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -1124,6 +1360,15 @@ func TestGraphsyncBlockListeners(t *testing.T) { require.Equal(t, blockChainLength, blocksOutgoing) require.Equal(t, blockChainLength, blocksIncoming) require.Equal(t, blockChainLength, blocksSent) + + requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks() + + tracing := collectTracing(t) + require.ElementsMatch(t, []string{ + "request(0)->newRequest(0)", + "request(0)->executeTask(0)", + "request(0)->terminateRequest(0)", + }, tracing.TracesToStrings()) } type gsTestData struct { diff --git a/requestmanager/client.go b/requestmanager/client.go index 4e3f2cb0..51cd75f2 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -16,6 +16,8 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/ipldutil" @@ -52,6 +54,7 @@ const ( type inProgressRequestStatus struct { ctx context.Context + span trace.Span startTime time.Time cancelFn func() p peer.ID @@ -174,13 +177,20 @@ func (rm *RequestManager) NewRequest(ctx context.Context, root ipld.Link, selectorNode ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { + + span := trace.SpanFromContext(ctx) + if _, err := selector.ParseSelector(selectorNode); err != nil { - return rm.singleErrorResponse(fmt.Errorf("invalid selector spec")) + err := fmt.Errorf("invalid selector spec") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + defer span.End() + return rm.singleErrorResponse(err) } inProgressRequestChan := make(chan inProgressRequest) - rm.send(&newRequestMessage{p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done()) + rm.send(&newRequestMessage{span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done()) var receivedInProgressRequest inProgressRequest select { case <-rm.ctx.Done(): @@ -322,7 +332,12 @@ func (rm *RequestManager) GetRequestTask(p peer.ID, task *peertask.Task, request // ReleaseRequestTask releases a task request the requestQueue func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err error) { - rm.send(&releaseRequestTaskMessage{p, task, err}, nil) + done := make(chan struct{}, 1) + rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil) + select { + case <-rm.ctx.Done(): + case <-done: + } } // SendRequest sends a request to the message queue diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 95b7f23e..88f19979 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -12,6 +12,9 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" @@ -73,11 +76,17 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask. log.Info("Empty task on peer request stack") return false } + + _, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, requestTask.Span), "executeTask") + defer span.End() + log.Debugw("beginning request execution", "id", requestTask.Request.ID(), "peer", pid.String(), "root_cid", requestTask.Request.Root().String()) err := e.traverse(requestTask) + span.RecordError(err) if err != nil && !ipldutil.IsContextCancelErr(err) { e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID())) if !isPausedErr(err) { + span.SetStatus(codes.Error, err.Error()) select { case <-requestTask.Ctx.Done(): case requestTask.InProgressErr <- err: @@ -92,6 +101,7 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask. // RequestTask are parameters for a single request execution type RequestTask struct { Ctx context.Context + Span trace.Span Request gsmsg.GraphSyncRequest LastResponse *atomic.Value DoNotSendCids *cid.Set diff --git a/requestmanager/messages.go b/requestmanager/messages.go index e5709ea8..b8f4fa87 100644 --- a/requestmanager/messages.go +++ b/requestmanager/messages.go @@ -5,6 +5,7 @@ import ( "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" @@ -76,13 +77,19 @@ type releaseRequestTaskMessage struct { p peer.ID task *peertask.Task err error + done chan struct{} } func (trm *releaseRequestTaskMessage) handle(rm *RequestManager) { rm.releaseRequestTask(trm.p, trm.task, trm.err) + select { + case <-rm.ctx.Done(): + case trm.done <- struct{}{}: + } } type newRequestMessage struct { + span trace.Span p peer.ID root ipld.Link selector ipld.Node @@ -93,7 +100,7 @@ type newRequestMessage struct { func (nrm *newRequestMessage) handle(rm *RequestManager) { var ipr inProgressRequest - ipr.request, ipr.incoming, ipr.incomingError = rm.newRequest(nrm.p, nrm.root, nrm.selector, nrm.extensions) + ipr.request, ipr.incoming, ipr.incomingError = rm.newRequest(nrm.span, nrm.p, nrm.root, nrm.selector, nrm.extensions) ipr.requestID = ipr.request.ID() select { diff --git a/requestmanager/server.go b/requestmanager/server.go index 20a5be0c..e8e2ddf8 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -15,6 +15,10 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" @@ -49,14 +53,21 @@ func (rm *RequestManager) cleanupInProcessRequests() { } } -func (rm *RequestManager) newRequest(p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { +func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { requestID := rm.nextRequestID rm.nextRequestID++ + parentSpan.SetAttributes(attribute.Int("requestID", int(requestID))) + ctx, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, parentSpan), "newRequest") + defer span.End() + log.Infow("graphsync request initiated", "request id", requestID, "peer", p, "root", root) request, hooksResult, err := rm.validateRequest(requestID, p, root, selector, extensions) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + defer parentSpan.End() rp, err := rm.singleErrorResponse(err) return request, rp, err } @@ -65,15 +76,19 @@ func (rm *RequestManager) newRequest(p peer.ID, root ipld.Link, selector ipld.No if has { doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + defer parentSpan.End() rp, err := rm.singleErrorResponse(err) return request, rp, err } } else { doNotSendCids = cid.NewSet() } - ctx, cancel := context.WithCancel(rm.ctx) + ctx, cancel := context.WithCancel(ctx) requestStatus := &inProgressRequestStatus{ ctx: ctx, + span: parentSpan, startTime: time.Now(), cancelFn: cancel, p: p, @@ -141,6 +156,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re ipr.state = running return executor.RequestTask{ Ctx: ipr.ctx, + Span: ipr.span, Request: ipr.request, LastResponse: &ipr.lastResponse, DoNotSendCids: ipr.doNotSendCids, @@ -163,6 +179,10 @@ func (rm *RequestManager) getRequestTask(p peer.ID, task *peertask.Task) executo } func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *inProgressRequestStatus) { + _, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, ipr.span), "terminateRequest") + defer span.End() + defer ipr.span.End() // parent span for this whole request + if ipr.terminalError != nil { select { case ipr.inProgressErr <- ipr.terminalError: diff --git a/testutil/tracing.go b/testutil/tracing.go new file mode 100644 index 00000000..36b9b9c9 --- /dev/null +++ b/testutil/tracing.go @@ -0,0 +1,173 @@ +package testutil + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +var _ trace.SpanExporter = &Collector{} + +type Collector struct { + Spans tracetest.SpanStubs +} + +func (c *Collector) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error { + c.Spans = tracetest.SpanStubsFromReadOnlySpans(spans) + return nil +} + +func (c *Collector) Shutdown(ctx context.Context) error { + return nil +} + +func (c Collector) FindSpans(name string) tracetest.SpanStubs { + var found = tracetest.SpanStubs{} + for _, s := range c.Spans { + if s.Name == name { + found = append(found, s) + } + } + return found +} + +// TracesToString returns an array of traces represented as strings with each +// span in the trace identified by name separated by a '->' +func (c Collector) TracesToStrings() []string { + return c.tracesToString("", c.FindParentSpans(), "", func(_ tracetest.SpanStub) {}) +} + +func (c Collector) tracesToString(trace string, spans tracetest.SpanStubs, matchString string, matchCb func(tracetest.SpanStub)) []string { + var traces []string + counts := make(map[string]int) // count the span children by name + for _, span := range spans { + nc := counts[span.Name] + counts[span.Name] = nc + 1 + t := fmt.Sprintf("%v(%d)", span.Name, nc) + if trace != "" { + t = fmt.Sprintf("%v->%v", trace, t) + } + if t == matchString { + matchCb(span) + } + children := c.FindSpansWithParent(span) + if len(children) > 0 { + traces = append(traces, c.tracesToString(t, children, matchString, matchCb)...) + } else { + traces = append(traces, t) + } + } + return traces +} + +func (c Collector) FindSpanByTraceString(trace string) tracetest.SpanStubs { + var found = tracetest.SpanStubs{} + c.tracesToString("", c.FindParentSpans(), trace, func(span tracetest.SpanStub) { + found = append(found, span) + }) + return found +} + +func (c Collector) FindParentSpans() tracetest.SpanStubs { + var found = tracetest.SpanStubs{} + for _, s := range c.Spans { + if s.Parent.SpanID() == [8]byte{} { + found = append(found, s) + } + } + return found +} + +func (c Collector) FindSpansWithParent(stub tracetest.SpanStub) tracetest.SpanStubs { + var found = tracetest.SpanStubs{} + for _, s := range c.Spans { + if s.Parent.SpanID() == stub.SpanContext.SpanID() { + found = append(found, s) + } + } + return found +} + +func (c Collector) SingleExceptionEvent(t *testing.T, trace string, typeRe string, messageRe string, errorCode bool) { + t.Helper() + + // has ContextCancelError exception recorded in the right place + et := c.FindSpanByTraceString(trace) + require.Len(t, et, 1, "expected one span with trace %v", trace) + require.Len(t, et[0].Events, 1, "expected one event in span %v", trace) + ex := EventAsException(t, EventInTraceSpan(t, et[0], "exception")) + require.Regexp(t, typeRe, ex.Type) + require.Regexp(t, messageRe, ex.Message) + if errorCode { + require.Equal(t, codes.Error, et[0].Status.Code) + require.Regexp(t, messageRe, et[0].Status.Description) + } +} + +func SetupTracing() func(t *testing.T) *Collector { + collector := &Collector{} + tp := trace.NewTracerProvider(trace.WithBatcher(collector)) + otel.SetTracerProvider(tp) + + collect := func(t *testing.T) *Collector { + t.Helper() + + require.NoError(t, tp.Shutdown(context.Background())) + return collector + } + + return collect +} + +func AttributeValueInTraceSpan(t *testing.T, stub tracetest.SpanStub, attributeName string) attribute.Value { + t.Helper() + + for _, attr := range stub.Attributes { + if attr.Key == attribute.Key(attributeName) { + return attr.Value + } + } + require.Fail(t, "did not find expected attribute %v on trace span %v", attributeName, stub.Name) + return attribute.Value{} +} + +func EventInTraceSpan(t *testing.T, stub tracetest.SpanStub, eventName string) trace.Event { + t.Helper() + + for _, evt := range stub.Events { + if evt.Name == eventName { + return evt + } + } + require.Fail(t, "did not find expected event %v on trace span %v", eventName, stub.Name) + return trace.Event{} +} + +type ExceptionEvent struct { + Type string + Message string +} + +func EventAsException(t *testing.T, evt trace.Event) ExceptionEvent { + t.Helper() + + var typ string + var msg string + for _, attr := range evt.Attributes { + if attr.Key == attribute.Key("exception.type") { + typ = attr.Value.AsString() + } else if attr.Key == attribute.Key("exception.message") { + msg = attr.Value.AsString() + } + } + require.NotEmpty(t, typ, "expected non-empty exception.type attribute for %v", evt.Name) + require.NotEmpty(t, msg, "expected non-empty exception.message attribute for %v", evt.Name) + return ExceptionEvent{Type: typ, Message: msg} +} From f21aee05f524fc203cde16367eceec0520ab0471 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 30 Nov 2021 16:03:31 +1100 Subject: [PATCH 2/2] docs(tests): document tracing test helper utilities --- testutil/tracing.go | 52 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/testutil/tracing.go b/testutil/tracing.go index 36b9b9c9..428c00dc 100644 --- a/testutil/tracing.go +++ b/testutil/tracing.go @@ -15,19 +15,24 @@ import ( var _ trace.SpanExporter = &Collector{} +// Collector can be used as a trace batcher to provide traces to, we collect +// individual spans and then extract useful data out of them for test assertions type Collector struct { Spans tracetest.SpanStubs } +// ExportSpans receives the ReadOnlySpans from the batch provider func (c *Collector) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error { c.Spans = tracetest.SpanStubsFromReadOnlySpans(spans) return nil } +// Shutdown is a noop, we don't need to do anything fancy func (c *Collector) Shutdown(ctx context.Context) error { return nil } +// FindSpans returns a list of spans by their name func (c Collector) FindSpans(name string) tracetest.SpanStubs { var found = tracetest.SpanStubs{} for _, s := range c.Spans { @@ -38,8 +43,9 @@ func (c Collector) FindSpans(name string) tracetest.SpanStubs { return found } -// TracesToString returns an array of traces represented as strings with each -// span in the trace identified by name separated by a '->' +// TracesToString returns an array of all traces represented as strings with each +// span in the trace identified by name and its number (within the parent span) +// in parens, separated by a '->'. e.g. `"foo(0)->bar(0)","foo(0)->bar(1)"` func (c Collector) TracesToStrings() []string { return c.tracesToString("", c.FindParentSpans(), "", func(_ tracetest.SpanStub) {}) } @@ -67,14 +73,23 @@ func (c Collector) tracesToString(trace string, spans tracetest.SpanStubs, match return traces } -func (c Collector) FindSpanByTraceString(trace string) tracetest.SpanStubs { - var found = tracetest.SpanStubs{} +// FindSpanByTraceString is similar to FindSpans but returns a single span +// identified by its trace string as described in TracesToStrings. Note that +// this string can also be a partial of a complete trace, e.g. just `"foo(0)"` +// without any children to fetch the parent span. +func (c Collector) FindSpanByTraceString(trace string) tracetest.SpanStub { + var found tracetest.SpanStub c.tracesToString("", c.FindParentSpans(), trace, func(span tracetest.SpanStub) { - found = append(found, span) + if found.Name != "" { + panic("found more than one span with the same trace string") + } + found = span }) return found } +// FindParentSpans finds spans that have no parents, they are at the top any +// stack. func (c Collector) FindParentSpans() tracetest.SpanStubs { var found = tracetest.SpanStubs{} for _, s := range c.Spans { @@ -85,6 +100,7 @@ func (c Collector) FindParentSpans() tracetest.SpanStubs { return found } +// FindSpansWithParent finds spans that are children of the provided span. func (c Collector) FindSpansWithParent(stub tracetest.SpanStub) tracetest.SpanStubs { var found = tracetest.SpanStubs{} for _, s := range c.Spans { @@ -95,22 +111,29 @@ func (c Collector) FindSpansWithParent(stub tracetest.SpanStub) tracetest.SpanSt return found } +// SingleExceptionEvent is a test helper that asserts that a span, identified by a +// trace string (see TracesToStrings) contains a single exception, identified by +// the type (regexp) and message (regexp). If errorCode is true, then we also assert +// that the span has an error status code, with the same message (regexp) func (c Collector) SingleExceptionEvent(t *testing.T, trace string, typeRe string, messageRe string, errorCode bool) { t.Helper() // has ContextCancelError exception recorded in the right place et := c.FindSpanByTraceString(trace) - require.Len(t, et, 1, "expected one span with trace %v", trace) - require.Len(t, et[0].Events, 1, "expected one event in span %v", trace) - ex := EventAsException(t, EventInTraceSpan(t, et[0], "exception")) + require.Len(t, et.Events, 1, "expected one event in span %v", trace) + ex := EventAsException(t, EventInTraceSpan(t, et, "exception")) require.Regexp(t, typeRe, ex.Type) require.Regexp(t, messageRe, ex.Message) if errorCode { - require.Equal(t, codes.Error, et[0].Status.Code) - require.Regexp(t, messageRe, et[0].Status.Description) + require.Equal(t, codes.Error, et.Status.Code) + require.Regexp(t, messageRe, et.Status.Description) } } +// SetupTracing returns a test helper that can will collect all spans within +// a Collector. The returned helper function should be called at the point in +// a test where the spans are ready to be analyzed. Any spans not properly +// completed at that point won't be represented in the Collector. func SetupTracing() func(t *testing.T) *Collector { collector := &Collector{} tp := trace.NewTracerProvider(trace.WithBatcher(collector)) @@ -126,6 +149,9 @@ func SetupTracing() func(t *testing.T) *Collector { return collect } +// AttributeValueInTraceSpan is a test helper that asserts that at a span +// contains an attribute with the name provided, and returns the value of +// that attribute for further inspection. func AttributeValueInTraceSpan(t *testing.T, stub tracetest.SpanStub, attributeName string) attribute.Value { t.Helper() @@ -138,6 +164,9 @@ func AttributeValueInTraceSpan(t *testing.T, stub tracetest.SpanStub, attributeN return attribute.Value{} } +// EventInTraceSpan is a test helper that asserts that at a span +// contains an event with the name provided, and returns the value of +// that event for further inspection. func EventInTraceSpan(t *testing.T, stub tracetest.SpanStub, eventName string) trace.Event { t.Helper() @@ -150,11 +179,14 @@ func EventInTraceSpan(t *testing.T, stub tracetest.SpanStub, eventName string) t return trace.Event{} } +// ExceptionEvent is a simplistic string form representation of an event type ExceptionEvent struct { Type string Message string } +// EventAsException is a test helper that converts a trace event to an ExceptionEvent +// for easier inspection. func EventAsException(t *testing.T, evt trace.Event) ExceptionEvent { t.Helper()