Skip to content

Commit 5a31334

Browse files
committed
feat: MaxLinks option for channels
1 parent 69709e2 commit 5a31334

File tree

5 files changed

+73
-13
lines changed

5 files changed

+73
-13
lines changed

go.mod

+1-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-cid v0.3.2
1414
github.com/ipfs/go-datastore v0.6.0
1515
github.com/ipfs/go-ds-badger v0.3.0
16-
github.com/ipfs/go-graphsync v0.14.0
16+
github.com/ipfs/go-graphsync v0.14.5-0.20230406112652-a21ee96a5267
1717
github.com/ipfs/go-ipfs-blockstore v1.2.0
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5
@@ -45,7 +45,6 @@ require (
4545
github.com/benbjohnson/clock v1.3.0 // indirect
4646
github.com/beorn7/perks v1.0.1 // indirect
4747
github.com/btcsuite/btcd v0.22.1 // indirect
48-
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
4948
github.com/cespare/xxhash v1.1.0 // indirect
5049
github.com/cespare/xxhash/v2 v2.2.0 // indirect
5150
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect

go.sum

+3-6
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,6 @@ github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13P
8989
github.com/btcsuite/btcd v0.21.0-beta/go.mod h1:ZSWyehm27aAuS9bvkATT+Xte3hjHZ+MRgMY/8NJ7K94=
9090
github.com/btcsuite/btcd v0.22.1 h1:CnwP9LM/M9xuRrGSCGeMVs9iv09uMqwsVX7EeIpgV2c=
9191
github.com/btcsuite/btcd v0.22.1/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y=
92-
github.com/btcsuite/btcd/btcec/v2 v2.1.3 h1:xM/n3yIhHAhHy04z4i43C8p4ehixJZMsnrVJkgl+MTE=
93-
github.com/btcsuite/btcd/btcec/v2 v2.1.3/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE=
94-
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
9592
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
9693
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
9794
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
@@ -402,8 +399,8 @@ github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupV
402399
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
403400
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
404401
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
405-
github.com/ipfs/go-graphsync v0.14.0 h1:f5KYkc8GpwwE1BrjBOWxIkRivXIw7fVqGZlnILpvbSc=
406-
github.com/ipfs/go-graphsync v0.14.0/go.mod h1:1LDVVnNHjit8ddJOtw3Jq9epP792xWFXXL3dJWIBIkM=
402+
github.com/ipfs/go-graphsync v0.14.5-0.20230406112652-a21ee96a5267 h1:w1WtXTbXbM5CuZ8gDzAcTwa1Nloz1WzOl7/jIN726a0=
403+
github.com/ipfs/go-graphsync v0.14.5-0.20230406112652-a21ee96a5267/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo=
407404
github.com/ipfs/go-ipfs-blockstore v1.2.0 h1:n3WTeJ4LdICWs/0VSfjHrlqpPpl6MZ+ySd3j8qz0ykw=
408405
github.com/ipfs/go-ipfs-blockstore v1.2.0/go.mod h1:eh8eTFLiINYNSNawfZOC7HOxNTxpB1PFuA5E1m/7exE=
409406
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
@@ -596,8 +593,8 @@ github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB
596593
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
597594
github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
598595
github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
596+
github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw=
599597
github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
600-
github.com/libp2p/go-libp2p-core v0.19.0 h1:KDw7hanmh0EuVdZqsHCAzmkdiYMk5uR5h0UGSCVTxSU=
601598
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
602599
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
603600
github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw=

transport/graphsync/graphsync.go

+41-5
Original file line numberDiff line numberDiff line change
@@ -350,12 +350,30 @@ func UseStore(store ipld.LinkSystem) datatransfer.TransportOption {
350350
}
351351
}
352352

353+
// MaxLinks sets the maximum number of links for this channelID
354+
func MaxLinks(maxLinks uint64) datatransfer.TransportOption {
355+
return func(channelID datatransfer.ChannelID, transport datatransfer.Transport) error {
356+
gsTransport, ok := transport.(*Transport)
357+
if !ok {
358+
return datatransfer.ErrUnsupported
359+
}
360+
gsTransport.MaxLinks(channelID, maxLinks)
361+
return nil
362+
}
363+
}
364+
353365
// UseStore tells the graphsync transport to use the given loader and storer for this channelID
354366
func (t *Transport) UseStore(channelID datatransfer.ChannelID, lsys ipld.LinkSystem) error {
355367
ch := t.trackDTChannel(channelID)
356368
return ch.useStore(lsys)
357369
}
358370

371+
// MaxLinks sets the maximum number of links for this channelID
372+
func (t *Transport) MaxLinks(channelID datatransfer.ChannelID, maxLinks uint64) {
373+
ch := t.trackDTChannel(channelID)
374+
ch.setMaxLinks(maxLinks)
375+
}
376+
359377
// ChannelGraphsyncRequests describes any graphsync request IDs associated with a given channel
360378
type ChannelGraphsyncRequests struct {
361379
// Current is the current request ID for the transfer
@@ -905,8 +923,9 @@ type dtChannel struct {
905923

906924
opened chan graphsync.RequestID
907925

908-
storeLk sync.RWMutex
926+
optionsLk sync.RWMutex
909927
storeRegistered bool
928+
maxLinksOption uint64
910929
}
911930

912931
// Info needed to monitor an ongoing graphsync request
@@ -1012,6 +1031,7 @@ func (c *dtChannel) gsReqOpened(requestID graphsync.RequestID, hookActions graph
10121031
if c.hasStore() {
10131032
hookActions.UsePersistenceOption("data-transfer-" + c.channelID.String())
10141033
}
1034+
hookActions.MaxLinks(c.maxLinks())
10151035
log.Infow("outgoing graphsync request", "peer", c.channelID.OtherParty(c.t.peerID), "graphsync request id", requestID, "data transfer channel id", c.channelID)
10161036
// Save a mapping from the graphsync key to the channel ID so that
10171037
// subsequent graphsync callbacks are associated with this channel
@@ -1043,6 +1063,8 @@ func (c *dtChannel) gsDataRequestRcvd(requestID graphsync.RequestID, hookActions
10431063
hookActions.UsePersistenceOption("data-transfer-" + c.channelID.String())
10441064
}
10451065

1066+
hookActions.MaxLinks(c.maxLinks())
1067+
10461068
// Save a mapping from the graphsync key to the channel ID so that
10471069
// subsequent graphsync callbacks are associated with this channel
10481070
c.requestID = &requestID
@@ -1139,17 +1161,31 @@ func (c *dtChannel) onRequesterCancelled() {
11391161
}
11401162

11411163
func (c *dtChannel) hasStore() bool {
1142-
c.storeLk.RLock()
1143-
defer c.storeLk.RUnlock()
1164+
c.optionsLk.RLock()
1165+
defer c.optionsLk.RUnlock()
11441166

11451167
return c.storeRegistered
11461168
}
11471169

1170+
func (c *dtChannel) maxLinks() uint64 {
1171+
c.optionsLk.Lock()
1172+
defer c.optionsLk.Unlock()
1173+
1174+
return c.maxLinksOption
1175+
}
1176+
1177+
func (c *dtChannel) setMaxLinks(maxLinks uint64) {
1178+
c.optionsLk.Lock()
1179+
defer c.optionsLk.Unlock()
1180+
1181+
c.maxLinksOption = maxLinks
1182+
}
1183+
11481184
// Use the given loader and storer to get / put blocks for the data-transfer.
11491185
// Note that each data-transfer channel uses a separate blockstore.
11501186
func (c *dtChannel) useStore(lsys ipld.LinkSystem) error {
1151-
c.storeLk.Lock()
1152-
defer c.storeLk.Unlock()
1187+
c.optionsLk.Lock()
1188+
defer c.optionsLk.Unlock()
11531189

11541190
// Register the channel's store with graphsync
11551191
err := c.t.gs.RegisterPersistenceOption("data-transfer-"+c.channelID.String(), lsys)

transport/graphsync/graphsync_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,24 @@ func TestManager(t *testing.T) {
10331033
gsData.fgs.AssertDoesNotHavePersistenceOption(t, expectedChannel)
10341034
},
10351035
},
1036+
"MaxLinks can set maxLinks used for outgoing requests": {
1037+
action: func(gsData *harness) {
1038+
gsData.transport.MaxLinks(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, 101)
1039+
gsData.outgoingRequestHook()
1040+
},
1041+
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
1042+
require.Equal(t, uint64(101), gsData.outgoingRequestHookActions.MaxLinksOption)
1043+
},
1044+
},
1045+
"MaxLinks can set maxLinks used for incoming requests": {
1046+
action: func(gsData *harness) {
1047+
gsData.transport.MaxLinks(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other}, 101)
1048+
gsData.incomingRequestHook()
1049+
},
1050+
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
1051+
require.Equal(t, uint64(101), gsData.incomingRequestHookActions.MaxLinksOption)
1052+
},
1053+
},
10361054
}
10371055

10381056
ctx := context.Background()

transport/graphsync/testharness/fakegraphsync.go

+10
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ func NewFakeResponse(id graphsync.RequestID, extensions map[graphsync.ExtensionN
519519

520520
type FakeOutgoingRequestHookActions struct {
521521
PersistenceOption string
522+
MaxLinksOption uint64
522523
}
523524

524525
func (fa *FakeOutgoingRequestHookActions) UsePersistenceOption(name string) {
@@ -527,6 +528,10 @@ func (fa *FakeOutgoingRequestHookActions) UsePersistenceOption(name string) {
527528
func (fa *FakeOutgoingRequestHookActions) UseLinkTargetNodePrototypeChooser(_ traversal.LinkTargetNodePrototypeChooser) {
528529
}
529530

531+
func (fa *FakeOutgoingRequestHookActions) MaxLinks(maxLinks uint64) {
532+
fa.MaxLinksOption = maxLinks
533+
}
534+
530535
var _ graphsync.OutgoingRequestHookActions = &FakeOutgoingRequestHookActions{}
531536

532537
type FakeIncomingBlockHookActions struct {
@@ -576,6 +581,7 @@ type FakeIncomingRequestHookActions struct {
576581
SentExtensions []graphsync.ExtensionData
577582
Paused bool
578583
CtxAugFuncs []func(context.Context) context.Context
584+
MaxLinksOption uint64
579585
}
580586

581587
func (fa *FakeIncomingRequestHookActions) SendExtensionData(ext graphsync.ExtensionData) {
@@ -625,6 +631,10 @@ func (fa *FakeIncomingRequestHookActions) DTMessage(t *testing.T) datatransfer.M
625631
return matchDtMessage(t, fa.SentExtensions, extension.ExtensionIncomingRequest1_1)
626632
}
627633

634+
func (fa *FakeIncomingRequestHookActions) MaxLinks(maxLinks uint64) {
635+
fa.MaxLinksOption = maxLinks
636+
}
637+
628638
var _ graphsync.IncomingRequestHookActions = &FakeIncomingRequestHookActions{}
629639

630640
type FakeRequestUpdatedActions struct {

0 commit comments

Comments
 (0)