Skip to content

Commit a609cae

Browse files
authored
Correctly handle select on multiple channels in Queues (#22146)
There are a few places in FlushQueueWithContext which make an incorrect assumption about how `select` on multiple channels works. The problem is best expressed by looking at the following example: ```go package main import "fmt" func main() { closedChan := make(chan struct{}) close(closedChan) toClose := make(chan struct{}) count := 0 for { select { case <-closedChan: count++ fmt.Println(count) if count == 2 { close(toClose) } case <-toClose: return } } } ``` This PR double-checks that the contexts are closed outside of checking if there is data in the dataChan. It also rationalises the WorkerPool FlushWithContext because the previous implementation failed to handle pausing correctly. This will probably fix the underlying problem in #22145 Fix #22145 Signed-off-by: Andrew Thornton <[email protected]> Signed-off-by: Andrew Thornton <[email protected]>
1 parent 47efba7 commit a609cae

File tree

3 files changed

+43
-57
lines changed

3 files changed

+43
-57
lines changed

Diff for: modules/queue/queue_channel.go

-26
Original file line numberDiff line numberDiff line change
@@ -109,32 +109,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error {
109109
return q.FlushWithContext(ctx)
110110
}
111111

112-
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
113-
func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
114-
log.Trace("ChannelQueue: %d Flush", q.qid)
115-
paused, _ := q.IsPausedIsResumed()
116-
for {
117-
select {
118-
case <-paused:
119-
return nil
120-
case data, ok := <-q.dataChan:
121-
if !ok {
122-
return nil
123-
}
124-
if unhandled := q.handle(data); unhandled != nil {
125-
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
126-
}
127-
atomic.AddInt64(&q.numInQueue, -1)
128-
case <-q.baseCtx.Done():
129-
return q.baseCtx.Err()
130-
case <-ctx.Done():
131-
return ctx.Err()
132-
default:
133-
return nil
134-
}
135-
}
136-
}
137-
138112
// Shutdown processing from this queue
139113
func (q *ChannelQueue) Shutdown() {
140114
q.lock.Lock()

Diff for: modules/queue/unique_queue_channel.go

-30
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"fmt"
99
"runtime/pprof"
1010
"sync"
11-
"sync/atomic"
1211
"time"
1312

1413
"code.gitea.io/gitea/modules/container"
@@ -167,35 +166,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
167166
return q.FlushWithContext(ctx)
168167
}
169168

170-
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
171-
func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
172-
log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
173-
paused, _ := q.IsPausedIsResumed()
174-
for {
175-
select {
176-
case <-paused:
177-
return nil
178-
default:
179-
}
180-
select {
181-
case data, ok := <-q.dataChan:
182-
if !ok {
183-
return nil
184-
}
185-
if unhandled := q.handle(data); unhandled != nil {
186-
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
187-
}
188-
atomic.AddInt64(&q.numInQueue, -1)
189-
case <-q.baseCtx.Done():
190-
return q.baseCtx.Err()
191-
case <-ctx.Done():
192-
return ctx.Err()
193-
default:
194-
return nil
195-
}
196-
}
197-
}
198-
199169
// Shutdown processing from this queue
200170
func (q *ChannelUniqueQueue) Shutdown() {
201171
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)

Diff for: modules/queue/workerpool.go

+43-1
Original file line numberDiff line numberDiff line change
@@ -463,13 +463,43 @@ func (p *WorkerPool) IsEmpty() bool {
463463
return atomic.LoadInt64(&p.numInQueue) == 0
464464
}
465465

466+
// contextError returns either ctx.Done(), the base context's error or nil
467+
func (p *WorkerPool) contextError(ctx context.Context) error {
468+
select {
469+
case <-p.baseCtx.Done():
470+
return p.baseCtx.Err()
471+
case <-ctx.Done():
472+
return ctx.Err()
473+
default:
474+
return nil
475+
}
476+
}
477+
466478
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
467479
// NB: The worker will not be registered with the manager.
468480
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
469481
log.Trace("WorkerPool: %d Flush", p.qid)
482+
paused, _ := p.IsPausedIsResumed()
470483
for {
484+
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
485+
select {
486+
case <-paused:
487+
// Ensure that even if paused that the cancelled error is still sent
488+
return p.contextError(ctx)
489+
case <-p.baseCtx.Done():
490+
return p.baseCtx.Err()
491+
case <-ctx.Done():
492+
return ctx.Err()
493+
default:
494+
}
495+
471496
select {
472-
case data := <-p.dataChan:
497+
case <-paused:
498+
return p.contextError(ctx)
499+
case data, ok := <-p.dataChan:
500+
if !ok {
501+
return nil
502+
}
473503
if unhandled := p.handle(data); unhandled != nil {
474504
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
475505
}
@@ -495,6 +525,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
495525
paused, _ := p.IsPausedIsResumed()
496526
data := make([]Data, 0, p.batchLength)
497527
for {
528+
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
498529
select {
499530
case <-paused:
500531
log.Trace("Worker for Queue %d Pausing", p.qid)
@@ -515,8 +546,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
515546
log.Trace("Worker shutting down")
516547
return
517548
}
549+
case <-ctx.Done():
550+
if len(data) > 0 {
551+
log.Trace("Handling: %d data, %v", len(data), data)
552+
if unhandled := p.handle(data...); unhandled != nil {
553+
log.Error("Unhandled Data in queue %d", p.qid)
554+
}
555+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
556+
}
557+
log.Trace("Worker shutting down")
558+
return
518559
default:
519560
}
561+
520562
select {
521563
case <-paused:
522564
// go back around

0 commit comments

Comments
 (0)