Skip to content

Commit 749e517

Browse files
hannahhowardrvagg
andauthored
Limit simultaneous incoming requests on a per peer basis (#245)
* feat(graphsync): limit simultaneous requests per peer * Update impl/graphsync.go Co-authored-by: Rod Vagg <[email protected]> Co-authored-by: Rod Vagg <[email protected]>
1 parent c5420cf commit 749e517

File tree

3 files changed

+36
-15
lines changed

3 files changed

+36
-15
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ require (
2424
github.com/ipfs/go-ipld-format v0.2.0
2525
github.com/ipfs/go-log/v2 v2.1.1
2626
github.com/ipfs/go-merkledag v0.3.2
27-
github.com/ipfs/go-peertaskqueue v0.2.0
27+
github.com/ipfs/go-peertaskqueue v0.6.0
2828
github.com/ipfs/go-unixfs v0.2.4
2929
github.com/ipld/go-codec-dagpb v1.3.0
3030
github.com/ipld/go-ipld-prime v0.12.3

go.sum

+4-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cB
1212
github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s=
1313
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
1414
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
15+
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
16+
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
1517
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
1618
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
1719
github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
@@ -227,8 +229,8 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
227229
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
228230
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
229231
github.com/ipfs/go-peertaskqueue v0.1.1/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
230-
github.com/ipfs/go-peertaskqueue v0.2.0 h1:2cSr7exUGKYyDeUyQ7P/nHPs9P7Ht/B+ROrpN1EJOjc=
231-
github.com/ipfs/go-peertaskqueue v0.2.0/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
232+
github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg=
233+
github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
232234
github.com/ipfs/go-unixfs v0.2.4 h1:6NwppOXefWIyysZ4LR/qUBPvXd5//8J3jiMdvpbw6Lo=
233235
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
234236
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=

impl/graphsync.go

+31-12
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,16 @@ type GraphSync struct {
6767
}
6868

6969
type graphsyncConfigOptions struct {
70-
totalMaxMemoryResponder uint64
71-
maxMemoryPerPeerResponder uint64
72-
totalMaxMemoryRequestor uint64
73-
maxMemoryPerPeerRequestor uint64
74-
maxInProgressIncomingRequests uint64
75-
maxInProgressOutgoingRequests uint64
76-
registerDefaultValidator bool
77-
maxLinksPerOutgoingRequest uint64
78-
maxLinksPerIncomingRequest uint64
70+
totalMaxMemoryResponder uint64
71+
maxMemoryPerPeerResponder uint64
72+
totalMaxMemoryRequestor uint64
73+
maxMemoryPerPeerRequestor uint64
74+
maxInProgressIncomingRequests uint64
75+
maxInProgressIncomingRequestsPerPeer uint64
76+
maxInProgressOutgoingRequests uint64
77+
registerDefaultValidator bool
78+
maxLinksPerOutgoingRequest uint64
79+
maxLinksPerIncomingRequest uint64
7980
}
8081

8182
// Option defines the functional option type that can be used to configure
@@ -123,15 +124,29 @@ func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
123124
}
124125

125126
// MaxInProgressIncomingRequests changes the maximum number of
126-
// graphsync requests that are processed in parallel (default 6)
127+
// incoming graphsync requests that are processed in parallel (default 6)
127128
func MaxInProgressIncomingRequests(maxInProgressIncomingRequests uint64) Option {
128129
return func(gs *graphsyncConfigOptions) {
129130
gs.maxInProgressIncomingRequests = maxInProgressIncomingRequests
130131
}
131132
}
132133

134+
// MaxInProgressIncomingRequestsPerPeer changes the maximum number of
135+
// incoming graphsync requests that are processed in parallel on a per-peer basis.
136+
// The value is not set by default.
137+
// Useful in an environment for very high bandwidth graphsync responders serving
138+
// many peers
139+
// Note: if for some reason this is set higher than MaxInProgressIncomingRequests
140+
// it will simply have no effect.
141+
// Note: setting a value of zero will have no effect
142+
func MaxInProgressIncomingRequestsPerPeer(maxInProgressIncomingRequestsPerPeer uint64) Option {
143+
return func(gs *graphsyncConfigOptions) {
144+
gs.maxInProgressIncomingRequestsPerPeer = maxInProgressIncomingRequestsPerPeer
145+
}
146+
}
147+
133148
// MaxInProgressOutgoingRequests changes the maximum number of
134-
// graphsync requests that are processed in parallel (default 6)
149+
// outgoing graphsync requests that are processed in parallel (default 6)
135150
func MaxInProgressOutgoingRequests(maxInProgressOutgoingRequests uint64) Option {
136151
return func(gs *graphsyncConfigOptions) {
137152
gs.maxInProgressOutgoingRequests = maxInProgressOutgoingRequests
@@ -202,7 +217,11 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
202217
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
203218
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
204219
responseAssembler := responseassembler.New(ctx, peerManager)
205-
peerTaskQueue := peertaskqueue.New()
220+
var ptqopts []peertaskqueue.Option
221+
if gsConfig.maxInProgressIncomingRequestsPerPeer > 0 {
222+
ptqopts = append(ptqopts, peertaskqueue.MaxOutstandingWorkPerPeer(int(gsConfig.maxInProgressIncomingRequestsPerPeer)))
223+
}
224+
peerTaskQueue := peertaskqueue.New(ptqopts...)
206225
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests, network.ConnectionManager(), gsConfig.maxLinksPerIncomingRequest)
207226
graphSync := &GraphSync{
208227
network: network,

0 commit comments

Comments
 (0)