Skip to content

Commit 572844c

Browse files
committed
Close provider on ipfs shutdown
License: MIT Signed-off-by: Michael Avila <[email protected]>
1 parent 495f12c commit 572844c

File tree

4 files changed

+33
-1
lines changed

4 files changed

+33
-1
lines changed

core/core.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,10 @@ func (n *IpfsNode) teardown() error {
675675
// needs to use another during its shutdown/cleanup process, it should be
676676
// closed before that other object
677677

678+
if n.Provider != nil {
679+
closers = append(closers, n.Provider)
680+
}
681+
678682
if n.FilesRoot != nil {
679683
closers = append(closers, n.FilesRoot)
680684
}

provider/offline.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,7 @@ func (op *offlineProvider) Run() {}
1414
func (op *offlineProvider) Provide(cid cid.Cid) error {
1515
return nil
1616
}
17+
18+
func (op *offlineProvider) Close() error {
19+
return nil
20+
}

provider/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ type Provider interface {
2020
Run()
2121
// Provide takes a cid and makes an attempt to announce it to the network
2222
Provide(cid.Cid) error
23+
// Close stops the provider
24+
Close() error
2325
}
2426

2527
type provider struct {
@@ -39,6 +41,12 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte
3941
}
4042
}
4143

44+
// Close stops the provider
45+
func (p *provider) Close() error {
46+
p.queue.Close()
47+
return nil
48+
}
49+
4250
// Start workers to handle provide requests.
4351
func (p *provider) Run() {
4452
p.handleAnnouncements()

provider/queue.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type Queue struct {
2727
ds datastore.Datastore // Must be threadsafe
2828
dequeue chan cid.Cid
2929
enqueue chan cid.Cid
30+
close context.CancelFunc
31+
closed chan struct{}
3032
}
3133

3234
// NewQueue creates a queue for cids
@@ -36,19 +38,29 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
3638
if err != nil {
3739
return nil, err
3840
}
41+
cancelCtx, cancel := context.WithCancel(ctx)
3942
q := &Queue{
4043
name: name,
41-
ctx: ctx,
44+
ctx: cancelCtx,
4245
head: head,
4346
tail: tail,
4447
ds: namespaced,
4548
dequeue: make(chan cid.Cid),
4649
enqueue: make(chan cid.Cid),
50+
close: cancel,
51+
closed: make(chan struct{}, 1),
4752
}
4853
q.work()
4954
return q, nil
5055
}
5156

57+
// Close stops the queue
58+
func (q *Queue) Close() error {
59+
q.close()
60+
<-q.closed
61+
return nil
62+
}
63+
5264
// Enqueue puts a cid in the queue
5365
func (q *Queue) Enqueue(cid cid.Cid) {
5466
select {
@@ -103,6 +115,10 @@ func (q *Queue) work() {
103115
var k datastore.Key = datastore.Key{}
104116
var c cid.Cid = cid.Undef
105117

118+
defer func() {
119+
close(q.closed)
120+
}()
121+
106122
for {
107123
if c == cid.Undef {
108124
k, c = q.nextEntry()

0 commit comments

Comments
 (0)