Skip to content

Commit 3ad6212

Browse files
authored
Correctly handle select on multiple channels in Queues (#22146) (#22428)
Backport #22146 There are a few places in FlushQueueWithContext which make an incorrect assumption about how `select` on multiple channels works. The problem is best expressed by looking at the following example: ```go package main import "fmt" func main() { closedChan := make(chan struct{}) close(closedChan) toClose := make(chan struct{}) count := 0 for { select { case <-closedChan: count++ fmt.Println(count) if count == 2 { close(toClose) } case <-toClose: return } } } ``` This PR double-checks that the contexts are closed outside of checking if there is data in the dataChan. It also rationalises the WorkerPool FlushWithContext because the previous implementation failed to handle pausing correctly. This will probably fix the underlying problem in #22145 Fix #22145 Signed-off-by: Andrew Thornton <[email protected]>
1 parent 37e23c9 commit 3ad6212

File tree

3 files changed

+43
-57
lines changed

3 files changed

+43
-57
lines changed

Diff for: modules/queue/queue_channel.go

-26
Original file line numberDiff line numberDiff line change
@@ -110,32 +110,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error {
110110
return q.FlushWithContext(ctx)
111111
}
112112

113-
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
114-
func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
115-
log.Trace("ChannelQueue: %d Flush", q.qid)
116-
paused, _ := q.IsPausedIsResumed()
117-
for {
118-
select {
119-
case <-paused:
120-
return nil
121-
case data, ok := <-q.dataChan:
122-
if !ok {
123-
return nil
124-
}
125-
if unhandled := q.handle(data); unhandled != nil {
126-
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
127-
}
128-
atomic.AddInt64(&q.numInQueue, -1)
129-
case <-q.baseCtx.Done():
130-
return q.baseCtx.Err()
131-
case <-ctx.Done():
132-
return ctx.Err()
133-
default:
134-
return nil
135-
}
136-
}
137-
}
138-
139113
// Shutdown processing from this queue
140114
func (q *ChannelQueue) Shutdown() {
141115
q.lock.Lock()

Diff for: modules/queue/unique_queue_channel.go

-30
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"fmt"
1010
"runtime/pprof"
1111
"sync"
12-
"sync/atomic"
1312
"time"
1413

1514
"code.gitea.io/gitea/modules/container"
@@ -168,35 +167,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
168167
return q.FlushWithContext(ctx)
169168
}
170169

171-
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
172-
func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
173-
log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
174-
paused, _ := q.IsPausedIsResumed()
175-
for {
176-
select {
177-
case <-paused:
178-
return nil
179-
default:
180-
}
181-
select {
182-
case data, ok := <-q.dataChan:
183-
if !ok {
184-
return nil
185-
}
186-
if unhandled := q.handle(data); unhandled != nil {
187-
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
188-
}
189-
atomic.AddInt64(&q.numInQueue, -1)
190-
case <-q.baseCtx.Done():
191-
return q.baseCtx.Err()
192-
case <-ctx.Done():
193-
return ctx.Err()
194-
default:
195-
return nil
196-
}
197-
}
198-
}
199-
200170
// Shutdown processing from this queue
201171
func (q *ChannelUniqueQueue) Shutdown() {
202172
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)

Diff for: modules/queue/workerpool.go

+43-1
Original file line numberDiff line numberDiff line change
@@ -464,13 +464,43 @@ func (p *WorkerPool) IsEmpty() bool {
464464
return atomic.LoadInt64(&p.numInQueue) == 0
465465
}
466466

467+
// contextError returns either ctx.Done(), the base context's error or nil
468+
func (p *WorkerPool) contextError(ctx context.Context) error {
469+
select {
470+
case <-p.baseCtx.Done():
471+
return p.baseCtx.Err()
472+
case <-ctx.Done():
473+
return ctx.Err()
474+
default:
475+
return nil
476+
}
477+
}
478+
467479
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
468480
// NB: The worker will not be registered with the manager.
469481
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
470482
log.Trace("WorkerPool: %d Flush", p.qid)
483+
paused, _ := p.IsPausedIsResumed()
471484
for {
485+
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
486+
select {
487+
case <-paused:
488+
// Ensure that even if paused that the cancelled error is still sent
489+
return p.contextError(ctx)
490+
case <-p.baseCtx.Done():
491+
return p.baseCtx.Err()
492+
case <-ctx.Done():
493+
return ctx.Err()
494+
default:
495+
}
496+
472497
select {
473-
case data := <-p.dataChan:
498+
case <-paused:
499+
return p.contextError(ctx)
500+
case data, ok := <-p.dataChan:
501+
if !ok {
502+
return nil
503+
}
474504
if unhandled := p.handle(data); unhandled != nil {
475505
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
476506
}
@@ -496,6 +526,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
496526
paused, _ := p.IsPausedIsResumed()
497527
data := make([]Data, 0, p.batchLength)
498528
for {
529+
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
499530
select {
500531
case <-paused:
501532
log.Trace("Worker for Queue %d Pausing", p.qid)
@@ -516,8 +547,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
516547
log.Trace("Worker shutting down")
517548
return
518549
}
550+
case <-ctx.Done():
551+
if len(data) > 0 {
552+
log.Trace("Handling: %d data, %v", len(data), data)
553+
if unhandled := p.handle(data...); unhandled != nil {
554+
log.Error("Unhandled Data in queue %d", p.qid)
555+
}
556+
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
557+
}
558+
log.Trace("Worker shutting down")
559+
return
519560
default:
520561
}
562+
521563
select {
522564
case <-paused:
523565
// go back around

0 commit comments

Comments
 (0)