Skip to content

Commit 29debb1

Browse files
committed
🌱 Handlers: Use low priority when object is unchanged and priority q
This change makes the `TypedFuncs` and `enqueueRequestsFromMapFunc` set a low priority when the object is unchanged by default, as well as extend the test coverage to validate this behavior for `EnqueueRequestForOwner`.
1 parent 5355658 commit 29debb1

File tree

3 files changed

+332
-27
lines changed

3 files changed

+332
-27
lines changed

pkg/handler/enqueue_mapped.go

+33-7
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ package handler
1818

1919
import (
2020
"context"
21+
"reflect"
2122

2223
"k8s.io/client-go/util/workqueue"
2324
"sigs.k8s.io/controller-runtime/pkg/client"
25+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2426
"sigs.k8s.io/controller-runtime/pkg/event"
2527
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2628
)
@@ -81,7 +83,15 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
8183
q workqueue.TypedRateLimitingInterface[request],
8284
) {
8385
reqs := map[request]empty{}
84-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
86+
87+
var lowPriority bool
88+
if reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) && isPriorityQueue(q) && !isNil(evt.Object) {
89+
clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)}
90+
if isObjectUnchanged(clientObjectEvent) {
91+
lowPriority = true
92+
}
93+
}
94+
e.mapAndEnqueue(ctx, q, evt.Object, reqs, lowPriority)
8595
}
8696

8797
// Update implements EventHandler.
@@ -90,9 +100,13 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
90100
evt event.TypedUpdateEvent[object],
91101
q workqueue.TypedRateLimitingInterface[request],
92102
) {
103+
var lowPriority bool
104+
if reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) {
105+
lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion()
106+
}
93107
reqs := map[request]empty{}
94-
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
95-
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
108+
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs, lowPriority)
109+
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs, lowPriority)
96110
}
97111

98112
// Delete implements EventHandler.
@@ -102,7 +116,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Delete(
102116
q workqueue.TypedRateLimitingInterface[request],
103117
) {
104118
reqs := map[request]empty{}
105-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
119+
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
106120
}
107121

108122
// Generic implements EventHandler.
@@ -112,14 +126,26 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Generic(
112126
q workqueue.TypedRateLimitingInterface[request],
113127
) {
114128
reqs := map[request]empty{}
115-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
129+
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
116130
}
117131

118-
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
132+
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
133+
ctx context.Context,
134+
q workqueue.TypedRateLimitingInterface[request],
135+
o object,
136+
reqs map[request]empty,
137+
unchanged bool,
138+
) {
119139
for _, req := range e.toRequests(ctx, o) {
120140
_, ok := reqs[req]
121141
if !ok {
122-
q.Add(req)
142+
if unchanged {
143+
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
144+
Priority: LowPriority,
145+
}, req)
146+
} else {
147+
q.Add(req)
148+
}
123149
reqs[req] = empty{}
124150
}
125151
}

pkg/handler/eventhandler.go

+56-20
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package handler
1818

1919
import (
2020
"context"
21+
"reflect"
2122
"time"
2223

2324
"k8s.io/client-go/util/workqueue"
@@ -108,10 +109,40 @@ type TypedFuncs[object any, request comparable] struct {
108109
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
109110
}
110111

112+
func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool {
113+
_, ok := q.(priorityqueue.PriorityQueue[request])
114+
return ok
115+
}
116+
111117
// Create implements EventHandler.
112118
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
113119
if h.CreateFunc != nil {
114-
h.CreateFunc(ctx, e, q)
120+
if !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) || !isPriorityQueue(q) || isNil(e.Object) {
121+
h.CreateFunc(ctx, e, q)
122+
return
123+
}
124+
wq := workqueueWithCustomAddFunc[request]{
125+
TypedRateLimitingInterface: q,
126+
// We already know that we have a priority queue, that event.Object implements
127+
// client.Object and that its not nil
128+
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
129+
// We construct a new event typed to client.Object because isObjectUnchanged
130+
// is a generic and hence has to know at compile time the type of the event
131+
// it gets. We only figure that out at runtime though, but we know for sure
132+
// that it implements client.Object at this point so we can hardcode the event
133+
// type to that.
134+
evt := event.CreateEvent{Object: any(e.Object).(client.Object)}
135+
var priority int
136+
if isObjectUnchanged(evt) {
137+
priority = LowPriority
138+
}
139+
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
140+
priorityqueue.AddOpts{Priority: priority},
141+
item,
142+
)
143+
},
144+
}
145+
h.CreateFunc(ctx, e, wq)
115146
}
116147
}
117148

@@ -125,7 +156,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
125156
// Update implements EventHandler.
126157
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
127158
if h.UpdateFunc != nil {
128-
h.UpdateFunc(ctx, e, q)
159+
if !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) {
160+
h.UpdateFunc(ctx, e, q)
161+
return
162+
}
163+
164+
wq := workqueueWithCustomAddFunc[request]{
165+
TypedRateLimitingInterface: q,
166+
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
167+
// client.Object and that they are not nil
168+
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169+
var priority int
170+
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
171+
priority = LowPriority
172+
}
173+
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
174+
priorityqueue.AddOpts{Priority: priority},
175+
item,
176+
)
177+
},
178+
}
179+
h.UpdateFunc(ctx, e, wq)
129180
}
130181
}
131182

@@ -142,25 +193,10 @@ const LowPriority = -100
142193
// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
143194
// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
144195
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] {
196+
// TypedFuncs already implements this so just wrap
145197
return TypedFuncs[object, request]{
146-
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
147-
// Due to how the handlers are factored, we have to wrap the workqueue to be able
148-
// to inject custom behavior.
149-
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
150-
TypedRateLimitingInterface: trli,
151-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
152-
addToQueueCreate(q, tce, item)
153-
},
154-
})
155-
},
156-
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
157-
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
158-
TypedRateLimitingInterface: trli,
159-
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
160-
addToQueueUpdate(q, tue, item)
161-
},
162-
})
163-
},
198+
CreateFunc: u.Create,
199+
UpdateFunc: u.Update,
164200
DeleteFunc: u.Delete,
165201
GenericFunc: u.Generic,
166202
}

0 commit comments

Comments
 (0)