Skip to content

🌱 Add priority label to PQ depth metric #3156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions pkg/controller/priorityqueue/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/internal/metrics"
)

// This file is mostly a copy of unexported code from
Expand All @@ -14,8 +15,9 @@ import (
// The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics.

type queueMetrics[T comparable] interface {
add(item T)
get(item T)
add(item T, priority int)
get(item T, priority int)
updateDepthWithPriorityMetric(oldPriority, newPriority int)
done(item T)
updateUnfinishedWork()
retry()
Expand All @@ -25,9 +27,9 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl
if len(name) == 0 {
return noMetrics[T]{}
}
return &defaultQueueMetrics[T]{

dqm := &defaultQueueMetrics[T]{
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
Expand All @@ -37,14 +39,22 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl
processingStartTimes: map[T]time.Time{},
retries: mp.NewRetriesMetric(name),
}

if mpp, ok := mp.(metrics.MetricsProviderWithPriority); ok {
dqm.depthWithPriority = mpp.NewDepthMetricWithPriority(name)
} else {
dqm.depth = mp.NewDepthMetric(name)
}
return dqm
}

// defaultQueueMetrics expects the caller to lock before setting any metrics.
type defaultQueueMetrics[T comparable] struct {
clock clock.Clock

// current depth of a workqueue
depth workqueue.GaugeMetric
depth workqueue.GaugeMetric
depthWithPriority metrics.DepthMetricWithPriority
// total number of adds handled by a workqueue
adds workqueue.CounterMetric
// how long an item stays in a workqueue
Expand All @@ -64,13 +74,17 @@ type defaultQueueMetrics[T comparable] struct {
}

// add is called for ready items only
func (m *defaultQueueMetrics[T]) add(item T) {
func (m *defaultQueueMetrics[T]) add(item T, priority int) {
if m == nil {
return
}

m.adds.Inc()
m.depth.Inc()
if m.depthWithPriority != nil {
m.depthWithPriority.Inc(priority)
} else {
m.depth.Inc()
}

m.mapLock.Lock()
defer m.mapLock.Unlock()
Expand All @@ -80,12 +94,16 @@ func (m *defaultQueueMetrics[T]) add(item T) {
}
}

func (m *defaultQueueMetrics[T]) get(item T) {
func (m *defaultQueueMetrics[T]) get(item T, priority int) {
if m == nil {
return
}

m.depth.Dec()
if m.depthWithPriority != nil {
m.depthWithPriority.Dec(priority)
} else {
m.depth.Dec()
}

m.mapLock.Lock()
defer m.mapLock.Unlock()
Expand All @@ -97,6 +115,13 @@ func (m *defaultQueueMetrics[T]) get(item T) {
}
}

func (m *defaultQueueMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) {
if m.depthWithPriority != nil {
m.depthWithPriority.Dec(oldPriority)
m.depthWithPriority.Inc(newPriority)
}
}

func (m *defaultQueueMetrics[T]) done(item T) {
if m == nil {
return
Expand Down Expand Up @@ -139,8 +164,9 @@ func (m *defaultQueueMetrics[T]) retry() {

type noMetrics[T any] struct{}

func (noMetrics[T]) add(item T) {}
func (noMetrics[T]) get(item T) {}
func (noMetrics[T]) done(item T) {}
func (noMetrics[T]) updateUnfinishedWork() {}
func (noMetrics[T]) retry() {}
func (noMetrics[T]) add(item T, priority int) {}
func (noMetrics[T]) get(item T, priority int) {}
func (noMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) {}
func (noMetrics[T]) done(item T) {}
func (noMetrics[T]) updateUnfinishedWork() {}
func (noMetrics[T]) retry() {}
23 changes: 15 additions & 8 deletions pkg/controller/priorityqueue/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"sync"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/internal/metrics"
)

func newFakeMetricsProvider() *fakeMetricsProvider {
return &fakeMetricsProvider{
depth: make(map[string]int),
depth: make(map[string]map[int]int),
adds: make(map[string]int),
latency: make(map[string][]float64),
workDuration: make(map[string][]float64),
Expand All @@ -19,8 +20,10 @@ func newFakeMetricsProvider() *fakeMetricsProvider {
}
}

var _ metrics.MetricsProviderWithPriority = &fakeMetricsProvider{}

type fakeMetricsProvider struct {
depth map[string]int
depth map[string]map[int]int
adds map[string]int
latency map[string][]float64
workDuration map[string][]float64
Expand All @@ -31,9 +34,13 @@ type fakeMetricsProvider struct {
}

func (f *fakeMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
panic("Should never be called. Expected NewDepthMetricWithPriority to be called instead")
}

func (f *fakeMetricsProvider) NewDepthMetricWithPriority(name string) metrics.DepthMetricWithPriority {
f.mu.Lock()
defer f.mu.Unlock()
f.depth[name] = 0
f.depth[name] = map[int]int{}
return &fakeGaugeMetric{m: &f.depth, mu: &f.mu, name: name}
}

Expand Down Expand Up @@ -80,21 +87,21 @@ func (f *fakeMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMet
}

type fakeGaugeMetric struct {
m *map[string]int
m *map[string]map[int]int
mu *sync.Mutex
name string
}

func (fg *fakeGaugeMetric) Inc() {
func (fg *fakeGaugeMetric) Inc(priority int) {
fg.mu.Lock()
defer fg.mu.Unlock()
(*fg.m)[fg.name]++
(*fg.m)[fg.name][priority]++
}

func (fg *fakeGaugeMetric) Dec() {
func (fg *fakeGaugeMetric) Dec(priority int) {
fg.mu.Lock()
defer fg.mu.Unlock()
(*fg.m)[fg.name]--
(*fg.m)[fg.name][priority]--
}

type fakeCounterMetric struct {
Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
w.items[key] = item
w.queue.ReplaceOrInsert(item)
if item.ReadyAt == nil {
w.metrics.add(key)
w.metrics.add(key, item.Priority)
}
w.addedCounter++
continue
Expand All @@ -166,12 +166,16 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
// will affect the order - Just delete and re-add.
item, _ := w.queue.Delete(w.items[key])
if o.Priority > item.Priority {
// Update depth metric only if the item in the queue was already added to the depth metric.
if item.ReadyAt == nil || w.becameReady.Has(key) {
w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority)
}
item.Priority = o.Priority
}

if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
if readyAt == nil && !w.becameReady.Has(key) {
w.metrics.add(key)
w.metrics.add(key, item.Priority)
}
item.ReadyAt = readyAt
}
Expand Down Expand Up @@ -223,7 +227,7 @@ func (w *priorityqueue[T]) spin() {
return false
}
if !w.becameReady.Has(item.Key) {
w.metrics.add(item.Key)
w.metrics.add(item.Key, item.Priority)
w.becameReady.Insert(item.Key)
}
}
Expand All @@ -239,7 +243,7 @@ func (w *priorityqueue[T]) spin() {
return true
}

w.metrics.get(item.Key)
w.metrics.get(item.Key, item.Priority)
w.locked.Insert(item.Key)
w.waiters.Add(-1)
delete(w.items, item.Key)
Expand Down
Loading
Loading