Skip to content

✨ add typed watcher to ClusterCache #11331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions controllers/clustercache/cluster_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"

"sigs.k8s.io/cluster-api/util/certs"
)
Expand Down Expand Up @@ -407,16 +406,16 @@ func (ca *clusterAccessor) GetClientCertificatePrivateKey(ctx context.Context) *
}

// Watch watches a workload cluster for events.
// Each unique watch (by input.Name) is only added once after a Connect (otherwise we return early).
// Each unique watch (by watcher.Name()) is only added once after a Connect (otherwise we return early).
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
// After a re-connect watches will be re-added (assuming the Watch method is called again).
func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {
if input.Name == "" {
return errors.New("input.Name is required")
func (ca *clusterAccessor) Watch(ctx context.Context, watcher Watcher) error {
if watcher.Name() == "" {
return errors.New("watcher.Name() cannot be empty")
}

if !ca.Connected(ctx) {
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}

log := ctrl.LoggerFrom(ctx)
Expand All @@ -429,21 +428,21 @@ func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {

// Checking connection again while holding the lock, because maybe Disconnect was called since checking above.
if ca.lockedState.connection == nil {
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}

// Return early if the watch was already added.
if ca.lockedState.connection.watches.Has(input.Name) {
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", input.Name, input.Kind))
if ca.lockedState.connection.watches.Has(watcher.Name()) {
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", watcher.Name(), watcher.Object()))
return nil
}

log.Info(fmt.Sprintf("Creating watch %s for %T", input.Name, input.Kind))
if err := input.Watcher.Watch(source.Kind(ca.lockedState.connection.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
return errors.Wrapf(err, "error creating watch %s for %T", input.Name, input.Kind)
log.Info(fmt.Sprintf("Creating watch %s for %T", watcher.Name(), watcher.Object()))
if err := watcher.Watch(ca.lockedState.connection.cache); err != nil {
return errors.Wrapf(err, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}

ca.lockedState.connection.watches.Insert(input.Name)
ca.lockedState.connection.watches.Insert(watcher.Name())
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions controllers/clustercache/cluster_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,15 @@ func TestWatch(t *testing.T) {
accessor := newClusterAccessor(clusterKey, config)

tw := &testWatcher{}
wi := WatchInput{
wi := WatcherOptions{
Name: "test-watch",
Watcher: tw,
Kind: &corev1.Node{},
EventHandler: &handler.EnqueueRequestForObject{},
}

// Add watch when not connected (fails)
err := accessor.Watch(ctx, wi)
err := accessor.Watch(ctx, NewWatcher(wi))
g.Expect(err).To(HaveOccurred())
g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue())

Expand All @@ -346,12 +346,12 @@ func TestWatch(t *testing.T) {
g.Expect(accessor.lockedState.connection.watches).To(BeEmpty())

// Add watch
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))

// Add watch again (no-op as watch already exists)
g.Expect(accessor.Watch(ctx, wi)).To(Succeed())
g.Expect(accessor.Watch(ctx, NewWatcher(wi))).To(Succeed())
g.Expect(accessor.lockedState.connection.watches.Has("test-watch")).To(BeTrue())
g.Expect(accessor.lockedState.connection.watches.Len()).To(Equal(1))

Expand Down
70 changes: 55 additions & 15 deletions controllers/clustercache/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type ClusterCache interface {
// During a disconnect existing watches (i.e. informers) are shutdown when stopping the cache.
// After a re-connect watches will be re-added (assuming the Watch method is called again).
// If there is no connection to the workload cluster ErrClusterNotConnected will be returned.
Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error
Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error

// GetLastProbeSuccessTimestamp returns the time when the health probe was successfully executed last.
GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time
Expand All @@ -169,34 +169,74 @@ type ClusterCache interface {
// because there is no connection to the workload cluster.
var ErrClusterNotConnected = errors.New("connection to the workload cluster is down")

// Watcher is a scoped-down interface from Controller that only has the Watch func.
// Watcher is an interface that can start a Watch.
type Watcher interface {
// Watch watches the provided Source.
Watch(src source.Source) error
Name() string
Object() client.Object
Watch(cache cache.Cache) error
}

// WatchInput specifies the parameters used to establish a new watch for a workload cluster.
// A source.Kind source (configured with Kind, EventHandler and Predicates) will be added to the Watcher.
// To watch for events, the source.Kind will create an informer on the Cache that we have created and cached
// SourceWatcher is a scoped-down interface from Controller that only has the Watch func.
type SourceWatcher[request comparable] interface {
Watch(src source.TypedSource[request]) error
}

// WatcherOptions specifies the parameters used to establish a new watch for a workload cluster.
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
// for the given Cluster.
type WatcherOptions = TypedWatcherOptions[client.Object, ctrl.Request]

// TypedWatcherOptions specifies the parameters used to establish a new watch for a workload cluster.
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the Watcher.
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
// for the given Cluster.
type WatchInput struct {
type TypedWatcherOptions[object client.Object, request comparable] struct {
// Name represents a unique Watch request for the specified Cluster.
// The name is used to track that a specific watch is only added once to a cache.
// After a connection (and thus also the cache) has been re-created, watches have to be added
// again by calling the Watch method again.
Name string

// Watcher is the watcher (controller) whose Reconcile() function will be called for events.
Watcher Watcher
Watcher SourceWatcher[request]

// Kind is the type of resource to watch.
Kind client.Object
Kind object

// EventHandler contains the event handlers to invoke for resource events.
EventHandler handler.EventHandler
EventHandler handler.TypedEventHandler[object, request]

// Predicates is used to filter resource events.
Predicates []predicate.Predicate
Predicates []predicate.TypedPredicate[object]
}

// NewWatcher creates a Watcher for the workload cluster.
// A source.TypedKind source (configured with Kind, TypedEventHandler and Predicates) will be added to the SourceWatcher.
// To watch for events, the source.TypedKind will create an informer on the Cache that we have created and cached
// for the given Cluster.
func NewWatcher[object client.Object, request comparable](options TypedWatcherOptions[object, request]) Watcher {
return &watcher[object, request]{
name: options.Name,
kind: options.Kind,
eventHandler: options.EventHandler,
predicates: options.Predicates,
watcher: options.Watcher,
}
}

type watcher[object client.Object, request comparable] struct {
name string
kind object
eventHandler handler.TypedEventHandler[object, request]
predicates []predicate.TypedPredicate[object]
watcher SourceWatcher[request]
}

func (tw *watcher[object, request]) Name() string { return tw.name }
func (tw *watcher[object, request]) Object() client.Object { return tw.kind }
func (tw *watcher[object, request]) Watch(cache cache.Cache) error {
return tw.watcher.Watch(source.TypedKind[object, request](cache, tw.kind, tw.eventHandler, tw.predicates...))
}

// GetClusterSourceOption is an option that modifies GetClusterSourceOptions for a GetClusterSource call.
Expand Down Expand Up @@ -342,12 +382,12 @@ func (cc *clusterCache) GetClientCertificatePrivateKey(ctx context.Context, clus
return accessor.GetClientCertificatePrivateKey(ctx), nil
}

func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, input WatchInput) error {
func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, watcher Watcher) error {
accessor := cc.getClusterAccessor(cluster)
if accessor == nil {
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", watcher.Name(), watcher.Object())
}
return accessor.Watch(ctx, input)
return accessor.Watch(ctx, watcher)
}

func (cc *clusterCache) GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time {
Expand Down
4 changes: 2 additions & 2 deletions exp/internal/controllers/machinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,12 @@ func (r *MachinePoolReconciler) watchClusterNodes(ctx context.Context, cluster *
return nil
}

return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "machinepool-watchNodes",
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachinePool),
})
}))
}

func (r *MachinePoolReconciler) nodeToMachinePool(ctx context.Context, o client.Object) []reconcile.Request {
Expand Down
4 changes: 2 additions & 2 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,12 +1045,12 @@ func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.C
return nil
}

return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "machine-watchNodes",
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachine),
})
}))
}

func (r *Reconciler) nodeToMachine(ctx context.Context, o client.Object) []reconcile.Request {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,14 @@ func TestGetNode(t *testing.T) {

// Retry because the ClusterCache might not have immediately created the clusterAccessor.
g.Eventually(func(g Gomega) {
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.WatchInput{
g.Expect(clusterCache.Watch(ctx, util.ObjectKey(testCluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "TestGetNode",
Watcher: w,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(func(context.Context, client.Object) []reconcile.Request {
return nil
}),
})).To(Succeed())
}))).To(Succeed())
}, 1*time.Minute, 5*time.Second).Should(Succeed())

for _, tc := range testCases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,12 @@ func (r *Reconciler) nodeToMachineHealthCheck(ctx context.Context, o client.Obje
}

func (r *Reconciler) watchClusterNodes(ctx context.Context, cluster *clusterv1.Cluster) error {
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.WatchInput{
return r.ClusterCache.Watch(ctx, util.ObjectKey(cluster), clustercache.NewWatcher(clustercache.WatcherOptions{
Name: "machinehealthcheck-watchClusterNodes",
Watcher: r.controller,
Kind: &corev1.Node{},
EventHandler: handler.EnqueueRequestsFromMapFunc(r.nodeToMachineHealthCheck),
})
}))
}

// getMachineFromNode retrieves the machine with a nodeRef to nodeName
Expand Down