Skip to content

Commit 6921fe7

Browse files
committed
Revert "Make Add non-blocking"
This reverts commit ce23de5. Speedup is tiny and at the expense of increased mem usage (which due to increasing GC pressure is likely the explanation why its so small), so doesn't seem worth it overall: ``` goos: darwin goarch: arm64 pkg: sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue cpu: Apple M2 Pro │ blocking.txt │ non-blocking.txt │ │ sec/op │ sec/op vs base │ AddGetDone-10 1.320m ± 1% 1.410m ± 0% +6.81% (p=0.000 n=10) AddOnly-10 373.9µ ± 1% 343.2µ ± 1% -8.22% (p=0.000 n=10) AddLockContended-10 375.8µ ± 1% 342.8µ ± 1% -8.78% (p=0.000 n=10) geomean 570.3µ 549.4µ -3.66% │ blocking.txt │ non-blocking.txt │ │ B/op │ B/op vs base │ AddGetDone-10 109.9Ki ± 0% 164.2Ki ± 0% +49.42% (p=0.000 n=10) AddOnly-10 553.0 ± 2% 56045.0 ± 0% +10034.72% (p=0.000 n=10) AddLockContended-10 569.0 ± 6% 56045.0 ± 0% +9749.74% (p=0.000 n=10) geomean 3.207Ki 78.94Ki +2361.60% │ blocking.txt │ non-blocking.txt │ │ allocs/op │ allocs/op vs base │ AddGetDone-10 3.013k ± 0% 5.001k ± 0% +65.98% (p=0.000 n=10) AddOnly-10 16.00 ± 6% 2000.00 ± 0% +12400.00% (p=0.000 n=10) AddLockContended-10 16.00 ± 6% 2000.00 ± 0% +12400.00% (p=0.000 n=10) geomean 91.71 2.715k +2860.01% ```
1 parent cb5131f commit 6921fe7

File tree

1 file changed

+38
-61
lines changed

1 file changed

+38
-61
lines changed

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 38 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
6060
items: map[T]*item[T]{},
6161
queue: btree.NewG(32, less[T]),
6262
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
63-
// Use a buffered channel to try and not block callers
64-
adds: make(chan *itemsWithOpts[T], 1000),
6563
// itemOrWaiterAdded indicates that an item or
6664
// waiter was added. It must be buffered, because
6765
// if we currently process items we can't tell
@@ -90,7 +88,6 @@ type priorityqueue[T comparable] struct {
9088
items map[T]*item[T]
9189
queue bTree[*item[T]]
9290
metrics queueMetrics[T]
93-
adds chan *itemsWithOpts[T]
9491

9592
// addedCounter is a counter of elements added, we need it
9693
// because unixNano is not guaranteed to be unique.
@@ -120,67 +117,54 @@ type priorityqueue[T comparable] struct {
120117
}
121118

122119
func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
123-
w.adds <- &itemsWithOpts[T]{opts: o, items: items}
124-
}
125-
126-
func (w *priorityqueue[T]) drainAddsLocked(added *itemsWithOpts[T]) {
127-
for {
128-
for _, key := range added.items {
129-
if added.opts.RateLimited {
130-
after := w.rateLimiter.When(key)
131-
if added.opts.After == 0 || after < added.opts.After {
132-
added.opts.After = after
133-
}
134-
}
135-
136-
var readyAt *time.Time
137-
if added.opts.After > 0 {
138-
readyAt = ptr.To(w.now().Add(added.opts.After))
139-
w.metrics.retry()
140-
}
141-
if _, ok := w.items[key]; !ok {
142-
item := &item[T]{
143-
key: key,
144-
addedAtUnixNano: w.now().UnixNano(),
145-
addedCounter: w.addedCounter,
146-
priority: added.opts.Priority,
147-
readyAt: readyAt,
148-
}
149-
w.items[key] = item
150-
w.queue.ReplaceOrInsert(item)
151-
w.metrics.add(key)
152-
w.addedCounter++
153-
continue
154-
}
120+
w.lock.Lock()
121+
defer w.lock.Unlock()
155122

156-
// The b-tree de-duplicates based on ordering and any change here
157-
// will affect the order - Just delete and re-add.
158-
item, _ := w.queue.Delete(w.items[key])
159-
if added.opts.Priority > item.priority {
160-
item.priority = added.opts.Priority
123+
for _, key := range items {
124+
if o.RateLimited {
125+
after := w.rateLimiter.When(key)
126+
if o.After == 0 || after < o.After {
127+
o.After = after
161128
}
129+
}
162130

163-
if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
164-
item.readyAt = readyAt
131+
var readyAt *time.Time
132+
if o.After > 0 {
133+
readyAt = ptr.To(w.now().Add(o.After))
134+
w.metrics.retry()
135+
}
136+
if _, ok := w.items[key]; !ok {
137+
item := &item[T]{
138+
key: key,
139+
addedAtUnixNano: w.now().UnixNano(),
140+
addedCounter: w.addedCounter,
141+
priority: o.Priority,
142+
readyAt: readyAt,
165143
}
166-
144+
w.items[key] = item
167145
w.queue.ReplaceOrInsert(item)
146+
w.metrics.add(key)
147+
w.addedCounter++
148+
continue
168149
}
169150

170-
// Drain the remainder of the channel. Has to be at the end,
171-
// because the first item is read in spin() and passed on
172-
// to us.
173-
select {
174-
case added = <-w.adds:
175-
default:
176-
return
151+
// The b-tree de-duplicates based on ordering and any change here
152+
// will affect the order - Just delete and re-add.
153+
item, _ := w.queue.Delete(w.items[key])
154+
if o.Priority > item.priority {
155+
item.priority = o.Priority
156+
}
157+
158+
if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
159+
item.readyAt = readyAt
177160
}
161+
162+
w.queue.ReplaceOrInsert(item)
178163
}
179-
}
180164

181-
type itemsWithOpts[T comparable] struct {
182-
opts AddOpts
183-
items []T
165+
if len(items) > 0 {
166+
w.notifyItemOrWaiterAdded()
167+
}
184168
}
185169

186170
func (w *priorityqueue[T]) notifyItemOrWaiterAdded() {
@@ -195,13 +179,11 @@ func (w *priorityqueue[T]) spin() {
195179
var nextReady <-chan time.Time
196180
nextReady = blockForever
197181

198-
var addedItem *itemsWithOpts[T]
199182
for {
200183
select {
201184
case <-w.done:
202185
return
203186
case <-w.itemOrWaiterAdded:
204-
case addedItem = <-w.adds:
205187
case <-nextReady:
206188
}
207189

@@ -211,11 +193,6 @@ func (w *priorityqueue[T]) spin() {
211193
w.lock.Lock()
212194
defer w.lock.Unlock()
213195

214-
if addedItem != nil {
215-
w.drainAddsLocked(addedItem)
216-
}
217-
addedItem = nil
218-
219196
w.lockedLock.Lock()
220197
defer w.lockedLock.Unlock()
221198

0 commit comments

Comments
 (0)