diff --git a/go.mod b/go.mod index 87783c9..a6ad9fb 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger v0.3.0 - github.com/ipfs/go-graphsync v0.14.5 + github.com/ipfs/go-graphsync v0.14.6 github.com/ipfs/go-ipfs-blockstore v1.2.0 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index 2fbc95f..c7839ac 100644 --- a/go.sum +++ b/go.sum @@ -399,8 +399,8 @@ github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupV github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.14.5 h1:SKQog4ZABe+yy7OtTsrMoSQfFmEPPi0qP5sl6bFN8xM= -github.com/ipfs/go-graphsync v0.14.5/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo= +github.com/ipfs/go-graphsync v0.14.6 h1:NPxvuUy4Z08Mg8dwpBzwgbv/PGLIufSJ1sle6iAX8yo= +github.com/ipfs/go-graphsync v0.14.6/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo= github.com/ipfs/go-ipfs-blockstore v1.2.0 h1:n3WTeJ4LdICWs/0VSfjHrlqpPpl6MZ+ySd3j8qz0ykw= github.com/ipfs/go-ipfs-blockstore v1.2.0/go.mod h1:eh8eTFLiINYNSNawfZOC7HOxNTxpB1PFuA5E1m/7exE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 5ca59e9..d4a4db7 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -350,12 +350,30 @@ func UseStore(store ipld.LinkSystem) datatransfer.TransportOption { } } +// MaxLinks sets the maximum number of links for this channelID +func MaxLinks(maxLinks uint64) datatransfer.TransportOption { + return func(channelID datatransfer.ChannelID, transport datatransfer.Transport) error { + gsTransport, ok := transport.(*Transport) + if !ok { + return datatransfer.ErrUnsupported + } + gsTransport.MaxLinks(channelID, maxLinks) + return nil + } +} + // UseStore tells the graphsync transport to use the given loader and storer for this channelID func (t *Transport) UseStore(channelID datatransfer.ChannelID, lsys ipld.LinkSystem) error { ch := t.trackDTChannel(channelID) return ch.useStore(lsys) } +// MaxLinks sets the maximum number of links for this channelID +func (t *Transport) MaxLinks(channelID datatransfer.ChannelID, maxLinks uint64) { + ch := t.trackDTChannel(channelID) + ch.setMaxLinks(maxLinks) +} + // ChannelGraphsyncRequests describes any graphsync request IDs associated with a given channel type ChannelGraphsyncRequests struct { // Current is the current request ID for the transfer @@ -905,8 +923,9 @@ type dtChannel struct { opened chan graphsync.RequestID - storeLk sync.RWMutex + optionsLk sync.RWMutex storeRegistered bool + maxLinksOption uint64 } // Info needed to monitor an ongoing graphsync request @@ -1012,6 +1031,7 @@ func (c *dtChannel) gsReqOpened(requestID graphsync.RequestID, hookActions graph if c.hasStore() { hookActions.UsePersistenceOption("data-transfer-" + c.channelID.String()) } + hookActions.MaxLinks(c.maxLinks()) log.Infow("outgoing graphsync request", "peer", c.channelID.OtherParty(c.t.peerID), "graphsync request id", requestID, "data transfer channel id", c.channelID) // Save a mapping from the graphsync key to the channel ID so that // subsequent graphsync callbacks are associated with this channel @@ -1043,6 +1063,8 @@ func (c *dtChannel) gsDataRequestRcvd(requestID graphsync.RequestID, hookActions hookActions.UsePersistenceOption("data-transfer-" + c.channelID.String()) } + hookActions.MaxLinks(c.maxLinks()) + // Save a mapping from the graphsync key to the channel ID so that // subsequent graphsync callbacks are associated with this channel c.requestID = &requestID @@ -1139,17 +1161,31 @@ func (c *dtChannel) onRequesterCancelled() { } func (c *dtChannel) hasStore() bool { - c.storeLk.RLock() - defer c.storeLk.RUnlock() + c.optionsLk.RLock() + defer c.optionsLk.RUnlock() return c.storeRegistered } +func (c *dtChannel) maxLinks() uint64 { + c.optionsLk.Lock() + defer c.optionsLk.Unlock() + + return c.maxLinksOption +} + +func (c *dtChannel) setMaxLinks(maxLinks uint64) { + c.optionsLk.Lock() + defer c.optionsLk.Unlock() + + c.maxLinksOption = maxLinks +} + // Use the given loader and storer to get / put blocks for the data-transfer. // Note that each data-transfer channel uses a separate blockstore. func (c *dtChannel) useStore(lsys ipld.LinkSystem) error { - c.storeLk.Lock() - defer c.storeLk.Unlock() + c.optionsLk.Lock() + defer c.optionsLk.Unlock() // Register the channel's store with graphsync err := c.t.gs.RegisterPersistenceOption("data-transfer-"+c.channelID.String(), lsys) diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 2c0cb8b..6a728c9 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -1033,6 +1033,24 @@ func TestManager(t *testing.T) { gsData.fgs.AssertDoesNotHavePersistenceOption(t, expectedChannel) }, }, + "MaxLinks can set maxLinks used for outgoing requests": { + action: func(gsData *harness) { + gsData.transport.MaxLinks(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, 101) + gsData.outgoingRequestHook() + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + require.Equal(t, uint64(101), gsData.outgoingRequestHookActions.MaxLinksOption) + }, + }, + "MaxLinks can set maxLinks used for incoming requests": { + action: func(gsData *harness) { + gsData.transport.MaxLinks(datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.self, Initiator: gsData.other}, 101) + gsData.incomingRequestHook() + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + require.Equal(t, uint64(101), gsData.incomingRequestHookActions.MaxLinksOption) + }, + }, } ctx := context.Background() diff --git a/transport/graphsync/testharness/fakegraphsync.go b/transport/graphsync/testharness/fakegraphsync.go index 61697fb..763ce08 100644 --- a/transport/graphsync/testharness/fakegraphsync.go +++ b/transport/graphsync/testharness/fakegraphsync.go @@ -519,6 +519,7 @@ func NewFakeResponse(id graphsync.RequestID, extensions map[graphsync.ExtensionN type FakeOutgoingRequestHookActions struct { PersistenceOption string + MaxLinksOption uint64 } func (fa *FakeOutgoingRequestHookActions) UsePersistenceOption(name string) { @@ -527,6 +528,10 @@ func (fa *FakeOutgoingRequestHookActions) UsePersistenceOption(name string) { func (fa *FakeOutgoingRequestHookActions) UseLinkTargetNodePrototypeChooser(_ traversal.LinkTargetNodePrototypeChooser) { } +func (fa *FakeOutgoingRequestHookActions) MaxLinks(maxLinks uint64) { + fa.MaxLinksOption = maxLinks +} + var _ graphsync.OutgoingRequestHookActions = &FakeOutgoingRequestHookActions{} type FakeIncomingBlockHookActions struct { @@ -576,6 +581,7 @@ type FakeIncomingRequestHookActions struct { SentExtensions []graphsync.ExtensionData Paused bool CtxAugFuncs []func(context.Context) context.Context + MaxLinksOption uint64 } func (fa *FakeIncomingRequestHookActions) SendExtensionData(ext graphsync.ExtensionData) { @@ -625,6 +631,10 @@ func (fa *FakeIncomingRequestHookActions) DTMessage(t *testing.T) datatransfer.M return matchDtMessage(t, fa.SentExtensions, extension.ExtensionIncomingRequest1_1) } +func (fa *FakeIncomingRequestHookActions) MaxLinks(maxLinks uint64) { + fa.MaxLinksOption = maxLinks +} + var _ graphsync.IncomingRequestHookActions = &FakeIncomingRequestHookActions{} type FakeRequestUpdatedActions struct {