@@ -394,56 +394,51 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit
394
394
var FetchGraphConcurrency = 8
395
395
396
396
func EnumerateChildrenAsync (ctx context.Context , ds DAGService , c * cid.Cid , visit func (* cid.Cid ) bool ) error {
397
- if ! visit (c ) {
398
- return nil
399
- }
400
-
401
- root , err := ds .Get (ctx , c )
402
- if err != nil {
403
- return err
404
- }
405
-
406
- feed := make (chan node.Node )
407
- out := make (chan * NodeOption )
397
+ feed := make (chan * cid.Cid )
398
+ out := make (chan node.Node )
408
399
done := make (chan struct {})
409
400
410
401
var setlk sync.Mutex
411
-
402
+
403
+ errChan := make (chan error )
404
+ fetchersCtx , cancel := context .WithCancel (ctx )
405
+
406
+ defer cancel ()
407
+
412
408
for i := 0 ; i < FetchGraphConcurrency ; i ++ {
413
409
go func () {
414
- for n := range feed {
415
- links := n .Links ()
416
- cids := make ([]* cid.Cid , 0 , len (links ))
417
- for _ , l := range links {
418
- setlk .Lock ()
419
- unseen := visit (l .Cid )
420
- setlk .Unlock ()
421
- if unseen {
422
- cids = append (cids , l .Cid )
423
- }
410
+ for ic := range feed {
411
+ n , err := ds .Get (ctx , ic )
412
+ if err != nil {
413
+ errChan <- err
414
+ return
424
415
}
425
-
426
- for nopt := range ds .GetMany (ctx , cids ) {
416
+
417
+ setlk .Lock ()
418
+ unseen := visit (ic )
419
+ setlk .Unlock ()
420
+
421
+ if unseen {
427
422
select {
428
- case out <- nopt :
429
- case <- ctx .Done ():
423
+ case out <- n :
424
+ case <- fetchersCtx .Done ():
430
425
return
431
426
}
432
427
}
433
428
select {
434
429
case done <- struct {}{}:
435
- case <- ctx .Done ():
430
+ case <- fetchersCtx .Done ():
436
431
}
437
432
}
438
433
}()
439
434
}
440
435
defer close (feed )
441
436
442
437
send := feed
443
- var todobuffer []node. Node
438
+ var todobuffer []* cid. Cid
444
439
var inProgress int
445
440
446
- next := root
441
+ next := c
447
442
for {
448
443
select {
449
444
case send <- next :
@@ -460,18 +455,18 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
460
455
if inProgress == 0 && next == nil {
461
456
return nil
462
457
}
463
- case nc := <- out :
464
- if nc .Err != nil {
465
- return nc .Err
466
- }
467
-
468
- if next == nil {
469
- next = nc .Node
470
- send = feed
471
- } else {
472
- todobuffer = append (todobuffer , nc .Node )
458
+ case nd := <- out :
459
+ for _ , lnk := range nd .Links () {
460
+ if next == nil {
461
+ next = lnk .Cid
462
+ send = feed
463
+ } else {
464
+ todobuffer = append (todobuffer , lnk .Cid )
465
+ }
473
466
}
474
-
467
+ case err := <- errChan :
468
+ return err
469
+
475
470
case <- ctx .Done ():
476
471
return ctx .Err ()
477
472
}
0 commit comments