Skip to content

Commit 836a3a4

Browse files
zeripathtechknowlogick
authored andcommitted
Properly flush unique queues on startup (go-gitea#23154)
There have been a number of reports of PRs being blocked whilst being checked which have been difficult to debug. In investigating go-gitea#23050 I have realised that whilst the Warn there is somewhat of a miscall there was a real bug in the way that the LevelUniqueQueue was being restored on start-up of the PersistableChannelUniqueQueue. Next there is a conflict in the setting of the internal leveldb queue name - This wasn't being set so it was being overridden by other unique queues. This PR fixes these bugs and adds a testcase. Thanks to @brechtvl for noticing the second issue. Fix go-gitea#23050 and others --------- Signed-off-by: Andrew Thornton <[email protected]> Co-authored-by: techknowlogick <[email protected]>
1 parent ceedb49 commit 836a3a4

7 files changed

+332
-21
lines changed

Diff for: modules/queue/queue_channel.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ func (q *ChannelQueue) Shutdown() {
125125
log.Trace("ChannelQueue: %s Flushing", q.name)
126126
// We can't use Cleanup here because that will close the channel
127127
if err := q.FlushWithContext(q.terminateCtx); err != nil {
128-
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
128+
count := atomic.LoadInt64(&q.numInQueue)
129+
if count > 0 {
130+
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
131+
}
129132
return
130133
}
131134
log.Debug("ChannelQueue: %s Flushed", q.name)

Diff for: modules/queue/queue_disk_channel.go

+22-7
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
9595
},
9696
Workers: 0,
9797
},
98-
DataDir: config.DataDir,
98+
DataDir: config.DataDir,
99+
QueueName: config.Name + "-level",
99100
}
100101

101102
levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
@@ -173,16 +174,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
173174
atShutdown(q.Shutdown)
174175
atTerminate(q.Terminate)
175176

176-
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
177+
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 {
177178
// Just run the level queue - we shut it down once it's flushed
178179
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
179180
go func() {
180-
for !q.IsEmpty() {
181-
_ = q.internal.Flush(0)
181+
for !lq.IsEmpty() {
182+
_ = lq.Flush(0)
182183
select {
183184
case <-time.After(100 * time.Millisecond):
184-
case <-q.internal.(*LevelQueue).shutdownCtx.Done():
185-
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
185+
case <-lq.shutdownCtx.Done():
186+
if lq.byteFIFO.Len(lq.terminateCtx) > 0 {
187+
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
188+
}
186189
return
187190
}
188191
}
@@ -317,10 +320,22 @@ func (q *PersistableChannelQueue) Shutdown() {
317320
// Redirect all remaining data in the chan to the internal channel
318321
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
319322
close(q.channelQueue.dataChan)
323+
countOK, countLost := 0, 0
320324
for data := range q.channelQueue.dataChan {
321-
_ = q.internal.Push(data)
325+
err := q.internal.Push(data)
326+
if err != nil {
327+
log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
328+
countLost++
329+
} else {
330+
countOK++
331+
}
322332
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
323333
}
334+
if countLost > 0 {
335+
log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
336+
} else if countOK > 0 {
337+
log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
338+
}
324339
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
325340

326341
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)

Diff for: modules/queue/queue_disk_channel_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestPersistableChannelQueue(t *testing.T) {
4040
Workers: 1,
4141
BoostWorkers: 0,
4242
MaxWorkers: 10,
43-
Name: "first",
43+
Name: "test-queue",
4444
}, &testData{})
4545
assert.NoError(t, err)
4646

@@ -136,7 +136,7 @@ func TestPersistableChannelQueue(t *testing.T) {
136136
Workers: 1,
137137
BoostWorkers: 0,
138138
MaxWorkers: 10,
139-
Name: "second",
139+
Name: "test-queue",
140140
}, &testData{})
141141
assert.NoError(t, err)
142142

@@ -228,7 +228,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
228228
Workers: 1,
229229
BoostWorkers: 0,
230230
MaxWorkers: 10,
231-
Name: "first",
231+
Name: "test-queue",
232232
}, &testData{})
233233
assert.NoError(t, err)
234234

@@ -434,7 +434,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
434434
Workers: 1,
435435
BoostWorkers: 0,
436436
MaxWorkers: 10,
437-
Name: "second",
437+
Name: "test-queue",
438438
}, &testData{})
439439
assert.NoError(t, err)
440440
pausable, ok = queue.(Pausable)

Diff for: modules/queue/unique_queue_channel.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,9 @@ func (q *ChannelUniqueQueue) Shutdown() {
178178
go func() {
179179
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
180180
if err := q.FlushWithContext(q.terminateCtx); err != nil {
181-
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
181+
if !q.IsEmpty() {
182+
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
183+
}
182184
return
183185
}
184186
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)

Diff for: modules/queue/unique_queue_channel_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@ import (
99
"testing"
1010
"time"
1111

12+
"code.gitea.io/gitea/modules/log"
13+
1214
"github.com/stretchr/testify/assert"
1315
)
1416

1517
func TestChannelUniqueQueue(t *testing.T) {
18+
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
1619
handleChan := make(chan *testData)
1720
handle := func(data ...Data) []Data {
1821
for _, datum := range data {
@@ -53,6 +56,8 @@ func TestChannelUniqueQueue(t *testing.T) {
5356
}
5457

5558
func TestChannelUniqueQueue_Batch(t *testing.T) {
59+
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
60+
5661
handleChan := make(chan *testData)
5762
handle := func(data ...Data) []Data {
5863
for _, datum := range data {
@@ -99,6 +104,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) {
99104
}
100105

101106
func TestChannelUniqueQueue_Pause(t *testing.T) {
107+
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
108+
102109
lock := sync.Mutex{}
103110
var queue Queue
104111
var err error

Diff for: modules/queue/unique_queue_disk_channel.go

+33-8
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
9595
},
9696
Workers: 0,
9797
},
98-
DataDir: config.DataDir,
98+
DataDir: config.DataDir,
99+
QueueName: config.Name + "-level",
99100
}
100101

101102
queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
@@ -210,17 +211,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
210211
atTerminate(q.Terminate)
211212
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
212213

213-
if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
214+
if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
214215
// Just run the level queue - we shut it down once it's flushed
215-
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
216+
go luq.Run(func(_ func()) {}, func(_ func()) {})
216217
go func() {
217-
_ = q.internal.Flush(0)
218-
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
219-
q.internal.(*LevelUniqueQueue).Shutdown()
220-
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
218+
_ = luq.Flush(0)
219+
for !luq.IsEmpty() {
220+
_ = luq.Flush(0)
221+
select {
222+
case <-time.After(100 * time.Millisecond):
223+
case <-luq.shutdownCtx.Done():
224+
if luq.byteFIFO.Len(luq.terminateCtx) > 0 {
225+
log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
226+
}
227+
return
228+
}
229+
}
230+
log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
231+
luq.Shutdown()
232+
GetManager().Remove(luq.qid)
221233
}()
222234
} else {
223235
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
236+
_ = q.internal.Flush(0)
224237
q.internal.(*LevelUniqueQueue).Shutdown()
225238
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
226239
}
@@ -286,8 +299,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
286299
// Redirect all remaining data in the chan to the internal channel
287300
close(q.channelQueue.dataChan)
288301
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
302+
countOK, countLost := 0, 0
289303
for data := range q.channelQueue.dataChan {
290-
_ = q.internal.Push(data)
304+
err := q.internal.(*LevelUniqueQueue).Push(data)
305+
if err != nil {
306+
log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
307+
countLost++
308+
} else {
309+
countOK++
310+
}
311+
}
312+
if countLost > 0 {
313+
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
314+
} else if countOK > 0 {
315+
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
291316
}
292317
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
293318

0 commit comments

Comments
 (0)