Skip to content

Commit ce23de5

Browse files
committed
Make Add non-blocking
1 parent feb8790 commit ce23de5

File tree

1 file changed

+61
-38
lines changed

1 file changed

+61
-38
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 61 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
6060
items: map[T]*item[T]{},
6161
queue: btree.NewG(32, less[T]),
6262
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
63+
// Use a buffered channel to try and not block callers
64+
adds: make(chan *itemsWithOpts[T], 1000),
6365
// itemOrWaiterAdded indicates that an item or
6466
// waiter was added. It must be buffered, because
6567
// if we currently process items we can't tell
@@ -88,6 +90,7 @@ type priorityqueue[T comparable] struct {
8890
items map[T]*item[T]
8991
queue bTree[*item[T]]
9092
metrics queueMetrics[T]
93+
adds chan *itemsWithOpts[T]
9194

9295
// addedCounter is a counter of elements added, we need it
9396
// because unixNano is not guaranteed to be unique.
@@ -117,54 +120,67 @@ type priorityqueue[T comparable] struct {
117120
}
118121

119122
func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
120-
w.lock.Lock()
121-
defer w.lock.Unlock()
123+
w.adds <- &itemsWithOpts[T]{opts: o, items: items}
124+
}
122125

123-
for _, key := range items {
124-
if o.RateLimited {
125-
after := w.rateLimiter.When(key)
126-
if o.After == 0 || after < o.After {
127-
o.After = after
126+
func (w *priorityqueue[T]) drainAddsLocked(added *itemsWithOpts[T]) {
127+
for {
128+
for _, key := range added.items {
129+
if added.opts.RateLimited {
130+
after := w.rateLimiter.When(key)
131+
if added.opts.After == 0 || after < added.opts.After {
132+
added.opts.After = after
133+
}
128134
}
129-
}
130135

131-
var readyAt *time.Time
132-
if o.After > 0 {
133-
readyAt = ptr.To(w.now().Add(o.After))
134-
w.metrics.retry()
135-
}
136-
if _, ok := w.items[key]; !ok {
137-
item := &item[T]{
138-
key: key,
139-
addedAtUnixNano: w.now().UnixNano(),
140-
addedCounter: w.addedCounter,
141-
priority: o.Priority,
142-
readyAt: readyAt,
136+
var readyAt *time.Time
137+
if added.opts.After > 0 {
138+
readyAt = ptr.To(w.now().Add(added.opts.After))
139+
w.metrics.retry()
140+
}
141+
if _, ok := w.items[key]; !ok {
142+
item := &item[T]{
143+
key: key,
144+
addedAtUnixNano: w.now().UnixNano(),
145+
addedCounter: w.addedCounter,
146+
priority: added.opts.Priority,
147+
readyAt: readyAt,
148+
}
149+
w.items[key] = item
150+
w.queue.ReplaceOrInsert(item)
151+
w.metrics.add(key)
152+
w.addedCounter++
153+
continue
143154
}
144-
w.items[key] = item
145-
w.queue.ReplaceOrInsert(item)
146-
w.metrics.add(key)
147-
w.addedCounter++
148-
continue
149-
}
150155

151-
// The b-tree de-duplicates based on ordering and any change here
152-
// will affect the order - Just delete and re-add.
153-
item, _ := w.queue.Delete(w.items[key])
154-
if o.Priority > item.priority {
155-
item.priority = o.Priority
156-
}
156+
// The b-tree de-duplicates based on ordering and any change here
157+
// will affect the order - Just delete and re-add.
158+
item, _ := w.queue.Delete(w.items[key])
159+
if added.opts.Priority > item.priority {
160+
item.priority = added.opts.Priority
161+
}
162+
163+
if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
164+
item.readyAt = readyAt
165+
}
157166

158-
if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
159-
item.readyAt = readyAt
167+
w.queue.ReplaceOrInsert(item)
160168
}
161169

162-
w.queue.ReplaceOrInsert(item)
170+
// Drain the remainder of the channel. Has to be at the end,
171+
// because the first item is read in spin() and passed on
172+
// to us.
173+
select {
174+
case added = <-w.adds:
175+
default:
176+
return
177+
}
163178
}
179+
}
164180

165-
if len(items) > 0 {
166-
w.notifyItemOrWaiterAdded()
167-
}
181+
type itemsWithOpts[T comparable] struct {
182+
opts AddOpts
183+
items []T
168184
}
169185

170186
func (w *priorityqueue[T]) notifyItemOrWaiterAdded() {
@@ -179,11 +195,13 @@ func (w *priorityqueue[T]) spin() {
179195
var nextReady <-chan time.Time
180196
nextReady = blockForever
181197

198+
var addedItem *itemsWithOpts[T]
182199
for {
183200
select {
184201
case <-w.done:
185202
return
186203
case <-w.itemOrWaiterAdded:
204+
case addedItem = <-w.adds:
187205
case <-nextReady:
188206
}
189207

@@ -193,6 +211,11 @@ func (w *priorityqueue[T]) spin() {
193211
w.lock.Lock()
194212
defer w.lock.Unlock()
195213

214+
if addedItem != nil {
215+
w.drainAddsLocked(addedItem)
216+
}
217+
addedItem = nil
218+
196219
w.lockedLock.Lock()
197220
defer w.lockedLock.Unlock()
198221

0 commit comments

Comments
 (0)