From 6f43d11b2fac87c5dd5771dd114570968ebd1b37 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 16 Nov 2024 00:36:26 -0500 Subject: [PATCH 01/19] :sparkling: POC of a priority queue This change contains the POC of a priority workqueue that allows to prioritize events over one another. It is opt-in and will by default de-prioritize events originating from the initial listwatch and from periodic resyncs. --- .golangci.yml | 8 + examples/priorityqueue/main.go | 73 ++++ pkg/builder/controller.go | 8 +- pkg/config/controller.go | 4 + pkg/controller/controller.go | 15 +- pkg/controllerworkqueue/metrics.go | 140 +++++++ pkg/controllerworkqueue/metrics_test.go | 134 +++++++ pkg/controllerworkqueue/workqueue.go | 358 ++++++++++++++++++ .../workqueue_suite_test.go | 13 + pkg/controllerworkqueue/workqueue_test.go | 263 +++++++++++++ pkg/handler/eventhandler.go | 45 +++ pkg/internal/metrics/workqueue.go | 131 +++++++ pkg/metrics/workqueue.go | 101 ----- 13 files changed, 1184 insertions(+), 109 deletions(-) create mode 100644 examples/priorityqueue/main.go create mode 100644 pkg/controllerworkqueue/metrics.go create mode 100644 pkg/controllerworkqueue/metrics_test.go create mode 100644 pkg/controllerworkqueue/workqueue.go create mode 100644 pkg/controllerworkqueue/workqueue_suite_test.go create mode 100644 pkg/controllerworkqueue/workqueue_test.go create mode 100644 pkg/internal/metrics/workqueue.go diff --git a/.golangci.yml b/.golangci.yml index e147e82d69..2b8ad145d1 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -165,6 +165,14 @@ issues: - linters: - dupl path: _test\.go + - linters: + - revive + path: .*/internal/.* + - linters: + - unused + # Seems to incorrectly trigger on the two implementations that are only + # used through an interface and not directly..? + path: pkg/controllerworkqueue/metrics\.go run: go: "1.23" diff --git a/examples/priorityqueue/main.go b/examples/priorityqueue/main.go new file mode 100644 index 0000000000..d6b25f6419 --- /dev/null +++ b/examples/priorityqueue/main.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + "time" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/builder" + kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/config" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func init() { +} + +func main() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func run() error { + log.SetLogger(zap.New()) + + // Setup a Manager + mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{ + Controller: config.Controller{UsePriorityQueue: true}, + }) + if err != nil { + return fmt.Errorf("failed to set up controller-manager: %w", err) + } + + if err := builder.ControllerManagedBy(mgr). + For(&corev1.ConfigMap{}). + Complete(reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) { + log.FromContext(ctx).Info("Reconciling") + time.Sleep(10 * time.Second) + + return reconcile.Result{}, nil + })); err != nil { + return fmt.Errorf("failed to set up controller: %w", err) + } + + if err := mgr.Start(signals.SetupSignalHandler()); err != nil { + return fmt.Errorf("failed to start manager: %w", err) + } + + return nil +} diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 6d906f6e52..0760953e02 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches( ) *TypedBuilder[request] { input := WatchesInput[request]{ obj: object, - handler: eventHandler, + handler: handler.WithLowPriorityWhenUnchanged(eventHandler), } for _, opt := range opts { opt.ApplyToWatches(&input) @@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error { } var hdler handler.TypedEventHandler[client.Object, request] - reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{})) + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}))) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, blder.forInput.predicates...) src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...) @@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error { } var hdler handler.TypedEventHandler[client.Object, request] - reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner( + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner( blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(), blder.forInput.object, opts..., - ))) + )))) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...) diff --git a/pkg/config/controller.go b/pkg/config/controller.go index 999ef07e21..6ae5c385e3 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -53,4 +53,8 @@ type Controller struct { // NeedLeaderElection indicates whether the controller needs to use leader election. // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool + + // UsePriorityQueue is experimental and configures if controllers that do not have a + // NewQueue() configured should default to the priority queue. + UsePriorityQueue bool } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f2496236db..05b50ee8dc 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/controllerworkqueue" "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -193,10 +194,16 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.NewQueue == nil { - options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { - return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ - Name: controllerName, - }) + if mgr.GetControllerOptions().UsePriorityQueue { + options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { + return controllerworkqueue.New[request](controllerName) + } + } else { + options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { + return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ + Name: controllerName, + }) + } } } diff --git a/pkg/controllerworkqueue/metrics.go b/pkg/controllerworkqueue/metrics.go new file mode 100644 index 0000000000..0e773d3c61 --- /dev/null +++ b/pkg/controllerworkqueue/metrics.go @@ -0,0 +1,140 @@ +package controllerworkqueue + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" +) + +// This file is mostly a copy of unexported code from +// https://github.com/kubernetes/kubernetes/blob/1d8828ce707ed9dd7a6a9756385419cce1d202ac/staging/src/k8s.io/client-go/util/workqueue/metrics.go + +type queueMetrics[T comparable] interface { + add(item T) + get(item T) + done(item T) + updateUnfinishedWork() +} + +func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, clock clock.Clock) queueMetrics[T] { + if len(name) == 0 { + return noMetrics[T]{} + } + return &defaultQueueMetrics[T]{ + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), + added: sets.Set[T]{}, + addTimes: map[T]time.Time{}, + processingStartTimes: map[T]time.Time{}, + } +} + +// defaultQueueMetrics expects the caller to lock before setting any metrics. +type defaultQueueMetrics[T comparable] struct { + clock clock.Clock + + // current depth of a workqueue + depth workqueue.GaugeMetric + // total number of adds handled by a workqueue + adds workqueue.CounterMetric + // how long an item stays in a workqueue + latency workqueue.HistogramMetric + // how long processing an item from a workqueue takes + workDuration workqueue.HistogramMetric + + mapLock sync.RWMutex + added sets.Set[T] + addTimes map[T]time.Time + processingStartTimes map[T]time.Time + + // how long have current threads been working? + unfinishedWorkSeconds workqueue.SettableGaugeMetric + longestRunningProcessor workqueue.SettableGaugeMetric +} + +func (m *defaultQueueMetrics[T]) add(item T) { + if m == nil { + return + } + + m.adds.Inc() + + m.mapLock.Lock() + defer m.mapLock.Unlock() + if !m.added.Has(item) { + m.added.Insert(item) + m.depth.Inc() + } + if _, exists := m.addTimes[item]; !exists { + m.addTimes[item] = m.clock.Now() + } +} + +func (m *defaultQueueMetrics[T]) get(item T) { + if m == nil { + return + } + + m.mapLock.Lock() + defer m.mapLock.Unlock() + + m.depth.Dec() + m.added.Delete(item) + + m.processingStartTimes[item] = m.clock.Now() + if startTime, exists := m.addTimes[item]; exists { + m.latency.Observe(m.sinceInSeconds(startTime)) + delete(m.addTimes, item) + } +} + +func (m *defaultQueueMetrics[T]) done(item T) { + if m == nil { + return + } + + m.mapLock.Lock() + defer m.mapLock.Unlock() + if startTime, exists := m.processingStartTimes[item]; exists { + m.workDuration.Observe(m.sinceInSeconds(startTime)) + delete(m.processingStartTimes, item) + } +} + +func (m *defaultQueueMetrics[T]) updateUnfinishedWork() { + m.mapLock.RLock() + defer m.mapLock.RUnlock() + // Note that a summary metric would be better for this, but prometheus + // doesn't seem to have non-hacky ways to reset the summary metrics. + var total float64 + var oldest float64 + for _, t := range m.processingStartTimes { + age := m.sinceInSeconds(t) + total += age + if age > oldest { + oldest = age + } + } + m.unfinishedWorkSeconds.Set(total) + m.longestRunningProcessor.Set(oldest) +} + +// Gets the time since the specified start in seconds. +func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 { + return m.clock.Since(start).Seconds() +} + +type noMetrics[T any] struct{} + +func (noMetrics[T]) add(item T) {} +func (noMetrics[T]) get(item T) {} +func (noMetrics[T]) done(item T) {} +func (noMetrics[T]) updateUnfinishedWork() {} diff --git a/pkg/controllerworkqueue/metrics_test.go b/pkg/controllerworkqueue/metrics_test.go new file mode 100644 index 0000000000..cde9155346 --- /dev/null +++ b/pkg/controllerworkqueue/metrics_test.go @@ -0,0 +1,134 @@ +package controllerworkqueue + +import ( + "sync" + + "k8s.io/client-go/util/workqueue" +) + +func newFakeMetricsProvider() *fakeMetricsProvider { + return &fakeMetricsProvider{ + depth: make(map[string]int), + adds: make(map[string]int), + latency: make(map[string][]float64), + workDuration: make(map[string][]float64), + unfinishedWorkSeconds: make(map[string]float64), + longestRunningProcessor: make(map[string]float64), + retries: make(map[string]int), + mu: sync.Mutex{}, + } +} + +type fakeMetricsProvider struct { + depth map[string]int + adds map[string]int + latency map[string][]float64 + workDuration map[string][]float64 + unfinishedWorkSeconds map[string]float64 + longestRunningProcessor map[string]float64 + retries map[string]int + mu sync.Mutex +} + +func (f *fakeMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { + f.mu.Lock() + defer f.mu.Unlock() + f.depth[name] = 0 + return &fakeGaugeMetric{m: &f.depth, mu: &f.mu, name: name} +} + +func (f *fakeMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { + f.mu.Lock() + defer f.mu.Unlock() + f.adds[name] = 0 + return &fakeCounterMetric{m: &f.adds, mu: &f.mu, name: name} +} + +func (f *fakeMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { + f.mu.Lock() + defer f.mu.Unlock() + f.latency[name] = []float64{} + return &fakeHistogramMetric{m: &f.latency, mu: &f.mu, name: name} +} + +func (f *fakeMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { + f.mu.Lock() + defer f.mu.Unlock() + f.workDuration[name] = []float64{} + return &fakeHistogramMetric{m: &f.workDuration, mu: &f.mu, name: name} +} + +func (f *fakeMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { + f.mu.Lock() + defer f.mu.Unlock() + f.unfinishedWorkSeconds[name] = 0 + return &fakeSettableGaugeMetric{m: &f.unfinishedWorkSeconds, mu: &f.mu, name: name} +} + +func (f *fakeMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { + f.mu.Lock() + defer f.mu.Unlock() + f.longestRunningProcessor[name] = 0 + return &fakeSettableGaugeMetric{m: &f.longestRunningProcessor, mu: &f.mu, name: name} +} + +func (f *fakeMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { + f.mu.Lock() + defer f.mu.Unlock() + f.retries[name] = 0 + return &fakeCounterMetric{m: &f.retries, mu: &f.mu, name: name} +} + +type fakeGaugeMetric struct { + m *map[string]int + mu *sync.Mutex + name string +} + +func (fg *fakeGaugeMetric) Inc() { + fg.mu.Lock() + defer fg.mu.Unlock() + (*fg.m)[fg.name]++ +} + +func (fg *fakeGaugeMetric) Dec() { + fg.mu.Lock() + defer fg.mu.Unlock() + (*fg.m)[fg.name]-- +} + +type fakeCounterMetric struct { + m *map[string]int + mu *sync.Mutex + name string +} + +func (fc *fakeCounterMetric) Inc() { + fc.mu.Lock() + defer fc.mu.Unlock() + (*fc.m)[fc.name]++ +} + +type fakeHistogramMetric struct { + m *map[string][]float64 + mu *sync.Mutex + name string +} + +func (fh *fakeHistogramMetric) Observe(v float64) { + fh.mu.Lock() + defer fh.mu.Unlock() + (*fh.m)[fh.name] = append((*fh.m)[fh.name], v) +} + +type fakeSettableGaugeMetric struct { + m *map[string]float64 + mu *sync.Mutex + name string +} + +func (fs *fakeSettableGaugeMetric) Set(v float64) { + fs.mu.Lock() + defer fs.mu.Unlock() + (*fs.m)[fs.name] = v +} diff --git a/pkg/controllerworkqueue/workqueue.go b/pkg/controllerworkqueue/workqueue.go new file mode 100644 index 0000000000..c7399378ee --- /dev/null +++ b/pkg/controllerworkqueue/workqueue.go @@ -0,0 +1,358 @@ +package controllerworkqueue + +import ( + "sort" + "sync" + "sync/atomic" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/internal/metrics" +) + +// AddOpts describes the options for adding items to the queue. +type AddOpts struct { + After time.Duration + RateLimited bool + Priority int +} + +// PriorityQueue is a priority queue for a controller. It +// internally de-duplicates all items that are added to +// it. It will use the max of the passed priorities and the +// min of possible durations. +type PriorityQueue[T comparable] interface { + workqueue.TypedRateLimitingInterface[T] + AddWithOpts(o AddOpts, Items ...T) + GetWithPriority() (item T, priority int, shutdown bool) +} + +// Opts contains the options for a PriorityQueue. +type Opts[T comparable] struct { + // Ratelimiter is being used when AddRateLimited is called. Defaults to a per-item exponential backoff + // limiter with an initial delay of five milliseconds and a max delay of 1000 seconds. + RateLimiter workqueue.TypedRateLimiter[T] + MetricProvider workqueue.MetricsProvider +} + +// Opt allows to configure a PriorityQueue. +type Opt[T comparable] func(*Opts[T]) + +// New constructs a new PriorityQueue. +func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { + opts := &Opts[T]{} + for _, f := range o { + f(opts) + } + + if opts.RateLimiter == nil { + opts.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, 1000*time.Second) + } + + if opts.MetricProvider == nil { + opts.MetricProvider = metrics.WorkqueueMetricsProvider{} + } + + cwq := &controllerworkqueue[T]{ + items: map[T]*item[T]{}, + queue: queue[T]{}, + tryPush: make(chan struct{}, 1), + rateLimiter: opts.RateLimiter, + locked: sets.Set[T]{}, + done: make(chan struct{}), + get: make(chan item[T]), + now: time.Now, + tick: time.Tick, + } + + go cwq.spin() + + return wrapWithMetrics(cwq, name, opts.MetricProvider) +} + +type controllerworkqueue[T comparable] struct { + // lock has to be acquired for any access to either items or queue + lock sync.Mutex + items map[T]*item[T] + queue queue[T] + + tryPush chan struct{} + + rateLimiter workqueue.TypedRateLimiter[T] + + // locked contains the keys we handed out through Get() and that haven't + // yet been returned through Done(). + locked sets.Set[T] + lockedLock sync.RWMutex + + shutdown atomic.Bool + done chan struct{} + + get chan item[T] + + // waiters is the number of routines blocked in Get, we use it to determine + // if we can push items. + waiters atomic.Int64 + + // Configurable for testing + now func() time.Time + tick func(time.Duration) <-chan time.Time +} + +func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) { + w.lock.Lock() + defer w.lock.Unlock() + + var hadChanges bool + for _, key := range items { + if o.RateLimited { + after := w.rateLimiter.When(key) + if o.After == 0 || after < o.After { + o.After = after + } + } + + var readyAt *time.Time + if o.After != 0 { + readyAt = ptr.To(w.now().Add(o.After)) + } + if _, ok := w.items[key]; !ok { + item := &item[T]{ + key: key, + priority: o.Priority, + readyAt: readyAt, + } + w.items[key] = item + w.queue = append(w.queue, item) + hadChanges = true + continue + } + + if o.Priority > w.items[key].priority { + w.items[key].priority = o.Priority + hadChanges = true + } + + if w.items[key].readyAt != nil && (readyAt == nil || readyAt.Before(*w.items[key].readyAt)) { + w.items[key].readyAt = readyAt + hadChanges = true + } + } + + if hadChanges { + sort.Stable(w.queue) + w.doTryPush() + } +} + +func (w *controllerworkqueue[T]) doTryPush() { + select { + case w.tryPush <- struct{}{}: + default: + } +} + +func (w *controllerworkqueue[T]) spin() { + blockForever := make(chan time.Time) + var nextReady <-chan time.Time + nextReady = blockForever + for { + select { + case <-w.done: + return + case <-w.tryPush: + case <-nextReady: + } + + nextReady = blockForever + + func() { + w.lock.Lock() + defer w.lock.Unlock() + + w.lockedLock.Lock() + defer w.lockedLock.Unlock() + + // toRemove is a list of indexes to remove from the queue. + // We can not do it in-place as we would be manipulating the + // slice we are iterating over. We have to do it backwards, as + // otherwise the indexes become invalid. + var toRemove []int + defer func() { + for i := len(toRemove) - 1; i >= 0; i-- { + idxToRemove := toRemove[i] + if idxToRemove == len(w.queue)-1 { + w.queue = w.queue[:idxToRemove] + } else { + w.queue = append(w.queue[:idxToRemove], w.queue[idxToRemove+1:]...) + } + } + }() + for idx, item := range w.queue { + if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways + return + } + // No next element we can process + if w.queue[0].readyAt != nil && w.queue[0].readyAt.After(w.now()) { + nextReady = w.tick(w.queue[0].readyAt.Sub(w.now())) + return + } + + // Item is locked, we can not hand it out + if w.locked.Has(item.key) { + continue + } + + w.get <- *item + w.locked.Insert(item.key) + delete(w.items, item.key) + w.waiters.Add(-1) + toRemove = append(toRemove, idx) + } + }() + } +} + +func (w *controllerworkqueue[T]) Add(item T) { + w.AddWithOpts(AddOpts{}, item) +} + +func (w *controllerworkqueue[T]) AddAfter(item T, after time.Duration) { + w.AddWithOpts(AddOpts{After: after}, item) +} + +func (w *controllerworkqueue[T]) AddRateLimited(item T) { + w.AddWithOpts(AddOpts{RateLimited: true}, item) +} + +func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) { + w.waiters.Add(1) + + w.doTryPush() + item := <-w.get + + return item.key, item.priority, w.shutdown.Load() +} + +func (w *controllerworkqueue[T]) Get() (item T, shutdown bool) { + key, _, shutdown := w.GetWithPriority() + return key, shutdown +} + +func (w *controllerworkqueue[T]) Forget(item T) { + w.rateLimiter.Forget(item) +} + +func (w *controllerworkqueue[T]) NumRequeues(item T) int { + return w.rateLimiter.NumRequeues(item) +} + +func (w *controllerworkqueue[T]) ShuttingDown() bool { + return w.shutdown.Load() +} + +func (w *controllerworkqueue[T]) Done(item T) { + w.lockedLock.Lock() + defer w.lockedLock.Unlock() + w.locked.Delete(item) + w.doTryPush() +} + +func (w *controllerworkqueue[T]) ShutDown() { + w.shutdown.Store(true) + close(w.done) +} + +func (w *controllerworkqueue[T]) ShutDownWithDrain() { + w.ShutDown() +} + +func (w *controllerworkqueue[T]) Len() int { + w.lock.Lock() + defer w.lock.Unlock() + + return len(w.queue) +} + +// queue is the actual queue. It implements heap.Interface. +type queue[T comparable] []*item[T] + +func (q queue[T]) Len() int { + return len(q) +} + +func (q queue[T]) Less(i, j int) bool { + switch { + case q[i].readyAt == nil && q[j].readyAt != nil: + return true + case q[i].readyAt != nil && q[j].readyAt == nil: + return false + case q[i].readyAt != nil && q[j].readyAt != nil: + return q[i].readyAt.Before(*q[j].readyAt) + } + + return q[i].priority > q[j].priority +} + +func (q queue[T]) Swap(i, j int) { + q[i], q[j] = q[j], q[i] +} + +type item[T comparable] struct { + key T + priority int + readyAt *time.Time +} + +func wrapWithMetrics[T comparable](q *controllerworkqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] { + mwq := &metricWrappedQueue[T]{ + controllerworkqueue: q, + metrics: newQueueMetrics[T](provider, name, clock.RealClock{}), + } + + go mwq.updateUnfinishedWorkLoop() + + return mwq +} + +type metricWrappedQueue[T comparable] struct { + *controllerworkqueue[T] + metrics queueMetrics[T] +} + +func (m *metricWrappedQueue[T]) AddWithOpts(o AddOpts, items ...T) { + for _, item := range items { + m.metrics.add(item) + } + m.controllerworkqueue.AddWithOpts(o, items...) +} + +func (m *metricWrappedQueue[T]) GetWithPriority() (T, int, bool) { + item, priority, shutdown := m.controllerworkqueue.GetWithPriority() + m.metrics.get(item) + return item, priority, shutdown +} + +func (m *metricWrappedQueue[T]) Get() (T, bool) { + item, _, shutdown := m.GetWithPriority() + return item, shutdown +} + +func (m *metricWrappedQueue[T]) Done(item T) { + m.metrics.done(item) + m.controllerworkqueue.Done(item) +} + +func (m *metricWrappedQueue[T]) updateUnfinishedWorkLoop() { + t := time.NewTicker(time.Millisecond) + defer t.Stop() + for range t.C { + if m.controllerworkqueue.ShuttingDown() { + return + } + m.metrics.updateUnfinishedWork() + } +} diff --git a/pkg/controllerworkqueue/workqueue_suite_test.go b/pkg/controllerworkqueue/workqueue_suite_test.go new file mode 100644 index 0000000000..3c6ba4fed1 --- /dev/null +++ b/pkg/controllerworkqueue/workqueue_suite_test.go @@ -0,0 +1,13 @@ +package controllerworkqueue + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestControllerWorkqueue(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ControllerWorkqueue Suite") +} diff --git a/pkg/controllerworkqueue/workqueue_test.go b/pkg/controllerworkqueue/workqueue_test.go new file mode 100644 index 0000000000..a3763739cf --- /dev/null +++ b/pkg/controllerworkqueue/workqueue_test.go @@ -0,0 +1,263 @@ +package controllerworkqueue + +import ( + "sync" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Controllerworkqueue", func() { + It("returns an item", func() { + q, metrics := newQueue() + defer q.ShutDown() + q.AddWithOpts(AddOpts{}, "foo") + + item, _, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(1)) + }) + + It("returns items in order", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo") + q.AddWithOpts(AddOpts{}, "bar") + + item, _, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("bar")) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(2)) + }) + + It("doesn't return an item that is currently locked", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo") + + item, _, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + + q.AddWithOpts(AddOpts{}, "foo") + q.AddWithOpts(AddOpts{}, "bar") + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("bar")) + + Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.adds["test"]).To(Equal(3)) + }) + + It("returns an item as soon as its unlocked", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo") + + item, _, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + + q.AddWithOpts(AddOpts{}, "foo") + q.AddWithOpts(AddOpts{}, "bar") + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("bar")) + + q.AddWithOpts(AddOpts{}, "baz") + q.Done("foo") + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("foo")) + + Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.adds["test"]).To(Equal(4)) + }) + + It("de-duplicates items", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo") + q.AddWithOpts(AddOpts{}, "foo") + + Consistently(q.Len).Should(Equal(1)) + + cwq := q.(*metricWrappedQueue[string]) + cwq.lockedLock.Lock() + Expect(cwq.locked.Len()).To(Equal(0)) + + Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.adds["test"]).To(Equal(2)) + }) + + It("retains the highest priority", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{Priority: 1}, "foo") + q.AddWithOpts(AddOpts{Priority: 2}, "foo") + + item, priority, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(2)) + + Expect(q.Len()).To(Equal(0)) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(2)) + }) + + It("gets pushed to the front if the priority increases", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo") + q.AddWithOpts(AddOpts{}, "bar") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{Priority: 1}, "baz") + + item, priority, _ := q.GetWithPriority() + Expect(item).To(Equal("baz")) + Expect(priority).To(Equal(1)) + + Expect(q.Len()).To(Equal(2)) + + Expect(metrics.depth["test"]).To(Equal(2)) + Expect(metrics.adds["test"]).To(Equal(4)) + }) + + It("retains the lowest after duration", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: 0}, "foo") + q.AddWithOpts(AddOpts{After: time.Hour}, "foo") + + item, priority, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + Expect(priority).To(Equal(0)) + + Expect(q.Len()).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(2)) + }) + + It("returns an item only after after has passed", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*metricWrappedQueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(time.Second)) + return tick + } + + retrievedItem := make(chan struct{}) + + go func() { + defer GinkgoRecover() + q.GetWithPriority() + close(retrievedItem) + }() + + q.AddWithOpts(AddOpts{After: time.Second}, "foo") + + Consistently(retrievedItem).ShouldNot(BeClosed()) + + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now + Eventually(retrievedItem).Should(BeClosed()) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(1)) + }) + + It("returns multiple items with after in correct order", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*metricWrappedQueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(200 * time.Millisecond)) + return tick + } + + retrievedItem := make(chan struct{}) + retrievedSecondItem := make(chan struct{}) + + go func() { + defer GinkgoRecover() + first, _, _ := q.GetWithPriority() + Expect(first).To(Equal("bar")) + close(retrievedItem) + + second, _, _ := q.GetWithPriority() + Expect(second).To(Equal("foo")) + close(retrievedSecondItem) + }() + + q.AddWithOpts(AddOpts{After: time.Second}, "foo") + q.AddWithOpts(AddOpts{After: 200 * time.Millisecond}, "bar") + + Consistently(retrievedItem).ShouldNot(BeClosed()) + + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now + Eventually(retrievedItem).Should(BeClosed()) + Eventually(retrievedSecondItem).Should(BeClosed()) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(2)) + }) +}) + +func BenchmarkAddGetDone(b *testing.B) { + q := New[int]("") + defer q.ShutDown() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < 1000; i++ { + q.Add(i) + } + for range 1000 { + item, _ := q.Get() + q.Done(item) + } + } +} + +func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { + metrics := newFakeMetricsProvider() + q := New("test", func(o *Opts[string]) { + o.MetricProvider = metrics + }) + return q, metrics +} diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index ea4bcee31e..48711fd11f 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -18,9 +18,11 @@ package handler import ( "context" + "time" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controllerworkqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -133,3 +135,46 @@ func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedG h.GenericFunc(ctx, e, q) } } + +// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if +// and only if a controllerworkqueue.PriorityQueue is used. If not, it does nothing. +func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] { + return TypedFuncs[object, request]{ + CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { + // Due to how the handlers are factored, we have to wrap the workqueue to be able + // to inject custom behavior. + u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: trli, + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + priorityQueue, isPriorityQueue := q.(controllerworkqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + if isObjectUnchanged(tce) { + priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: -1}, item) + } + }, + }) + }, + UpdateFunc: u.Update, + DeleteFunc: u.Delete, + GenericFunc: u.Generic, + } +} + +type workqueueWithCustomAddFunc[request comparable] struct { + workqueue.TypedRateLimitingInterface[request] + addFunc func(item request, q workqueue.TypedRateLimitingInterface[request]) +} + +func (w workqueueWithCustomAddFunc[request]) Add(item request) { + w.addFunc(item, w.TypedRateLimitingInterface) +} + +// isObjectUnchanged checks if the object in a create event is unchanged, for example because +// we got it in our initial listwatch or because of a resync. The heuristic it uses is to check +// if the object is older than one minute. +func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { + return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) +} diff --git a/pkg/internal/metrics/workqueue.go b/pkg/internal/metrics/workqueue.go new file mode 100644 index 0000000000..86da340af8 --- /dev/null +++ b/pkg/internal/metrics/workqueue.go @@ -0,0 +1,131 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +// This file is copied and adapted from k8s.io/component-base/metrics/prometheus/workqueue +// which registers metrics to the k8s legacy Registry. We require very +// similar functionality, but must register metrics to a different Registry. + +// Metrics subsystem and all keys used by the workqueue. +const ( + WorkQueueSubsystem = metrics.WorkQueueSubsystem + DepthKey = metrics.DepthKey + AddsKey = metrics.AddsKey + QueueLatencyKey = metrics.QueueLatencyKey + WorkDurationKey = metrics.WorkDurationKey + UnfinishedWorkKey = metrics.UnfinishedWorkKey + LongestRunningProcessorKey = metrics.LongestRunningProcessorKey + RetriesKey = metrics.RetriesKey +) + +var ( + depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: DepthKey, + Help: "Current depth of workqueue", + }, []string{"name", "controller"}) + + adds = prometheus.NewCounterVec(prometheus.CounterOpts{ + Subsystem: WorkQueueSubsystem, + Name: AddsKey, + Help: "Total number of adds handled by workqueue", + }, []string{"name", "controller"}) + + latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: WorkQueueSubsystem, + Name: QueueLatencyKey, + Help: "How long in seconds an item stays in workqueue before being requested", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + }, []string{"name", "controller"}) + + workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: WorkQueueSubsystem, + Name: WorkDurationKey, + Help: "How long in seconds processing an item from workqueue takes.", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + }, []string{"name", "controller"}) + + unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: UnfinishedWorkKey, + Help: "How many seconds of work has been done that " + + "is in progress and hasn't been observed by work_duration. Large " + + "values indicate stuck threads. One can deduce the number of stuck " + + "threads by observing the rate at which this increases.", + }, []string{"name", "controller"}) + + longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: LongestRunningProcessorKey, + Help: "How many seconds has the longest running " + + "processor for workqueue been running.", + }, []string{"name", "controller"}) + + retries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Subsystem: WorkQueueSubsystem, + Name: RetriesKey, + Help: "Total number of retries handled by workqueue", + }, []string{"name", "controller"}) +) + +func init() { + metrics.Registry.MustRegister(depth) + metrics.Registry.MustRegister(adds) + metrics.Registry.MustRegister(latency) + metrics.Registry.MustRegister(workDuration) + metrics.Registry.MustRegister(unfinished) + metrics.Registry.MustRegister(longestRunningProcessor) + metrics.Registry.MustRegister(retries) + + workqueue.SetProvider(WorkqueueMetricsProvider{}) +} + +type WorkqueueMetricsProvider struct{} + +func (WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { + return depth.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { + return adds.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { + return latency.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { + return workDuration.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { + return unfinished.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { + return longestRunningProcessor.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { + return retries.WithLabelValues(name, name) +} diff --git a/pkg/metrics/workqueue.go b/pkg/metrics/workqueue.go index 590653e70f..cd7ccc773e 100644 --- a/pkg/metrics/workqueue.go +++ b/pkg/metrics/workqueue.go @@ -16,15 +16,6 @@ limitations under the License. package metrics -import ( - "github.com/prometheus/client_golang/prometheus" - "k8s.io/client-go/util/workqueue" -) - -// This file is copied and adapted from k8s.io/component-base/metrics/prometheus/workqueue -// which registers metrics to the k8s legacy Registry. We require very -// similar functionality, but must register metrics to a different Registry. - // Metrics subsystem and all keys used by the workqueue. const ( WorkQueueSubsystem = "workqueue" @@ -36,95 +27,3 @@ const ( LongestRunningProcessorKey = "longest_running_processor_seconds" RetriesKey = "retries_total" ) - -var ( - depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: DepthKey, - Help: "Current depth of workqueue", - }, []string{"name", "controller"}) - - adds = prometheus.NewCounterVec(prometheus.CounterOpts{ - Subsystem: WorkQueueSubsystem, - Name: AddsKey, - Help: "Total number of adds handled by workqueue", - }, []string{"name", "controller"}) - - latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: QueueLatencyKey, - Help: "How long in seconds an item stays in workqueue before being requested", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), - }, []string{"name", "controller"}) - - workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: WorkDurationKey, - Help: "How long in seconds processing an item from workqueue takes.", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), - }, []string{"name", "controller"}) - - unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: UnfinishedWorkKey, - Help: "How many seconds of work has been done that " + - "is in progress and hasn't been observed by work_duration. Large " + - "values indicate stuck threads. One can deduce the number of stuck " + - "threads by observing the rate at which this increases.", - }, []string{"name", "controller"}) - - longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: LongestRunningProcessorKey, - Help: "How many seconds has the longest running " + - "processor for workqueue been running.", - }, []string{"name", "controller"}) - - retries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Subsystem: WorkQueueSubsystem, - Name: RetriesKey, - Help: "Total number of retries handled by workqueue", - }, []string{"name", "controller"}) -) - -func init() { - Registry.MustRegister(depth) - Registry.MustRegister(adds) - Registry.MustRegister(latency) - Registry.MustRegister(workDuration) - Registry.MustRegister(unfinished) - Registry.MustRegister(longestRunningProcessor) - Registry.MustRegister(retries) - - workqueue.SetProvider(workqueueMetricsProvider{}) -} - -type workqueueMetricsProvider struct{} - -func (workqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { - return depth.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { - return adds.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { - return latency.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { - return workDuration.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { - return unfinished.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { - return longestRunningProcessor.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { - return retries.WithLabelValues(name, name) -} From 105de4f669b74b63c331ad02cfb002851e05850d Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Mon, 18 Nov 2024 20:31:21 -0500 Subject: [PATCH 02/19] Use a btree, it is faster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` $ benchstat slice.txt btree.txt goos: darwin goarch: arm64 pkg: sigs.k8s.io/controller-runtime/pkg/controllerworkqueue cpu: Apple M2 Pro │ slice.txt │ btree.txt │ │ sec/op │ sec/op vs base │ AddGetDone-10 5.078m ± 0% 1.163m ± 0% -77.09% (p=0.000 n=10) │ slice.txt │ btree.txt │ │ B/op │ B/op vs base │ AddGetDone-10 55.11Ki ± 0% 46.98Ki ± 0% -14.75% (p=0.000 n=10) │ slice.txt │ btree.txt │ │ allocs/op │ allocs/op vs base │ AddGetDone-10 3.000k ± 0% 1.000k ± 0% -66.67% (p=0.000 n=10) ``` --- .gomodcheck.yaml | 3 + examples/scratch-env/go.mod | 1 + examples/scratch-env/go.sum | 2 + go.mod | 2 + go.sum | 2 + pkg/controllerworkqueue/workqueue.go | 118 ++++++++++++--------------- 6 files changed, 64 insertions(+), 64 deletions(-) diff --git a/.gomodcheck.yaml b/.gomodcheck.yaml index 75c5261fde..3608de331d 100644 --- a/.gomodcheck.yaml +++ b/.gomodcheck.yaml @@ -12,3 +12,6 @@ excludedModules: # --- test dependencies: - github.com/onsi/ginkgo/v2 - github.com/onsi/gomega + + # --- We want a newer version with generics support for this + - github.com/google/btree diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index 118cb055aa..f8600130e6 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -22,6 +22,7 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/btree v1.1.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 2350452cd3..42d40227c3 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -33,6 +33,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/go.mod b/go.mod index 2006ab27e6..267f21ea70 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +require github.com/google/btree v1.1.3 + require ( cel.dev/expr v0.18.0 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect diff --git a/go.sum b/go.sum index 81caa20e5d..685a9ea774 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= diff --git a/pkg/controllerworkqueue/workqueue.go b/pkg/controllerworkqueue/workqueue.go index c7399378ee..615c72578f 100644 --- a/pkg/controllerworkqueue/workqueue.go +++ b/pkg/controllerworkqueue/workqueue.go @@ -1,11 +1,11 @@ package controllerworkqueue import ( - "sort" "sync" "sync/atomic" "time" + "github.com/google/btree" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" @@ -58,7 +58,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { cwq := &controllerworkqueue[T]{ items: map[T]*item[T]{}, - queue: queue[T]{}, + queue: btree.NewG(32, less[T]), tryPush: make(chan struct{}, 1), rateLimiter: opts.RateLimiter, locked: sets.Set[T]{}, @@ -77,12 +77,16 @@ type controllerworkqueue[T comparable] struct { // lock has to be acquired for any access to either items or queue lock sync.Mutex items map[T]*item[T] - queue queue[T] + queue *btree.BTreeG[*item[T]] tryPush chan struct{} rateLimiter workqueue.TypedRateLimiter[T] + // addedCounter is a counter of elements added, we need it + // because unixNano is not guaranteed to be unique. + addedCounter uint64 + // locked contains the keys we handed out through Get() and that haven't // yet been returned through Done(). locked sets.Set[T] @@ -106,7 +110,6 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.lock.Lock() defer w.lock.Unlock() - var hadChanges bool for _, key := range items { if o.RateLimited { after := w.rateLimiter.When(key) @@ -121,30 +124,30 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) { } if _, ok := w.items[key]; !ok { item := &item[T]{ - key: key, - priority: o.Priority, - readyAt: readyAt, + key: key, + addedAtUnixNano: w.now().UnixNano(), + addedCounter: w.addedCounter, + priority: o.Priority, + readyAt: readyAt, } w.items[key] = item - w.queue = append(w.queue, item) - hadChanges = true + w.queue.ReplaceOrInsert(item) + w.addedCounter++ continue } - if o.Priority > w.items[key].priority { - w.items[key].priority = o.Priority - hadChanges = true + // The b-tree de-duplicates based on ordering and any change here + // will affect the order - Just delete and re-add. + item, _ := w.queue.Delete(w.items[key]) + if o.Priority > item.priority { + item.priority = o.Priority } - if w.items[key].readyAt != nil && (readyAt == nil || readyAt.Before(*w.items[key].readyAt)) { - w.items[key].readyAt = readyAt - hadChanges = true + if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { + item.readyAt = readyAt } - } - if hadChanges { - sort.Stable(w.queue) - w.doTryPush() + w.queue.ReplaceOrInsert(item) } } @@ -176,42 +179,30 @@ func (w *controllerworkqueue[T]) spin() { w.lockedLock.Lock() defer w.lockedLock.Unlock() - // toRemove is a list of indexes to remove from the queue. - // We can not do it in-place as we would be manipulating the - // slice we are iterating over. We have to do it backwards, as - // otherwise the indexes become invalid. - var toRemove []int - defer func() { - for i := len(toRemove) - 1; i >= 0; i-- { - idxToRemove := toRemove[i] - if idxToRemove == len(w.queue)-1 { - w.queue = w.queue[:idxToRemove] - } else { - w.queue = append(w.queue[:idxToRemove], w.queue[idxToRemove+1:]...) - } - } - }() - for idx, item := range w.queue { + w.queue.Ascend(func(item *item[T]) bool { if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways - return + return false } + // No next element we can process - if w.queue[0].readyAt != nil && w.queue[0].readyAt.After(w.now()) { - nextReady = w.tick(w.queue[0].readyAt.Sub(w.now())) - return + if item.readyAt != nil && item.readyAt.After(w.now()) { + nextReady = w.tick(item.readyAt.Sub(w.now())) + return false } // Item is locked, we can not hand it out if w.locked.Has(item.key) { - continue + return true } w.get <- *item w.locked.Insert(item.key) - delete(w.items, item.key) w.waiters.Add(-1) - toRemove = append(toRemove, idx) - } + delete(w.items, item.key) + w.queue.Delete(item) + + return true + }) }() } } @@ -274,37 +265,36 @@ func (w *controllerworkqueue[T]) Len() int { w.lock.Lock() defer w.lock.Unlock() - return len(w.queue) + return w.queue.Len() } -// queue is the actual queue. It implements heap.Interface. -type queue[T comparable] []*item[T] - -func (q queue[T]) Len() int { - return len(q) -} - -func (q queue[T]) Less(i, j int) bool { - switch { - case q[i].readyAt == nil && q[j].readyAt != nil: +func less[T comparable](a, b *item[T]) bool { + if a.readyAt == nil && b.readyAt != nil { return true - case q[i].readyAt != nil && q[j].readyAt == nil: + } + if a.readyAt != nil && b.readyAt == nil { return false - case q[i].readyAt != nil && q[j].readyAt != nil: - return q[i].readyAt.Before(*q[j].readyAt) + } + if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) { + return a.readyAt.Before(*b.readyAt) + } + if a.priority != b.priority { + return a.priority > b.priority } - return q[i].priority > q[j].priority -} + if a.addedAtUnixNano != b.addedAtUnixNano { + return a.addedAtUnixNano < b.addedAtUnixNano + } -func (q queue[T]) Swap(i, j int) { - q[i], q[j] = q[j], q[i] + return a.addedCounter < b.addedCounter } type item[T comparable] struct { - key T - priority int - readyAt *time.Time + key T + addedAtUnixNano int64 + addedCounter uint64 + priority int + readyAt *time.Time } func wrapWithMetrics[T comparable](q *controllerworkqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] { From 0a24be1a00234bf9a7ae0967a149c4f34c57245a Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Mon, 18 Nov 2024 23:26:30 -0500 Subject: [PATCH 03/19] Add fuzztest and fix bug it found --- pkg/controllerworkqueue/workqueue.go | 43 ++++---- pkg/controllerworkqueue/workqueue_test.go | 118 ++++++++++++++++++++++ 2 files changed, 144 insertions(+), 17 deletions(-) diff --git a/pkg/controllerworkqueue/workqueue.go b/pkg/controllerworkqueue/workqueue.go index 615c72578f..23e489ec95 100644 --- a/pkg/controllerworkqueue/workqueue.go +++ b/pkg/controllerworkqueue/workqueue.go @@ -57,15 +57,19 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } cwq := &controllerworkqueue[T]{ - items: map[T]*item[T]{}, - queue: btree.NewG(32, less[T]), - tryPush: make(chan struct{}, 1), - rateLimiter: opts.RateLimiter, - locked: sets.Set[T]{}, - done: make(chan struct{}), - get: make(chan item[T]), - now: time.Now, - tick: time.Tick, + items: map[T]*item[T]{}, + queue: btree.NewG(32, less[T]), + // itemOrWaiterAdded indicates that an item or + // waiter was added. It must be buffered, because + // if we currently process items we can't tell + // if that included the new item/waiter. + itemOrWaiterAdded: make(chan struct{}, 1), + rateLimiter: opts.RateLimiter, + locked: sets.Set[T]{}, + done: make(chan struct{}), + get: make(chan item[T]), + now: time.Now, + tick: time.Tick, } go cwq.spin() @@ -79,7 +83,7 @@ type controllerworkqueue[T comparable] struct { items map[T]*item[T] queue *btree.BTreeG[*item[T]] - tryPush chan struct{} + itemOrWaiterAdded chan struct{} rateLimiter workqueue.TypedRateLimiter[T] @@ -119,7 +123,7 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) { } var readyAt *time.Time - if o.After != 0 { + if o.After > 0 { readyAt = ptr.To(w.now().Add(o.After)) } if _, ok := w.items[key]; !ok { @@ -151,9 +155,9 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) { } } -func (w *controllerworkqueue[T]) doTryPush() { +func (w *controllerworkqueue[T]) notifyItemOrWaiterAdded() { select { - case w.tryPush <- struct{}{}: + case w.itemOrWaiterAdded <- struct{}{}: default: } } @@ -162,11 +166,12 @@ func (w *controllerworkqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever + for { select { case <-w.done: return - case <-w.tryPush: + case <-w.itemOrWaiterAdded: case <-nextReady: } @@ -186,7 +191,11 @@ func (w *controllerworkqueue[T]) spin() { // No next element we can process if item.readyAt != nil && item.readyAt.After(w.now()) { - nextReady = w.tick(item.readyAt.Sub(w.now())) + readyAt := item.readyAt.Sub(w.now()) + if readyAt <= 0 { // Toctou race with the above check + readyAt = 1 + } + nextReady = w.tick(readyAt) return false } @@ -222,7 +231,7 @@ func (w *controllerworkqueue[T]) AddRateLimited(item T) { func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) { w.waiters.Add(1) - w.doTryPush() + w.notifyItemOrWaiterAdded() item := <-w.get return item.key, item.priority, w.shutdown.Load() @@ -249,7 +258,7 @@ func (w *controllerworkqueue[T]) Done(item T) { w.lockedLock.Lock() defer w.lockedLock.Unlock() w.locked.Delete(item) - w.doTryPush() + w.notifyItemOrWaiterAdded() } func (w *controllerworkqueue[T]) ShutDown() { diff --git a/pkg/controllerworkqueue/workqueue_test.go b/pkg/controllerworkqueue/workqueue_test.go index a3763739cf..1be035f28a 100644 --- a/pkg/controllerworkqueue/workqueue_test.go +++ b/pkg/controllerworkqueue/workqueue_test.go @@ -5,8 +5,10 @@ import ( "testing" "time" + fuzz "github.com/google/gofuzz" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/util/sets" ) var _ = Describe("Controllerworkqueue", func() { @@ -254,6 +256,122 @@ func BenchmarkAddGetDone(b *testing.B) { } } +// TestFuzzPrioriorityQueue validates a set of basic +// invariants that should always be true: +// +// - The queue is threadsafe when multiple producers and consumers +// are involved +// - There are no deadlocks +// - An item is never handed out again before it is returned +// - Items in the queue are de-duplicated +// - max(existing priority, new priority) is used +func TestFuzzPrioriorityQueue(t *testing.T) { + t.Parallel() + + seed := time.Now().UnixNano() + t.Logf("seed: %d", seed) + f := fuzz.NewWithSeed(seed) + fuzzLock := sync.Mutex{} + fuzz := func(in any) { + fuzzLock.Lock() + defer fuzzLock.Unlock() + + f.Fuzz(in) + } + + inQueue := map[string]int{} + inQueueLock := sync.Mutex{} + + handedOut := sets.Set[string]{} + handedOutLock := sync.Mutex{} + + wg := sync.WaitGroup{} + q, _ := newQueue() + + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + + for range 1000 { + opts, item := AddOpts{}, "" + + fuzz(&opts) + fuzz(&item) + + if opts.After > 100*time.Millisecond { + opts.After = 10 * time.Millisecond + } + opts.RateLimited = false + + func() { + inQueueLock.Lock() + defer inQueueLock.Unlock() + + q.AddWithOpts(opts, item) + if existingPriority, exists := inQueue[item]; !exists || existingPriority < opts.Priority { + inQueue[item] = opts.Priority + } + }() + } + }() + } + + for range 100 { + wg.Add(1) + + go func() { + defer wg.Done() + + for { + item, cont := func() (string, bool) { + inQueueLock.Lock() + defer inQueueLock.Unlock() + + if len(inQueue) == 0 { + return "", false + } + + item, priority, _ := q.GetWithPriority() + if expected := inQueue[item]; expected != priority { + t.Errorf("got priority %d, expected %d", priority, expected) + } + delete(inQueue, item) + return item, true + }() + + if !cont { + return + } + + func() { + handedOutLock.Lock() + defer handedOutLock.Unlock() + + if handedOut.Has(item) { + t.Errorf("item %s got handed out more than once", item) + } + handedOut.Insert(item) + }() + + func() { + handedOutLock.Lock() + defer handedOutLock.Unlock() + + handedOut.Delete(item) + q.Done(item) + }() + } + }() + } + + wg.Wait() + + if expected := len(inQueue); expected != q.Len() { + t.Errorf("Expected queue length to be %d, was %d", expected, q.Len()) + } +} + func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { metrics := newFakeMetricsProvider() q := New("test", func(o *Opts[string]) { From 2dcf0c10f86269820f5ceb61ed2ee0c18c29ade9 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Tue, 19 Nov 2024 21:07:04 -0500 Subject: [PATCH 04/19] Fix handler --- pkg/handler/eventhandler.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 48711fd11f..286ca1a9c2 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -136,6 +136,9 @@ func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedG } } +// LowPriority is the priority set by WithLowPriorityWhenUnchanged +const LowPriority = -100 + // WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if // and only if a controllerworkqueue.PriorityQueue is used. If not, it does nothing. func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] { @@ -151,13 +154,31 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty q.Add(item) return } + var priority int if isObjectUnchanged(tce) { - priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: -1}, item) + priority = LowPriority + } + priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: priority}, item) + }, + }) + }, + UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { + u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: trli, + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + priorityQueue, isPriorityQueue := q.(controllerworkqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + var priority int + if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() { + priority = LowPriority } + priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: priority}, item) }, }) }, - UpdateFunc: u.Update, DeleteFunc: u.Delete, GenericFunc: u.Generic, } From c861416e40f0d81a098ef2ea6d5f679245d1e146 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Tue, 19 Nov 2024 21:18:45 -0500 Subject: [PATCH 05/19] Move into package priorityqueue --- pkg/controller/controller.go | 4 +- .../priorityqueue}/metrics.go | 2 +- .../priorityqueue}/metrics_test.go | 2 +- .../priorityqueue/priorityqueue.go} | 52 +++++++++---------- .../priorityqueue_suite_test.go} | 2 +- .../priorityqueue/priorityqueue_test.go} | 2 +- pkg/handler/eventhandler.go | 12 ++--- 7 files changed, 38 insertions(+), 38 deletions(-) rename pkg/{controllerworkqueue => controller/priorityqueue}/metrics.go (99%) rename pkg/{controllerworkqueue => controller/priorityqueue}/metrics_test.go (99%) rename pkg/{controllerworkqueue/workqueue.go => controller/priorityqueue/priorityqueue.go} (83%) rename pkg/{controllerworkqueue/workqueue_suite_test.go => controller/priorityqueue/priorityqueue_suite_test.go} (87%) rename pkg/{controllerworkqueue/workqueue_test.go => controller/priorityqueue/priorityqueue_test.go} (99%) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 05b50ee8dc..56417c0a61 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -25,7 +25,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/controllerworkqueue" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -196,7 +196,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt if options.NewQueue == nil { if mgr.GetControllerOptions().UsePriorityQueue { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { - return controllerworkqueue.New[request](controllerName) + return priorityqueue.New[request](controllerName) } } else { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { diff --git a/pkg/controllerworkqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go similarity index 99% rename from pkg/controllerworkqueue/metrics.go rename to pkg/controller/priorityqueue/metrics.go index 0e773d3c61..a4589fab80 100644 --- a/pkg/controllerworkqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -1,4 +1,4 @@ -package controllerworkqueue +package priorityqueue import ( "sync" diff --git a/pkg/controllerworkqueue/metrics_test.go b/pkg/controller/priorityqueue/metrics_test.go similarity index 99% rename from pkg/controllerworkqueue/metrics_test.go rename to pkg/controller/priorityqueue/metrics_test.go index cde9155346..634250b93e 100644 --- a/pkg/controllerworkqueue/metrics_test.go +++ b/pkg/controller/priorityqueue/metrics_test.go @@ -1,4 +1,4 @@ -package controllerworkqueue +package priorityqueue import ( "sync" diff --git a/pkg/controllerworkqueue/workqueue.go b/pkg/controller/priorityqueue/priorityqueue.go similarity index 83% rename from pkg/controllerworkqueue/workqueue.go rename to pkg/controller/priorityqueue/priorityqueue.go index 23e489ec95..61fcccecdc 100644 --- a/pkg/controllerworkqueue/workqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -1,4 +1,4 @@ -package controllerworkqueue +package priorityqueue import ( "sync" @@ -56,7 +56,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { opts.MetricProvider = metrics.WorkqueueMetricsProvider{} } - cwq := &controllerworkqueue[T]{ + cwq := &priorityqueue[T]{ items: map[T]*item[T]{}, queue: btree.NewG(32, less[T]), // itemOrWaiterAdded indicates that an item or @@ -77,7 +77,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { return wrapWithMetrics(cwq, name, opts.MetricProvider) } -type controllerworkqueue[T comparable] struct { +type priorityqueue[T comparable] struct { // lock has to be acquired for any access to either items or queue lock sync.Mutex items map[T]*item[T] @@ -110,7 +110,7 @@ type controllerworkqueue[T comparable] struct { tick func(time.Duration) <-chan time.Time } -func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) { +func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.lock.Lock() defer w.lock.Unlock() @@ -155,14 +155,14 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) { } } -func (w *controllerworkqueue[T]) notifyItemOrWaiterAdded() { +func (w *priorityqueue[T]) notifyItemOrWaiterAdded() { select { case w.itemOrWaiterAdded <- struct{}{}: default: } } -func (w *controllerworkqueue[T]) spin() { +func (w *priorityqueue[T]) spin() { blockForever := make(chan time.Time) var nextReady <-chan time.Time nextReady = blockForever @@ -216,19 +216,19 @@ func (w *controllerworkqueue[T]) spin() { } } -func (w *controllerworkqueue[T]) Add(item T) { +func (w *priorityqueue[T]) Add(item T) { w.AddWithOpts(AddOpts{}, item) } -func (w *controllerworkqueue[T]) AddAfter(item T, after time.Duration) { +func (w *priorityqueue[T]) AddAfter(item T, after time.Duration) { w.AddWithOpts(AddOpts{After: after}, item) } -func (w *controllerworkqueue[T]) AddRateLimited(item T) { +func (w *priorityqueue[T]) AddRateLimited(item T) { w.AddWithOpts(AddOpts{RateLimited: true}, item) } -func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) { +func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) { w.waiters.Add(1) w.notifyItemOrWaiterAdded() @@ -237,40 +237,40 @@ func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown return item.key, item.priority, w.shutdown.Load() } -func (w *controllerworkqueue[T]) Get() (item T, shutdown bool) { +func (w *priorityqueue[T]) Get() (item T, shutdown bool) { key, _, shutdown := w.GetWithPriority() return key, shutdown } -func (w *controllerworkqueue[T]) Forget(item T) { +func (w *priorityqueue[T]) Forget(item T) { w.rateLimiter.Forget(item) } -func (w *controllerworkqueue[T]) NumRequeues(item T) int { +func (w *priorityqueue[T]) NumRequeues(item T) int { return w.rateLimiter.NumRequeues(item) } -func (w *controllerworkqueue[T]) ShuttingDown() bool { +func (w *priorityqueue[T]) ShuttingDown() bool { return w.shutdown.Load() } -func (w *controllerworkqueue[T]) Done(item T) { +func (w *priorityqueue[T]) Done(item T) { w.lockedLock.Lock() defer w.lockedLock.Unlock() w.locked.Delete(item) w.notifyItemOrWaiterAdded() } -func (w *controllerworkqueue[T]) ShutDown() { +func (w *priorityqueue[T]) ShutDown() { w.shutdown.Store(true) close(w.done) } -func (w *controllerworkqueue[T]) ShutDownWithDrain() { +func (w *priorityqueue[T]) ShutDownWithDrain() { w.ShutDown() } -func (w *controllerworkqueue[T]) Len() int { +func (w *priorityqueue[T]) Len() int { w.lock.Lock() defer w.lock.Unlock() @@ -306,10 +306,10 @@ type item[T comparable] struct { readyAt *time.Time } -func wrapWithMetrics[T comparable](q *controllerworkqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] { +func wrapWithMetrics[T comparable](q *priorityqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] { mwq := &metricWrappedQueue[T]{ - controllerworkqueue: q, - metrics: newQueueMetrics[T](provider, name, clock.RealClock{}), + priorityqueue: q, + metrics: newQueueMetrics[T](provider, name, clock.RealClock{}), } go mwq.updateUnfinishedWorkLoop() @@ -318,7 +318,7 @@ func wrapWithMetrics[T comparable](q *controllerworkqueue[T], name string, provi } type metricWrappedQueue[T comparable] struct { - *controllerworkqueue[T] + *priorityqueue[T] metrics queueMetrics[T] } @@ -326,11 +326,11 @@ func (m *metricWrappedQueue[T]) AddWithOpts(o AddOpts, items ...T) { for _, item := range items { m.metrics.add(item) } - m.controllerworkqueue.AddWithOpts(o, items...) + m.priorityqueue.AddWithOpts(o, items...) } func (m *metricWrappedQueue[T]) GetWithPriority() (T, int, bool) { - item, priority, shutdown := m.controllerworkqueue.GetWithPriority() + item, priority, shutdown := m.priorityqueue.GetWithPriority() m.metrics.get(item) return item, priority, shutdown } @@ -342,14 +342,14 @@ func (m *metricWrappedQueue[T]) Get() (T, bool) { func (m *metricWrappedQueue[T]) Done(item T) { m.metrics.done(item) - m.controllerworkqueue.Done(item) + m.priorityqueue.Done(item) } func (m *metricWrappedQueue[T]) updateUnfinishedWorkLoop() { t := time.NewTicker(time.Millisecond) defer t.Stop() for range t.C { - if m.controllerworkqueue.ShuttingDown() { + if m.priorityqueue.ShuttingDown() { return } m.metrics.updateUnfinishedWork() diff --git a/pkg/controllerworkqueue/workqueue_suite_test.go b/pkg/controller/priorityqueue/priorityqueue_suite_test.go similarity index 87% rename from pkg/controllerworkqueue/workqueue_suite_test.go rename to pkg/controller/priorityqueue/priorityqueue_suite_test.go index 3c6ba4fed1..71bc5ba049 100644 --- a/pkg/controllerworkqueue/workqueue_suite_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_suite_test.go @@ -1,4 +1,4 @@ -package controllerworkqueue +package priorityqueue import ( "testing" diff --git a/pkg/controllerworkqueue/workqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go similarity index 99% rename from pkg/controllerworkqueue/workqueue_test.go rename to pkg/controller/priorityqueue/priorityqueue_test.go index 1be035f28a..b3d7411d68 100644 --- a/pkg/controllerworkqueue/workqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -1,4 +1,4 @@ -package controllerworkqueue +package priorityqueue import ( "sync" diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 286ca1a9c2..9fd912f882 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -22,7 +22,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controllerworkqueue" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -140,7 +140,7 @@ func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedG const LowPriority = -100 // WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if -// and only if a controllerworkqueue.PriorityQueue is used. If not, it does nothing. +// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing. func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] { return TypedFuncs[object, request]{ CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { @@ -149,7 +149,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{ TypedRateLimitingInterface: trli, addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - priorityQueue, isPriorityQueue := q.(controllerworkqueue.PriorityQueue[request]) + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) if !isPriorityQueue { q.Add(item) return @@ -158,7 +158,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty if isObjectUnchanged(tce) { priority = LowPriority } - priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: priority}, item) + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) }, }) }, @@ -166,7 +166,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{ TypedRateLimitingInterface: trli, addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - priorityQueue, isPriorityQueue := q.(controllerworkqueue.PriorityQueue[request]) + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) if !isPriorityQueue { q.Add(item) return @@ -175,7 +175,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() { priority = LowPriority } - priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: priority}, item) + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) }, }) }, From c567047cfbca24b946675d7e8e619f4af44d5b4b Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Tue, 19 Nov 2024 23:19:20 -0500 Subject: [PATCH 06/19] Metric: Adds are only counted if the object didn't exist yet --- .golangci.yml | 2 +- pkg/controller/priorityqueue/metrics.go | 3 + pkg/controller/priorityqueue/priorityqueue.go | 84 ++++++------------- .../priorityqueue/priorityqueue_test.go | 14 ++-- 4 files changed, 38 insertions(+), 65 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2b8ad145d1..33285d112c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -172,7 +172,7 @@ issues: - unused # Seems to incorrectly trigger on the two implementations that are only # used through an interface and not directly..? - path: pkg/controllerworkqueue/metrics\.go + path: pkg/controller/priorityqueue/metrics\.go run: go: "1.23" diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index a4589fab80..8038941482 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -11,6 +11,9 @@ import ( // This file is mostly a copy of unexported code from // https://github.com/kubernetes/kubernetes/blob/1d8828ce707ed9dd7a6a9756385419cce1d202ac/staging/src/k8s.io/client-go/util/workqueue/metrics.go +// +// The only difference is the addition of mapLock in defaultQueueMetrics, we want to avoid the need of synchronizing updateUnfinishedWork() +// with the queue. type queueMetrics[T comparable] interface { add(item T) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 61fcccecdc..bca4067542 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -56,9 +56,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { opts.MetricProvider = metrics.WorkqueueMetricsProvider{} } - cwq := &priorityqueue[T]{ - items: map[T]*item[T]{}, - queue: btree.NewG(32, less[T]), + pq := &priorityqueue[T]{ + items: map[T]*item[T]{}, + queue: btree.NewG(32, less[T]), + metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), // itemOrWaiterAdded indicates that an item or // waiter was added. It must be buffered, because // if we currently process items we can't tell @@ -72,25 +73,30 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { tick: time.Tick, } - go cwq.spin() + go pq.spin() + if _, ok := pq.metrics.(noMetrics[T]); !ok { + go pq.updateUnfinishedWorkLoop() + } - return wrapWithMetrics(cwq, name, opts.MetricProvider) + return pq } type priorityqueue[T comparable] struct { - // lock has to be acquired for any access to either items or queue - lock sync.Mutex - items map[T]*item[T] - queue *btree.BTreeG[*item[T]] - - itemOrWaiterAdded chan struct{} - - rateLimiter workqueue.TypedRateLimiter[T] + // lock has to be acquired for any access any of items, queue, addedCounter + // or metrics. + lock sync.Mutex + items map[T]*item[T] + queue *btree.BTreeG[*item[T]] + metrics queueMetrics[T] // addedCounter is a counter of elements added, we need it // because unixNano is not guaranteed to be unique. addedCounter uint64 + itemOrWaiterAdded chan struct{} + + rateLimiter workqueue.TypedRateLimiter[T] + // locked contains the keys we handed out through Get() and that haven't // yet been returned through Done(). locked sets.Set[T] @@ -136,6 +142,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } w.items[key] = item w.queue.ReplaceOrInsert(item) + w.metrics.add(key) w.addedCounter++ continue } @@ -204,11 +211,12 @@ func (w *priorityqueue[T]) spin() { return true } - w.get <- *item + w.metrics.get(item.key) w.locked.Insert(item.key) w.waiters.Add(-1) delete(w.items, item.key) w.queue.Delete(item) + w.get <- *item return true }) @@ -258,6 +266,7 @@ func (w *priorityqueue[T]) Done(item T) { w.lockedLock.Lock() defer w.lockedLock.Unlock() w.locked.Delete(item) + w.metrics.done(item) w.notifyItemOrWaiterAdded() } @@ -306,52 +315,13 @@ type item[T comparable] struct { readyAt *time.Time } -func wrapWithMetrics[T comparable](q *priorityqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] { - mwq := &metricWrappedQueue[T]{ - priorityqueue: q, - metrics: newQueueMetrics[T](provider, name, clock.RealClock{}), - } - - go mwq.updateUnfinishedWorkLoop() - - return mwq -} - -type metricWrappedQueue[T comparable] struct { - *priorityqueue[T] - metrics queueMetrics[T] -} - -func (m *metricWrappedQueue[T]) AddWithOpts(o AddOpts, items ...T) { - for _, item := range items { - m.metrics.add(item) - } - m.priorityqueue.AddWithOpts(o, items...) -} - -func (m *metricWrappedQueue[T]) GetWithPriority() (T, int, bool) { - item, priority, shutdown := m.priorityqueue.GetWithPriority() - m.metrics.get(item) - return item, priority, shutdown -} - -func (m *metricWrappedQueue[T]) Get() (T, bool) { - item, _, shutdown := m.GetWithPriority() - return item, shutdown -} - -func (m *metricWrappedQueue[T]) Done(item T) { - m.metrics.done(item) - m.priorityqueue.Done(item) -} - -func (m *metricWrappedQueue[T]) updateUnfinishedWorkLoop() { - t := time.NewTicker(time.Millisecond) +func (w *priorityqueue[T]) updateUnfinishedWorkLoop() { + t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 defer t.Stop() for range t.C { - if m.priorityqueue.ShuttingDown() { + if w.shutdown.Load() { return } - m.metrics.updateUnfinishedWork() + w.metrics.updateUnfinishedWork() } } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index b3d7411d68..584edbc398 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -90,12 +90,12 @@ var _ = Describe("Controllerworkqueue", func() { Consistently(q.Len).Should(Equal(1)) - cwq := q.(*metricWrappedQueue[string]) + cwq := q.(*priorityqueue[string]) cwq.lockedLock.Lock() Expect(cwq.locked.Len()).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(1)) - Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.adds["test"]).To(Equal(1)) }) It("retains the highest priority", func() { @@ -112,7 +112,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(0)) - Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.adds["test"]).To(Equal(1)) }) It("gets pushed to the front if the priority increases", func() { @@ -131,7 +131,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(2)) Expect(metrics.depth["test"]).To(Equal(2)) - Expect(metrics.adds["test"]).To(Equal(4)) + Expect(metrics.adds["test"]).To(Equal(3)) }) It("retains the lowest after duration", func() { @@ -147,7 +147,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(0)) - Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.adds["test"]).To(Equal(1)) }) It("returns an item only after after has passed", func() { @@ -158,7 +158,7 @@ var _ = Describe("Controllerworkqueue", func() { nowLock := sync.Mutex{} tick := make(chan time.Time) - cwq := q.(*metricWrappedQueue[string]) + cwq := q.(*priorityqueue[string]) cwq.now = func() time.Time { nowLock.Lock() defer nowLock.Unlock() @@ -199,7 +199,7 @@ var _ = Describe("Controllerworkqueue", func() { nowLock := sync.Mutex{} tick := make(chan time.Time) - cwq := q.(*metricWrappedQueue[string]) + cwq := q.(*priorityqueue[string]) cwq.now = func() time.Time { nowLock.Lock() defer nowLock.Unlock() From c1bc89e270d8ff0862c8732727291e34048548cb Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Wed, 20 Nov 2024 18:20:22 -0500 Subject: [PATCH 07/19] Validate correct usage of btree and tick --- pkg/controller/priorityqueue/priorityqueue.go | 9 ++++- .../priorityqueue/priorityqueue_test.go | 34 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index bca4067542..89f2f7f646 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -86,7 +86,7 @@ type priorityqueue[T comparable] struct { // or metrics. lock sync.Mutex items map[T]*item[T] - queue *btree.BTreeG[*item[T]] + queue bTree[*item[T]] metrics queueMetrics[T] // addedCounter is a counter of elements added, we need it @@ -325,3 +325,10 @@ func (w *priorityqueue[T]) updateUnfinishedWorkLoop() { w.metrics.updateUnfinishedWork() } } + +type bTree[T any] interface { + ReplaceOrInsert(item T) (_ T, _ bool) + Delete(item T) (T, bool) + Ascend(iterator btree.ItemIteratorG[T]) + Len() int +} diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 584edbc398..ecc086686a 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -1,6 +1,7 @@ package priorityqueue import ( + "fmt" "sync" "testing" "time" @@ -377,5 +378,38 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { q := New("test", func(o *Opts[string]) { o.MetricProvider = metrics }) + q.(*priorityqueue[string]).queue = &btreeInteractionValidator{ + bTree: q.(*priorityqueue[string]).queue, + } + + upstreamTick := q.(*priorityqueue[string]).tick + q.(*priorityqueue[string]).tick = func(d time.Duration) <-chan time.Time { + if d <= 0 { + panic(fmt.Sprintf("got non-positive tick: %v", d)) + } + return upstreamTick(d) + } return q, metrics } + +type btreeInteractionValidator struct { + bTree[*item[string]] +} + +func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[string], bool) { + // There is no codepath that updates an item + item, alreadyExist := b.bTree.ReplaceOrInsert(item) + if alreadyExist { + panic(fmt.Sprintf("ReplaceOrInsert: item %v already existed", item)) + } + return item, alreadyExist +} + +func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) { + // There is node codepath that deletes an item that doesn't exist + old, existed := b.bTree.Delete(item) + if !existed { + panic(fmt.Sprintf("Delete: item %v not found", item)) + } + return old, existed +} From 6e5beb55006ef4557b25f18ae48c6fcd52759e90 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Wed, 20 Nov 2024 19:08:48 -0500 Subject: [PATCH 08/19] Add retry metrics --- pkg/controller/priorityqueue/metrics.go | 12 ++++++++++-- pkg/controller/priorityqueue/priorityqueue.go | 1 + pkg/controller/priorityqueue/priorityqueue_test.go | 2 ++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index 8038941482..bfb31ffc1e 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -12,14 +12,14 @@ import ( // This file is mostly a copy of unexported code from // https://github.com/kubernetes/kubernetes/blob/1d8828ce707ed9dd7a6a9756385419cce1d202ac/staging/src/k8s.io/client-go/util/workqueue/metrics.go // -// The only difference is the addition of mapLock in defaultQueueMetrics, we want to avoid the need of synchronizing updateUnfinishedWork() -// with the queue. +// The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics. type queueMetrics[T comparable] interface { add(item T) get(item T) done(item T) updateUnfinishedWork() + retry() } func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, clock clock.Clock) queueMetrics[T] { @@ -37,6 +37,7 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl added: sets.Set[T]{}, addTimes: map[T]time.Time{}, processingStartTimes: map[T]time.Time{}, + retries: mp.NewRetriesMetric(name), } } @@ -61,6 +62,8 @@ type defaultQueueMetrics[T comparable] struct { // how long have current threads been working? unfinishedWorkSeconds workqueue.SettableGaugeMetric longestRunningProcessor workqueue.SettableGaugeMetric + + retries workqueue.CounterMetric } func (m *defaultQueueMetrics[T]) add(item T) { @@ -135,9 +138,14 @@ func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 { return m.clock.Since(start).Seconds() } +func (m *defaultQueueMetrics[T]) retry() { + m.retries.Inc() +} + type noMetrics[T any] struct{} func (noMetrics[T]) add(item T) {} func (noMetrics[T]) get(item T) {} func (noMetrics[T]) done(item T) {} func (noMetrics[T]) updateUnfinishedWork() {} +func (noMetrics[T]) retry() {} diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 89f2f7f646..15b935ede2 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -131,6 +131,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { var readyAt *time.Time if o.After > 0 { readyAt = ptr.To(w.now().Add(o.After)) + w.metrics.retry() } if _, ok := w.items[key]; !ok { item := &item[T]{ diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index ecc086686a..d4f04930b0 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -23,6 +23,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(0)) Expect(metrics.adds["test"]).To(Equal(1)) + Expect(metrics.retries["test"]).To(Equal(0)) }) It("returns items in order", func() { @@ -190,6 +191,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(0)) Expect(metrics.adds["test"]).To(Equal(1)) + Expect(metrics.retries["test"]).To(Equal(1)) }) It("returns multiple items with after in correct order", func() { From 0edbea21fcd0cf6f2787d1b2a50b089be0fb41a0 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 23 Nov 2024 19:38:25 -0500 Subject: [PATCH 09/19] Fix missing notification when item is added --- pkg/controller/priorityqueue/priorityqueue.go | 4 ++ .../priorityqueue/priorityqueue_test.go | 43 ++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 15b935ede2..e462a48dc2 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -161,6 +161,10 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.queue.ReplaceOrInsert(item) } + + if len(items) > 0 { + w.notifyItemOrWaiterAdded() + } } func (w *priorityqueue[T]) notifyItemOrWaiterAdded() { diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index d4f04930b0..405649f4de 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -194,6 +194,31 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) + It("returns an item to a waiter as soon as it has one", func() { + q, metrics := newQueue() + defer q.ShutDown() + + retrieved := make(chan struct{}) + go func() { + defer GinkgoRecover() + item, _, _ := q.GetWithPriority() + Expect(item).To(Equal("foo")) + close(retrieved) + }() + + // We are waiting for the GetWithPriority() call to be blocked + // on retrieving an item. As golang doesn't provide a way to + // check if something is listening on a channel without + // sending them a message, I can't think of a way to do this + // without sleeping. + time.Sleep(time.Second) + q.AddWithOpts(AddOpts{}, "foo") + Eventually(retrieved).Should(BeClosed()) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(1)) + }) + It("returns multiple items with after in correct order", func() { q, metrics := newQueue() defer q.ShutDown() @@ -209,7 +234,23 @@ var _ = Describe("Controllerworkqueue", func() { return now } cwq.tick = func(d time.Duration) <-chan time.Time { - Expect(d).To(Equal(200 * time.Millisecond)) + // What a bunch of bs. Deferring in here causes + // ginkgo to deadlock, presumably because it + // never returns after the defer. Not deferring + // hides the actual assertion result and makes + // it complain that there should be a defer. + // Move the assertion into a goroutine just to + // get around that mess. + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(done) + + // This is not deterministic and depends on which of + // Add() or Spin() gets the lock first. + Expect(d).To(Or(Equal(200*time.Millisecond), Equal(time.Second))) + }() + <-done return tick } From 382fb9cf634dbf4f1c105d63aec4e6f3c6714f93 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 23 Nov 2024 19:59:41 -0500 Subject: [PATCH 10/19] Add tests for handler --- pkg/handler/eventhandler_test.go | 137 +++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 38b5040971..5679d9dffe 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -773,4 +774,140 @@ var _ = Describe("Eventhandler", func() { instance.Generic(ctx, evt, q) }) }) + + Describe("WithLowPriorityWhenUnchanged", func() { + It("should lower the priority of a create request for an object that was crated more than one minute in the past", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } + + h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) + h.Create(ctx, event.CreateEvent{ + Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) + + It("should not lower the priority of a create request for an object that was crated less than one minute in the past", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } + + h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) + h.Create(ctx, event.CreateEvent{ + Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + CreationTimestamp: metav1.Now(), + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) + + It("should lower the priority of an update request with unchanged RV", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } + + h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) + h.Update(ctx, event.UpdateEvent{ + ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + }}, + ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) + + It("should not lower the priority of an update request with changed RV", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } + + h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) + h.Update(ctx, event.UpdateEvent{ + ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + }}, + ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + ResourceVersion: "1", + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) + + It("should have no effect on create if the workqueue is not a priorityqueue", func() { + h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) + h.Create(ctx, event.CreateEvent{ + Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + }}, + }, q) + + Expect(q.Len()).To(Equal(1)) + item, _ := q.Get() + Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}})) + }) + + It("should have no effect on Update if the workqueue is not a priorityqueue", func() { + h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) + h.Update(ctx, event.UpdateEvent{ + ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + }}, + ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + }}, + }, q) + + Expect(q.Len()).To(Equal(1)) + item, _ := q.Get() + Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}})) + }) + }) + }) + +type fakePriorityQueue struct { + workqueue.TypedRateLimitingInterface[reconcile.Request] + addWithOpts func(o priorityqueue.AddOpts, items ...reconcile.Request) +} + +func (f *fakePriorityQueue) AddWithOpts(o priorityqueue.AddOpts, items ...reconcile.Request) { + f.addWithOpts(o, items...) +} +func (f *fakePriorityQueue) GetWithPriority() (item reconcile.Request, priority int, shutdown bool) { + panic("GetWithPriority is not expected to be called") +} From 4ad2ae6edcb6c18ed40bdd037b765250399e3c03 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 23 Nov 2024 20:09:03 -0500 Subject: [PATCH 11/19] Controller tests --- pkg/controller/controller_test.go | 37 +++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index b69840af84..02fbf27dc2 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller" @@ -437,5 +438,41 @@ var _ = Describe("controller.Controller", func() { _, ok := c.(manager.LeaderElectionRunnable) Expect(ok).To(BeTrue()) }) + + It("should configure a priority queue if UsePriorityQueue is set", func() { + m, err := manager.New(cfg, manager.Options{ + Controller: config.Controller{UsePriorityQueue: true}, + }) + Expect(err).NotTo(HaveOccurred()) + + c, err := controller.New("new-controller-16", m, controller.Options{ + Reconciler: rec, + }) + Expect(err).NotTo(HaveOccurred()) + + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + + q := ctrl.NewQueue("foo", nil) + _, ok = q.(priorityqueue.PriorityQueue[reconcile.Request]) + Expect(ok).To(BeTrue()) + }) + + It("should not configure a priority queue if UsePriorityQueue is not set", func() { + m, err := manager.New(cfg, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + c, err := controller.New("new-controller-17", m, controller.Options{ + Reconciler: rec, + }) + Expect(err).NotTo(HaveOccurred()) + + ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request]) + Expect(ok).To(BeTrue()) + + q := ctrl.NewQueue("foo", nil) + _, ok = q.(priorityqueue.PriorityQueue[reconcile.Request]) + Expect(ok).To(BeFalse()) + }) }) }) From e9cdce82d73c3a83891a046671f291ac6fbc2fef Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 23 Nov 2024 21:09:37 -0500 Subject: [PATCH 12/19] Add some benchmarks --- .../priorityqueue/priorityqueue_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 405649f4de..13bd5fc8d3 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -300,6 +300,34 @@ func BenchmarkAddGetDone(b *testing.B) { } } +func BenchmarkAddOnly(b *testing.B) { + q := New[int]("") + defer q.ShutDown() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < 1000; i++ { + q.Add(i) + } + } +} + +func BenchmarkAddLockContended(b *testing.B) { + q := New[int]("") + defer q.ShutDown() + go func() { + for range 1000 { + item, _ := q.Get() + q.Done(item) + } + }() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < 1000; i++ { + q.Add(i) + } + } +} + // TestFuzzPrioriorityQueue validates a set of basic // invariants that should always be true: // From cb5131fb5524f46a22a92f132db8071bb9925b2c Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 23 Nov 2024 20:39:21 -0500 Subject: [PATCH 13/19] Make Add non-blocking --- pkg/controller/priorityqueue/priorityqueue.go | 99 ++++++++++++------- 1 file changed, 61 insertions(+), 38 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index e462a48dc2..96702e3468 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -60,6 +60,8 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { items: map[T]*item[T]{}, queue: btree.NewG(32, less[T]), metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), + // Use a buffered channel to try and not block callers + adds: make(chan *itemsWithOpts[T], 1000), // itemOrWaiterAdded indicates that an item or // waiter was added. It must be buffered, because // if we currently process items we can't tell @@ -88,6 +90,7 @@ type priorityqueue[T comparable] struct { items map[T]*item[T] queue bTree[*item[T]] metrics queueMetrics[T] + adds chan *itemsWithOpts[T] // addedCounter is a counter of elements added, we need it // because unixNano is not guaranteed to be unique. @@ -117,54 +120,67 @@ type priorityqueue[T comparable] struct { } func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { - w.lock.Lock() - defer w.lock.Unlock() + w.adds <- &itemsWithOpts[T]{opts: o, items: items} +} - for _, key := range items { - if o.RateLimited { - after := w.rateLimiter.When(key) - if o.After == 0 || after < o.After { - o.After = after +func (w *priorityqueue[T]) drainAddsLocked(added *itemsWithOpts[T]) { + for { + for _, key := range added.items { + if added.opts.RateLimited { + after := w.rateLimiter.When(key) + if added.opts.After == 0 || after < added.opts.After { + added.opts.After = after + } } - } - var readyAt *time.Time - if o.After > 0 { - readyAt = ptr.To(w.now().Add(o.After)) - w.metrics.retry() - } - if _, ok := w.items[key]; !ok { - item := &item[T]{ - key: key, - addedAtUnixNano: w.now().UnixNano(), - addedCounter: w.addedCounter, - priority: o.Priority, - readyAt: readyAt, + var readyAt *time.Time + if added.opts.After > 0 { + readyAt = ptr.To(w.now().Add(added.opts.After)) + w.metrics.retry() + } + if _, ok := w.items[key]; !ok { + item := &item[T]{ + key: key, + addedAtUnixNano: w.now().UnixNano(), + addedCounter: w.addedCounter, + priority: added.opts.Priority, + readyAt: readyAt, + } + w.items[key] = item + w.queue.ReplaceOrInsert(item) + w.metrics.add(key) + w.addedCounter++ + continue } - w.items[key] = item - w.queue.ReplaceOrInsert(item) - w.metrics.add(key) - w.addedCounter++ - continue - } - // The b-tree de-duplicates based on ordering and any change here - // will affect the order - Just delete and re-add. - item, _ := w.queue.Delete(w.items[key]) - if o.Priority > item.priority { - item.priority = o.Priority - } + // The b-tree de-duplicates based on ordering and any change here + // will affect the order - Just delete and re-add. + item, _ := w.queue.Delete(w.items[key]) + if added.opts.Priority > item.priority { + item.priority = added.opts.Priority + } + + if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { + item.readyAt = readyAt + } - if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { - item.readyAt = readyAt + w.queue.ReplaceOrInsert(item) } - w.queue.ReplaceOrInsert(item) + // Drain the remainder of the channel. Has to be at the end, + // because the first item is read in spin() and passed on + // to us. + select { + case added = <-w.adds: + default: + return + } } +} - if len(items) > 0 { - w.notifyItemOrWaiterAdded() - } +type itemsWithOpts[T comparable] struct { + opts AddOpts + items []T } func (w *priorityqueue[T]) notifyItemOrWaiterAdded() { @@ -179,11 +195,13 @@ func (w *priorityqueue[T]) spin() { var nextReady <-chan time.Time nextReady = blockForever + var addedItem *itemsWithOpts[T] for { select { case <-w.done: return case <-w.itemOrWaiterAdded: + case addedItem = <-w.adds: case <-nextReady: } @@ -193,6 +211,11 @@ func (w *priorityqueue[T]) spin() { w.lock.Lock() defer w.lock.Unlock() + if addedItem != nil { + w.drainAddsLocked(addedItem) + } + addedItem = nil + w.lockedLock.Lock() defer w.lockedLock.Unlock() From 6921fe730c96fa42ee73797449455ebd572a5d2e Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 23 Nov 2024 21:15:03 -0500 Subject: [PATCH 14/19] Revert "Make Add non-blocking" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit ce23de525d361d2edaa4b988a867af0af4aebae9. Speedup is tiny and at the expense of increased mem usage (which due to increasing GC pressure is likely the explanation why its so small), so doesn't seem worth it overall: ``` goos: darwin goarch: arm64 pkg: sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue cpu: Apple M2 Pro │ blocking.txt │ non-blocking.txt │ │ sec/op │ sec/op vs base │ AddGetDone-10 1.320m ± 1% 1.410m ± 0% +6.81% (p=0.000 n=10) AddOnly-10 373.9µ ± 1% 343.2µ ± 1% -8.22% (p=0.000 n=10) AddLockContended-10 375.8µ ± 1% 342.8µ ± 1% -8.78% (p=0.000 n=10) geomean 570.3µ 549.4µ -3.66% │ blocking.txt │ non-blocking.txt │ │ B/op │ B/op vs base │ AddGetDone-10 109.9Ki ± 0% 164.2Ki ± 0% +49.42% (p=0.000 n=10) AddOnly-10 553.0 ± 2% 56045.0 ± 0% +10034.72% (p=0.000 n=10) AddLockContended-10 569.0 ± 6% 56045.0 ± 0% +9749.74% (p=0.000 n=10) geomean 3.207Ki 78.94Ki +2361.60% │ blocking.txt │ non-blocking.txt │ │ allocs/op │ allocs/op vs base │ AddGetDone-10 3.013k ± 0% 5.001k ± 0% +65.98% (p=0.000 n=10) AddOnly-10 16.00 ± 6% 2000.00 ± 0% +12400.00% (p=0.000 n=10) AddLockContended-10 16.00 ± 6% 2000.00 ± 0% +12400.00% (p=0.000 n=10) geomean 91.71 2.715k +2860.01% ``` --- pkg/controller/priorityqueue/priorityqueue.go | 99 +++++++------------ 1 file changed, 38 insertions(+), 61 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 96702e3468..e462a48dc2 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -60,8 +60,6 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { items: map[T]*item[T]{}, queue: btree.NewG(32, less[T]), metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), - // Use a buffered channel to try and not block callers - adds: make(chan *itemsWithOpts[T], 1000), // itemOrWaiterAdded indicates that an item or // waiter was added. It must be buffered, because // if we currently process items we can't tell @@ -90,7 +88,6 @@ type priorityqueue[T comparable] struct { items map[T]*item[T] queue bTree[*item[T]] metrics queueMetrics[T] - adds chan *itemsWithOpts[T] // addedCounter is a counter of elements added, we need it // because unixNano is not guaranteed to be unique. @@ -120,67 +117,54 @@ type priorityqueue[T comparable] struct { } func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { - w.adds <- &itemsWithOpts[T]{opts: o, items: items} -} - -func (w *priorityqueue[T]) drainAddsLocked(added *itemsWithOpts[T]) { - for { - for _, key := range added.items { - if added.opts.RateLimited { - after := w.rateLimiter.When(key) - if added.opts.After == 0 || after < added.opts.After { - added.opts.After = after - } - } - - var readyAt *time.Time - if added.opts.After > 0 { - readyAt = ptr.To(w.now().Add(added.opts.After)) - w.metrics.retry() - } - if _, ok := w.items[key]; !ok { - item := &item[T]{ - key: key, - addedAtUnixNano: w.now().UnixNano(), - addedCounter: w.addedCounter, - priority: added.opts.Priority, - readyAt: readyAt, - } - w.items[key] = item - w.queue.ReplaceOrInsert(item) - w.metrics.add(key) - w.addedCounter++ - continue - } + w.lock.Lock() + defer w.lock.Unlock() - // The b-tree de-duplicates based on ordering and any change here - // will affect the order - Just delete and re-add. - item, _ := w.queue.Delete(w.items[key]) - if added.opts.Priority > item.priority { - item.priority = added.opts.Priority + for _, key := range items { + if o.RateLimited { + after := w.rateLimiter.When(key) + if o.After == 0 || after < o.After { + o.After = after } + } - if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { - item.readyAt = readyAt + var readyAt *time.Time + if o.After > 0 { + readyAt = ptr.To(w.now().Add(o.After)) + w.metrics.retry() + } + if _, ok := w.items[key]; !ok { + item := &item[T]{ + key: key, + addedAtUnixNano: w.now().UnixNano(), + addedCounter: w.addedCounter, + priority: o.Priority, + readyAt: readyAt, } - + w.items[key] = item w.queue.ReplaceOrInsert(item) + w.metrics.add(key) + w.addedCounter++ + continue } - // Drain the remainder of the channel. Has to be at the end, - // because the first item is read in spin() and passed on - // to us. - select { - case added = <-w.adds: - default: - return + // The b-tree de-duplicates based on ordering and any change here + // will affect the order - Just delete and re-add. + item, _ := w.queue.Delete(w.items[key]) + if o.Priority > item.priority { + item.priority = o.Priority + } + + if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { + item.readyAt = readyAt } + + w.queue.ReplaceOrInsert(item) } -} -type itemsWithOpts[T comparable] struct { - opts AddOpts - items []T + if len(items) > 0 { + w.notifyItemOrWaiterAdded() + } } func (w *priorityqueue[T]) notifyItemOrWaiterAdded() { @@ -195,13 +179,11 @@ func (w *priorityqueue[T]) spin() { var nextReady <-chan time.Time nextReady = blockForever - var addedItem *itemsWithOpts[T] for { select { case <-w.done: return case <-w.itemOrWaiterAdded: - case addedItem = <-w.adds: case <-nextReady: } @@ -211,11 +193,6 @@ func (w *priorityqueue[T]) spin() { w.lock.Lock() defer w.lock.Unlock() - if addedItem != nil { - w.drainAddsLocked(addedItem) - } - addedItem = nil - w.lockedLock.Lock() defer w.lockedLock.Unlock() From b4fc147ca60a45926122a3df283b1b269c100cc8 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 23 Nov 2024 21:18:44 -0500 Subject: [PATCH 15/19] Remove unneccesarry timestamp --- pkg/controller/priorityqueue/priorityqueue.go | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index e462a48dc2..8f9adf2629 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -135,11 +135,10 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } if _, ok := w.items[key]; !ok { item := &item[T]{ - key: key, - addedAtUnixNano: w.now().UnixNano(), - addedCounter: w.addedCounter, - priority: o.Priority, - readyAt: readyAt, + key: key, + addedCounter: w.addedCounter, + priority: o.Priority, + readyAt: readyAt, } w.items[key] = item w.queue.ReplaceOrInsert(item) @@ -305,19 +304,14 @@ func less[T comparable](a, b *item[T]) bool { return a.priority > b.priority } - if a.addedAtUnixNano != b.addedAtUnixNano { - return a.addedAtUnixNano < b.addedAtUnixNano - } - return a.addedCounter < b.addedCounter } type item[T comparable] struct { - key T - addedAtUnixNano int64 - addedCounter uint64 - priority int - readyAt *time.Time + key T + addedCounter uint64 + priority int + readyAt *time.Time } func (w *priorityqueue[T]) updateUnfinishedWorkLoop() { From c0bd1bb4da80706c8c8493ef90e8dac7b6288fee Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 6 Dec 2024 12:48:31 -0500 Subject: [PATCH 16/19] Consolidate require directiv --- go.mod | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 267f21ea70..056ca6aae3 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/evanphx/json-patch/v5 v5.9.0 github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 + github.com/google/btree v1.1.3 github.com/google/go-cmp v0.6.0 github.com/google/gofuzz v1.2.0 github.com/onsi/ginkgo/v2 v2.21.0 @@ -30,8 +31,6 @@ require ( sigs.k8s.io/yaml v1.4.0 ) -require github.com/google/btree v1.1.3 - require ( cel.dev/expr v0.18.0 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect From 00d3aaf7748a6a4d2c198c966bcad5284e818340 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 6 Dec 2024 12:51:57 -0500 Subject: [PATCH 17/19] Godocs and simplification --- pkg/config/controller.go | 6 ++++-- pkg/controller/controller.go | 12 ++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/config/controller.go b/pkg/config/controller.go index 6ae5c385e3..b702f2838d 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -54,7 +54,9 @@ type Controller struct { // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool - // UsePriorityQueue is experimental and configures if controllers that do not have a - // NewQueue() configured should default to the priority queue. + // UsePriorityQueue configures the controllers queue to use the controller-runtime provided + // priority queue. + // + // Note: This flag is disabled by default until a future version. It's currently in beta. UsePriorityQueue bool } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 56417c0a61..590bd53891 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -194,12 +194,12 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.NewQueue == nil { - if mgr.GetControllerOptions().UsePriorityQueue { - options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { - return priorityqueue.New[request](controllerName) - } - } else { - options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { + options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { + if mgr.GetControllerOptions().UsePriorityQueue { + return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { + o.RateLimiter = rateLimiter + }) + } else { return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ Name: controllerName, }) From 5905190d706cce4405dc19a01d92572dafb7e2d5 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 6 Dec 2024 12:55:01 -0500 Subject: [PATCH 18/19] Fix priorityqueue defaulting --- pkg/controller/controller.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 590bd53891..9dd0308375 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -190,7 +190,11 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.RateLimiter == nil { - options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() + if mgr.GetControllerOptions().UsePriorityQueue { + options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second) + } else { + options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() + } } if options.NewQueue == nil { From 27d63643e172b1e8ab0e85eeec820d7adc6ffded Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 6 Dec 2024 13:05:23 -0500 Subject: [PATCH 19/19] Avoid unnecessary else when returning --- pkg/controller/controller.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9dd0308375..47f8aecd1c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -203,11 +203,10 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { o.RateLimiter = rateLimiter }) - } else { - return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ - Name: controllerName, - }) } + return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ + Name: controllerName, + }) } }