Skip to content

Commit 622bab2

Browse files
committed
feat: use traversal link budget instead of limitstore
Ref: ipfs/go-graphsync#420 Ref: filecoin-project/go-data-transfer#374
1 parent c603562 commit 622bab2

File tree

13 files changed

+51
-137
lines changed

13 files changed

+51
-137
lines changed

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ require (
66
contrib.go.opencensus.io/exporter/prometheus v0.4.2
77
github.com/benbjohnson/clock v1.3.0
88
github.com/dustin/go-humanize v1.0.1
9-
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5
9+
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5.0.20230406122351-99ba9d323b96
1010
github.com/filecoin-project/go-state-types v0.10.0
1111
github.com/google/uuid v1.3.0
1212
github.com/hannahhoward/go-pubsub v1.0.0
1313
github.com/ipfs/go-blockservice v0.5.0
1414
github.com/ipfs/go-cid v0.4.0
1515
github.com/ipfs/go-datastore v0.6.0
16-
github.com/ipfs/go-graphsync v0.14.4
16+
github.com/ipfs/go-graphsync v0.14.5-0.20230406121333-d40f493cf2d0
1717
github.com/ipfs/go-ipfs-blockstore v1.3.0
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-delay v0.0.1

go.sum

+4-4
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ github.com/filecoin-project/go-commp-utils v0.1.3/go.mod h1:3ENlD1pZySaUout0p9AN
125125
github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082a837/go.mod h1:e2YBjSblNVoBckkbv3PPqsq71q98oFkFqL7s1etViGo=
126126
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
127127
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
128-
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5 h1:lqjkVplfTRt5GV7Pxjo+H+Jhnh7tgUIhKFFLxGvjv1Y=
129-
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5/go.mod h1:YY4onZJ9LoSP19kdJWD7PZ0ZDJSQnbcEXYfjezMLTog=
128+
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5.0.20230406122351-99ba9d323b96 h1:D6zEwN3s1pWLGAGjPHDu8b2Bjl31MXqTFFdJ3avBihw=
129+
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5.0.20230406122351-99ba9d323b96/go.mod h1:j2J58y4HLzDc6sXLJzuM+ONZm59SbJDEw4u1XA5JCfM=
130130
github.com/filecoin-project/go-ds-versioning v0.1.2 h1:to4pTadv3IeV1wvgbCbN6Vqd+fu+7tveXgv/rCEZy6w=
131131
github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
132132
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
@@ -346,8 +346,8 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M
346346
github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8=
347347
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
348348
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
349-
github.com/ipfs/go-graphsync v0.14.4 h1:ysazATpwsIjYtYEZH5CdD/HRaonCJd4pASUtnzESewk=
350-
github.com/ipfs/go-graphsync v0.14.4/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo=
349+
github.com/ipfs/go-graphsync v0.14.5-0.20230406121333-d40f493cf2d0 h1:2VHJORBuN9wL2TmySBgHvlpZLM2Sq7BEW66Imho0lbA=
350+
github.com/ipfs/go-graphsync v0.14.5-0.20230406121333-d40f493cf2d0/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo=
351351
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
352352
github.com/ipfs/go-ipfs-blockstore v1.3.0 h1:m2EXaWgwTzAfsmt5UdJ7Is6l4gJcaM/A12XwJyvYvMM=
353353
github.com/ipfs/go-ipfs-blockstore v1.3.0/go.mod h1:KgtZyc9fq+P2xJUiCAzbRdhhqJHvsw8u2Dlqy2MyRTE=

pkg/internal/itest/client_retrieval_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func runRetrieval(t *testing.T, ctx context.Context, mrn *mocknet.MockRetrievalN
128128
mrn.Remotes[0].Host.ID(),
129129
proposal,
130130
selectorparse.CommonSelector_ExploreAllRecursively,
131+
0,
131132
subscriberLocal,
132133
shutdown,
133134
)

pkg/internal/testutil/mockclient.go

+1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func (mc *MockClient) RetrieveFromPeer(
169169
peerID peer.ID,
170170
proposal *retrievaltypes.DealProposal,
171171
selector ipld.Node,
172+
maxBlocks uint64,
172173
eventsCallback datatransfer.Subscriber,
173174
gracefulShutdownRequested <-chan struct{},
174175
) (*types.RetrievalStats, error) {

pkg/net/client/client.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ func (rc *RetrievalClient) RetrieveFromPeer(
182182
peerID peer.ID,
183183
proposal *retrievaltypes.DealProposal,
184184
sel ipld.Node,
185+
maxBlocks uint64,
185186
eventsCallback datatransfer.Subscriber,
186187
gracefulShutdownRequested <-chan struct{},
187188
) (*types.RetrievalStats, error) {
@@ -309,7 +310,10 @@ func (rc *RetrievalClient) RetrieveFromPeer(
309310
proposal.PayloadCID,
310311
sel,
311312
datatransfer.WithSubscriber(eventsCb),
312-
datatransfer.WithTransportOptions(dttransport.UseStore(linkSystem)),
313+
datatransfer.WithTransportOptions(
314+
dttransport.UseStore(linkSystem),
315+
dttransport.MaxLinks(maxBlocks),
316+
),
313317
)
314318
if err != nil {
315319
// We could fail before a successful proposal

pkg/net/client/client_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func TestClient(t *testing.T) {
5959
p,
6060
proposal,
6161
selector,
62+
0,
6263
eventsCb,
6364
gracefulShutdownRequested,
6465
)
@@ -106,6 +107,7 @@ func TestClient_BadSelector(t *testing.T) {
106107
p,
107108
proposal,
108109
selector,
110+
0,
109111
eventsCb,
110112
gracefulShutdownRequested,
111113
)

pkg/retriever/bitswapretriever.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"math"
89
"os"
910
"sync/atomic"
1011
"time"
@@ -229,7 +230,14 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
229230
)
230231

231232
// run the retrieval
232-
err = easyTraverse(ctx, cidlink.Link{Cid: br.request.Cid}, selector, storage.TraversalLinkSystem, storage.Preloader)
233+
err = easyTraverse(
234+
ctx,
235+
cidlink.Link{Cid: br.request.Cid},
236+
selector,
237+
storage.TraversalLinkSystem,
238+
storage.Preloader,
239+
br.request.MaxBlocks,
240+
)
233241
storage.Stop()
234242
cancel()
235243

@@ -301,6 +309,7 @@ func easyTraverse(
301309
traverseSelector datamodel.Node,
302310
lsys *linking.LinkSystem,
303311
preloader preload.Loader,
312+
maxBlocks uint64,
304313
) error {
305314

306315
protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser)
@@ -323,6 +332,12 @@ func easyTraverse(
323332
Preloader: preloader,
324333
},
325334
}
335+
if maxBlocks > 0 {
336+
progress.Budget = &traversal.Budget{
337+
LinkBudget: int64(maxBlocks) - 1, // first block is already loaded
338+
NodeBudget: math.MaxInt64,
339+
}
340+
}
326341
progress.LastBlock.Link = root
327342
compiledSelector, err := selector.ParseSelector(traverseSelector)
328343
if err != nil {

pkg/retriever/graphsyncretriever.go

+3
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ func runRetrievalCandidate(
285285
timeout,
286286
candidate,
287287
req.GetSelector(),
288+
req.MaxBlocks,
288289
eventsCallback,
289290
)
290291

@@ -371,6 +372,7 @@ func retrievalPhase(
371372
timeout time.Duration,
372373
candidate types.RetrievalCandidate,
373374
selector ipld.Node,
375+
maxBlocks uint64,
374376
eventsCallback datatransfer.Subscriber,
375377
) (*types.RetrievalStats, error) {
376378

@@ -445,6 +447,7 @@ func retrievalPhase(
445447
candidate.MinerPeer.ID,
446448
proposal,
447449
selector,
450+
uint64(maxBlocks),
448451
eventsSubscriber,
449452
gracefulShutdownChan,
450453
)

pkg/retriever/retrievalclient.go

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type RetrievalClient interface {
1818
peerID peer.ID,
1919
proposal *retrievaltypes.DealProposal,
2020
selector ipld.Node,
21+
maxLinks uint64,
2122
eventsCallback datatransfer.Subscriber,
2223
gracefulShutdownRequested <-chan struct{},
2324
) (*types.RetrievalStats, error)

pkg/server/http/ipfs.go

+15-16
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
lassie "github.com/filecoin-project/lassie/pkg/lassie"
1313
"github.com/filecoin-project/lassie/pkg/retriever"
1414
"github.com/filecoin-project/lassie/pkg/storage"
15-
"github.com/filecoin-project/lassie/pkg/storage/limitstore"
1615
"github.com/filecoin-project/lassie/pkg/types"
1716
"github.com/ipfs/go-cid"
1817
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
@@ -173,21 +172,6 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response
173172
}()
174173
var store types.ReadableWritableStorage = carStore
175174

176-
// extract block limit from query param as needed
177-
var blockLimit uint64
178-
if req.URL.Query().Has("blockLimit") {
179-
if parsedBlockLimit, err := strconv.ParseUint(req.URL.Query().Get("blockLimit"), 10, 64); err == nil {
180-
blockLimit = parsedBlockLimit
181-
}
182-
}
183-
if cfg.MaxBlocksPerRequest > 0 || blockLimit > 0 {
184-
// use the lowest non-zero value for block limit
185-
if blockLimit == 0 || (cfg.MaxBlocksPerRequest > 0 && blockLimit > cfg.MaxBlocksPerRequest) {
186-
blockLimit = cfg.MaxBlocksPerRequest
187-
}
188-
store = limitstore.NewLimitStore(carStore, blockLimit)
189-
}
190-
191175
carWriter.OnPut(func(int) {
192176
// called once we start writing blocks into the CAR (on the first Put())
193177
res.Header().Set("Content-Disposition", "attachment; filename="+filename)
@@ -221,6 +205,21 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response
221205
request.PreloadLinkSystem.SetWriteStorage(preloadStore)
222206
request.PreloadLinkSystem.TrustedStorage = true
223207

208+
// extract block limit from query param as needed
209+
var blockLimit uint64
210+
if req.URL.Query().Has("blockLimit") {
211+
if parsedBlockLimit, err := strconv.ParseUint(req.URL.Query().Get("blockLimit"), 10, 64); err == nil {
212+
blockLimit = parsedBlockLimit
213+
}
214+
}
215+
if cfg.MaxBlocksPerRequest > 0 || blockLimit > 0 {
216+
// use the lowest non-zero value for block limit
217+
if blockLimit == 0 || (cfg.MaxBlocksPerRequest > 0 && blockLimit > cfg.MaxBlocksPerRequest) {
218+
blockLimit = cfg.MaxBlocksPerRequest
219+
}
220+
request.MaxBlocks = blockLimit
221+
}
222+
224223
log.Debugw("fetching CID", "retrievalId", retrievalId, "CID", rootCid.String(), "path", unixfsPath, "fullFetch", fullFetch)
225224
stats, err := lassie.Fetch(req.Context(), request)
226225
if err != nil {

pkg/storage/limitstore/limitstore.go

-54
This file was deleted.

pkg/storage/limitstore/limitstore_test.go

-59
This file was deleted.

pkg/types/request.go

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type RetrievalRequest struct {
5151
Selector ipld.Node
5252
Protocols []multicodec.Code
5353
PreloadLinkSystem ipld.LinkSystem
54+
MaxBlocks uint64
5455
}
5556

5657
// NewRequestForPath creates a new RetrievalRequest from the provided parameters

0 commit comments

Comments
 (0)