Skip to content

Commit a610c89

Browse files
committed
Properly flush unique queues on startup
There have been a number of reports of blocked PRs being checked which have been difficult to debug. In investigating go-gitea#23050 I have realised that whilst the Warn there is somewhat of a miscall there was a real bug in the way that the LevelUniqueQueue was being restored on start-up of the PersistableChannelUniqueQueue. This PR fixes this bug and adds a testcase. Fix go-gitea#23050 and others Signed-off-by: Andrew Thornton <[email protected]>
1 parent 8540fc4 commit a610c89

File tree

2 files changed

+240
-7
lines changed

2 files changed

+240
-7
lines changed

modules/queue/unique_queue_disk_channel.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -209,17 +209,27 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
209209
atTerminate(q.Terminate)
210210
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
211211

212-
if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
212+
if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
213213
// Just run the level queue - we shut it down once it's flushed
214-
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
214+
go luq.Run(func(_ func()) {}, func(_ func()) {})
215215
go func() {
216-
_ = q.internal.Flush(0)
217-
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
218-
q.internal.(*LevelUniqueQueue).Shutdown()
219-
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
216+
_ = luq.Flush(0)
217+
for !luq.IsEmpty() {
218+
_ = luq.Flush(0)
219+
select {
220+
case <-time.After(100 * time.Millisecond):
221+
case <-luq.shutdownCtx.Done():
222+
log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
223+
return
224+
}
225+
}
226+
log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
227+
luq.Shutdown()
228+
GetManager().Remove(luq.qid)
220229
}()
221230
} else {
222231
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
232+
_ = q.internal.Flush(0)
223233
q.internal.(*LevelUniqueQueue).Shutdown()
224234
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
225235
}
@@ -286,7 +296,7 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
286296
close(q.channelQueue.dataChan)
287297
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
288298
for data := range q.channelQueue.dataChan {
289-
_ = q.internal.Push(data)
299+
_ = q.internal.(*LevelUniqueQueue).Push(data)
290300
}
291301
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
292302

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package queue
5+
6+
import (
7+
"fmt"
8+
"strconv"
9+
"sync"
10+
"testing"
11+
"time"
12+
13+
"code.gitea.io/gitea/modules/log"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestPersistableChannelUniqueQueue(t *testing.T) {
18+
tmpDir := t.TempDir()
19+
fmt.Printf("TempDir %s\n", tmpDir)
20+
_ = log.NewLogger(1000, "console", "console", `{"level":"trace","stacktracelevel":"NONE","stderr":true}`)
21+
22+
// Common function to create the Queue
23+
newQueue := func(handle func(data ...Data) []Data) Queue {
24+
q, err := NewPersistableChannelUniqueQueue(handle,
25+
PersistableChannelUniqueQueueConfiguration{
26+
Name: "TestPersistableChannelUniqueQueue",
27+
DataDir: tmpDir,
28+
QueueLength: 200,
29+
MaxWorkers: 1,
30+
BlockTimeout: 1 * time.Second,
31+
BoostTimeout: 5 * time.Minute,
32+
BoostWorkers: 1,
33+
Workers: 0,
34+
}, "task-0")
35+
assert.NoError(t, err)
36+
return q
37+
}
38+
39+
// runs the provided queue and provides some timer function
40+
type channels struct {
41+
readyForShutdown chan struct{} // closed when shutdown functions have been assigned
42+
readyForTerminate chan struct{} // closed when terminate functions have been assigned
43+
signalShutdown chan struct{} // Should close to signal shutdown
44+
doneShutdown chan struct{} // closed when shutdown function is done
45+
queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock
46+
}
47+
runQueue := func(q Queue, lock *sync.Mutex) *channels {
48+
returnable := &channels{
49+
readyForShutdown: make(chan struct{}),
50+
readyForTerminate: make(chan struct{}),
51+
signalShutdown: make(chan struct{}),
52+
doneShutdown: make(chan struct{}),
53+
}
54+
go q.Run(func(atShutdown func()) {
55+
go func() {
56+
lock.Lock()
57+
select {
58+
case <-returnable.readyForShutdown:
59+
default:
60+
close(returnable.readyForShutdown)
61+
}
62+
lock.Unlock()
63+
<-returnable.signalShutdown
64+
atShutdown()
65+
close(returnable.doneShutdown)
66+
}()
67+
}, func(atTerminate func()) {
68+
lock.Lock()
69+
defer lock.Unlock()
70+
select {
71+
case <-returnable.readyForTerminate:
72+
default:
73+
close(returnable.readyForTerminate)
74+
}
75+
returnable.queueTerminate = append(returnable.queueTerminate, atTerminate)
76+
})
77+
78+
return returnable
79+
}
80+
81+
// call to shutdown and terminate the queue associated with the channels
82+
shutdownAndTerminate := func(chans *channels, lock *sync.Mutex) {
83+
close(chans.signalShutdown)
84+
<-chans.doneShutdown
85+
<-chans.readyForTerminate
86+
87+
lock.Lock()
88+
callbacks := []func(){}
89+
callbacks = append(callbacks, chans.queueTerminate...)
90+
lock.Unlock()
91+
92+
for _, callback := range callbacks {
93+
callback()
94+
}
95+
}
96+
97+
executedTasks1 := []string{}
98+
hasTasks1 := []string{}
99+
100+
t.Run("Initial Filling", func(t *testing.T) {
101+
lock := sync.Mutex{}
102+
103+
startAt100Queued := make(chan struct{})
104+
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
105+
106+
handle := func(data ...Data) []Data {
107+
<-startAt100Queued
108+
for _, datum := range data {
109+
s := datum.(string)
110+
lock.Lock()
111+
executedTasks1 = append(executedTasks1, s)
112+
lock.Unlock()
113+
if s == "task-20" {
114+
close(stopAt20Shutdown)
115+
}
116+
}
117+
return nil
118+
}
119+
120+
q := newQueue(handle)
121+
122+
// add 100 tasks to the queue
123+
for i := 0; i < 100; i++ {
124+
_ = q.Push("task-" + strconv.Itoa(i))
125+
}
126+
close(startAt100Queued)
127+
128+
chans := runQueue(q, &lock)
129+
130+
<-chans.readyForShutdown
131+
<-stopAt20Shutdown
132+
shutdownAndTerminate(chans, &lock)
133+
134+
// check which tasks are still in the queue
135+
for i := 0; i < 100; i++ {
136+
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
137+
hasTasks1 = append(hasTasks1, "task-"+strconv.Itoa(i))
138+
}
139+
}
140+
assert.Equal(t, 100, len(executedTasks1)+len(hasTasks1))
141+
})
142+
143+
executedTasks2 := []string{}
144+
hasTasks2 := []string{}
145+
t.Run("Ensure that things will empty on restart", func(t *testing.T) {
146+
lock := sync.Mutex{}
147+
stop := make(chan struct{})
148+
149+
// collect the tasks that have been executed
150+
handle := func(data ...Data) []Data {
151+
lock.Lock()
152+
for _, datum := range data {
153+
t.Logf("executed %s", datum.(string))
154+
executedTasks2 = append(executedTasks2, datum.(string))
155+
if datum.(string) == "task-99" {
156+
close(stop)
157+
}
158+
}
159+
lock.Unlock()
160+
return nil
161+
}
162+
163+
q := newQueue(handle)
164+
chans := runQueue(q, &lock)
165+
166+
<-chans.readyForShutdown
167+
<-stop
168+
shutdownAndTerminate(chans, &lock)
169+
170+
// check which tasks are still in the queue
171+
for i := 0; i < 100; i++ {
172+
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
173+
hasTasks2 = append(hasTasks2, "task-"+strconv.Itoa(i))
174+
}
175+
}
176+
177+
assert.Equal(t, 100, len(executedTasks1)+len(executedTasks2))
178+
assert.Equal(t, 0, len(hasTasks2))
179+
})
180+
181+
executedTasks3 := []string{}
182+
hasTasks3 := []string{}
183+
184+
t.Run("refill", func(t *testing.T) {
185+
lock := sync.Mutex{}
186+
stop := make(chan struct{})
187+
188+
handle := func(data ...Data) []Data {
189+
lock.Lock()
190+
for _, datum := range data {
191+
executedTasks3 = append(executedTasks3, datum.(string))
192+
}
193+
lock.Unlock()
194+
return nil
195+
}
196+
197+
q := newQueue(handle)
198+
chans := runQueue(q, &lock)
199+
200+
// re-run all tasks
201+
for i := 0; i < 100; i++ {
202+
_ = q.Push("task-" + strconv.Itoa(i))
203+
}
204+
205+
// wait for a while
206+
time.Sleep(1 * time.Second)
207+
208+
close(stop)
209+
<-chans.readyForShutdown
210+
shutdownAndTerminate(chans, &lock)
211+
212+
// check whether the tasks are still in the queue
213+
for i := 0; i < 100; i++ {
214+
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
215+
hasTasks3 = append(hasTasks3, "task-"+strconv.Itoa(i))
216+
}
217+
}
218+
assert.Equal(t, 100, len(executedTasks3)+len(hasTasks3))
219+
})
220+
221+
t.Logf("TestPersistableChannelUniqueQueue completed1=%v, executed2=%v, has2=%v, executed3=%v, has3=%v",
222+
len(executedTasks1), len(executedTasks2), len(hasTasks2), len(executedTasks3), len(hasTasks3))
223+
}

0 commit comments

Comments
 (0)