Skip to content

Commit d6f3c78

Browse files
committed
add TypedWatch to ClusterCache
1 parent 76328ed commit d6f3c78

File tree

2 files changed

+69
-0
lines changed

2 files changed

+69
-0
lines changed

controllers/clustercache/cluster_accessor.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,43 @@ func (ca *clusterAccessor) Watch(ctx context.Context, input WatchInput) error {
447447
return nil
448448
}
449449

450+
func typedWatch[object client.Object, request comparable](ctx context.Context, ca *clusterAccessor, input TypedWatchInput[object, request]) error {
451+
if input.Name == "" {
452+
return errors.New("input.Name is required")
453+
}
454+
455+
if !ca.Connected(ctx) {
456+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
457+
}
458+
459+
log := ctrl.LoggerFrom(ctx)
460+
461+
// Calling Watch on a controller is non-blocking because it only calls Start on the Kind source.
462+
// Start on the Kind source starts an additional go routine to create an informer.
463+
// Because it is non-blocking we can use a full lock here without blocking other reconcilers too long.
464+
ca.lock(ctx)
465+
defer ca.unlock(ctx)
466+
467+
// Checking connection again while holding the lock, because maybe Disconnect was called since checking above.
468+
if ca.lockedState.connection == nil {
469+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
470+
}
471+
472+
// Return early if the watch was already added.
473+
if ca.lockedState.connection.watches.Has(input.Name) {
474+
log.V(6).Info(fmt.Sprintf("Skip creation of watch %s for %T because it already exists", input.Name, input.Kind))
475+
return nil
476+
}
477+
478+
log.Info(fmt.Sprintf("Creating watch %s for %T", input.Name, input.Kind))
479+
if err := input.Watcher.Watch(source.TypedKind(ca.lockedState.connection.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
480+
return errors.Wrapf(err, "error creating watch %s for %T", input.Name, input.Kind)
481+
}
482+
483+
ca.lockedState.connection.watches.Insert(input.Name)
484+
return nil
485+
}
486+
450487
func (ca *clusterAccessor) GetLastProbeSuccessTimestamp(ctx context.Context) time.Time {
451488
ca.rLock(ctx)
452489
defer ca.rUnlock(ctx)

controllers/clustercache/cluster_cache.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,29 @@ type WatchInput struct {
199199
Predicates []predicate.Predicate
200200
}
201201

202+
// TypedWatcher is a scoped-down interface from Controller that only knows how to watch.
203+
type TypedWatcher[request comparable] interface {
204+
Watch(src source.TypedSource[request]) error
205+
}
206+
207+
// TypedWatchInput specifies the parameters used to establish a new watch for a remote cluster.
208+
type TypedWatchInput[object client.Object, request comparable] struct {
209+
// Name represents a unique watch request for the specified Cluster.
210+
Name string
211+
212+
// Watcher is the watcher (controller) whose Reconcile() function will be called for events.
213+
Watcher TypedWatcher[request]
214+
215+
// Kind is the type of resource to watch.
216+
Kind object
217+
218+
// EventHandler contains the event handlers to invoke for resource events.
219+
EventHandler handler.TypedEventHandler[object, request]
220+
221+
// Predicates is used to filter resource events.
222+
Predicates []predicate.TypedPredicate[object]
223+
}
224+
202225
// GetClusterSourceOption is an option that modifies GetClusterSourceOptions for a GetClusterSource call.
203226
type GetClusterSourceOption interface {
204227
// ApplyToGetClusterSourceOptions applies this option to the given GetClusterSourceOptions.
@@ -350,6 +373,15 @@ func (cc *clusterCache) Watch(ctx context.Context, cluster client.ObjectKey, inp
350373
return accessor.Watch(ctx, input)
351374
}
352375

376+
// TypedWatch starts a typed watch.
377+
func TypedWatch[object client.Object, request comparable](ctx context.Context, cc *clusterCache, cluster client.ObjectKey, input TypedWatchInput[object, request]) error {
378+
accessor := cc.getClusterAccessor(cluster)
379+
if accessor == nil {
380+
return errors.Wrapf(ErrClusterNotConnected, "error creating watch %s for %T", input.Name, input.Kind)
381+
}
382+
return typedWatch(ctx, accessor, input)
383+
}
384+
353385
func (cc *clusterCache) GetLastProbeSuccessTimestamp(ctx context.Context, cluster client.ObjectKey) time.Time {
354386
accessor := cc.getClusterAccessor(cluster)
355387
if accessor == nil {

0 commit comments

Comments
 (0)