@@ -24,6 +24,9 @@ import (
24
24
"context"
25
25
"fmt"
26
26
"os"
27
+ "sync"
28
+
29
+ "golang.org/x/sync/errgroup"
27
30
28
31
format "github.com/ipfs/go-unixfs"
29
32
"github.com/ipfs/go-unixfs/internal"
@@ -372,59 +375,190 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
372
375
go func () {
373
376
defer close (linkResults )
374
377
defer cancel ()
375
- getLinks := makeAsyncTrieGetLinks (ds .dserv , linkResults )
376
- cset := cid .NewSet ()
377
- rootNode , err := ds .Node ()
378
- if err != nil {
379
- emitResult (ctx , linkResults , format.LinkResult {Link : nil , Err : err })
380
- return
381
- }
382
- err = dag .Walk (ctx , getLinks , rootNode .Cid (), cset .Visit , dag .Concurrent ())
378
+
379
+ err := parallelShardWalk (ctx , ds , ds .dserv , func (formattedLink * ipld.Link ) error {
380
+ emitResult (ctx , linkResults , format.LinkResult {Link : formattedLink , Err : nil })
381
+ return nil
382
+ })
383
383
if err != nil {
384
384
emitResult (ctx , linkResults , format.LinkResult {Link : nil , Err : err })
385
385
}
386
386
}()
387
387
return linkResults
388
388
}
389
389
390
- // makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
391
- // to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
392
- // on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
393
- func makeAsyncTrieGetLinks (dagService ipld.DAGService , linkResults chan <- format.LinkResult ) dag.GetLinks {
394
-
395
- return func (ctx context.Context , currentCid cid.Cid ) ([]* ipld.Link , error ) {
396
- node , err := dagService .Get (ctx , currentCid )
397
- if err != nil {
398
- return nil , err
399
- }
400
- directoryShard , err := NewHamtFromDag (dagService , node )
401
- if err != nil {
402
- return nil , err
403
- }
390
+ type listCidsAndShards struct {
391
+ cids []cid.Cid
392
+ shards []* Shard
393
+ }
404
394
405
- childShards := make ([]* ipld.Link , 0 , directoryShard .childer .length ())
406
- links := directoryShard .childer .links
407
- for idx := range directoryShard .childer .children {
408
- lnk := links [idx ]
409
- lnkLinkType , err := directoryShard .childLinkType (lnk )
395
+ func (ds * Shard ) walkChildren (processLinkValues func (formattedLink * ipld.Link ) error ) (* listCidsAndShards , error ) {
396
+ res := & listCidsAndShards {}
410
397
398
+ for idx , lnk := range ds .childer .links {
399
+ if nextShard := ds .childer .children [idx ]; nextShard == nil {
400
+ lnkLinkType , err := ds .childLinkType (lnk )
411
401
if err != nil {
412
402
return nil , err
413
403
}
414
- if lnkLinkType == shardLink {
415
- childShards = append ( childShards , lnk )
416
- } else {
417
- sv , err := directoryShard .makeShardValue (lnk )
404
+
405
+ switch lnkLinkType {
406
+ case shardValueLink :
407
+ sv , err := ds .makeShardValue (lnk )
418
408
if err != nil {
419
409
return nil , err
420
410
}
421
411
formattedLink := sv .val
422
412
formattedLink .Name = sv .key
423
- emitResult (ctx , linkResults , format.LinkResult {Link : formattedLink , Err : nil })
413
+
414
+ if err := processLinkValues (formattedLink ); err != nil {
415
+ return nil , err
416
+ }
417
+ case shardLink :
418
+ res .cids = append (res .cids , lnk .Cid )
419
+ default :
420
+ return nil , fmt .Errorf ("unsupported shard link type" )
421
+ }
422
+
423
+ } else {
424
+ if nextShard .val != nil {
425
+ formattedLink := & ipld.Link {
426
+ Name : nextShard .key ,
427
+ Size : nextShard .val .Size ,
428
+ Cid : nextShard .val .Cid ,
429
+ }
430
+ if err := processLinkValues (formattedLink ); err != nil {
431
+ return nil , err
432
+ }
433
+ } else {
434
+ res .shards = append (res .shards , nextShard )
435
+ }
436
+ }
437
+ }
438
+ return res , nil
439
+ }
440
+
441
+ // parallelShardWalk is quite similar to the DAG walking algorithm from https://github.com/ipfs/go-merkledag/blob/594e515f162e764183243b72c2ba84f743424c8c/merkledag.go#L464
442
+ // However, there are a few notable differences:
443
+ // 1. Some children are actualized Shard structs and some are in the blockstore, this will leverage walking over the in memory Shards as well as the stored blocks
444
+ // 2. Instead of just passing each child into the worker pool by itself we group them so that we can leverage optimizations from GetMany.
445
+ // This optimization also makes the walk a little more biased towards depth (as opposed to BFS) in the earlier part of the DAG.
446
+ // This is particularly helpful for operations like estimating the directory size which should complete quickly when possible.
447
+ // 3. None of the extra options from that package are needed
448
+ func parallelShardWalk (ctx context.Context , root * Shard , dserv ipld.DAGService , processShardValues func (formattedLink * ipld.Link ) error ) error {
449
+ const concurrency = 32
450
+
451
+ var visitlk sync.Mutex
452
+ visitSet := cid .NewSet ()
453
+ visit := visitSet .Visit
454
+
455
+ // Setup synchronization
456
+ grp , errGrpCtx := errgroup .WithContext (ctx )
457
+
458
+ // Input and output queues for workers.
459
+ feed := make (chan * listCidsAndShards )
460
+ out := make (chan * listCidsAndShards )
461
+ done := make (chan struct {})
462
+
463
+ for i := 0 ; i < concurrency ; i ++ {
464
+ grp .Go (func () error {
465
+ for feedChildren := range feed {
466
+ for _ , nextShard := range feedChildren .shards {
467
+ nextChildren , err := nextShard .walkChildren (processShardValues )
468
+ if err != nil {
469
+ return err
470
+ }
471
+
472
+ select {
473
+ case out <- nextChildren :
474
+ case <- errGrpCtx .Done ():
475
+ return nil
476
+ }
477
+ }
478
+
479
+ var linksToVisit []cid.Cid
480
+ for _ , nextCid := range feedChildren .cids {
481
+ var shouldVisit bool
482
+
483
+ visitlk .Lock ()
484
+ shouldVisit = visit (nextCid )
485
+ visitlk .Unlock ()
486
+
487
+ if shouldVisit {
488
+ linksToVisit = append (linksToVisit , nextCid )
489
+ }
490
+ }
491
+
492
+ chNodes := dserv .GetMany (errGrpCtx , linksToVisit )
493
+ for optNode := range chNodes {
494
+ if optNode .Err != nil {
495
+ return optNode .Err
496
+ }
497
+
498
+ nextShard , err := NewHamtFromDag (dserv , optNode .Node )
499
+ if err != nil {
500
+ return err
501
+ }
502
+
503
+ nextChildren , err := nextShard .walkChildren (processShardValues )
504
+ if err != nil {
505
+ return err
506
+ }
507
+
508
+ select {
509
+ case out <- nextChildren :
510
+ case <- errGrpCtx .Done ():
511
+ return nil
512
+ }
513
+ }
514
+
515
+ select {
516
+ case done <- struct {}{}:
517
+ case <- errGrpCtx .Done ():
518
+ }
519
+ }
520
+ return nil
521
+ })
522
+ }
523
+
524
+ send := feed
525
+ var todoQueue []* listCidsAndShards
526
+ var inProgress int
527
+
528
+ next := & listCidsAndShards {
529
+ shards : []* Shard {root },
530
+ }
531
+
532
+ dispatcherLoop:
533
+ for {
534
+ select {
535
+ case send <- next :
536
+ inProgress ++
537
+ if len (todoQueue ) > 0 {
538
+ next = todoQueue [0 ]
539
+ todoQueue = todoQueue [1 :]
540
+ } else {
541
+ next = nil
542
+ send = nil
543
+ }
544
+ case <- done :
545
+ inProgress --
546
+ if inProgress == 0 && next == nil {
547
+ break dispatcherLoop
548
+ }
549
+ case nextNodes := <- out :
550
+ if next == nil {
551
+ next = nextNodes
552
+ send = feed
553
+ } else {
554
+ todoQueue = append (todoQueue , nextNodes )
424
555
}
556
+ case <- errGrpCtx .Done ():
557
+ break dispatcherLoop
425
558
}
426
- return childShards , nil
427
559
}
560
+ close (feed )
561
+ return grp .Wait ()
428
562
}
429
563
430
564
func emitResult (ctx context.Context , linkResults chan <- format.LinkResult , r format.LinkResult ) {
0 commit comments