Skip to content

Commit 86ce450

Browse files
committed
adjust call sites
1 parent 45f007c commit 86ce450

File tree

8 files changed

+76
-16
lines changed

8 files changed

+76
-16
lines changed

controllers/remote/cluster_cache_tracker.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -437,29 +437,30 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
437437
return errors.New("input.Name is required")
438438
}
439439

440-
a, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...)
440+
accessor, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...)
441441
if err != nil {
442-
return err
442+
return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
443443
}
444444

445445
// We have to lock the cluster, so that the watch is not created multiple times in parallel.
446446
ok := t.clusterLock.TryLock(input.Cluster)
447447
if !ok {
448-
return errors.Wrapf(ErrClusterLocked, "failed to add watch: error getting lock for cluster")
448+
return errors.Wrapf(ErrClusterLocked, "failed to add %s watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
449449
}
450450
defer t.clusterLock.Unlock(input.Cluster)
451451

452-
if a.watches.Has(input.Name) {
453-
t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
452+
if accessor.watches.Has(input.Name) {
453+
log := ctrl.LoggerFrom(ctx)
454+
log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
454455
return nil
455456
}
456457

457458
// Need to create the watch
458-
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, a.cache), input.EventHandler, input.Predicates...); err != nil {
459-
return errors.Wrap(err, "error creating watch")
459+
if err := input.Watcher.Watch(source.NewKindWithCache(input.Kind, accessor.cache), input.EventHandler, input.Predicates...); err != nil {
460+
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
460461
}
461462

462-
a.watches.Insert(input.Name)
463+
accessor.watches.Insert(input.Name)
463464

464465
return nil
465466
}

controlplane/kubeadm/internal/controllers/controller.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,25 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.
204204

205205
if !kcp.ObjectMeta.DeletionTimestamp.IsZero() {
206206
// Handle deletion reconciliation loop.
207-
return r.reconcileDelete(ctx, cluster, kcp)
207+
res, err = r.reconcileDelete(ctx, cluster, kcp)
208+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
209+
// the current cluster because of concurrent access.
210+
if errors.Is(err, remote.ErrClusterLocked) {
211+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
212+
return ctrl.Result{Requeue: true}, nil
213+
}
214+
return res, err
208215
}
209216

210217
// Handle normal reconciliation loop.
211-
return r.reconcile(ctx, cluster, kcp)
218+
res, err = r.reconcile(ctx, cluster, kcp)
219+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
220+
// the current cluster because of concurrent access.
221+
if errors.Is(err, remote.ErrClusterLocked) {
222+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
223+
return ctrl.Result{Requeue: true}, nil
224+
}
225+
return res, err
212226
}
213227

214228
func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane) error {

exp/addons/internal/controllers/clusterresourceset_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ func (r *ClusterResourceSetReconciler) Reconcile(ctx context.Context, req ctrl.R
152152

153153
for _, cluster := range clusters {
154154
if err := r.ApplyClusterResourceSet(ctx, cluster, clusterResourceSet); err != nil {
155+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
156+
// the current cluster because of concurrent access.
157+
if errors.Is(err, remote.ErrClusterLocked) {
158+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
159+
return ctrl.Result{Requeue: true}, nil
160+
}
155161
return ctrl.Result{}, err
156162
}
157163
}

internal/controllers/machine/machine_controller.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
207207

208208
// Handle deletion reconciliation loop.
209209
if !m.ObjectMeta.DeletionTimestamp.IsZero() {
210-
return r.reconcileDelete(ctx, cluster, m)
210+
res, err := r.reconcileDelete(ctx, cluster, m)
211+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
212+
// the current cluster because of concurrent access.
213+
if errors.Is(err, remote.ErrClusterLocked) {
214+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
215+
return ctrl.Result{Requeue: true}, nil
216+
}
217+
return res, err
211218
}
212219

213220
// Handle normal reconciliation loop.
214-
return r.reconcile(ctx, cluster, m)
221+
res, err := r.reconcile(ctx, cluster, m)
222+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
223+
// the current cluster because of concurrent access.
224+
if errors.Is(err, remote.ErrClusterLocked) {
225+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
226+
return ctrl.Result{Requeue: true}, nil
227+
}
228+
return res, err
215229
}
216230

217231
func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clusterv1.Machine, options ...patch.Option) error {
@@ -254,7 +268,7 @@ func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clust
254268

255269
func (r *Reconciler) reconcile(ctx context.Context, cluster *clusterv1.Cluster, m *clusterv1.Machine) (ctrl.Result, error) {
256270
if err := r.watchClusterNodes(ctx, cluster); err != nil {
257-
return ctrl.Result{}, errors.Wrapf(err, "error watching nodes on target cluster")
271+
return ctrl.Result{}, err
258272
}
259273

260274
// If the Machine belongs to a cluster, add an owner reference.

internal/controllers/machinehealthcheck/machinehealthcheck_controller.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
174174

175175
result, err := r.reconcile(ctx, log, cluster, m)
176176
if err != nil {
177+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
178+
// the current cluster because of concurrent access.
179+
if errors.Is(err, remote.ErrClusterLocked) {
180+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
181+
return ctrl.Result{Requeue: true}, nil
182+
}
177183
log.Error(err, "Failed to reconcile MachineHealthCheck")
178184
r.recorder.Eventf(m, corev1.EventTypeWarning, "ReconcileError", "%v", err)
179185

@@ -201,7 +207,6 @@ func (r *Reconciler) reconcile(ctx context.Context, logger logr.Logger, cluster
201207
}
202208

203209
if err := r.watchClusterNodes(ctx, cluster); err != nil {
204-
logger.Error(err, "error watching nodes on target cluster")
205210
return ctrl.Result{}, err
206211
}
207212

internal/controllers/machineset/machineset_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
172172

173173
result, err := r.reconcile(ctx, cluster, machineSet)
174174
if err != nil {
175+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
176+
// the current cluster because of concurrent access.
177+
if errors.Is(err, remote.ErrClusterLocked) {
178+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
179+
return ctrl.Result{Requeue: true}, nil
180+
}
175181
log.Error(err, "Failed to reconcile MachineSet")
176182
r.recorder.Eventf(machineSet, corev1.EventTypeWarning, "ReconcileError", "%v", err)
177183
}

test/infrastructure/docker/exp/internal/controllers/dockermachinepool_controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,14 @@ func (r *DockerMachinePoolReconciler) Reconcile(ctx context.Context, req ctrl.Re
128128
}
129129

130130
// Handle non-deleted machines
131-
return r.reconcileNormal(ctx, cluster, machinePool, dockerMachinePool)
131+
res, err = r.reconcileNormal(ctx, cluster, machinePool, dockerMachinePool)
132+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
133+
// the current cluster because of concurrent access.
134+
if errors.Is(err, remote.ErrClusterLocked) {
135+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
136+
return ctrl.Result{Requeue: true}, nil
137+
}
138+
return res, err
132139
}
133140

134141
// SetupWithManager will add watches for this controller.

test/infrastructure/docker/internal/controllers/dockermachine_controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,14 @@ func (r *DockerMachineReconciler) Reconcile(ctx context.Context, req ctrl.Reques
167167
}
168168

169169
// Handle non-deleted machines
170-
return r.reconcileNormal(ctx, cluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
170+
res, err := r.reconcileNormal(ctx, cluster, machine, dockerMachine, externalMachine, externalLoadBalancer)
171+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
172+
// the current cluster because of concurrent access.
173+
if errors.Is(err, remote.ErrClusterLocked) {
174+
log.V(5).Info("Requeueing because another worker has the lock on the ClusterCacheTracker")
175+
return ctrl.Result{Requeue: true}, nil
176+
}
177+
return res, err
171178
}
172179

173180
func patchDockerMachine(ctx context.Context, patchHelper *patch.Helper, dockerMachine *infrav1.DockerMachine) error {

0 commit comments

Comments
 (0)