Skip to content

Commit 014a9cd

Browse files
authored
Merge pull request ipfs/go-ipfs-provider#25 from ipfs/fix/two-bugs
fix several bugs This commit was moved from ipfs/go-ipfs-provider@6d2a4b9
2 parents cb9da11 + abe7522 commit 014a9cd

File tree

2 files changed

+146
-55
lines changed

2 files changed

+146
-55
lines changed

provider/simple/reprovide.go

Lines changed: 64 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package simple
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -18,14 +19,21 @@ import (
1819

1920
var logR = logging.Logger("reprovider.simple")
2021

22+
// ErrClosed is returned by Trigger when operating on a closed reprovider.
23+
var ErrClosed = errors.New("reprovider service stopped")
24+
2125
// KeyChanFunc is function streaming CIDs to pass to content routing
2226
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
23-
type doneFunc func(error)
2427

2528
// Reprovider reannounces blocks to the network
2629
type Reprovider struct {
27-
ctx context.Context
28-
trigger chan doneFunc
30+
// Reprovider context. Cancel to stop, then wait on closedCh.
31+
ctx context.Context
32+
cancel context.CancelFunc
33+
closedCh chan struct{}
34+
35+
// Trigger triggers a reprovide.
36+
trigger chan chan<- error
2937

3038
// The routing system to provide values through
3139
rsys routing.ContentRouting
@@ -37,9 +45,12 @@ type Reprovider struct {
3745

3846
// NewReprovider creates new Reprovider instance.
3947
func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
48+
ctx, cancel := context.WithCancel(ctx)
4049
return &Reprovider{
41-
ctx: ctx,
42-
trigger: make(chan doneFunc),
50+
ctx: ctx,
51+
cancel: cancel,
52+
closedCh: make(chan struct{}),
53+
trigger: make(chan chan<- error),
4354

4455
rsys: rsys,
4556
keyProvider: keyProvider,
@@ -49,44 +60,60 @@ func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys r
4960

5061
// Close the reprovider
5162
func (rp *Reprovider) Close() error {
63+
rp.cancel()
64+
<-rp.closedCh
5265
return nil
5366
}
5467

5568
// Run re-provides keys with 'tick' interval or when triggered
5669
func (rp *Reprovider) Run() {
57-
// dont reprovide immediately.
58-
// may have just started the daemon and shutting it down immediately.
59-
// probability( up another minute | uptime ) increases with uptime.
60-
after := time.After(time.Minute)
61-
var done doneFunc
62-
for {
63-
if rp.tick == 0 {
64-
after = make(chan time.Time)
70+
defer close(rp.closedCh)
71+
72+
var initialReprovideCh, reprovideCh <-chan time.Time
73+
74+
// If reproviding is enabled (non-zero)
75+
if rp.tick > 0 {
76+
reprovideTicker := time.NewTicker(rp.tick)
77+
defer reprovideTicker.Stop()
78+
reprovideCh = reprovideTicker.C
79+
80+
// If the reprovide ticker is larger than a minute (likely),
81+
// provide once after we've been up a minute.
82+
//
83+
// Don't provide _immediately_ as we might be just about to stop.
84+
if rp.tick > time.Minute {
85+
initialReprovideTimer := time.NewTimer(time.Minute)
86+
defer initialReprovideTimer.Stop()
87+
88+
initialReprovideCh = initialReprovideTimer.C
6589
}
90+
}
6691

92+
var done chan<- error
93+
for rp.ctx.Err() == nil {
6794
select {
95+
case <-initialReprovideCh:
96+
case <-reprovideCh:
97+
case done = <-rp.trigger:
6898
case <-rp.ctx.Done():
6999
return
70-
case done = <-rp.trigger:
71-
case <-after:
72100
}
73101

74-
//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
75-
//a 'reprovider is already running' error is returned
76-
unmute := rp.muteTrigger()
77-
78102
err := rp.Reprovide()
79-
if err != nil {
103+
104+
// only log if we've hit an actual error, otherwise just tell the client we're shutting down
105+
if rp.ctx.Err() != nil {
106+
err = ErrClosed
107+
} else if err != nil {
80108
logR.Errorf("failed to reprovide: %s", err)
81109
}
82110

83111
if done != nil {
84-
done(err)
112+
if err != nil {
113+
done <- err
114+
}
115+
close(done)
85116
}
86-
87-
unmute()
88-
89-
after = time.After(rp.tick)
90117
}
91118
}
92119

@@ -119,44 +146,27 @@ func (rp *Reprovider) Reprovide() error {
119146
return nil
120147
}
121148

122-
// Trigger starts reprovision process in rp.Run and waits for it
149+
// Trigger starts the reprovision process in rp.Run and waits for it to finish.
150+
//
151+
// Returns an error if a reprovide is already in progress.
123152
func (rp *Reprovider) Trigger(ctx context.Context) error {
124-
progressCtx, done := context.WithCancel(ctx)
125-
126-
var err error
127-
df := func(e error) {
128-
err = e
129-
done()
153+
resultCh := make(chan error, 1)
154+
select {
155+
case rp.trigger <- resultCh:
156+
default:
157+
return fmt.Errorf("reprovider is already running")
130158
}
131159

132160
select {
161+
case err := <-resultCh:
162+
return err
133163
case <-rp.ctx.Done():
134-
return context.Canceled
164+
return ErrClosed
135165
case <-ctx.Done():
136-
return context.Canceled
137-
case rp.trigger <- df:
138-
<-progressCtx.Done()
139-
return err
166+
return ctx.Err()
140167
}
141168
}
142169

143-
func (rp *Reprovider) muteTrigger() context.CancelFunc {
144-
ctx, cf := context.WithCancel(rp.ctx)
145-
go func() {
146-
defer cf()
147-
for {
148-
select {
149-
case <-ctx.Done():
150-
return
151-
case done := <-rp.trigger:
152-
done(fmt.Errorf("reprovider is already running"))
153-
}
154-
}
155-
}()
156-
157-
return cf
158-
}
159-
160170
// Strategies
161171

162172
// NewBlockstoreProvider returns key provider using bstore.AllKeysChan

provider/simple/reprovide_test.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,22 @@ func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) {
6363
}
6464

6565
func TestReprovide(t *testing.T) {
66+
testReprovide(t, func(r *Reprovider, ctx context.Context) error {
67+
return r.Reprovide()
68+
})
69+
}
70+
71+
func TestTrigger(t *testing.T) {
72+
testReprovide(t, func(r *Reprovider, ctx context.Context) error {
73+
go r.Run()
74+
time.Sleep(1 * time.Second)
75+
defer r.Close()
76+
err := r.Trigger(ctx)
77+
return err
78+
})
79+
}
80+
81+
func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) {
6682
ctx, cancel := context.WithCancel(context.Background())
6783
defer cancel()
6884

@@ -71,7 +87,7 @@ func TestReprovide(t *testing.T) {
7187

7288
keyProvider := NewBlockstoreProvider(bstore)
7389
reprov := NewReprovider(ctx, time.Hour, clA, keyProvider)
74-
err := reprov.Reprovide()
90+
err := trigger(reprov, ctx)
7591
if err != nil {
7692
t.Fatal(err)
7793
}
@@ -95,6 +111,71 @@ func TestReprovide(t *testing.T) {
95111
}
96112
}
97113

114+
func TestTriggerTwice(t *testing.T) {
115+
// Ensure we can only trigger once at a time.
116+
ctx, cancel := context.WithCancel(context.Background())
117+
defer cancel()
118+
119+
clA, _, _, _ := setupRouting(t)
120+
121+
keyCh := make(chan cid.Cid)
122+
startCh := make(chan struct{})
123+
keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) {
124+
<-startCh
125+
return keyCh, nil
126+
}
127+
128+
reprov := NewReprovider(ctx, time.Hour, clA, keyFunc)
129+
go reprov.Run()
130+
defer reprov.Close()
131+
132+
// Wait for the reprovider to start, otherwise, the reprovider will
133+
// think a concurrent reprovide is running.
134+
//
135+
// We _could_ fix this race... but that would be complexity for nothing.
136+
// 1. We start a reprovide 1 minute after startup anyways.
137+
// 2. The window is really narrow.
138+
time.Sleep(1 * time.Second)
139+
140+
errCh := make(chan error, 2)
141+
142+
// Trigger in the background
143+
go func() {
144+
errCh <- reprov.Trigger(ctx)
145+
}()
146+
147+
// Wait for the trigger to really start.
148+
startCh <- struct{}{}
149+
150+
start := time.Now()
151+
// Try to trigger again, this should fail immediately.
152+
if err := reprov.Trigger(ctx); err == nil {
153+
t.Fatal("expected an error")
154+
}
155+
if time.Since(start) > 10*time.Millisecond {
156+
t.Fatal("expected reprovide to fail instantly")
157+
}
158+
159+
// Let the trigger progress.
160+
close(keyCh)
161+
162+
// Check the result.
163+
err := <-errCh
164+
if err != nil {
165+
t.Fatal(err)
166+
}
167+
168+
// Try to trigger again, this should work.
169+
go func() {
170+
errCh <- reprov.Trigger(ctx)
171+
}()
172+
startCh <- struct{}{}
173+
err = <-errCh
174+
if err != nil {
175+
t.Fatal(err)
176+
}
177+
}
178+
98179
type mockPinner struct {
99180
recursive []cid.Cid
100181
direct []cid.Cid

0 commit comments

Comments
 (0)