Skip to content

Commit c7919f5

Browse files
committedMar 20, 2025
Pop from the backoffQ when the activeQ is empty
1 parent 6b8e5a9 commit c7919f5

File tree

11 files changed

+298
-64
lines changed

11 files changed

+298
-64
lines changed
 

‎pkg/scheduler/backend/queue/active_queue.go

+33-4
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,22 @@ func (uaq *unlockedActiveQueue) has(pInfo *framework.QueuedPodInfo) bool {
113113
return uaq.queue.Has(pInfo)
114114
}
115115

116+
// backoffQPopper defines method that is used to pop from the backoffQ when the activeQ is empty.
117+
type backoffQPopper interface {
118+
// popBackoff pops the pInfo from the podBackoffQ.
119+
popBackoff() (*framework.QueuedPodInfo, error)
120+
// len returns length of the podBackoffQ queue.
121+
lenBackoff() int
122+
}
123+
116124
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
117125
type activeQueue struct {
118126
// lock synchronizes all operations related to activeQ.
119127
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
120128
// Caution: DO NOT take "SchedulingQueue.lock" after taking "lock".
121129
// You should always take "SchedulingQueue.lock" first, otherwise the queue could end up in deadlock.
122-
// "lock" should not be taken after taking "nLock".
123-
// Correct locking order is: SchedulingQueue.lock > lock > nominator.nLock.
130+
// "lock" should not be taken after taking "backoffQueue.lock" or "nominator.nLock".
131+
// Correct locking order is: SchedulingQueue.lock > lock > backoffQueue.lock > nominator.nLock.
124132
lock sync.RWMutex
125133

126134
// activeQ is heap structure that scheduler actively looks at to find pods to
@@ -132,6 +140,8 @@ type activeQueue struct {
132140
unlockedQueue *unlockedActiveQueue
133141

134142
// cond is a condition that is notified when the pod is added to activeQ.
143+
// When SchedulerPopFromBackoffQ feature is enabled,
144+
// condition is also notified when the pod is added to backoffQ.
135145
// It is used with lock.
136146
cond sync.Cond
137147

@@ -171,16 +181,21 @@ type activeQueue struct {
171181
isSchedulingQueueHintEnabled bool
172182

173183
metricsRecorder metrics.MetricAsyncRecorder
184+
185+
// backoffQPopper is used to pop from backoffQ when activeQ is empty.
186+
// It is non-nil only when SchedulerPopFromBackoffQ feature is enabled.
187+
backoffQPopper backoffQPopper
174188
}
175189

176-
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue {
190+
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder, backoffQPopper backoffQPopper) *activeQueue {
177191
aq := &activeQueue{
178192
queue: queue,
179193
inFlightPods: make(map[types.UID]*list.Element),
180194
inFlightEvents: list.New(),
181195
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
182196
metricsRecorder: metricRecorder,
183197
unlockedQueue: newUnlockedActiveQueue(queue),
198+
backoffQPopper: backoffQPopper,
184199
}
185200
aq.cond.L = &aq.lock
186201

@@ -238,7 +253,13 @@ func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
238253
}
239254

240255
func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
256+
var pInfo *framework.QueuedPodInfo
241257
for aq.queue.Len() == 0 {
258+
// backoffQPopper is non-nil only if SchedulerPopFromBackoffQ feature is enabled.
259+
// In case of non-empty backoffQ, try popping from there.
260+
if aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() != 0 {
261+
break
262+
}
242263
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
243264
// When Close() is called, the p.closed is set and the condition is broadcast,
244265
// which causes this loop to continue and return from the Pop().
@@ -250,7 +271,15 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo
250271
}
251272
pInfo, err := aq.queue.Pop()
252273
if err != nil {
253-
return nil, err
274+
if aq.backoffQPopper == nil {
275+
return nil, err
276+
}
277+
// Try to pop from backoffQ when activeQ is empty.
278+
pInfo, err = aq.backoffQPopper.popBackoff()
279+
if err != nil {
280+
return nil, err
281+
}
282+
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", framework.PopFromBackoffQ).Inc()
254283
}
255284
pInfo.Attempts++
256285
pInfo.BackoffExpiration = time.Time{}

‎pkg/scheduler/backend/queue/active_queue_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func TestClose(t *testing.T) {
3131
logger, ctx := ktesting.NewTestContext(t)
3232
rr := metrics.NewMetricsAsyncRecorder(10, time.Second, ctx.Done())
33-
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr)
33+
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](newDefaultQueueSort()), metrics.NewActivePodsRecorder()), true, *rr, nil)
3434

3535
aq.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
3636
unlockedActiveQ.add(&framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: st.MakePod().Namespace("foo").Name("p1").UID("p1").Obj()}}, framework.EventUnscheduledPodAdd.Label())

‎pkg/scheduler/backend/queue/backoff_queue.go

+69-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package queue
1818

1919
import (
20+
"sync"
2021
"time"
2122

2223
v1 "k8s.io/api/core/v1"
@@ -35,13 +36,14 @@ import (
3536
const backoffQOrderingWindowDuration = time.Second
3637

3738
// backoffQueuer is a wrapper for backoffQ related operations.
39+
// Its methods that relies on the queues, take the lock inside.
3840
type backoffQueuer interface {
3941
// isPodBackingoff returns true if a pod is still waiting for its backoff timer.
4042
// If this returns true, the pod should not be re-tried.
4143
// If the pod backoff time is in the actual ordering window, it should still be backing off.
4244
isPodBackingoff(podInfo *framework.QueuedPodInfo) bool
43-
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
44-
popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo))
45+
// popAllBackoffCompleted pops all pods from podBackoffQ and podErrorBackoffQ that completed backoff.
46+
popAllBackoffCompleted(logger klog.Logger) []*framework.QueuedPodInfo
4547

4648
// podInitialBackoffDuration returns initial backoff duration that pod can get.
4749
podInitialBackoffDuration() time.Duration
@@ -61,7 +63,8 @@ type backoffQueuer interface {
6163
// It returns new pod info if updated, nil otherwise.
6264
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
6365
// delete deletes the pInfo from backoffQueue.
64-
delete(pInfo *framework.QueuedPodInfo)
66+
// It returns true if the pod was deleted.
67+
delete(pInfo *framework.QueuedPodInfo) bool
6568
// get returns the pInfo matching given pInfoLookup, if exists.
6669
get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
6770
// has inform if pInfo exists in the queue.
@@ -75,6 +78,14 @@ type backoffQueuer interface {
7578
// backoffQueue implements backoffQueuer and wraps two queues inside,
7679
// providing seamless access as if it were one queue.
7780
type backoffQueue struct {
81+
// lock synchronizes all operations related to backoffQ.
82+
// It protects both podBackoffQ and podErrorBackoffQ.
83+
// Caution: DO NOT take "SchedulingQueue.lock" or "activeQueue.lock" after taking "lock".
84+
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first, otherwise the queue could end up in deadlock.
85+
// "lock" should not be taken after taking "nominator.nLock".
86+
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > lock > nominator.nLock.
87+
lock sync.RWMutex
88+
7889
clock clock.WithTicker
7990

8091
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
@@ -239,7 +250,8 @@ func (bq *backoffQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInf
239250
return duration
240251
}
241252

242-
func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo), queue *heap.Heap[*framework.QueuedPodInfo]) {
253+
func (bq *backoffQueue) popAllBackoffCompletedWithQueue(logger klog.Logger, queue *heap.Heap[*framework.QueuedPodInfo]) []*framework.QueuedPodInfo {
254+
var poppedPods []*framework.QueuedPodInfo
243255
for {
244256
pInfo, ok := queue.Peek()
245257
if !ok || pInfo == nil {
@@ -254,23 +266,27 @@ func (bq *backoffQueue) popEachBackoffCompletedWithQueue(logger klog.Logger, fn
254266
logger.Error(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
255267
break
256268
}
257-
if fn != nil {
258-
fn(pInfo)
259-
}
269+
poppedPods = append(poppedPods, pInfo)
260270
}
271+
return poppedPods
261272
}
262273

263-
// popEachBackoffCompleted run fn for all pods from podBackoffQ and podErrorBackoffQ that completed backoff while popping them.
264-
func (bq *backoffQueue) popEachBackoffCompleted(logger klog.Logger, fn func(pInfo *framework.QueuedPodInfo)) {
274+
// popAllBackoffCompleted pops all pods from podBackoffQ and podErrorBackoffQ that completed backoff.
275+
func (bq *backoffQueue) popAllBackoffCompleted(logger klog.Logger) []*framework.QueuedPodInfo {
276+
bq.lock.Lock()
277+
defer bq.lock.Unlock()
278+
265279
// Ensure both queues are called
266-
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podBackoffQ)
267-
bq.popEachBackoffCompletedWithQueue(logger, fn, bq.podErrorBackoffQ)
280+
return append(bq.popAllBackoffCompletedWithQueue(logger, bq.podBackoffQ), bq.popAllBackoffCompletedWithQueue(logger, bq.podErrorBackoffQ)...)
268281
}
269282

270283
// add adds the pInfo to backoffQueue.
271284
// The event should show which event triggered this addition and is used for the metric recording.
272285
// It also ensures that pInfo is not in both queues.
273286
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) {
287+
bq.lock.Lock()
288+
defer bq.lock.Unlock()
289+
274290
// If pod has empty both unschedulable plugins and pending plugins,
275291
// it means that it failed because of error and should be moved to podErrorBackoffQ.
276292
if pInfo.UnschedulablePlugins.Len() == 0 && pInfo.PendingPlugins.Len() == 0 {
@@ -297,6 +313,9 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo,
297313
// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
298314
// It returns new pod info if updated, nil otherwise.
299315
func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
316+
bq.lock.Lock()
317+
defer bq.lock.Unlock()
318+
300319
// If the pod is in the backoff queue, update it there.
301320
if pInfo, exists := bq.podBackoffQ.Get(oldPodInfo); exists {
302321
_ = pInfo.Update(newPod)
@@ -313,13 +332,32 @@ func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodIn
313332
}
314333

315334
// delete deletes the pInfo from backoffQueue.
316-
func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) {
317-
_ = bq.podBackoffQ.Delete(pInfo)
318-
_ = bq.podErrorBackoffQ.Delete(pInfo)
335+
// It returns true if the pod was deleted.
336+
func (bq *backoffQueue) delete(pInfo *framework.QueuedPodInfo) bool {
337+
bq.lock.Lock()
338+
defer bq.lock.Unlock()
339+
340+
if bq.podBackoffQ.Delete(pInfo) == nil {
341+
return true
342+
}
343+
return bq.podErrorBackoffQ.Delete(pInfo) == nil
344+
}
345+
346+
// popBackoff pops the pInfo from the podBackoffQ.
347+
// It returns error if the queue is empty.
348+
// This doesn't pop the pods from the podErrorBackoffQ.
349+
func (bq *backoffQueue) popBackoff() (*framework.QueuedPodInfo, error) {
350+
bq.lock.Lock()
351+
defer bq.lock.Unlock()
352+
353+
return bq.podBackoffQ.Pop()
319354
}
320355

321356
// get returns the pInfo matching given pInfoLookup, if exists.
322357
func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool) {
358+
bq.lock.RLock()
359+
defer bq.lock.RUnlock()
360+
323361
pInfo, exists := bq.podBackoffQ.Get(pInfoLookup)
324362
if exists {
325363
return pInfo, true
@@ -329,11 +367,17 @@ func (bq *backoffQueue) get(pInfoLookup *framework.QueuedPodInfo) (*framework.Qu
329367

330368
// has inform if pInfo exists in the queue.
331369
func (bq *backoffQueue) has(pInfo *framework.QueuedPodInfo) bool {
370+
bq.lock.RLock()
371+
defer bq.lock.RUnlock()
372+
332373
return bq.podBackoffQ.Has(pInfo) || bq.podErrorBackoffQ.Has(pInfo)
333374
}
334375

335376
// list returns all pods that are in the queue.
336377
func (bq *backoffQueue) list() []*v1.Pod {
378+
bq.lock.RLock()
379+
defer bq.lock.RUnlock()
380+
337381
var result []*v1.Pod
338382
for _, pInfo := range bq.podBackoffQ.List() {
339383
result = append(result, pInfo.Pod)
@@ -346,5 +390,16 @@ func (bq *backoffQueue) list() []*v1.Pod {
346390

347391
// len returns length of the queue.
348392
func (bq *backoffQueue) len() int {
393+
bq.lock.RLock()
394+
defer bq.lock.RUnlock()
395+
349396
return bq.podBackoffQ.Len() + bq.podErrorBackoffQ.Len()
350397
}
398+
399+
// lenBackoff returns length of the podBackoffQ.
400+
func (bq *backoffQueue) lenBackoff() int {
401+
bq.lock.RLock()
402+
defer bq.lock.RUnlock()
403+
404+
return bq.podBackoffQ.Len()
405+
}

‎pkg/scheduler/backend/queue/backoff_queue_test.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestBackoffQueue_calculateBackoffDuration(t *testing.T) {
7878
}
7979
}
8080

81-
func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
81+
func TestBackoffQueue_popAllBackoffCompleted(t *testing.T) {
8282
fakeClock := testingclock.NewFakeClock(time.Now())
8383
podInfos := map[string]*framework.QueuedPodInfo{
8484
"pod0": {
@@ -156,10 +156,11 @@ func TestBackoffQueue_popEachBackoffCompleted(t *testing.T) {
156156
for _, podName := range tt.podsInBackoff {
157157
bq.add(logger, podInfos[podName], framework.EventUnscheduledPodAdd.Label())
158158
}
159+
gotPodInfos := bq.popAllBackoffCompleted(logger)
159160
var gotPods []string
160-
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
161+
for _, pInfo := range gotPodInfos {
161162
gotPods = append(gotPods, pInfo.Pod.Name)
162-
})
163+
}
163164
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
164165
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
165166
}
@@ -248,10 +249,11 @@ func TestBackoffQueueOrdering(t *testing.T) {
248249
for _, podInfo := range podInfos {
249250
bq.add(logger, podInfo, framework.EventUnscheduledPodAdd.Label())
250251
}
252+
gotPodInfos := bq.popAllBackoffCompleted(logger)
251253
var gotPods []string
252-
bq.popEachBackoffCompleted(logger, func(pInfo *framework.QueuedPodInfo) {
254+
for _, pInfo := range gotPodInfos {
253255
gotPods = append(gotPods, pInfo.Pod.Name)
254-
})
256+
}
255257
if diff := cmp.Diff(tt.wantPods, gotPods); diff != "" {
256258
t.Errorf("Unexpected pods moved (-want, +got):\n%s", diff)
257259
}

‎pkg/scheduler/backend/queue/nominator.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ import (
3535
type nominator struct {
3636
// nLock synchronizes all operations related to nominator.
3737
// It should not be used anywhere else.
38-
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
39-
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
38+
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock" or "backoffQueue.lock") after taking "nLock".
39+
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" and "backoffQueue.lock" first,
4040
// otherwise the nominator could end up in deadlock.
41-
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
41+
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock = backoffQueue.lock > nLock.
4242
nLock sync.RWMutex
4343

4444
// podLister is used to verify if the given pod is alive.

0 commit comments

Comments
 (0)
Please sign in to comment.