Skip to content

Refactor async loading for simplicity and correctness #356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,30 @@ func (e RequestNotFoundErr) Error() string {
return "request not found"
}

// RemoteMissingBlockErr indicates that the remote peer was missing a block
// in the selector requested. It is a non-terminal error in the error stream
// MissingBlockErr indicates that the remote peer was missing a block
// in the selector requested, and we also don't have it locally.
// It is a non-terminal error in the error stream
// for a request and does NOT cause a request to fail completely
type RemoteMissingBlockErr struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the rename? it's still for remotes and we have other Remote* errors like the one you introduced below

Copy link
Collaborator Author

@hannahhoward hannahhoward Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically it's now just a "missing block" -- missing both locally & remotely -- maybe a rename that breaks compatibility isn't worth it just to be super accurate.

type MissingBlockErr struct {
Link ipld.Link
}

func (e RemoteMissingBlockErr) Error() string {
func (e MissingBlockErr) Error() string {
return fmt.Sprintf("remote peer is missing block: %s", e.Link.String())
}

// RemoteIncorrectResponseError indicates that the remote peer sent a response
// to a traversal that did not correspond with the expected next link
// in the selector traversal based on verification of data up to this point
type RemoteIncorrectResponseError struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the path where this link mismatched occured might be useful context as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

LocalLink ipld.Link
RemoteLink ipld.Link
}

func (e RemoteIncorrectResponseError) Error() string {
return fmt.Sprintf("next link %s in remote traversal does not match next link %s in local traversal, possible malicious responder", e.LocalLink, e.RemoteLink)
}

var (
// ErrExtensionAlreadyRegistered means a user extension can be registered only once
ErrExtensionAlreadyRegistered = errors.New("extension already registered")
Expand Down Expand Up @@ -223,6 +236,8 @@ type LinkMetadataIterator func(cid.Cid, LinkAction)

// LinkMetadata is used to access link metadata through an Iterator
type LinkMetadata interface {
// Length returns the number of metadata entries
Length() int64
// Iterate steps over individual metadata one by one using the provided
// callback
Iterate(LinkMetadataIterator)
Expand Down
18 changes: 3 additions & 15 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/ipfs/go-graphsync/persistenceoptions"
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/requestmanager/executor"
requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager"
responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
"github.com/ipfs/go-graphsync/responsemanager/queryexecutor"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
"github.com/ipfs/go-graphsync/selectorvalidator"
Expand All @@ -50,7 +49,6 @@ type GraphSync struct {
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
queryExecutor *queryexecutor.QueryExecutor
asyncLoader *asyncloader.AsyncLoader
responseQueue taskqueue.TaskQueue
requestQueue taskqueue.TaskQueue
requestExecutor *executor.Executor
Expand Down Expand Up @@ -230,10 +228,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)

asyncLoader := asyncloader.New(ctx, linkSystem)
requestQueue := taskqueue.NewTaskQueue(ctx)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, outgoingRequestProcessingListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
requestManager := requestmanager.New(ctx, persistenceOptions, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, outgoingRequestProcessingListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks)
responseAssembler := responseassembler.New(ctx, peerManager)
var ptqopts []peertaskqueue.Option
if gsConfig.maxInProgressIncomingRequestsPerPeer > 0 {
Expand Down Expand Up @@ -266,7 +263,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestManager: requestManager,
responseManager: responseManager,
queryExecutor: queryExecutor,
asyncLoader: asyncLoader,
responseQueue: responseQueue,
requestQueue: requestQueue,
requestExecutor: requestExecutor,
Expand Down Expand Up @@ -339,19 +335,11 @@ func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingReques

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
func (gs *GraphSync) RegisterPersistenceOption(name string, lsys ipld.LinkSystem) error {
err := gs.asyncLoader.RegisterPersistenceOption(name, lsys)
if err != nil {
return err
}
return gs.persistenceOptions.Register(name, lsys)
}

// UnregisterPersistenceOption unregisters an alternate loader/storer combo
func (gs *GraphSync) UnregisterPersistenceOption(name string) error {
err := gs.asyncLoader.UnregisterPersistenceOption(name)
if err != nil {
return err
}
return gs.persistenceOptions.Unregister(name)
}

Expand Down
74 changes: 37 additions & 37 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMakeRequestToNetwork(t *testing.T) {
graphSync := td.GraphSyncHost1()

blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence1, 100, blockChainLength)
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

requestCtx, requestCancel := context.WithCancel(ctx)
defer requestCancel()
Expand All @@ -109,6 +109,11 @@ func TestMakeRequestToNetwork(t *testing.T) {
require.True(t, found)
require.Equal(t, td.extensionData, returnedData, "Failed to encode extension")

builder := gsmsg.NewBuilder()
builder.AddResponseCode(receivedRequest.ID(), graphsync.RequestRejected)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this now? is it because this test never needed to actually make a response and the previous version had all the blocks locally; but this new one puts the blocks on the remote but then tells the client to get lost?

So we're simply testing the ability to make a network connection and have the most basic interaction here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this test is from very far back in the early graphsync development days -- when I was just seeing if the protocol was generating the network traffic I expected. So previously, it yes loaded absolutely everything locally but we just wanted to make sure a request got made.

Maybe the right move now is eliminate, since really, at this point, I'm pretty sure we can send network requests :P

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eliminated

response, err := builder.Build()
require.NoError(t, err)
td.gsnet2.SendMessage(ctx, td.host1.ID(), response)
drain(graphSync)

tracing := collectTracing(t)
Expand All @@ -117,6 +122,7 @@ func TestMakeRequestToNetwork(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"message(0)->sendMessage(0)",
"processResponses(0)",
}, tracing.TracesToStrings())

// make sure the attributes are what we expect
Expand Down Expand Up @@ -224,7 +230,7 @@ func TestRejectRequestsByDefault(t *testing.T) {
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"processResponses(0)->loaderProcess(0)->cacheProcess(0)",
"processResponses(0)",
"processRequests(0)->transaction(0)->execute(0)->buildMessage(0)",
"message(0)->sendMessage(0)",
"message(1)->sendMessage(0)",
Expand Down Expand Up @@ -278,8 +284,8 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ErrBudgetExceeded exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true)
Expand Down Expand Up @@ -327,8 +333,8 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ContextCancelError exception recorded in the right place
// the requester gets a cancel, the responder gets a ErrBudgetExceeded
Expand Down Expand Up @@ -411,27 +417,27 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

processUpdateSpan := tracing.FindSpanByTraceString("response(0)")
require.Equal(t, int64(0), testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "priority").AsInt64())
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())

// each verifyBlock span should link to a cacheProcess span that stored it

cacheProcessSpans := tracing.FindSpans("cacheProcess")
cacheProcessLinks := make(map[string]int64)
processResponsesSpans := tracing.FindSpans("processResponses")
processResponsesLinks := make(map[string]int64)
verifyBlockSpans := tracing.FindSpans("verifyBlock")

for _, verifyBlockSpan := range verifyBlockSpans {
require.Len(t, verifyBlockSpan.Links, 1, "verifyBlock span should have one link")
found := false
for _, cacheProcessSpan := range cacheProcessSpans {
sid := cacheProcessSpan.SpanContext.SpanID().String()
for _, prcessResponseSpan := range processResponsesSpans {
sid := prcessResponseSpan.SpanContext.SpanID().String()
if verifyBlockSpan.Links[0].SpanContext.SpanID().String() == sid {
found = true
cacheProcessLinks[sid] = cacheProcessLinks[sid] + 1
processResponsesLinks[sid] = processResponsesLinks[sid] + 1
break
}
}
Expand All @@ -440,9 +446,9 @@ func TestGraphsyncRoundTrip(t *testing.T) {

// each cacheProcess span should be linked to one verifyBlock span per block it stored

for _, cacheProcessSpan := range cacheProcessSpans {
blockCount := testutil.AttributeValueInTraceSpan(t, cacheProcessSpan, "blockCount").AsInt64()
require.Equal(t, cacheProcessLinks[cacheProcessSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed")
for _, processResponseSpan := range processResponsesSpans {
blockCount := testutil.AttributeValueInTraceSpan(t, processResponseSpan, "blockCount").AsInt64()
require.Equal(t, processResponsesLinks[processResponseSpan.SpanContext.SpanID().String()], blockCount, "cacheProcess span should be linked to one verifyBlock span per block it processed")
}
})
}
Expand Down Expand Up @@ -510,8 +516,8 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
Expand Down Expand Up @@ -744,8 +750,8 @@ func TestPauseResume(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// pause recorded
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)
Expand Down Expand Up @@ -827,8 +833,8 @@ func TestPauseResumeRequest(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(1)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ErrPaused exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrPaused", hooks.ErrPaused{}.Error(), false)
Expand Down Expand Up @@ -1115,8 +1121,8 @@ func TestNetworkDisconnect(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ContextCancelError exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
Expand Down Expand Up @@ -1256,8 +1262,8 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?)
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(1)->verifyBlock(0)") // should have one of these per block (TODO: why request(1) and not (0)?)

// TODO(rvagg): this is randomly either a SkipMe or a ipldutil.ContextCancelError; confirm this is sane
// tracing.SingleExceptionEvent(t, "request(0)->newRequest(0)","request(0)->executeTask(0)", "SkipMe", traversal.SkipMe{}.Error(), true)
Expand Down Expand Up @@ -1345,8 +1351,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
require.Contains(t, traceStrings, "request(1)->newRequest(0)")
require.Contains(t, traceStrings, "request(1)->executeTask(0)")
require.Contains(t, traceStrings, "request(1)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
Expand Down Expand Up @@ -1600,8 +1606,8 @@ func TestUnixFSFetch(t *testing.T) {
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)->loaderProcess(0)->cacheProcess(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
}

func TestGraphsyncBlockListeners(t *testing.T) {
Expand Down Expand Up @@ -1873,11 +1879,5 @@ func (r *receiver) Disconnected(p peer.ID) {
}

func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string {
traces := testutil.RepeatTraceStrings("processResponses({})->loaderProcess(0)->cacheProcess(0)", responseCount-1)
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
require.NotNil(t, finalStub)
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsStringSlice()) == 0 {
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
}
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
return testutil.RepeatTraceStrings("processResponses({})", responseCount)
}
5 changes: 5 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ func (gslm GraphSyncLinkMetadata) Iterate(iter graphsync.LinkMetadataIterator) {
}
}

// Length returns the number of metadata entries
func (gslm GraphSyncLinkMetadata) Length() int64 {
return int64(len(gslm.linkMetadata))
}

// RawMetadata accesses the raw GraphSyncLinkMetadatum contained in this object,
// this is not exposed via the graphsync.LinkMetadata API and in general the
// Iterate() method should be used instead for accessing the individual metadata
Expand Down
Loading