Skip to content

Commit ddaddaf

Browse files
committed
reprovider: apply review suggestions
License: MIT Signed-off-by: Łukasz Magiera <[email protected]>
1 parent 048debe commit ddaddaf

File tree

4 files changed

+18
-21
lines changed

4 files changed

+18
-21
lines changed

core/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
263263
return err
264264
}
265265

266-
var keyProvider func(context.Context) (<-chan *cid.Cid, error)
266+
var keyProvider rp.KeyChanFunc
267267

268268
switch cfg.Reprovider.Strategy {
269269
case "all":
@@ -275,7 +275,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
275275
case "pinned":
276276
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
277277
default:
278-
return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy)
278+
return fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
279279
}
280280
n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)
281281

docs/config.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ a running daemon do not read the config file at runtime.
1515
- [`Identity`](#identity)
1616
- [`Ipns`](#ipns)
1717
- [`Mounts`](#mounts)
18-
- [`Reproviderl`](#reprovider)
18+
- [`Reprovider`](#reprovider)
1919
- [`SupernodeRouting`](#supernoderouting)
2020
- [`Swarm`](#swarm)
2121
- [`Tour`](#tour)

exchange/reprovide/providers.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import (
1111
)
1212

1313
// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
14-
func NewBlockstoreProvider(bstore blocks.Blockstore) keyChanFunc {
14+
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
1515
return func(ctx context.Context) (<-chan *cid.Cid, error) {
1616
return bstore.AllKeysChan(ctx)
1717
}
1818
}
1919

2020
// NewPinnedProvider returns provider supplying pinned keys
21-
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) keyChanFunc {
21+
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc {
2222
return func(ctx context.Context) (<-chan *cid.Cid, error) {
2323
set, err := pinSet(ctx, pinning, dag, onlyRoots)
2424
if err != nil {
@@ -46,6 +46,8 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o
4646
set := newStreamingSet()
4747

4848
go func() {
49+
defer close(set.new)
50+
4951
for _, key := range pinning.DirectKeys() {
5052
set.add(key)
5153
}
@@ -56,41 +58,33 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, o
5658
if !onlyRoots {
5759
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.add)
5860
if err != nil {
59-
return //TODO: propagate to chan / log?
61+
log.Errorf("reprovide indirect pins: %s", err)
62+
return
6063
}
6164
}
6265
}
63-
64-
close(set.new)
6566
}()
6667

6768
return set, nil
6869
}
6970

7071
type streamingSet struct {
71-
set map[string]struct{}
72+
set *cid.Set
7273
new chan *cid.Cid
7374
}
7475

7576
// NewSet initializes and returns a new Set.
7677
func newStreamingSet() *streamingSet {
7778
return &streamingSet{
78-
set: make(map[string]struct{}),
79+
set: cid.NewSet(),
7980
new: make(chan *cid.Cid),
8081
}
8182
}
8283

83-
// has returns if the Set contains a given Cid.
84-
func (s *streamingSet) has(c *cid.Cid) bool {
85-
_, ok := s.set[string(c.Bytes())]
86-
return ok
87-
}
88-
8984
// add adds a Cid to the set only if it is
9085
// not in it already.
9186
func (s *streamingSet) add(c *cid.Cid) bool {
92-
if !s.has(c) {
93-
s.set[string(c.Bytes())] = struct{}{}
87+
if s.set.Visit(c) {
9488
s.new <- c
9589
return true
9690
}

exchange/reprovide/reprovide.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313

1414
var log = logging.Logger("reprovider")
1515

16-
type keyChanFunc func(context.Context) (<-chan *cid.Cid, error)
16+
//KeyChanFunc is function streaming CIDs to pass to content routing
17+
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
1718
type doneFunc func(error)
1819

1920
type Reprovider struct {
@@ -23,11 +24,11 @@ type Reprovider struct {
2324
// The routing system to provide values through
2425
rsys routing.ContentRouting
2526

26-
keyProvider keyChanFunc
27+
keyProvider KeyChanFunc
2728
}
2829

2930
// NewReprovider creates new Reprovider instance.
30-
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider {
31+
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
3132
return &Reprovider{
3233
ctx: ctx,
3334
trigger: make(chan doneFunc),
@@ -52,6 +53,8 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
5253
case <-after:
5354
}
5455

56+
//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
57+
//a 'reprovider is already running' error is returned
5558
unmute := rp.muteTrigger()
5659

5760
err := rp.Reprovide()

0 commit comments

Comments
 (0)