@@ -2,7 +2,7 @@ package taskqueue
2
2
3
3
import (
4
4
"context"
5
- "sync/atomic "
5
+ "sync"
6
6
"time"
7
7
8
8
"github.com/ipfs/go-peertaskqueue"
@@ -33,7 +33,7 @@ type WorkerTaskQueue struct {
33
33
cancelFn func ()
34
34
peerTaskQueue * peertaskqueue.PeerTaskQueue
35
35
workSignal chan struct {}
36
- noTaskSignal chan struct {}
36
+ noTaskCond * sync. Cond
37
37
ticker * time.Ticker
38
38
activeTasks int32
39
39
}
@@ -46,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {
46
46
cancelFn : cancelFn ,
47
47
peerTaskQueue : peertaskqueue .New (),
48
48
workSignal : make (chan struct {}, 1 ),
49
- noTaskSignal : make ( chan struct {}, 1 ),
49
+ noTaskCond : sync . NewCond ( & sync. Mutex {} ),
50
50
ticker : time .NewTicker (thawSpeed ),
51
51
}
52
52
}
@@ -93,13 +93,11 @@ func (tq *WorkerTaskQueue) Shutdown() {
93
93
}
94
94
95
95
func (tq * WorkerTaskQueue ) WaitForNoActiveTasks () {
96
- for atomic .LoadInt32 (& tq .activeTasks ) > 0 {
97
- select {
98
- case <- tq .ctx .Done ():
99
- return
100
- case <- tq .noTaskSignal :
101
- }
96
+ tq .noTaskCond .L .Lock ()
97
+ for tq .activeTasks > 0 {
98
+ tq .noTaskCond .Wait ()
102
99
}
100
+ tq .noTaskCond .L .Unlock ()
103
101
}
104
102
105
103
func (tq * WorkerTaskQueue ) worker (executor Executor ) {
@@ -118,14 +116,16 @@ func (tq *WorkerTaskQueue) worker(executor Executor) {
118
116
}
119
117
}
120
118
for _ , task := range tasks {
121
- atomic .AddInt32 (& tq .activeTasks , 1 )
119
+ tq .noTaskCond .L .Lock ()
120
+ tq .activeTasks = tq .activeTasks + 1
121
+ tq .noTaskCond .L .Unlock ()
122
122
terminate := executor .ExecuteTask (tq .ctx , pid , task )
123
- if atomic .AddInt32 (& tq .activeTasks , - 1 ) == 0 {
124
- select {
125
- case tq .noTaskSignal <- struct {}{}:
126
- default :
127
- }
123
+ tq .noTaskCond .L .Lock ()
124
+ tq .activeTasks = tq .activeTasks - 1
125
+ if tq .activeTasks == 0 {
126
+ tq .noTaskCond .Broadcast ()
128
127
}
128
+ tq .noTaskCond .L .Unlock ()
129
129
if terminate {
130
130
return
131
131
}
0 commit comments