Skip to content

Commit a7917e0

Browse files
committed
Event, source, handler, predicates: Use generics
This change add generic versions of event, handler and predicates along the existing ones, prefixed with `Typed`. The existing ones are left in place under the same signature. The `source` constructors are changed to be generic.
1 parent 9931919 commit a7917e0

20 files changed

+503
-299
lines changed

Diff for: examples/builtins/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ func main() {
5959
}
6060

6161
// Watch ReplicaSets and enqueue ReplicaSet object key
62-
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
62+
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.TypedEnqueueRequestForObject[*appsv1.ReplicaSet]{})); err != nil {
6363
entryLog.Error(err, "unable to watch ReplicaSets")
6464
os.Exit(1)
6565
}
6666

6767
// Watch Pods and enqueue owning ReplicaSet key
6868
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
69-
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
69+
handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
7070
entryLog.Error(err, "unable to watch Pods")
7171
os.Exit(1)
7272
}

Diff for: pkg/builder/controller.go

+23-22
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3131
"sigs.k8s.io/controller-runtime/pkg/controller"
3232
"sigs.k8s.io/controller-runtime/pkg/handler"
33-
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
3433
"sigs.k8s.io/controller-runtime/pkg/manager"
3534
"sigs.k8s.io/controller-runtime/pkg/predicate"
3635
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -56,6 +55,7 @@ const (
5655
type Builder struct {
5756
forInput ForInput
5857
ownsInput []OwnsInput
58+
rawSources []source.Source
5959
watchesInput []WatchesInput
6060
mgr manager.Manager
6161
globalPredicates []predicate.Predicate
@@ -123,7 +123,8 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
123123

124124
// WatchesInput represents the information set by Watches method.
125125
type WatchesInput struct {
126-
src source.Source
126+
obj client.Object
127+
handler handler.EventHandler
127128
predicates []predicate.Predicate
128129
objectProjection objectProjection
129130
}
@@ -134,13 +135,17 @@ type WatchesInput struct {
134135
// This is the equivalent of calling
135136
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
136137
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
137-
input := WatchesInput{}
138+
input := WatchesInput{
139+
obj: object,
140+
handler: eventHandler,
141+
}
138142
for _, opt := range opts {
139143
opt.ApplyToWatches(&input)
140144
}
141-
src := source.Kind(blder.mgr.GetCache(), object, eventHandler, input.predicates...)
142145

143-
return blder.WatchesRawSource(src, opts...)
146+
blder.watchesInput = append(blder.watchesInput, input)
147+
148+
return blder
144149
}
145150

146151
// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
@@ -180,13 +185,9 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
180185
//
181186
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
182187
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
183-
func (blder *Builder) WatchesRawSource(src source.Source, opts ...WatchesOption) *Builder {
184-
input := WatchesInput{src: src}
185-
for _, opt := range opts {
186-
opt.ApplyToWatches(&input)
187-
}
188+
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
189+
blder.rawSources = append(blder.rawSources, src)
188190

189-
blder.watchesInput = append(blder.watchesInput, input)
190191
return blder
191192
}
192193

@@ -312,22 +313,22 @@ func (blder *Builder) doWatch() error {
312313
}
313314

314315
// Do the watch requests
315-
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
316+
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
316317
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
317318
}
318319
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
319320
for _, w := range blder.watchesInput {
320-
// If the source of this watch is of type Kind, project it.
321-
if srcKind, ok := w.src.(*internalsource.Kind); ok {
322-
allPredicates := append(allPredicates, w.predicates...)
323-
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
324-
if err != nil {
325-
return err
326-
}
327-
srcKind.Type = typeForSrc
328-
srcKind.Predicates = append(srcKind.Predicates, allPredicates...)
321+
projected, err := blder.project(w.obj, w.objectProjection)
322+
if err != nil {
323+
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
329324
}
330-
if err := blder.ctrl.Watch(w.src); err != nil {
325+
allPredicates := append(allPredicates, w.predicates...)
326+
if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
327+
return err
328+
}
329+
}
330+
for _, src := range blder.rawSources {
331+
if err := blder.ctrl.Watch(src); err != nil {
331332
return err
332333
}
333334
}

Diff for: pkg/controller/controller_integration_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ var _ = Describe("controller", func() {
6666
By("Watching Resources")
6767
err = instance.Watch(
6868
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
69-
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
69+
handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
7070
),
7171
)
7272
Expect(err).NotTo(HaveOccurred())
7373

74-
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
74+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
7575
Expect(err).NotTo(HaveOccurred())
7676

7777
err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})

Diff for: pkg/controller/controller_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {
7979

8080
ctx, cancel := context.WithCancel(context.Background())
8181
watchChan := make(chan event.GenericEvent, 1)
82-
watch := &source.Channel{Source: watchChan, Handler: &handler.EnqueueRequestForObject{}}
82+
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
8383
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8484

8585
reconcileStarted := make(chan struct{})

Diff for: pkg/controller/example_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func ExampleController() {
7171
}
7272

7373
// Watch for Pod create / update / delete events and call Reconcile
74-
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
74+
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}))
7575
if err != nil {
7676
log.Error(err, "unable to watch pods")
7777
os.Exit(1)
@@ -108,7 +108,7 @@ func ExampleController_unstructured() {
108108
Version: "v1",
109109
})
110110
// Watch for Pod create / update / delete events and call Reconcile
111-
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
111+
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{}))
112112
if err != nil {
113113
log.Error(err, "unable to watch pods")
114114
os.Exit(1)
@@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
139139
os.Exit(1)
140140
}
141141

142-
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
142+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})); err != nil {
143143
log.Error(err, "unable to watch pods")
144144
os.Exit(1)
145145
}

Diff for: pkg/event/event.go

+34-17
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,55 @@ package event
1818

1919
import "sigs.k8s.io/controller-runtime/pkg/client"
2020

21-
// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
21+
// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
22+
// by a source.Source and transformed into a reconcile.Request by a handler.EventHandler.
23+
type CreateEvent = TypedCreateEvent[client.Object]
24+
25+
// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
26+
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
27+
type UpdateEvent = TypedUpdateEvent[client.Object]
28+
29+
// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
2230
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
23-
type CreateEvent struct {
31+
type DeleteEvent = TypedDeleteEvent[client.Object]
32+
33+
// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
34+
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
35+
// handler.EventHandler.
36+
type GenericEvent = TypedGenericEvent[client.Object]
37+
38+
// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated
39+
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
40+
type TypedCreateEvent[T any] struct {
2441
// Object is the object from the event
25-
Object client.Object
42+
Object T
2643
}
2744

28-
// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
29-
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
30-
type UpdateEvent struct {
45+
// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
46+
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
47+
type TypedUpdateEvent[T any] struct {
3148
// ObjectOld is the object from the event
32-
ObjectOld client.Object
49+
ObjectOld T
3350

3451
// ObjectNew is the object from the event
35-
ObjectNew client.Object
52+
ObjectNew T
3653
}
3754

38-
// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
39-
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
40-
type DeleteEvent struct {
55+
// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated
56+
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
57+
type TypedDeleteEvent[T any] struct {
4158
// Object is the object from the event
42-
Object client.Object
59+
Object T
4360

4461
// DeleteStateUnknown is true if the Delete event was missed but we identified the object
4562
// as having been deleted.
4663
DeleteStateUnknown bool
4764
}
4865

49-
// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
50-
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
51-
// handler.EventHandler.
52-
type GenericEvent struct {
66+
// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
67+
// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
68+
// handler.TypedEventHandler.
69+
type TypedGenericEvent[T any] struct {
5370
// Object is the object from the event
54-
Object client.Object
71+
Object T
5572
}

Diff for: pkg/handler/enqueue.go

+30-11
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ package handler
1818

1919
import (
2020
"context"
21+
"reflect"
2122

2223
"k8s.io/apimachinery/pkg/types"
2324
"k8s.io/client-go/util/workqueue"
25+
"sigs.k8s.io/controller-runtime/pkg/client"
2426
"sigs.k8s.io/controller-runtime/pkg/event"
2527
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2628
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -33,13 +35,18 @@ type empty struct{}
3335
var _ EventHandler = &EnqueueRequestForObject{}
3436

3537
// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
36-
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
38+
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
3739
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
38-
type EnqueueRequestForObject struct{}
40+
type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]
41+
42+
// TypedEnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
43+
// (e.g. the created / deleted / updated objects Name and Namespace). handler.TypedEnqueueRequestForObject is used by almost all
44+
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
45+
type TypedEnqueueRequestForObject[T client.Object] struct{}
3946

4047
// Create implements EventHandler.
41-
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
42-
if evt.Object == nil {
48+
func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) {
49+
if isNil(evt.Object) {
4350
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
4451
return
4552
}
@@ -50,14 +57,14 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv
5057
}
5158

5259
// Update implements EventHandler.
53-
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
60+
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) {
5461
switch {
55-
case evt.ObjectNew != nil:
62+
case !isNil(evt.ObjectNew):
5663
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
5764
Name: evt.ObjectNew.GetName(),
5865
Namespace: evt.ObjectNew.GetNamespace(),
5966
}})
60-
case evt.ObjectOld != nil:
67+
case !isNil(evt.ObjectOld):
6168
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
6269
Name: evt.ObjectOld.GetName(),
6370
Namespace: evt.ObjectOld.GetNamespace(),
@@ -68,8 +75,8 @@ func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEv
6875
}
6976

7077
// Delete implements EventHandler.
71-
func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
72-
if evt.Object == nil {
78+
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) {
79+
if isNil(evt.Object) {
7380
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
7481
return
7582
}
@@ -80,8 +87,8 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv
8087
}
8188

8289
// Generic implements EventHandler.
83-
func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
84-
if evt.Object == nil {
90+
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) {
91+
if isNil(evt.Object) {
8592
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
8693
return
8794
}
@@ -90,3 +97,15 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
9097
Namespace: evt.Object.GetNamespace(),
9198
}})
9299
}
100+
101+
func isNil(arg any) bool {
102+
if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
103+
v.Kind() == reflect.Interface ||
104+
v.Kind() == reflect.Slice ||
105+
v.Kind() == reflect.Map ||
106+
v.Kind() == reflect.Chan ||
107+
v.Kind() == reflect.Func) && v.IsNil()) {
108+
return true
109+
}
110+
return false
111+
}

0 commit comments

Comments
 (0)