@@ -2,6 +2,7 @@ package taskqueue
2
2
3
3
import (
4
4
"context"
5
+ "sync/atomic"
5
6
"time"
6
7
7
8
"github.com/ipfs/go-peertaskqueue"
@@ -32,7 +33,9 @@ type WorkerTaskQueue struct {
32
33
cancelFn func ()
33
34
peerTaskQueue * peertaskqueue.PeerTaskQueue
34
35
workSignal chan struct {}
36
+ noTaskSignal chan struct {}
35
37
ticker * time.Ticker
38
+ activeTasks int32
36
39
}
37
40
38
41
// NewTaskQueue initializes a new queue
@@ -43,6 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {
43
46
cancelFn : cancelFn ,
44
47
peerTaskQueue : peertaskqueue .New (),
45
48
workSignal : make (chan struct {}, 1 ),
49
+ noTaskSignal : make (chan struct {}, 1 ),
46
50
ticker : time .NewTicker (thawSpeed ),
47
51
}
48
52
}
@@ -88,6 +92,16 @@ func (tq *WorkerTaskQueue) Shutdown() {
88
92
tq .cancelFn ()
89
93
}
90
94
95
+ func (tq * WorkerTaskQueue ) WaitForNoActiveTasks () {
96
+ for atomic .LoadInt32 (& tq .activeTasks ) > 0 {
97
+ select {
98
+ case <- tq .ctx .Done ():
99
+ return
100
+ case <- tq .noTaskSignal :
101
+ }
102
+ }
103
+ }
104
+
91
105
func (tq * WorkerTaskQueue ) worker (executor Executor ) {
92
106
targetWork := 1
93
107
for {
@@ -104,7 +118,14 @@ func (tq *WorkerTaskQueue) worker(executor Executor) {
104
118
}
105
119
}
106
120
for _ , task := range tasks {
121
+ atomic .AddInt32 (& tq .activeTasks , 1 )
107
122
terminate := executor .ExecuteTask (tq .ctx , pid , task )
123
+ if atomic .AddInt32 (& tq .activeTasks , - 1 ) == 0 {
124
+ select {
125
+ case tq .noTaskSignal <- struct {}{}:
126
+ default :
127
+ }
128
+ }
108
129
if terminate {
109
130
return
110
131
}
0 commit comments