Skip to content

Commit 051b006

Browse files
committed
Use ClusterCacheTracker consistently (intead of NewClusterClient)
Signed-off-by: Stefan Büringer [email protected]
1 parent 9be885c commit 051b006

File tree

12 files changed

+179
-83
lines changed

12 files changed

+179
-83
lines changed

bootstrap/kubeadm/controllers/alias.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sigs.k8s.io/controller-runtime/pkg/controller"
2626

2727
kubeadmbootstrapcontrollers "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/controllers"
28+
"sigs.k8s.io/cluster-api/controllers/remote"
2829
)
2930

3031
// Following types provides access to reconcilers implemented in internal/controllers, thus
@@ -39,6 +40,8 @@ const (
3940
type KubeadmConfigReconciler struct {
4041
Client client.Client
4142

43+
Tracker *remote.ClusterCacheTracker
44+
4245
// WatchFilterValue is the label value used to filter events prior to reconciliation.
4346
WatchFilterValue string
4447

@@ -50,6 +53,7 @@ type KubeadmConfigReconciler struct {
5053
func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
5154
return (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{
5255
Client: r.Client,
56+
Tracker: r.Tracker,
5357
WatchFilterValue: r.WatchFilterValue,
5458
TokenTTL: r.TokenTTL,
5559
}).SetupWithManager(ctx, mgr, options)

bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ import (
5959
"sigs.k8s.io/cluster-api/util/secret"
6060
)
6161

62-
const (
63-
// KubeadmConfigControllerName defines the controller used when creating clients.
64-
KubeadmConfigControllerName = "kubeadmconfig-controller"
65-
)
66-
6762
const (
6863
// DefaultTokenTTL is the default TTL used for tokens.
6964
DefaultTokenTTL = 15 * time.Minute
@@ -82,15 +77,14 @@ type InitLocker interface {
8277
// KubeadmConfigReconciler reconciles a KubeadmConfig object.
8378
type KubeadmConfigReconciler struct {
8479
Client client.Client
80+
Tracker *remote.ClusterCacheTracker
8581
KubeadmInitLock InitLocker
8682

8783
// WatchFilterValue is the label value used to filter events prior to reconciliation.
8884
WatchFilterValue string
8985

9086
// TokenTTL is the amount of time a bootstrap token (and therefore a KubeadmConfig) will be valid.
9187
TokenTTL time.Duration
92-
93-
remoteClientGetter remote.ClusterClientGetter
9488
}
9589

9690
// Scope is a scoped struct used during reconciliation.
@@ -106,9 +100,6 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl
106100
if r.KubeadmInitLock == nil {
107101
r.KubeadmInitLock = locking.NewControlPlaneInitMutex(mgr.GetClient())
108102
}
109-
if r.remoteClientGetter == nil {
110-
r.remoteClientGetter = remote.NewClusterClient
111-
}
112103
if r.TokenTTL == 0 {
113104
r.TokenTTL = DefaultTokenTTL
114105
}
@@ -239,6 +230,25 @@ func (r *KubeadmConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
239230
}
240231
}
241232
}()
233+
234+
// Ignore deleted KubeadmConfigs.
235+
if !config.DeletionTimestamp.IsZero() {
236+
return ctrl.Result{}, nil
237+
}
238+
239+
res, err := r.reconcile(ctx, scope, cluster, config, configOwner)
240+
if err != nil && errors.Is(err, remote.ErrClusterLocked) {
241+
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
242+
// the current cluster because of concurrent access.
243+
log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
244+
return ctrl.Result{Requeue: true}, nil
245+
}
246+
return res, err
247+
}
248+
249+
func (r *KubeadmConfigReconciler) reconcile(ctx context.Context, scope *Scope, cluster *clusterv1.Cluster, config *bootstrapv1.KubeadmConfig, configOwner *bsutil.ConfigOwner) (ctrl.Result, error) {
250+
log := ctrl.LoggerFrom(ctx)
251+
242252
// Ensure the bootstrap secret associated with this KubeadmConfig has the correct ownerReference.
243253
if err := r.ensureBootstrapSecretOwnersRef(ctx, scope); err != nil {
244254
return ctrl.Result{}, err
@@ -305,9 +315,8 @@ func (r *KubeadmConfigReconciler) refreshBootstrapToken(ctx context.Context, con
305315
log := ctrl.LoggerFrom(ctx)
306316
token := config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token
307317

308-
remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster))
318+
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
309319
if err != nil {
310-
log.Error(err, "Error creating remote cluster client")
311320
return ctrl.Result{}, err
312321
}
313322

@@ -323,7 +332,7 @@ func (r *KubeadmConfigReconciler) refreshBootstrapToken(ctx context.Context, con
323332
func (r *KubeadmConfigReconciler) rotateMachinePoolBootstrapToken(ctx context.Context, config *bootstrapv1.KubeadmConfig, cluster *clusterv1.Cluster, scope *Scope) (ctrl.Result, error) {
324333
log := ctrl.LoggerFrom(ctx)
325334
log.V(2).Info("Config is owned by a MachinePool, checking if token should be rotated")
326-
remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster))
335+
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
327336
if err != nil {
328337
return ctrl.Result{}, err
329338
}
@@ -928,7 +937,7 @@ func (r *KubeadmConfigReconciler) reconcileDiscovery(ctx context.Context, cluste
928937

929938
// if BootstrapToken already contains a token, respect it; otherwise create a new bootstrap token for the node to join
930939
if config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token == "" {
931-
remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster))
940+
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
932941
if err != nil {
933942
return ctrl.Result{}, err
934943
}

bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
ignition "github.com/flatcar/ignition/config/v2_3"
28+
"github.com/go-logr/logr"
2829
. "github.com/onsi/gomega"
2930
corev1 "k8s.io/api/core/v1"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -34,13 +35,14 @@ import (
3435
ctrl "sigs.k8s.io/controller-runtime"
3536
"sigs.k8s.io/controller-runtime/pkg/client"
3637
"sigs.k8s.io/controller-runtime/pkg/client/fake"
38+
"sigs.k8s.io/controller-runtime/pkg/log"
3739
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3840
"sigs.k8s.io/yaml"
3941

4042
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
4143
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
4244
bootstrapbuilder "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/builder"
43-
fakeremote "sigs.k8s.io/cluster-api/controllers/remote/fake"
45+
"sigs.k8s.io/cluster-api/controllers/remote"
4446
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
4547
"sigs.k8s.io/cluster-api/feature"
4648
"sigs.k8s.io/cluster-api/internal/test/builder"
@@ -495,9 +497,9 @@ func TestKubeadmConfigReconciler_Reconcile_GenerateCloudConfigData(t *testing.T)
495497
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
496498

497499
k := &KubeadmConfigReconciler{
498-
Client: myclient,
499-
KubeadmInitLock: &myInitLocker{},
500-
remoteClientGetter: fakeremote.NewClusterClient,
500+
Client: myclient,
501+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
502+
KubeadmInitLock: &myInitLocker{},
501503
}
502504

503505
request := ctrl.Request{
@@ -556,9 +558,9 @@ func TestKubeadmConfigReconciler_Reconcile_ErrorIfJoiningControlPlaneHasInvalidC
556558
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
557559

558560
k := &KubeadmConfigReconciler{
559-
Client: myclient,
560-
KubeadmInitLock: &myInitLocker{},
561-
remoteClientGetter: fakeremote.NewClusterClient,
561+
Client: myclient,
562+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
563+
KubeadmInitLock: &myInitLocker{},
562564
}
563565

564566
request := ctrl.Request{
@@ -677,9 +679,9 @@ func TestReconcileIfJoinCertificatesAvailableConditioninNodesAndControlPlaneIsRe
677679
objects = append(objects, createSecrets(t, cluster, config)...)
678680
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
679681
k := &KubeadmConfigReconciler{
680-
Client: myclient,
681-
KubeadmInitLock: &myInitLocker{},
682-
remoteClientGetter: fakeremote.NewClusterClient,
682+
Client: myclient,
683+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
684+
KubeadmInitLock: &myInitLocker{},
683685
}
684686

685687
request := ctrl.Request{
@@ -754,9 +756,9 @@ func TestReconcileIfJoinNodePoolsAndControlPlaneIsReady(t *testing.T) {
754756
objects = append(objects, createSecrets(t, cluster, config)...)
755757
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
756758
k := &KubeadmConfigReconciler{
757-
Client: myclient,
758-
KubeadmInitLock: &myInitLocker{},
759-
remoteClientGetter: fakeremote.NewClusterClient,
759+
Client: myclient,
760+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
761+
KubeadmInitLock: &myInitLocker{},
760762
}
761763

762764
request := ctrl.Request{
@@ -854,9 +856,9 @@ func TestBootstrapDataFormat(t *testing.T) {
854856
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
855857

856858
k := &KubeadmConfigReconciler{
857-
Client: myclient,
858-
KubeadmInitLock: &myInitLocker{},
859-
remoteClientGetter: fakeremote.NewClusterClient,
859+
Client: myclient,
860+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
861+
KubeadmInitLock: &myInitLocker{},
860862
}
861863
request := ctrl.Request{
862864
NamespacedName: client.ObjectKey{
@@ -934,9 +936,9 @@ func TestKubeadmConfigSecretCreatedStatusNotPatched(t *testing.T) {
934936
objects = append(objects, createSecrets(t, cluster, initConfig)...)
935937
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
936938
k := &KubeadmConfigReconciler{
937-
Client: myclient,
938-
KubeadmInitLock: &myInitLocker{},
939-
remoteClientGetter: fakeremote.NewClusterClient,
939+
Client: myclient,
940+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
941+
KubeadmInitLock: &myInitLocker{},
940942
}
941943
request := ctrl.Request{
942944
NamespacedName: client.ObjectKey{
@@ -1011,10 +1013,10 @@ func TestBootstrapTokenTTLExtension(t *testing.T) {
10111013
objects = append(objects, createSecrets(t, cluster, initConfig)...)
10121014
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}, &clusterv1.Machine{}).Build()
10131015
k := &KubeadmConfigReconciler{
1014-
Client: myclient,
1015-
KubeadmInitLock: &myInitLocker{},
1016-
TokenTTL: DefaultTokenTTL,
1017-
remoteClientGetter: fakeremote.NewClusterClient,
1016+
Client: myclient,
1017+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
1018+
KubeadmInitLock: &myInitLocker{},
1019+
TokenTTL: DefaultTokenTTL,
10181020
}
10191021
request := ctrl.Request{
10201022
NamespacedName: client.ObjectKey{
@@ -1212,10 +1214,10 @@ func TestBootstrapTokenRotationMachinePool(t *testing.T) {
12121214
objects = append(objects, createSecrets(t, cluster, initConfig)...)
12131215
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}, &expv1.MachinePool{}).Build()
12141216
k := &KubeadmConfigReconciler{
1215-
Client: myclient,
1216-
KubeadmInitLock: &myInitLocker{},
1217-
TokenTTL: DefaultTokenTTL,
1218-
remoteClientGetter: fakeremote.NewClusterClient,
1217+
Client: myclient,
1218+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
1219+
KubeadmInitLock: &myInitLocker{},
1220+
TokenTTL: DefaultTokenTTL,
12191221
}
12201222
request := ctrl.Request{
12211223
NamespacedName: client.ObjectKey{
@@ -1368,12 +1370,6 @@ func TestBootstrapTokenRotationMachinePool(t *testing.T) {
13681370

13691371
// Ensure the discovery portion of the JoinConfiguration gets generated correctly.
13701372
func TestKubeadmConfigReconciler_Reconcile_DiscoveryReconcileBehaviors(t *testing.T) {
1371-
k := &KubeadmConfigReconciler{
1372-
Client: fake.NewClientBuilder().Build(),
1373-
KubeadmInitLock: &myInitLocker{},
1374-
remoteClientGetter: fakeremote.NewClusterClient,
1375-
}
1376-
13771373
caHash := []string{"...."}
13781374
bootstrapToken := bootstrapv1.Discovery{
13791375
BootstrapToken: &bootstrapv1.BootstrapTokenDiscovery{
@@ -1499,6 +1495,13 @@ func TestKubeadmConfigReconciler_Reconcile_DiscoveryReconcileBehaviors(t *testin
14991495
t.Run(tc.name, func(t *testing.T) {
15001496
g := NewWithT(t)
15011497

1498+
fakeClient := fake.NewClientBuilder().Build()
1499+
k := &KubeadmConfigReconciler{
1500+
Client: fakeClient,
1501+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: tc.cluster.Name, Namespace: tc.cluster.Namespace}),
1502+
KubeadmInitLock: &myInitLocker{},
1503+
}
1504+
15021505
res, err := k.reconcileDiscovery(ctx, tc.cluster, tc.config, secret.Certificates{})
15031506
g.Expect(res.IsZero()).To(BeTrue())
15041507
g.Expect(err).NotTo(HaveOccurred())
@@ -1710,9 +1713,9 @@ func TestKubeadmConfigReconciler_Reconcile_AlwaysCheckCAVerificationUnlessReques
17101713

17111714
myclient := fake.NewClientBuilder().WithObjects(objects...).Build()
17121715
reconciler := KubeadmConfigReconciler{
1713-
Client: myclient,
1714-
KubeadmInitLock: &myInitLocker{},
1715-
remoteClientGetter: fakeremote.NewClusterClient,
1716+
Client: myclient,
1717+
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
1718+
KubeadmInitLock: &myInitLocker{},
17161719
}
17171720

17181721
wc := newWorkerJoinKubeadmConfig(metav1.NamespaceDefault, "worker-join-cfg")

bootstrap/kubeadm/main.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ import (
5555
)
5656

5757
var (
58-
scheme = runtime.NewScheme()
59-
setupLog = ctrl.Log.WithName("setup")
58+
scheme = runtime.NewScheme()
59+
setupLog = ctrl.Log.WithName("setup")
60+
controllerName = "cluster-api-kubeadm-bootstrap-manager"
6061
)
6162

6263
func init() {
@@ -80,6 +81,7 @@ var (
8081
watchFilterValue string
8182
watchNamespace string
8283
profilerAddress string
84+
clusterConcurrency int
8385
kubeadmConfigConcurrency int
8486
syncPeriod time.Duration
8587
restConfigQPS float32
@@ -117,6 +119,9 @@ func InitFlags(fs *pflag.FlagSet) {
117119
fs.StringVar(&profilerAddress, "profiler-address", "",
118120
"Bind address to expose the pprof profiler (e.g. localhost:6060)")
119121

122+
fs.IntVar(&clusterConcurrency, "cluster-concurrency", 10,
123+
"Number of clusters to process simultaneously")
124+
120125
fs.IntVar(&kubeadmConfigConcurrency, "kubeadmconfig-concurrency", 10,
121126
"Number of kubeadm configs to process simultaneously")
122127

@@ -166,7 +171,7 @@ func main() {
166171
restConfig := ctrl.GetConfigOrDie()
167172
restConfig.QPS = restConfigQPS
168173
restConfig.Burst = restConfigBurst
169-
restConfig.UserAgent = remote.DefaultClusterAPIUserAgent("cluster-api-kubeadm-bootstrap-manager")
174+
restConfig.UserAgent = remote.DefaultClusterAPIUserAgent(controllerName)
170175

171176
tlsOptionOverrides, err := flags.GetTLSOptionOverrideFuncs(tlsOptions)
172177
if err != nil {
@@ -245,8 +250,33 @@ func setupChecks(mgr ctrl.Manager) {
245250
}
246251

247252
func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
253+
// Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers
254+
// requiring a connection to a remote cluster
255+
log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker")
256+
tracker, err := remote.NewClusterCacheTracker(
257+
mgr,
258+
remote.ClusterCacheTrackerOptions{
259+
ControllerName: controllerName,
260+
Log: &log,
261+
Indexes: remote.DefaultIndexes,
262+
},
263+
)
264+
if err != nil {
265+
setupLog.Error(err, "unable to create cluster cache tracker")
266+
os.Exit(1)
267+
}
268+
if err := (&remote.ClusterCacheReconciler{
269+
Client: mgr.GetClient(),
270+
Tracker: tracker,
271+
WatchFilterValue: watchFilterValue,
272+
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
273+
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
274+
os.Exit(1)
275+
}
276+
248277
if err := (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{
249278
Client: mgr.GetClient(),
279+
Tracker: tracker,
250280
WatchFilterValue: watchFilterValue,
251281
TokenTTL: tokenTTL,
252282
}).SetupWithManager(ctx, mgr, concurrency(kubeadmConfigConcurrency)); err != nil {

0 commit comments

Comments
 (0)