Skip to content

Commit 6db51a2

Browse files
committed
clustercache: add typed watcher
1 parent 0202f9c commit 6db51a2

File tree

7 files changed

+109
-54
lines changed

7 files changed

+109
-54
lines changed

controllers/clustercache/cluster_accessor.go

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import (
3232
ctrl "sigs.k8s.io/controller-runtime"
3333
"sigs.k8s.io/controller-runtime/pkg/cache"
3434
"sigs.k8s.io/controller-runtime/pkg/client"
35+
"sigs.k8s.io/controller-runtime/pkg/handler"
36+
"sigs.k8s.io/controller-runtime/pkg/predicate"
3537
"sigs.k8s.io/controller-runtime/pkg/source"
3638

3739
"sigs.k8s.io/cluster-api/util/certs"
@@ -410,13 +412,13 @@ func (ca *clusterAccessor) GetClientCertificatePrivateKey(ctx context.Context) *
410412
// Each unique watch (by input.Name) is only added once after a Connect (otherwise we return early).
411413
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
412414
// After a re-connect watches will be re-added (assuming the Watch method is called again).
413-
func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {
414-
if input.Name == "" {
415-
return errors.New("input.Name is required")
415+
func (ca *clusterAccessor) Watch(ctx context.Context, watcher Watcher) error {
416+
if watcher.Name() == "" {
417+
return errors.New("watcher.Name() cannot be empty")
416418
}
417419

418420
if !ca.Connected(ctx) {
419-
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
421+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %s", watcher.Name(), watcher.KindType())
420422
}
421423

422424
log := ctrl.LoggerFrom(ctx)
@@ -429,21 +431,21 @@ func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {
429431

430432
// Checking connection again while holding the lock, because maybe Disconnect was called since checking above.
431433
if ca.lockedState.connection == nil {
432-
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
434+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %s", watcher.Name(), watcher.KindType())
433435
}
434436

435437
// Return early if the watch was already added.
436-
if ca.lockedState.connection.watches.Has(input.Name) {
437-
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", input.Name, input.Kind))
438+
if ca.lockedState.connection.watches.Has(watcher.Name()) {
439+
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %s because it already exists", watcher.Name(), watcher.KindType()))
438440
return nil
439441
}
440442

441-
log.Info(fmt.Sprintf("Creating watch %s for %T", input.Name, input.Kind))
442-
if err := input.Watcher.Watch(source.Kind(ca.lockedState.connection.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
443-
return errors.Wrapf(err, "error creating watch %s for %T", input.Name, input.Kind)
443+
log.Info(fmt.Sprintf("Creating watch %s for %s", watcher.Name(), watcher.KindType()))
444+
if err := watcher.Watch(ca.lockedState.connection.cache); err != nil {
445+
return errors.Wrapf(err, "error creating watch %s for %s", watcher.Name(), watcher.KindType())
444446
}
445447

446-
ca.lockedState.connection.watches.Insert(input.Name)
448+
ca.lockedState.connection.watches.Insert(watcher.Name())
447449
return nil
448450
}
449451

@@ -495,3 +497,80 @@ func (ca *clusterAccessor) unlock(ctx context.Context) {
495497
ca.lockedStateLock.Unlock()
496498
log.V(10).Info("Removed lock for ClusterAccessor")
497499
}
500+
501+
// ScopedWatcher is a scoped-down interface from Controller that only has the Watch func.
502+
type ScopedWatcher[request comparable] interface {
503+
Watch(src source.TypedSource[request]) error
504+
}
505+
506+
// WatchInput specifies the parameters used to establish a new watch for a workload cluster.
507+
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
508+
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
509+
// for the given Cluster.
510+
type WatchInput = TypedWatchInput[client.Object, ctrl.Request]
511+
512+
// TypedWatchInput specifies the parameters used to establish a new watch for a workload cluster.
513+
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
514+
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
515+
// for the given Cluster.
516+
type TypedWatchInput[object client.Object, request comparable] struct {
517+
// Name represents a unique Watch request for the specified Cluster.
518+
// The name is used to track that a specific watch is only added once to a cache.
519+
// After a connection (and thus also the cache) has been re-created, watches have to be added
520+
// again by calling the Watch method again.
521+
Name string
522+
523+
// Watcher is the watcher (controller) whose Reconcile() function will be called for events.
524+
Watcher ScopedWatcher[request]
525+
526+
// Kind is the type of resource to watch.
527+
Kind object
528+
529+
// EventHandler contains the event handlers to invoke for resource events.
530+
EventHandler handler.TypedEventHandler[object, request]
531+
532+
// Predicates is used to filter resource events.
533+
Predicates []predicate.TypedPredicate[object]
534+
}
535+
536+
// NewWatcher creates a Watcher on the workload cluster.
537+
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the ScopedWatcher.
538+
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
539+
// for the given Cluster.
540+
func NewWatcher(input WatchInput) Watcher {
541+
return &typedWatcher[client.Object, ctrl.Request]{
542+
name: input.Name,
543+
kind: input.Kind,
544+
eventHandler: input.EventHandler,
545+
predicates: input.Predicates,
546+
watcher: input.Watcher,
547+
}
548+
}
549+
550+
// NewTypedWatcher creates a Watcher on the workload cluster.
551+
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the ScopedWatcher.
552+
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
553+
// for the given Cluster.
554+
func NewTypedWatcher[object client.Object, request comparable](input TypedWatchInput[object, request]) Watcher {
555+
return &typedWatcher[object, request]{
556+
name: input.Name,
557+
kind: input.Kind,
558+
eventHandler: input.EventHandler,
559+
predicates: input.Predicates,
560+
watcher: input.Watcher,
561+
}
562+
}
563+
564+
type typedWatcher[object client.Object, request comparable] struct {
565+
name string
566+
kind object
567+
eventHandler handler.TypedEventHandler[object, request]
568+
predicates []predicate.TypedPredicate[object]
569+
watcher ScopedWatcher[request]
570+
}
571+
572+
func (tw *typedWatcher[object, request]) Name() string { return tw.name }
573+
func (tw *typedWatcher[object, request]) KindType() string { return fmt.Sprintf("%T", tw.kind) }
574+
func (tw *typedWatcher[object, request]) Watch(cache cache.Cache) error {
575+
return tw.watcher.Watch(source.TypedKind[object, request](cache, tw.kind, tw.eventHandler, tw.predicates...))
576+
}

controllers/clustercache/cluster_accessor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func TestWatch(t *testing.T) {
335335
}
336336

337337
// Add watch when not connected (fails)
338-
err := accessor.Watch(ctx, wi)
338+
err := accessor.Watch(ctx, NewWatcher(wi))
339339
g.Expect(err).To(HaveOccurred())
340340
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())
341341

@@ -346,12 +346,12 @@ func TestWatch(t *testing.T) {
346346
g.Expect(accessor.lockedState.connection.watches).To(BeEmpty())
347347

348348
// Add watch
349-
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
349+
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
350350
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
351351
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))
352352

353353
// Add watch again (no-op as watch already exists)
354-
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
354+
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
355355
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
356356
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))
357357

controllers/clustercache/cluster_cache.go

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"sigs.k8s.io/controller-runtime/pkg/event"
3939
"sigs.k8s.io/controller-runtime/pkg/handler"
4040
"sigs.k8s.io/controller-runtime/pkg/manager"
41-
"sigs.k8s.io/controller-runtime/pkg/predicate"
4241
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4342
"sigs.k8s.io/controller-runtime/pkg/source"
4443

@@ -149,7 +148,7 @@ type ClusterCache interface {
149148
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
150149
// After a re-connect watches will be re-added (assuming the Watch method is called again).
151150
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
152-
Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error
151+
Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error
153152

154153
// GetLastProbeSuccessTimestamp returns the time when the health probe was successfully executed last.
155154
GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time
@@ -169,34 +168,11 @@ type ClusterCache interface {
169168
// because there is no connection to the workload cluster.
170169
var ErrClusterNotConnected = errors.New("connection to the workload cluster is down")
171170

172-
// Watcher is a scoped-down interface from Controller that only has the Watch func.
171+
// Watcher is an interface that can start a Watch.
173172
type Watcher interface {
174-
// Watch watches the provided Source.
175-
Watch(src source.Source) error
176-
}
177-
178-
// WatchInput specifies the parameters used to establish a new watch for a workload cluster.
179-
// A source.Kind source (configured with Kind, EventHandler and Predicates) will be added to the Watcher.
180-
// To watch for events, the source.Kind will create an informer on the Cache that we have created and cached
181-
// for the given Cluster.
182-
type WatchInput struct {
183-
// Name represents a unique Watch request for the specified Cluster.
184-
// The name is used to track that a specific watch is only added once to a cache.
185-
// After a connection (and thus also the cache) has been re-created, watches have to be added
186-
// again by calling the Watch method again.
187-
Name string
188-
189-
// Watcher is the watcher (controller) whose Reconcile() function will be called for events.
190-
Watcher Watcher
191-
192-
// Kind is the type of resource to watch.
193-
Kind client.Object
194-
195-
// EventHandler contains the event handlers to invoke for resource events.
196-
EventHandler handler.EventHandler
197-
198-
// Predicates is used to filter resource events.
199-
Predicates []predicate.Predicate
173+
Name() string
174+
KindType() string
175+
Watch(cache cache.Cache) error
200176
}
201177

202178
// GetClusterSourceOption is an option that modifies GetClusterSourceOptions for a GetClusterSource call.
@@ -342,12 +318,12 @@ func (cc *clusterCache) GetClientCertificatePrivateKey(ctx context.Context, clus
342318
return accessor.GetClientCertificatePrivateKey(ctx), nil
343319
}
344320

345-
func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error {
321+
func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error {
346322
accessor := cc.getClusterAccessor(cluster)
347323
if accessor == nil {
348-
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
324+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %s", watcher.Name(), watcher.KindType())
349325
}
350-
return accessor.Watch(ctx, input)
326+
return accessor.Watch(ctx, watcher)
351327
}
352328

353329
func (cc *clusterCache) GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time {

exp/internal/controllers/machinepool_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,12 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *
371371
return nil
372372
}
373373

374-
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
374+
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatchInput{
375375
Name: "machinepool-watchNodes",
376376
Watcher: r.controller,
377377
Kind: &corev1.Node{},
378378
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
379-
})
379+
}))
380380
}
381381

382382
func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, o client.Object) []reconcile.Request {

internal/controllers/machine/machine_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,12 +1045,12 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C
10451045
return nil
10461046
}
10471047

1048-
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
1048+
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatchInput{
10491049
Name: "machine-watchNodes",
10501050
Watcher: r.controller,
10511051
Kind: &corev1.Node{},
10521052
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachine),
1053-
})
1053+
}))
10541054
}
10551055

10561056
func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []reconcile.Request {

internal/controllers/machine/machine_controller_noderef_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,14 @@ func TestGetNode(t *testing.T) {
351351

352352
// Retry because the ClusterCache might not have immediately created the clusterAccessor.
353353
g.Eventually(func(g Gomega) {
354-
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.WatchInput{
354+
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.NewWatcher(clustercache.WatchInput{
355355
Name: "TestGetNode",
356356
Watcher: w,
357357
Kind: &corev1.Node{},
358358
EventHandler: handler.EnqueueRequestsFromMapFunc(func(context.Context, client.Object) []reconcile.Request {
359359
return nil
360360
}),
361-
})).To(Succeed())
361+
}))).To(Succeed())
362362
}, 1*time.Minute, 5*time.Second).Should(Succeed())
363363

364364
for _, tc := range testCases {

internal/controllers/machinehealthcheck/machinehealthcheck_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,12 +571,12 @@ func (r *Reconciler) nodeToMachineHealthCheck(ctx context.Context, o client.Obje
571571
}
572572

573573
func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
574-
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
574+
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatchInput{
575575
Name: "machinehealthcheck-watchClusterNodes",
576576
Watcher: r.controller,
577577
Kind: &corev1.Node{},
578578
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck),
579-
})
579+
}))
580580
}
581581

582582
// getMachineFromNode retrieves the machine with a nodeRef to nodeName

0 commit comments

Comments
 (0)