Skip to content

Commit 1c26e63

Browse files
committed
clustercache: add typed watcher
1 parent 0202f9c commit 1c26e63

File tree

7 files changed

+79
-40
lines changed

7 files changed

+79
-40
lines changed

controllers/clustercache/cluster_accessor.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
ctrl "sigs.k8s.io/controller-runtime"
3333
"sigs.k8s.io/controller-runtime/pkg/cache"
3434
"sigs.k8s.io/controller-runtime/pkg/client"
35-
"sigs.k8s.io/controller-runtime/pkg/source"
3635

3736
"sigs.k8s.io/cluster-api/util/certs"
3837
)
@@ -407,16 +406,16 @@ func (ca *clusterAccessor) GetClientCertificatePrivateKey(ctx context.Context) *
407406
}
408407

409408
// Watch watches a workload cluster for events.
410-
// Each unique watch (by input.Name) is only added once after a Connect (otherwise we return early).
409+
// Each unique watch (by watcher.Name()) is only added once after a Connect (otherwise we return early).
411410
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
412411
// After a re-connect watches will be re-added (assuming the Watch method is called again).
413-
func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {
414-
if input.Name == "" {
415-
return errors.New("input.Name is required")
412+
func (ca *clusterAccessor) Watch(ctx context.Context, watcher Watcher) error {
413+
if watcher.Name() == "" {
414+
return errors.New("watcher.Name() cannot be empty")
416415
}
417416

418417
if !ca.Connected(ctx) {
419-
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
418+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
420419
}
421420

422421
log := ctrl.LoggerFrom(ctx)
@@ -429,21 +428,21 @@ func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {
429428

430429
// Checking connection again while holding the lock, because maybe Disconnect was called since checking above.
431430
if ca.lockedState.connection == nil {
432-
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
431+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
433432
}
434433

435434
// Return early if the watch was already added.
436-
if ca.lockedState.connection.watches.Has(input.Name) {
437-
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", input.Name, input.Kind))
435+
if ca.lockedState.connection.watches.Has(watcher.Name()) {
436+
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", watcher.Name(), watcher.Object()))
438437
return nil
439438
}
440439

441-
log.Info(fmt.Sprintf("Creating watch %s for %T", input.Name, input.Kind))
442-
if err := input.Watcher.Watch(source.Kind(ca.lockedState.connection.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
443-
return errors.Wrapf(err, "error creating watch %s for %T", input.Name, input.Kind)
440+
log.Info(fmt.Sprintf("Creating watch %s for %T", watcher.Name(), watcher.Object()))
441+
if err := watcher.Watch(ca.lockedState.connection.cache); err != nil {
442+
return errors.Wrapf(err, "error creating watch %s for %T", watcher.Name(), watcher.Object())
444443
}
445444

446-
ca.lockedState.connection.watches.Insert(input.Name)
445+
ca.lockedState.connection.watches.Insert(watcher.Name())
447446
return nil
448447
}
449448

controllers/clustercache/cluster_accessor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,15 +327,15 @@ func TestWatch(t *testing.T) {
327327
accessor := newClusterAccessor(clusterKey, config)
328328

329329
tw := &testWatcher{}
330-
wi := WatchInput{
330+
wi := WatcherOptions{
331331
Name: "test-watch",
332332
Watcher: tw,
333333
Kind: &corev1.Node{},
334334
EventHandler: &handler.EnqueueRequestForObject{},
335335
}
336336

337337
// Add watch when not connected (fails)
338-
err := accessor.Watch(ctx, wi)
338+
err := accessor.Watch(ctx, NewWatcher(wi))
339339
g.Expect(err).To(HaveOccurred())
340340
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())
341341

@@ -346,12 +346,12 @@ func TestWatch(t *testing.T) {
346346
g.Expect(accessor.lockedState.connection.watches).To(BeEmpty())
347347

348348
// Add watch
349-
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
349+
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
350350
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
351351
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))
352352

353353
// Add watch again (no-op as watch already exists)
354-
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
354+
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
355355
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
356356
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))
357357

controllers/clustercache/cluster_cache.go

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ type ClusterCache interface {
149149
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
150150
// After a re-connect watches will be re-added (assuming the Watch method is called again).
151151
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
152-
Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error
152+
Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error
153153

154154
// GetLastProbeSuccessTimestamp returns the time when the health probe was successfully executed last.
155155
GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time
@@ -169,34 +169,74 @@ type ClusterCache interface {
169169
// because there is no connection to the workload cluster.
170170
var ErrClusterNotConnected = errors.New("connection to the workload cluster is down")
171171

172-
// Watcher is a scoped-down interface from Controller that only has the Watch func.
172+
// Watcher is an interface that can start a Watch.
173173
type Watcher interface {
174-
// Watch watches the provided Source.
175-
Watch(src source.Source) error
174+
Name() string
175+
Object() client.Object
176+
Watch(cache cache.Cache) error
176177
}
177178

178-
// WatchInput specifies the parameters used to establish a new watch for a workload cluster.
179-
// A source.Kind source (configured with Kind, EventHandler and Predicates) will be added to the Watcher.
180-
// To watch for events, the source.Kind will create an informer on the Cache that we have created and cached
179+
// SourceWatcher is a scoped-down interface from Controller that only has the Watch func.
180+
type SourceWatcher[request comparable] interface {
181+
Watch(src source.TypedSource[request]) error
182+
}
183+
184+
// WatcherOptions specifies the parameters used to establish a new watch for a workload cluster.
185+
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
186+
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
187+
// for the given Cluster.
188+
type WatcherOptions = TypedWatcherOptions[client.Object, ctrl.Request]
189+
190+
// TypedWatcherOptions specifies the parameters used to establish a new watch for a workload cluster.
191+
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
192+
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
181193
// for the given Cluster.
182-
type WatchInput struct {
194+
type TypedWatcherOptions[object client.Object, request comparable] struct {
183195
// Name represents a unique Watch request for the specified Cluster.
184196
// The name is used to track that a specific watch is only added once to a cache.
185197
// After a connection (and thus also the cache) has been re-created, watches have to be added
186198
// again by calling the Watch method again.
187199
Name string
188200

189201
// Watcher is the watcher (controller) whose Reconcile() function will be called for events.
190-
Watcher Watcher
202+
Watcher SourceWatcher[request]
191203

192204
// Kind is the type of resource to watch.
193-
Kind client.Object
205+
Kind object
194206

195207
// EventHandler contains the event handlers to invoke for resource events.
196-
EventHandler handler.EventHandler
208+
EventHandler handler.TypedEventHandler[object, request]
197209

198210
// Predicates is used to filter resource events.
199-
Predicates []predicate.Predicate
211+
Predicates []predicate.TypedPredicate[object]
212+
}
213+
214+
// NewWatcher creates a Watcher for the workload cluster.
215+
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the SourceWatcher.
216+
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
217+
// for the given Cluster.
218+
func NewWatcher[object client.Object, request comparable](options TypedWatcherOptions[object, request]) Watcher {
219+
return &watcher[object, request]{
220+
name: options.Name,
221+
kind: options.Kind,
222+
eventHandler: options.EventHandler,
223+
predicates: options.Predicates,
224+
watcher: options.Watcher,
225+
}
226+
}
227+
228+
type watcher[object client.Object, request comparable] struct {
229+
name string
230+
kind object
231+
eventHandler handler.TypedEventHandler[object, request]
232+
predicates []predicate.TypedPredicate[object]
233+
watcher SourceWatcher[request]
234+
}
235+
236+
func (tw *watcher[object, request]) Name() string { return tw.name }
237+
func (tw *watcher[object, request]) Object() client.Object { return tw.kind }
238+
func (tw *watcher[object, request]) Watch(cache cache.Cache) error {
239+
return tw.watcher.Watch(source.TypedKind[object, request](cache, tw.kind, tw.eventHandler, tw.predicates...))
200240
}
201241

202242
// GetClusterSourceOption is an option that modifies GetClusterSourceOptions for a GetClusterSource call.
@@ -342,12 +382,12 @@ func (cc *clusterCache) GetClientCertificatePrivateKey(ctx context.Context, clus
342382
return accessor.GetClientCertificatePrivateKey(ctx), nil
343383
}
344384

345-
func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error {
385+
func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error {
346386
accessor := cc.getClusterAccessor(cluster)
347387
if accessor == nil {
348-
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
388+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
349389
}
350-
return accessor.Watch(ctx, input)
390+
return accessor.Watch(ctx, watcher)
351391
}
352392

353393
func (cc *clusterCache) GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time {

exp/internal/controllers/machinepool_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,12 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *
371371
return nil
372372
}
373373

374-
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
374+
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
375375
Name: "machinepool-watchNodes",
376376
Watcher: r.controller,
377377
Kind: &corev1.Node{},
378378
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
379-
})
379+
}))
380380
}
381381

382382
func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, o client.Object) []reconcile.Request {

internal/controllers/machine/machine_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,12 +1045,12 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C
10451045
return nil
10461046
}
10471047

1048-
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
1048+
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
10491049
Name: "machine-watchNodes",
10501050
Watcher: r.controller,
10511051
Kind: &corev1.Node{},
10521052
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachine),
1053-
})
1053+
}))
10541054
}
10551055

10561056
func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []reconcile.Request {

internal/controllers/machine/machine_controller_noderef_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,14 @@ func TestGetNode(t *testing.T) {
351351

352352
// Retry because the ClusterCache might not have immediately created the clusterAccessor.
353353
g.Eventually(func(g Gomega) {
354-
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.WatchInput{
354+
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.NewWatcher(clustercache.WatcherOptions{
355355
Name: "TestGetNode",
356356
Watcher: w,
357357
Kind: &corev1.Node{},
358358
EventHandler: handler.EnqueueRequestsFromMapFunc(func(context.Context, client.Object) []reconcile.Request {
359359
return nil
360360
}),
361-
})).To(Succeed())
361+
}))).To(Succeed())
362362
}, 1*time.Minute, 5*time.Second).Should(Succeed())
363363

364364
for _, tc := range testCases {

internal/controllers/machinehealthcheck/machinehealthcheck_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,12 +571,12 @@ func (r *Reconciler) nodeToMachineHealthCheck(ctx context.Context, o client.Obje
571571
}
572572

573573
func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
574-
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
574+
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
575575
Name: "machinehealthcheck-watchClusterNodes",
576576
Watcher: r.controller,
577577
Kind: &corev1.Node{},
578578
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck),
579-
})
579+
}))
580580
}
581581

582582
// getMachineFromNode retrieves the machine with a nodeRef to nodeName

0 commit comments

Comments
 (0)