Skip to content

Commit 2beebbd

Browse files
authored
Merge pull request #11247 from sbueringer/pr-cct-refactoring
✨ Introduce new ClusterCache
2 parents b8f2f80 + 19c80d4 commit 2beebbd

File tree

65 files changed

+3900
-712
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3900
-712
lines changed

api/v1beta1/v1beta2_condition_consts.go

+6
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,12 @@ const (
203203
// is detected (or whatever period is defined in the --remote-connection-grace-period flag).
204204
ClusterRemoteConnectionProbeV1Beta2Condition = "RemoteConnectionProbe"
205205

206+
// ClusterRemoteConnectionProbeFailedV1Beta2Reason surfaces issues with the connection to the workload cluster.
207+
ClusterRemoteConnectionProbeFailedV1Beta2Reason = "RemoteConnectionProbeFailed"
208+
209+
// ClusterRemoteConnectionProbeSucceededV1Beta2Reason is used to report a working connection with the workload cluster.
210+
ClusterRemoteConnectionProbeSucceededV1Beta2Reason = "RemoteConnectionProbeSucceeded"
211+
206212
// ClusterScalingUpV1Beta2Condition is true if available replicas < desired replicas.
207213
ClusterScalingUpV1Beta2Condition = ScalingUpV1Beta2Condition
208214

bootstrap/kubeadm/controllers/alias.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"sigs.k8s.io/controller-runtime/pkg/controller"
2626

2727
kubeadmbootstrapcontrollers "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/controllers"
28-
"sigs.k8s.io/cluster-api/controllers/remote"
28+
"sigs.k8s.io/cluster-api/controllers/clustercache"
2929
)
3030

3131
// Following types provides access to reconcilers implemented in internal/controllers, thus
@@ -41,7 +41,7 @@ type KubeadmConfigReconciler struct {
4141
Client client.Client
4242
SecretCachingClient client.Client
4343

44-
Tracker *remote.ClusterCacheTracker
44+
ClusterCache clustercache.ClusterCache
4545

4646
// WatchFilterValue is the label value used to filter events prior to reconciliation.
4747
WatchFilterValue string
@@ -55,7 +55,7 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl
5555
return (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{
5656
Client: r.Client,
5757
SecretCachingClient: r.SecretCachingClient,
58-
Tracker: r.Tracker,
58+
ClusterCache: r.ClusterCache,
5959
WatchFilterValue: r.WatchFilterValue,
6060
TokenTTL: r.TokenTTL,
6161
}).SetupWithManager(ctx, mgr, options)

bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go

+8-10
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import (
5050
"sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/locking"
5151
kubeadmtypes "sigs.k8s.io/cluster-api/bootstrap/kubeadm/types"
5252
bsutil "sigs.k8s.io/cluster-api/bootstrap/util"
53-
"sigs.k8s.io/cluster-api/controllers/remote"
53+
"sigs.k8s.io/cluster-api/controllers/clustercache"
5454
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
5555
"sigs.k8s.io/cluster-api/feature"
5656
"sigs.k8s.io/cluster-api/internal/util/taints"
@@ -83,7 +83,7 @@ type InitLocker interface {
8383
type KubeadmConfigReconciler struct {
8484
Client client.Client
8585
SecretCachingClient client.Client
86-
Tracker *remote.ClusterCacheTracker
86+
ClusterCache clustercache.ClusterCache
8787
KubeadmInitLock InitLocker
8888

8989
// WatchFilterValue is the label value used to filter events prior to reconciliation.
@@ -135,7 +135,7 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl
135135
predicates.ResourceHasFilterLabel(mgr.GetScheme(), predicateLog, r.WatchFilterValue),
136136
),
137137
),
138-
)
138+
).WatchesRawSource(r.ClusterCache.GetClusterSource("kubeadmconfig", r.ClusterToKubeadmConfigs))
139139

140140
if err := b.Complete(r); err != nil {
141141
return errors.Wrap(err, "failed setting up with a controller manager")
@@ -242,10 +242,8 @@ func (r *KubeadmConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
242242
}
243243

244244
res, err := r.reconcile(ctx, scope, cluster, config, configOwner)
245-
if err != nil && errors.Is(err, remote.ErrClusterLocked) {
246-
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
247-
// the current cluster because of concurrent access.
248-
log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
245+
if err != nil && errors.Is(err, clustercache.ErrClusterNotConnected) {
246+
log.V(5).Info("Requeuing because connection to the workload cluster is down")
249247
return ctrl.Result{RequeueAfter: time.Minute}, nil
250248
}
251249
return res, err
@@ -320,7 +318,7 @@ func (r *KubeadmConfigReconciler) refreshBootstrapTokenIfNeeded(ctx context.Cont
320318
log := ctrl.LoggerFrom(ctx)
321319
token := config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token
322320

323-
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
321+
remoteClient, err := r.ClusterCache.GetClient(ctx, util.ObjectKey(cluster))
324322
if err != nil {
325323
return ctrl.Result{}, err
326324
}
@@ -367,7 +365,7 @@ func (r *KubeadmConfigReconciler) refreshBootstrapTokenIfNeeded(ctx context.Cont
367365
func (r *KubeadmConfigReconciler) rotateMachinePoolBootstrapToken(ctx context.Context, config *bootstrapv1.KubeadmConfig, cluster *clusterv1.Cluster, scope *Scope) (ctrl.Result, error) {
368366
log := ctrl.LoggerFrom(ctx)
369367
log.V(2).Info("Config is owned by a MachinePool, checking if token should be rotated")
370-
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
368+
remoteClient, err := r.ClusterCache.GetClient(ctx, util.ObjectKey(cluster))
371369
if err != nil {
372370
return ctrl.Result{}, err
373371
}
@@ -1087,7 +1085,7 @@ func (r *KubeadmConfigReconciler) reconcileDiscovery(ctx context.Context, cluste
10871085

10881086
// if BootstrapToken already contains a token, respect it; otherwise create a new bootstrap token for the node to join
10891087
if config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token == "" {
1090-
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
1088+
remoteClient, err := r.ClusterCache.GetClient(ctx, util.ObjectKey(cluster))
10911089
if err != nil {
10921090
return ctrl.Result{}, err
10931091
}

bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller_test.go

+11-13
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"time"
2525

2626
ignition "github.com/flatcar/ignition/config/v2_3"
27-
"github.com/go-logr/logr"
2827
. "github.com/onsi/gomega"
2928
corev1 "k8s.io/api/core/v1"
3029
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -34,14 +33,13 @@ import (
3433
ctrl "sigs.k8s.io/controller-runtime"
3534
"sigs.k8s.io/controller-runtime/pkg/client"
3635
"sigs.k8s.io/controller-runtime/pkg/client/fake"
37-
"sigs.k8s.io/controller-runtime/pkg/log"
3836
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3937
"sigs.k8s.io/yaml"
4038

4139
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
4240
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
4341
bootstrapbuilder "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/builder"
44-
"sigs.k8s.io/cluster-api/controllers/remote"
42+
"sigs.k8s.io/cluster-api/controllers/clustercache"
4543
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
4644
"sigs.k8s.io/cluster-api/feature"
4745
"sigs.k8s.io/cluster-api/internal/test/builder"
@@ -509,7 +507,7 @@ func TestKubeadmConfigReconciler_Reconcile_GenerateCloudConfigData(t *testing.T)
509507
k := &KubeadmConfigReconciler{
510508
Client: myclient,
511509
SecretCachingClient: myclient,
512-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
510+
ClusterCache: clustercache.NewFakeClusterCache(myclient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
513511
KubeadmInitLock: &myInitLocker{},
514512
}
515513

@@ -571,7 +569,7 @@ func TestKubeadmConfigReconciler_Reconcile_ErrorIfJoiningControlPlaneHasInvalidC
571569
k := &KubeadmConfigReconciler{
572570
Client: myclient,
573571
SecretCachingClient: myclient,
574-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
572+
ClusterCache: clustercache.NewFakeClusterCache(myclient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
575573
KubeadmInitLock: &myInitLocker{},
576574
}
577575

@@ -693,7 +691,7 @@ func TestReconcileIfJoinCertificatesAvailableConditioninNodesAndControlPlaneIsRe
693691
k := &KubeadmConfigReconciler{
694692
Client: myclient,
695693
SecretCachingClient: myclient,
696-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
694+
ClusterCache: clustercache.NewFakeClusterCache(myclient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
697695
KubeadmInitLock: &myInitLocker{},
698696
}
699697

@@ -770,7 +768,7 @@ func TestReconcileIfJoinNodePoolsAndControlPlaneIsReady(t *testing.T) {
770768
k := &KubeadmConfigReconciler{
771769
Client: myclient,
772770
SecretCachingClient: myclient,
773-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
771+
ClusterCache: clustercache.NewFakeClusterCache(myclient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
774772
KubeadmInitLock: &myInitLocker{},
775773
}
776774

@@ -871,7 +869,7 @@ func TestBootstrapDataFormat(t *testing.T) {
871869
k := &KubeadmConfigReconciler{
872870
Client: myclient,
873871
SecretCachingClient: myclient,
874-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
872+
ClusterCache: clustercache.NewFakeClusterCache(myclient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
875873
KubeadmInitLock: &myInitLocker{},
876874
}
877875
request := ctrl.Request{
@@ -952,7 +950,7 @@ func TestKubeadmConfigSecretCreatedStatusNotPatched(t *testing.T) {
952950
k := &KubeadmConfigReconciler{
953951
Client: myclient,
954952
SecretCachingClient: myclient,
955-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
953+
ClusterCache: clustercache.NewFakeClusterCache(myclient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
956954
KubeadmInitLock: &myInitLocker{},
957955
}
958956
request := ctrl.Request{
@@ -1033,7 +1031,7 @@ func TestBootstrapTokenTTLExtension(t *testing.T) {
10331031
SecretCachingClient: myclient,
10341032
KubeadmInitLock: &myInitLocker{},
10351033
TokenTTL: DefaultTokenTTL,
1036-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, remoteClient, remoteClient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
1034+
ClusterCache: clustercache.NewFakeClusterCache(remoteClient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
10371035
}
10381036
request := ctrl.Request{
10391037
NamespacedName: client.ObjectKey{
@@ -1279,7 +1277,7 @@ func TestBootstrapTokenRotationMachinePool(t *testing.T) {
12791277
SecretCachingClient: myclient,
12801278
KubeadmInitLock: &myInitLocker{},
12811279
TokenTTL: DefaultTokenTTL,
1282-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, remoteClient, remoteClient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
1280+
ClusterCache: clustercache.NewFakeClusterCache(remoteClient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
12831281
}
12841282
request := ctrl.Request{
12851283
NamespacedName: client.ObjectKey{
@@ -1602,7 +1600,7 @@ func TestKubeadmConfigReconciler_Reconcile_DiscoveryReconcileBehaviors(t *testin
16021600
k := &KubeadmConfigReconciler{
16031601
Client: fakeClient,
16041602
SecretCachingClient: fakeClient,
1605-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: tc.cluster.Name, Namespace: tc.cluster.Namespace}),
1603+
ClusterCache: clustercache.NewFakeClusterCache(fakeClient, client.ObjectKey{Name: tc.cluster.Name, Namespace: tc.cluster.Namespace}),
16061604
KubeadmInitLock: &myInitLocker{},
16071605
}
16081606

@@ -1827,7 +1825,7 @@ func TestKubeadmConfigReconciler_Reconcile_AlwaysCheckCAVerificationUnlessReques
18271825
reconciler := KubeadmConfigReconciler{
18281826
Client: myclient,
18291827
SecretCachingClient: myclient,
1830-
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
1828+
ClusterCache: clustercache.NewFakeClusterCache(myclient, client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
18311829
KubeadmInitLock: &myInitLocker{},
18321830
}
18331831

bootstrap/kubeadm/main.go

+48-50
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
4848
kubeadmbootstrapcontrollers "sigs.k8s.io/cluster-api/bootstrap/kubeadm/controllers"
4949
"sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/webhooks"
50+
"sigs.k8s.io/cluster-api/controllers/clustercache"
5051
"sigs.k8s.io/cluster-api/controllers/remote"
5152
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
5253
"sigs.k8s.io/cluster-api/feature"
@@ -63,31 +64,31 @@ var (
6364
controllerName = "cluster-api-kubeadm-bootstrap-manager"
6465

6566
// flags.
66-
enableLeaderElection bool
67-
leaderElectionLeaseDuration time.Duration
68-
leaderElectionRenewDeadline time.Duration
69-
leaderElectionRetryPeriod time.Duration
70-
watchFilterValue string
71-
watchNamespace string
72-
profilerAddress string
73-
enableContentionProfiling bool
74-
syncPeriod time.Duration
75-
restConfigQPS float32
76-
restConfigBurst int
77-
clusterCacheTrackerClientQPS float32
78-
clusterCacheTrackerClientBurst int
79-
webhookPort int
80-
webhookCertDir string
81-
webhookCertName string
82-
webhookKeyName string
83-
healthAddr string
84-
managerOptions = flags.ManagerOptions{}
85-
logOptions = logs.NewOptions()
67+
enableLeaderElection bool
68+
leaderElectionLeaseDuration time.Duration
69+
leaderElectionRenewDeadline time.Duration
70+
leaderElectionRetryPeriod time.Duration
71+
watchFilterValue string
72+
watchNamespace string
73+
profilerAddress string
74+
enableContentionProfiling bool
75+
syncPeriod time.Duration
76+
restConfigQPS float32
77+
restConfigBurst int
78+
clusterCacheClientQPS float32
79+
clusterCacheClientBurst int
80+
webhookPort int
81+
webhookCertDir string
82+
webhookCertName string
83+
webhookKeyName string
84+
healthAddr string
85+
managerOptions = flags.ManagerOptions{}
86+
logOptions = logs.NewOptions()
8687
// CABPK specific flags.
87-
clusterConcurrency int
88-
clusterCacheTrackerConcurrency int
89-
kubeadmConfigConcurrency int
90-
tokenTTL time.Duration
88+
clusterConcurrency int
89+
clusterCacheConcurrency int
90+
kubeadmConfigConcurrency int
91+
tokenTTL time.Duration
9192
)
9293

9394
func init() {
@@ -131,7 +132,7 @@ func InitFlags(fs *pflag.FlagSet) {
131132
"Number of clusters to process simultaneously")
132133
_ = fs.MarkDeprecated("cluster-concurrency", "This flag has no function anymore and is going to be removed in a next release. Use \"--clustercachetracker-concurrency\" instead.")
133134

134-
fs.IntVar(&clusterCacheTrackerConcurrency, "clustercachetracker-concurrency", 10,
135+
fs.IntVar(&clusterCacheConcurrency, "clustercache-concurrency", 100,
135136
"Number of clusters to process simultaneously")
136137

137138
fs.IntVar(&kubeadmConfigConcurrency, "kubeadmconfig-concurrency", 10,
@@ -146,11 +147,11 @@ func InitFlags(fs *pflag.FlagSet) {
146147
fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
147148
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
148149

149-
fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
150-
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
150+
fs.Float32Var(&clusterCacheClientQPS, "clustercache-client-qps", 20,
151+
"Maximum queries per second from the cluster cache clients to the Kubernetes API server of workload clusters.")
151152

152-
fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
153-
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
153+
fs.IntVar(&clusterCacheClientBurst, "clustercache-client-burst", 30,
154+
"Maximum number of queries that should be allowed in one burst from the cluster cache clients to the Kubernetes API server of workload clusters.")
154155

155156
fs.DurationVar(&tokenTTL, "bootstrap-token-ttl", kubeadmbootstrapcontrollers.DefaultTokenTTL,
156157
"The amount of time the bootstrap token will be valid")
@@ -312,35 +313,32 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
312313
os.Exit(1)
313314
}
314315

315-
// Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers
316-
// requiring a connection to a remote cluster
317-
tracker, err := remote.NewClusterCacheTracker(
318-
mgr,
319-
remote.ClusterCacheTrackerOptions{
320-
SecretCachingClient: secretCachingClient,
321-
ControllerName: controllerName,
322-
Log: &ctrl.Log,
323-
ClientQPS: clusterCacheTrackerClientQPS,
324-
ClientBurst: clusterCacheTrackerClientBurst,
316+
clusterCache, err := clustercache.SetupWithManager(ctx, mgr, clustercache.Options{
317+
SecretClient: secretCachingClient,
318+
Cache: clustercache.CacheOptions{},
319+
Client: clustercache.ClientOptions{
320+
QPS: clusterCacheClientQPS,
321+
Burst: clusterCacheClientBurst,
322+
UserAgent: remote.DefaultClusterAPIUserAgent(controllerName),
323+
Cache: clustercache.ClientCacheOptions{
324+
DisableFor: []client.Object{
325+
// Don't cache ConfigMaps & Secrets.
326+
&corev1.ConfigMap{},
327+
&corev1.Secret{},
328+
},
329+
},
325330
},
326-
)
327-
if err != nil {
328-
setupLog.Error(err, "unable to create cluster cache tracker")
329-
os.Exit(1)
330-
}
331-
if err := (&remote.ClusterCacheReconciler{
332-
Client: mgr.GetClient(),
333-
Tracker: tracker,
334331
WatchFilterValue: watchFilterValue,
335-
}).SetupWithManager(ctx, mgr, concurrency(clusterCacheTrackerConcurrency)); err != nil {
336-
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
332+
}, concurrency(clusterCacheConcurrency))
333+
if err != nil {
334+
setupLog.Error(err, "Unable to create ClusterCache")
337335
os.Exit(1)
338336
}
339337

340338
if err := (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{
341339
Client: mgr.GetClient(),
342340
SecretCachingClient: secretCachingClient,
343-
Tracker: tracker,
341+
ClusterCache: clusterCache,
344342
WatchFilterValue: watchFilterValue,
345343
TokenTTL: tokenTTL,
346344
}).SetupWithManager(ctx, mgr, concurrency(kubeadmConfigConcurrency)); err != nil {

0 commit comments

Comments
 (0)