Skip to content

Response message tracing #327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 7, 2022
114 changes: 75 additions & 39 deletions impl/graphsync_test.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion messagequeue/builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package messagequeue

import (
"context"
"io"

"github.com/ipfs/go-graphsync"
Expand All @@ -11,6 +12,7 @@ import (
// Builder wraps a message builder with additional functions related to metadata
// and notifications in the message queue
type Builder struct {
ctx context.Context
*gsmsg.Builder
topic Topic
responseStreams map[graphsync.RequestID]io.Closer
Expand All @@ -19,8 +21,9 @@ type Builder struct {
}

// NewBuilder sets up a new builder for the given topic
func NewBuilder(topic Topic) *Builder {
func NewBuilder(ctx context.Context, topic Topic) *Builder {
return &Builder{
ctx: ctx,
Builder: gsmsg.NewBuilder(),
topic: topic,
responseStreams: make(map[graphsync.RequestID]io.Closer),
Expand All @@ -29,6 +32,10 @@ func NewBuilder(topic Topic) *Builder {
}
}

func (b *Builder) Context() context.Context {
return b.ctx
}

// SetResponseStream sets the given response stream to close should the message fail to send
func (b *Builder) SetResponseStream(requestID graphsync.RequestID, stream io.Closer) {
b.responseStreams[requestID] = stream
Expand Down Expand Up @@ -82,6 +89,7 @@ func (b *Builder) build(publisher notifications.Publisher) (gsmsg.GraphSyncMessa
BlockData: b.blockData,
ResponseCodes: message.ResponseCodes(),
},
ctx: b.ctx,
topic: b.topic,
msgSize: b.BlockSize(),
responseStreams: b.responseStreams,
Expand Down
21 changes: 20 additions & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/ipfs/go-graphsync"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -112,7 +116,10 @@ func (mq *MessageQueue) buildMessage(size uint64, buildMessageFn func(*Builder))
if shouldBeginNewResponse(mq.builders, size) {
topic := mq.nextBuilderTopic
mq.nextBuilderTopic++
mq.builders = append(mq.builders, NewBuilder(topic))
ctx, _ := otel.Tracer("graphsync").Start(mq.ctx, "message", trace.WithAttributes(
attribute.Int64("topic", int64(topic)),
))
mq.builders = append(mq.builders, NewBuilder(ctx, topic))
}
builder := mq.builders[len(mq.builders)-1]
buildMessageFn(builder)
Expand Down Expand Up @@ -156,6 +163,9 @@ func (mq *MessageQueue) runQueue() {
for {
_, metadata, err := mq.extractOutgoingMessage()
if err == nil {
span := trace.SpanFromContext(metadata.ctx)
span.SetStatus(codes.Error, "message queue shutdown")
span.End()
mq.publishError(metadata, fmt.Errorf("message queue shutdown"))
mq.eventPublisher.Close(metadata.topic)
} else {
Expand Down Expand Up @@ -211,12 +221,20 @@ func (mq *MessageQueue) extractOutgoingMessage() (gsmsg.GraphSyncMessage, intern

func (mq *MessageQueue) sendMessage() {
message, metadata, err := mq.extractOutgoingMessage()

if err != nil {
if err != errEmptyMessage {
log.Errorf("Unable to assemble GraphSync message: %s", err.Error())
}
return
}
span := trace.SpanFromContext(metadata.ctx)
defer span.End()
_, sendSpan := otel.Tracer("graphsync").Start(metadata.ctx, "sendMessage", trace.WithAttributes(
attribute.Int64("topic", int64(metadata.topic)),
attribute.Int64("size", int64(metadata.msgSize)),
))
defer sendSpan.End()
mq.publishQueued(metadata)
defer mq.eventPublisher.Close(metadata.topic)

Expand Down Expand Up @@ -337,6 +355,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTime
}

type internalMetadata struct {
ctx context.Context
public Metadata
topic Topic
msgSize uint64
Expand Down
7 changes: 7 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func TestProcessingNotification(t *testing.T) {

func TestDedupingMessages(t *testing.T) {
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

Expand Down Expand Up @@ -251,6 +252,12 @@ func TestDedupingMessages(t *testing.T) {
t.Fatal("incorrect request added to message")
}
}

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
"message(0)->sendMessage(0)",
"message(1)->sendMessage(0)",
}, tracing.TracesToStrings())
}

func TestSendsVeryLargeBlocksResponses(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion peermanager/peermessagemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type fakePeer struct {
}

func (fp *fakePeer) AllocateAndBuildMessage(blkSize uint64, buildMessage func(b *messagequeue.Builder)) {
builder := messagequeue.NewBuilder(messagequeue.Topic(0))
builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0))
buildMessage(builder)
message, err := builder.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ type fakePeerHandler struct {

func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
requestBuilder func(b *messagequeue.Builder)) {
builder := messagequeue.NewBuilder(messagequeue.Topic(0))
builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0))
requestBuilder(builder)
message, err := builder.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type NetworkErrorListeners interface {

// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
NewStream(p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream
NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream
}

type responseManagerMessage interface {
Expand Down
35 changes: 25 additions & 10 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -82,7 +83,7 @@ func New(ctx context.Context,
// and uses the ResponseAssembler to build and send a response, while also triggering any of
// the QueryExecutor's BlockHooks. Traversal continues until complete, or a signal or hook
// suggests we should stop or pause.
func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.Task) bool {
func (qe *QueryExecutor) ExecuteTask(_ context.Context, pid peer.ID, task *peertask.Task) bool {
// StartTask lets us block until this task is at the top of the execution stack
responseTaskChan := make(chan ResponseTask)
var rt ResponseTask
Expand All @@ -97,11 +98,11 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
return false
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
ctx, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
defer span.End()

log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
err := qe.executeQuery(pid, rt)
err := qe.executeQuery(ctx, pid, rt)
if err != nil {
span.RecordError(err)
if _, isPaused := err.(hooks.ErrPaused); !isPaused {
Expand All @@ -114,10 +115,10 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
}

func (qe *QueryExecutor) executeQuery(
p peer.ID, rt ResponseTask) error {
ctx context.Context, p peer.ID, rt ResponseTask) error {

// Execute the traversal operation, continue until we have reason to stop (error, pause, complete)
err := qe.runTraversal(p, rt)
err := qe.runTraversal(ctx, p, rt)

_, isPaused := err.(hooks.ErrPaused)
if isPaused {
Expand Down Expand Up @@ -180,7 +181,7 @@ func (qe *QueryExecutor) checkForUpdates(
}
}

func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error {
func (qe *QueryExecutor) runTraversal(ctx context.Context, p peer.ID, taskData ResponseTask) error {
for {
traverser := taskData.Traverser
isComplete, err := traverser.IsComplete()
Expand All @@ -195,18 +196,28 @@ func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error {
}
return err
}
lnk, data, err := qe.nextBlock(taskData)
lnk, _ := taskData.Traverser.CurrentRequest()
ctx, span := otel.Tracer("graphsync").Start(ctx, "processBlock", trace.WithAttributes(
attribute.String("cid", lnk.String()),
))
lnk, data, err := qe.nextBlock(ctx, taskData)
if err != nil {
span.End()
return err
}
err = qe.sendResponse(p, taskData, lnk, data)
err = qe.sendResponse(ctx, p, taskData, lnk, data)
if err != nil {
span.End()
return err
}
span.End()
}
}

func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, error) {
func (qe *QueryExecutor) nextBlock(ctx context.Context, taskData ResponseTask) (ipld.Link, []byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be renamed now to loadBlock, or call it loadCurrentBlock and don't return the link since it's now being pulled up to in the caller for span metadata so it's a pointless variable overwrite.

_, span := otel.Tracer("graphsync").Start(ctx, "loadBlock")
defer span.End()

lnk, lnkCtx := taskData.Traverser.CurrentRequest()
log.Debugf("will load link=%s", lnk)
result, err := taskData.Loader(lnkCtx, lnk)
Expand Down Expand Up @@ -237,9 +248,11 @@ func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, er
return lnk, data, nil
}

func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error {
func (qe *QueryExecutor) sendResponse(ctx context.Context, p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error {
// Execute a transaction for this block, including any other queued operations
return taskData.ResponseStream.Transaction(func(rb responseassembler.ResponseBuilder) error {
ctx, span := otel.Tracer("graphsync").Start(ctx, "sendBlock", trace.WithLinks(trace.LinkFromContext(rb.Context())))
defer span.End()
// Ensure that any updates that have occurred till now are integrated into the response
err := qe.checkForUpdates(p, taskData, rb)
// On any error other than a pause, we bail, if it's a pause then we continue processing _this_ block
Expand All @@ -248,7 +261,9 @@ func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipl
}
blockData := rb.SendResponse(link, data)
if blockData.BlockSize() > 0 {
_, span := otel.Tracer("graphsync").Start(ctx, "processBlockHooks")
result := qe.blockHooks.ProcessBlockHooks(p, taskData.Request, blockData)
span.End()
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
Expand Down
4 changes: 4 additions & 0 deletions responsemanager/queryexecutor/queryexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ func (rb fauxResponseBuilder) PauseRequest() {
}
}

func (rb fauxResponseBuilder) Context() context.Context {
return context.TODO()
}

var _ responseassembler.ResponseBuilder = &fauxResponseBuilder{}

type blockData struct {
Expand Down
7 changes: 7 additions & 0 deletions responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package responseassembler

import (
"context"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
Expand All @@ -18,6 +20,7 @@ type responseOperation interface {
}

type responseBuilder struct {
ctx context.Context
requestID graphsync.RequestID
operations []responseOperation
linkTracker *peerLinkTracker
Expand Down Expand Up @@ -47,6 +50,10 @@ func (rb *responseBuilder) PauseRequest() {
rb.operations = append(rb.operations, statusOperation{rb.requestID, graphsync.RequestPaused})
}

func (rb *responseBuilder) Context() context.Context {
return rb.ctx
}

func (rb *responseBuilder) setupBlockOperation(
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
Expand Down
22 changes: 19 additions & 3 deletions responsemanager/responseassembler/responseassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/messagequeue"
Expand Down Expand Up @@ -43,6 +45,9 @@ type ResponseBuilder interface {

// PauseRequest temporarily halts responding to the request
PauseRequest()

// Context returns the execution context for this transaction
Context() context.Context
}

// PeerMessageHandler is an interface that can queue a response for a given peer to go out over the network
Expand All @@ -68,8 +73,9 @@ func New(ctx context.Context, peerHandler PeerMessageHandler) *ResponseAssembler
}
}

func (ra *ResponseAssembler) NewStream(p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) ResponseStream {
func (ra *ResponseAssembler) NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) ResponseStream {
return &responseStream{
ctx: ctx,
requestID: requestID,
p: p,
messageSenders: ra.peerHandler,
Expand All @@ -79,6 +85,7 @@ func (ra *ResponseAssembler) NewStream(p peer.ID, requestID graphsync.RequestID,
}

type responseStream struct {
ctx context.Context
requestID graphsync.RequestID
p peer.ID
closed bool
Expand Down Expand Up @@ -132,16 +139,22 @@ func (rs *responseStream) ClearRequest() {
}

func (rs *responseStream) Transaction(transaction Transaction) error {
ctx, span := otel.Tracer("graphsync").Start(rs.ctx, "transaction")
defer span.End()
rb := &responseBuilder{
ctx: ctx,
requestID: rs.requestID,
linkTracker: rs.linkTrackers.GetProcess(rs.p).(*peerLinkTracker),
}
err := transaction(rb)
rs.execute(rb.operations)
rs.execute(ctx, rb.operations)
return err
}

func (rs *responseStream) execute(operations []responseOperation) {
func (rs *responseStream) execute(ctx context.Context, operations []responseOperation) {
ctx, span := otel.Tracer("graphsync").Start(ctx, "execute")
defer span.End()

if rs.isClosed() {
return
}
Expand All @@ -150,6 +163,9 @@ func (rs *responseStream) execute(operations []responseOperation) {
size += op.size()
}
rs.messageSenders.AllocateAndBuildMessage(rs.p, size, func(builder *messagequeue.Builder) {
_, span = otel.Tracer("graphsync").Start(ctx, "message-build", trace.WithLinks(trace.LinkFromContext(builder.Context())))
defer span.End()

if rs.isClosed() {
return
}
Expand Down
Loading