Skip to content

Commit dc8c863

Browse files
authored
Add support for IPLD prime's budgets feature in selectors (#235)
* feat(graphsync): add link limits add options to configure the maximum number of allowed links to traverse on the requestor and the responder * fix(requestmanager): make termination predictable termination from remote peers was not properly handling blocks included up to termination, and could include unpredictable amounts in the response channel * docs(comments): add comments about maxLinksPerRequest
1 parent bc8e6b1 commit dc8c863

14 files changed

+189
-17
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ require (
2727
github.com/ipfs/go-peertaskqueue v0.2.0
2828
github.com/ipfs/go-unixfs v0.2.4
2929
github.com/ipld/go-codec-dagpb v1.3.0
30-
github.com/ipld/go-ipld-prime v0.12.0
30+
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e
3131
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
3232
github.com/libp2p/go-buffer-pool v0.0.2
3333
github.com/libp2p/go-libp2p v0.13.0

go.sum

+4-2
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,8 @@ github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZ
238238
github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSatMGe8=
239239
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
240240
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
241-
github.com/ipld/go-ipld-prime v0.12.0 h1:JapyKWTsJgmhrPI7hfx4V798c/RClr85sXfBZnH1VIw=
242-
github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc=
241+
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e h1:HPLQ9V/OFHKjfbFio8vQV+EW7lpQPj+iPl93VcwSTYM=
242+
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8=
243243
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
244244
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
245245
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
@@ -539,6 +539,8 @@ github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5
539539
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
540540
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
541541
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
542+
github.com/multiformats/go-multicodec v0.3.0 h1:tstDwfIjiHbnIjeM5Lp+pMrSeN+LCMsEwOrkPmWm03A=
543+
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
542544
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
543545
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
544546
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=

impl/graphsync.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ type graphsyncConfigOptions struct {
7474
maxInProgressIncomingRequests uint64
7575
maxInProgressOutgoingRequests uint64
7676
registerDefaultValidator bool
77+
maxLinksPerOutgoingRequest uint64
78+
maxLinksPerIncomingRequest uint64
7779
}
7880

7981
// Option defines the functional option type that can be used to configure
@@ -136,6 +138,24 @@ func MaxInProgressOutgoingRequests(maxInProgressOutgoingRequests uint64) Option
136138
}
137139
}
138140

141+
// MaxLinksPerOutgoingRequests changes the allowed number of links an outgoing
142+
// request can traverse before failing
143+
// A value of 0 = infinity, or no limit
144+
func MaxLinksPerOutgoingRequests(maxLinksPerOutgoingRequest uint64) Option {
145+
return func(gs *graphsyncConfigOptions) {
146+
gs.maxLinksPerOutgoingRequest = maxLinksPerOutgoingRequest
147+
}
148+
}
149+
150+
// MaxLinksPerIncomingRequests changes the allowed number of links an incoming
151+
// request can traverse before failing
152+
// A value of 0 = infinity, or no limit
153+
func MaxLinksPerIncomingRequests(maxLinksPerIncomingRequest uint64) Option {
154+
return func(gs *graphsyncConfigOptions) {
155+
gs.maxLinksPerIncomingRequest = maxLinksPerIncomingRequest
156+
}
157+
}
158+
139159
// New creates a new GraphSync Exchange on the given network,
140160
// and the given link loader+storer.
141161
func New(parent context.Context, network gsnet.GraphSyncNetwork,
@@ -179,11 +199,11 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
179199

180200
asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
181201
requestQueue := taskqueue.NewTaskQueue(ctx)
182-
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue, network.ConnectionManager())
202+
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
183203
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
184204
responseAssembler := responseassembler.New(ctx, peerManager)
185205
peerTaskQueue := peertaskqueue.New()
186-
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests, network.ConnectionManager())
206+
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests, network.ConnectionManager(), gsConfig.maxLinksPerIncomingRequest)
187207
graphSync := &GraphSync{
188208
network: network,
189209
linkSystem: linkSystem,

impl/graphsync_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,58 @@ func TestRejectRequestsByDefault(t *testing.T) {
173173
testutil.VerifySingleTerminalError(ctx, t, errChan)
174174
}
175175

176+
func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
177+
// create network
178+
ctx := context.Background()
179+
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
180+
defer cancel()
181+
td := newGsTestData(ctx, t)
182+
183+
var linksToTraverse uint64 = 5
184+
// initialize graphsync on first node to make requests
185+
requestor := td.GraphSyncHost1(MaxLinksPerOutgoingRequests(linksToTraverse))
186+
187+
// setup receiving peer to just record message coming in
188+
blockChainLength := 100
189+
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)
190+
191+
// initialize graphsync on second node to response to requests
192+
td.GraphSyncHost2()
193+
194+
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
195+
196+
// response budgets don't include the root block, so total links traverse with be one more than expected
197+
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
198+
testutil.VerifySingleTerminalError(ctx, t, errChan)
199+
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")
200+
}
201+
202+
func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
203+
// create network
204+
ctx := context.Background()
205+
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
206+
defer cancel()
207+
td := newGsTestData(ctx, t)
208+
209+
var linksToTraverse uint64 = 5
210+
// initialize graphsync on first node to make requests
211+
requestor := td.GraphSyncHost1()
212+
213+
// setup receiving peer to just record message coming in
214+
blockChainLength := 100
215+
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)
216+
217+
// initialize graphsync on second node to response to requests
218+
td.GraphSyncHost2(MaxLinksPerIncomingRequests(linksToTraverse))
219+
220+
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
221+
222+
// response budgets don't include the root block, so total links traverse with be one more than expected
223+
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
224+
testutil.VerifySingleTerminalError(ctx, t, errChan)
225+
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")
226+
}
227+
176228
func TestGraphsyncRoundTrip(t *testing.T) {
177229
// create network
178230
ctx := context.Background()

ipldutil/traverser.go

+10
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type TraversalBuilder struct {
3636
Visitor traversal.AdvVisitFn
3737
LinkSystem ipld.LinkSystem
3838
Chooser traversal.LinkTargetNodePrototypeChooser
39+
Budget *traversal.Budget
3940
}
4041

4142
// Traverser is an interface for performing a selector traversal that operates iteratively --
@@ -81,6 +82,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
8182
visitor: defaultVisitor,
8283
chooser: defaultChooser,
8384
linkSystem: tb.LinkSystem,
85+
budget: tb.Budget,
8486
awaitRequest: make(chan struct{}, 1),
8587
stateChan: make(chan state, 1),
8688
responses: make(chan nextResponse),
@@ -120,6 +122,7 @@ type traverser struct {
120122
chooser traversal.LinkTargetNodePrototypeChooser
121123
currentLink ipld.Link
122124
currentContext ipld.LinkContext
125+
budget *traversal.Budget
123126
isDone bool
124127
completionErr error
125128
awaitRequest chan struct{}
@@ -184,6 +187,12 @@ func (t *traverser) start() {
184187
t.writeDone(err)
185188
return
186189
}
190+
if t.budget != nil {
191+
t.budget.LinkBudget--
192+
if t.budget.LinkBudget <= 0 {
193+
t.writeDone(&traversal.ErrBudgetExceeded{BudgetKind: "link", Link: t.root})
194+
}
195+
}
187196
nd, err := t.linkSystem.Load(ipld.LinkContext{Ctx: t.ctx}, t.root, ns)
188197
if err != nil {
189198
t.writeDone(err)
@@ -201,6 +210,7 @@ func (t *traverser) start() {
201210
LinkSystem: t.linkSystem,
202211
LinkTargetNodePrototypeChooser: t.chooser,
203212
},
213+
Budget: t.budget,
204214
}.WalkAdv(nd, sel, t.visitor)
205215
t.writeDone(err)
206216
}()

ipldutil/traverser_test.go

+54-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ipldutil
33
import (
44
"bytes"
55
"context"
6+
"math"
67
"testing"
78
"time"
89

@@ -56,7 +57,7 @@ func TestTraverser(t *testing.T) {
5657
testdata.MiddleMapBlock,
5758
testdata.LeafAlphaBlock,
5859
testdata.LeafAlphaBlock,
59-
})
60+
}, nil)
6061
})
6162

6263
t.Run("traverses correctly, blockchain", func(t *testing.T) {
@@ -86,13 +87,58 @@ func TestTraverser(t *testing.T) {
8687
blockChain.VerifyWholeChainWithTypes(ctx, inProgressChan)
8788
close(done)
8889
}()
89-
checkTraverseSequence(ctx, t, traverser, blockChain.AllBlocks())
90+
checkTraverseSequence(ctx, t, traverser, blockChain.AllBlocks(), nil)
9091
close(inProgressChan)
9192
testutil.AssertDoesReceive(ctx, t, done, "should have completed verification but did not")
9293
})
94+
95+
t.Run("errors correctly, with budget", func(t *testing.T) {
96+
store := make(map[ipld.Link][]byte)
97+
persistence := testutil.NewTestStore(store)
98+
blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10)
99+
traverser := TraversalBuilder{
100+
Root: blockChain.TipLink,
101+
Selector: blockChain.Selector(),
102+
Chooser: blockChain.Chooser,
103+
LinkSystem: persistence,
104+
Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error {
105+
return nil
106+
},
107+
Budget: &traversal.Budget{
108+
NodeBudget: math.MaxInt64,
109+
LinkBudget: 6,
110+
},
111+
}.Start(ctx)
112+
var path ipld.Path
113+
for i := 0; i < 6; i++ {
114+
path = path.AppendSegment(ipld.PathSegmentOfString("Parents"))
115+
path = path.AppendSegment(ipld.PathSegmentOfInt(0))
116+
}
117+
checkTraverseSequence(ctx, t, traverser, blockChain.Blocks(0, 6), &traversal.ErrBudgetExceeded{BudgetKind: "link", Path: path, Link: blockChain.LinkTipIndex(6)})
118+
})
119+
120+
t.Run("errors correctly, no budget", func(t *testing.T) {
121+
store := make(map[ipld.Link][]byte)
122+
persistence := testutil.NewTestStore(store)
123+
blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10)
124+
traverser := TraversalBuilder{
125+
Root: blockChain.TipLink,
126+
Selector: blockChain.Selector(),
127+
Chooser: blockChain.Chooser,
128+
LinkSystem: persistence,
129+
Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error {
130+
return nil
131+
},
132+
Budget: &traversal.Budget{
133+
NodeBudget: math.MaxInt64,
134+
LinkBudget: 0,
135+
},
136+
}.Start(ctx)
137+
checkTraverseSequence(ctx, t, traverser, []blocks.Block{}, &traversal.ErrBudgetExceeded{BudgetKind: "link", Link: blockChain.TipLink})
138+
})
93139
}
94140

95-
func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block) {
141+
func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block, finalErr error) {
96142
for _, blk := range expectedBlks {
97143
isComplete, err := traverser.IsComplete()
98144
require.False(t, isComplete)
@@ -104,5 +150,9 @@ func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverse
104150
}
105151
isComplete, err := traverser.IsComplete()
106152
require.True(t, isComplete)
107-
require.NoError(t, err)
153+
if finalErr == nil {
154+
require.NoError(t, err)
155+
} else {
156+
require.EqualError(t, err, finalErr.Error())
157+
}
108158
}

requestmanager/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type inProgressRequestStatus struct {
6666
inProgressChan chan graphsync.ResponseProgress
6767
inProgressErr chan error
6868
traverser ipldutil.Traverser
69+
traverserCancel context.CancelFunc
6970
}
7071

7172
// PeerHandler is an interface that can send requests to peers
@@ -96,6 +97,8 @@ type RequestManager struct {
9697
disconnectNotif *pubsub.PubSub
9798
linkSystem ipld.LinkSystem
9899
connManager network.ConnManager
100+
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
101+
maxLinksPerRequest uint64
99102

100103
// dont touch out side of run loop
101104
nextRequestID graphsync.RequestID
@@ -129,6 +132,7 @@ func New(ctx context.Context,
129132
networkErrorListeners *listeners.NetworkErrorListeners,
130133
requestQueue taskqueue.TaskQueue,
131134
connManager network.ConnManager,
135+
maxLinksPerRequest uint64,
132136
) *RequestManager {
133137
ctx, cancel := context.WithCancel(ctx)
134138
return &RequestManager{
@@ -145,6 +149,7 @@ func New(ctx context.Context,
145149
networkErrorListeners: networkErrorListeners,
146150
requestQueue: requestQueue,
147151
connManager: connManager,
152+
maxLinksPerRequest: maxLinksPerRequest,
148153
}
149154
}
150155

requestmanager/requestmanager_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1019,7 +1019,7 @@ func newTestData(ctx context.Context, t *testing.T) *testData {
10191019
td.networkErrorListeners = listeners.NewNetworkErrorListeners()
10201020
td.taskqueue = taskqueue.NewTaskQueue(ctx)
10211021
lsys := cidlink.DefaultLinkSystem()
1022-
td.requestManager = New(ctx, td.fal, lsys, td.requestHooks, td.responseHooks, td.networkErrorListeners, td.taskqueue, td.tcm)
1022+
td.requestManager = New(ctx, td.fal, lsys, td.requestHooks, td.responseHooks, td.networkErrorListeners, td.taskqueue, td.tcm, 0)
10231023
td.executor = executor.NewExecutor(td.requestManager, td.blockHooks, td.fal.AsyncLoad)
10241024
td.requestManager.SetDelegate(td.fph)
10251025
td.requestManager.Startup()

requestmanager/server.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,24 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
102102
var initialRequest bool
103103
if ipr.traverser == nil {
104104
initialRequest = true
105+
var budget *traversal.Budget
106+
if rm.maxLinksPerRequest > 0 {
107+
budget = &traversal.Budget{
108+
NodeBudget: math.MaxInt64,
109+
LinkBudget: int64(rm.maxLinksPerRequest),
110+
}
111+
}
112+
// the traverser has its own context because we want to fail on block boundaries, in the executor,
113+
// and make sure all blocks included up to the termination message
114+
// are processed and passed in the response channel
115+
ctx, cancel := context.WithCancel(rm.ctx)
116+
ipr.traverserCancel = cancel
105117
ipr.traverser = ipldutil.TraversalBuilder{
106118
Root: cidlink.Link{Cid: ipr.request.Root()},
107119
Selector: ipr.request.Selector(),
108120
Visitor: func(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error {
109121
select {
110-
case <-ipr.ctx.Done():
122+
case <-ctx.Done():
111123
case ipr.inProgressChan <- graphsync.ResponseProgress{
112124
Node: node,
113125
Path: tp.Path,
@@ -118,7 +130,8 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
118130
},
119131
Chooser: ipr.nodeStyleChooser,
120132
LinkSystem: rm.linkSystem,
121-
}.Start(ipr.ctx)
133+
Budget: budget,
134+
}.Start(ctx)
122135
}
123136

124137
ipr.state = running
@@ -157,6 +170,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i
157170
ipr.cancelFn()
158171
rm.asyncLoader.CleanupRequest(requestID)
159172
if ipr.traverser != nil {
173+
ipr.traverserCancel()
160174
ipr.traverser.Shutdown(rm.ctx)
161175
}
162176
// make sure context is not closed before closing channels (could cause send

responsemanager/client.go

+4
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ type ResponseManager struct {
155155
inProgressResponses map[responseKey]*inProgressResponseStatus
156156
maxInProcessRequests uint64
157157
connManager network.ConnManager
158+
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
159+
maxLinksPerRequest uint64
158160
}
159161

160162
// New creates a new response manager for responding to requests
@@ -172,6 +174,7 @@ func New(ctx context.Context,
172174
networkErrorListeners NetworkErrorListeners,
173175
maxInProcessRequests uint64,
174176
connManager network.ConnManager,
177+
maxLinksPerRequest uint64,
175178
) *ResponseManager {
176179
ctx, cancelFn := context.WithCancel(ctx)
177180
messages := make(chan responseManagerMessage, 16)
@@ -194,6 +197,7 @@ func New(ctx context.Context,
194197
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
195198
maxInProcessRequests: maxInProcessRequests,
196199
connManager: connManager,
200+
maxLinksPerRequest: maxLinksPerRequest,
197201
}
198202
rm.qe = &queryExecutor{
199203
blockHooks: blockHooks,

0 commit comments

Comments
 (0)