From abb8f5f55aaf12dc0a9572fc4b4d3570fc8bc528 Mon Sep 17 00:00:00 2001 From: Yuvaraj Kakaraparthi Date: Thu, 2 Jun 2022 10:53:56 -0700 Subject: [PATCH] Implement BeforeClusterCreate and BeforeClusterUpgrade lifecycle hooks --- api/v1beta1/condition_consts.go | 4 + controllers/alias.go | 4 + .../topology/cluster/cluster_controller.go | 44 +++++ .../cluster/cluster_controller_test.go | 95 +++++++++++ .../topology/cluster/conditions.go | 15 ++ .../topology/cluster/conditions_test.go | 29 ++++ .../topology/cluster/desired_state.go | 36 ++++- .../topology/cluster/desired_state_test.go | 138 ++++++++++++++-- .../cluster/scope/hookresponsetracker.go | 84 ++++++++++ .../cluster/scope/hookresponsetracker_test.go | 147 +++++++++++++++++ .../topology/cluster/scope/scope.go | 7 +- internal/runtime/catalog/catalog.go | 10 +- internal/runtime/catalog/test/catalog_test.go | 7 + internal/runtime/client/fake/fake_client.go | 151 ++++++++++++++++++ main.go | 23 +-- 15 files changed, 768 insertions(+), 26 deletions(-) create mode 100644 internal/controllers/topology/cluster/scope/hookresponsetracker.go create mode 100644 internal/controllers/topology/cluster/scope/hookresponsetracker_test.go create mode 100644 internal/runtime/client/fake/fake_client.go diff --git a/api/v1beta1/condition_consts.go b/api/v1beta1/condition_consts.go index 57fd2b379e8b..d404f0e52daa 100644 --- a/api/v1beta1/condition_consts.go +++ b/api/v1beta1/condition_consts.go @@ -277,4 +277,8 @@ const ( // TopologyReconciledMachineDeploymentsUpgradePendingReason (Severity=Info) documents reconciliation of a Cluster topology // not yet completed because at least one of the MachineDeployments is not yet updated to match the desired topology spec. TopologyReconciledMachineDeploymentsUpgradePendingReason = "MachineDeploymentsUpgradePending" + + // TopologyReconciledHookBlockingReason (Severity=Info) documents reconciliation of a Cluster topology + // not yet completed because at least one of the lifecycle hooks is blocking. + TopologyReconciledHookBlockingReason = "LifecycleHookBlocking" ) diff --git a/controllers/alias.go b/controllers/alias.go index 337440ae8507..c56d8af45f40 100644 --- a/controllers/alias.go +++ b/controllers/alias.go @@ -32,6 +32,7 @@ import ( clustertopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster" machinedeploymenttopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/machinedeployment" machinesettopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/machineset" + runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" ) // Following types provides access to reconcilers implemented in internal/controllers, thus @@ -133,6 +134,8 @@ type ClusterTopologyReconciler struct { // race conditions caused by an outdated cache. APIReader client.Reader + RuntimeClient runtimeclient.Client + // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -145,6 +148,7 @@ func (r *ClusterTopologyReconciler) SetupWithManager(ctx context.Context, mgr ct return (&clustertopologycontroller.Reconciler{ Client: r.Client, APIReader: r.APIReader, + RuntimeClient: r.RuntimeClient, UnstructuredCachingClient: r.UnstructuredCachingClient, WatchFilterValue: r.WatchFilterValue, }).SetupWithManager(ctx, mgr, options) diff --git a/internal/controllers/topology/cluster/cluster_controller.go b/internal/controllers/topology/cluster/cluster_controller.go index 2d0857d26b43..b5d14e6fd575 100644 --- a/internal/controllers/topology/cluster/cluster_controller.go +++ b/internal/controllers/topology/cluster/cluster_controller.go @@ -19,6 +19,7 @@ package cluster import ( "context" "fmt" + "time" "github.com/pkg/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -30,14 +31,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/api/v1beta1/index" "sigs.k8s.io/cluster-api/controllers/external" + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" + "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/patches" "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope" "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/structuredmerge" + runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog" + runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/patch" @@ -59,6 +65,8 @@ type Reconciler struct { // race conditions caused by an outdated cache. APIReader client.Reader + RuntimeClient runtimeclient.Client + // WatchFilterValue is the label value used to filter events prior to reconciliation. WatchFilterValue string @@ -207,6 +215,17 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope.Scope) (ctrl.Result return ctrl.Result{}, errors.Wrap(err, "error reading current state of the Cluster topology") } + // The cluster topology is yet to be created. Call the BeforeClusterCreate hook before proceeding. + if feature.Gates.Enabled(feature.RuntimeSDK) { + res, err := r.callBeforeClusterCreateHook(ctx, s) + if err != nil { + return reconcile.Result{}, err + } + if !res.IsZero() { + return res, nil + } + } + // Setup watches for InfrastructureCluster and ControlPlane CRs when they exist. if err := r.setupDynamicWatches(ctx, s); err != nil { return ctrl.Result{}, errors.Wrap(err, "error creating dynamic watch") @@ -223,6 +242,12 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope.Scope) (ctrl.Result return ctrl.Result{}, errors.Wrap(err, "error reconciling the Cluster topology") } + // requeueAfter will not be 0 if any of the runtime hooks returns a blocking response. + requeueAfter := s.HookResponseTracker.AggregateRetryAfter() + if requeueAfter != 0 { + return ctrl.Result{RequeueAfter: requeueAfter}, nil + } + return ctrl.Result{}, nil } @@ -247,6 +272,25 @@ func (r *Reconciler) setupDynamicWatches(ctx context.Context, s *scope.Scope) er return nil } +func (r *Reconciler) callBeforeClusterCreateHook(ctx context.Context, s *scope.Scope) (reconcile.Result, error) { + // If the cluster objects (InfraCluster, ControlPlane, etc) are not yet created we are in the creation phase. + // Call the BeforeClusterCreate hook before proceeding. + if s.Current.Cluster.Spec.InfrastructureRef == nil && s.Current.Cluster.Spec.ControlPlaneRef == nil { + hookRequest := &runtimehooksv1.BeforeClusterCreateRequest{ + Cluster: *s.Current.Cluster, + } + hookResponse := &runtimehooksv1.BeforeClusterCreateResponse{} + if err := r.RuntimeClient.CallAllExtensions(ctx, runtimehooksv1.BeforeClusterCreate, s.Current.Cluster, hookRequest, hookResponse); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "error calling the %s hook", runtimecatalog.HookName(runtimehooksv1.BeforeClusterCreate)) + } + s.HookResponseTracker.Add(runtimehooksv1.BeforeClusterCreate, hookResponse) + if hookResponse.RetryAfterSeconds != 0 { + return ctrl.Result{RequeueAfter: time.Duration(hookResponse.RetryAfterSeconds) * time.Second}, nil + } + } + return ctrl.Result{}, nil +} + // clusterClassToCluster is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation // for Cluster to update when its own ClusterClass gets updated. func (r *Reconciler) clusterClassToCluster(o client.Object) []ctrl.Request { diff --git a/internal/controllers/topology/cluster/cluster_controller_test.go b/internal/controllers/topology/cluster/cluster_controller_test.go index 77d74056659f..d0b534e44b17 100644 --- a/internal/controllers/topology/cluster/cluster_controller_test.go +++ b/internal/controllers/topology/cluster/cluster_controller_test.go @@ -26,11 +26,17 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" utilfeature "k8s.io/component-base/featuregate/testing" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/contract" + "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope" + runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog" + fakeruntimeclient "sigs.k8s.io/cluster-api/internal/runtime/client/fake" "sigs.k8s.io/cluster-api/internal/test/builder" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/patch" @@ -436,6 +442,95 @@ func TestClusterReconciler_deleteClusterClass(t *testing.T) { g.Expect(env.Delete(ctx, clusterClass)).NotTo(Succeed()) } +func TestReconciler_callBeforeClusterCreateHook(t *testing.T) { + catalog := runtimecatalog.New() + _ = runtimehooksv1.AddToCatalog(catalog) + gvh, err := catalog.GroupVersionHook(runtimehooksv1.BeforeClusterCreate) + if err != nil { + panic(err) + } + + blockingResponse := &runtimehooksv1.BeforeClusterCreateResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + RetryAfterSeconds: int32(10), + }, + } + nonBlockingResponse := &runtimehooksv1.BeforeClusterCreateResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + RetryAfterSeconds: int32(0), + }, + } + failingResponse := &runtimehooksv1.BeforeClusterCreateResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusFailure, + }, + }, + } + + tests := []struct { + name string + hookResponse *runtimehooksv1.BeforeClusterCreateResponse + wantResult reconcile.Result + wantErr bool + }{ + { + name: "should return a requeue response when the BeforeClusterCreate hook is blocking", + hookResponse: blockingResponse, + wantResult: ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, + wantErr: false, + }, + { + name: "should return an empty response when the BeforeClusterCreate hook is not blocking", + hookResponse: nonBlockingResponse, + wantResult: ctrl.Result{}, + wantErr: false, + }, + { + name: "should error when the BeforeClusterCreate hook returns a failure response", + hookResponse: failingResponse, + wantResult: ctrl.Result{}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{ + gvh: tt.hookResponse, + }). + Build() + + r := &Reconciler{ + RuntimeClient: runtimeClient, + } + s := &scope.Scope{ + Current: &scope.ClusterState{ + Cluster: &clusterv1.Cluster{}, + }, + HookResponseTracker: scope.NewHookResponseTracker(), + } + res, err := r.callBeforeClusterCreateHook(ctx, s) + if tt.wantErr { + g.Expect(err).NotTo(BeNil()) + } else { + g.Expect(err).To(BeNil()) + g.Expect(res).To(Equal(tt.wantResult)) + } + }) + } +} + // setupTestEnvForIntegrationTests builds and then creates in the envtest API server all objects required at init time for each of the // integration tests in this file. This includes: // - a first clusterClass with all the related templates diff --git a/internal/controllers/topology/cluster/conditions.go b/internal/controllers/topology/cluster/conditions.go index 8b0e932c2be9..2d25dfa0846e 100644 --- a/internal/controllers/topology/cluster/conditions.go +++ b/internal/controllers/topology/cluster/conditions.go @@ -57,6 +57,21 @@ func (r *Reconciler) reconcileTopologyReconciledCondition(s *scope.Scope, cluste return nil } + // If any of the lifecycle hooks are blocking any part of the reconciliation then topology + // is not considered as fully reconciled. + if s.HookResponseTracker.AggregateRetryAfter() != 0 { + conditions.Set( + cluster, + conditions.FalseCondition( + clusterv1.TopologyReconciledCondition, + clusterv1.TopologyReconciledHookBlockingReason, + clusterv1.ConditionSeverityInfo, + s.HookResponseTracker.AggregateMessage(), + ), + ) + return nil + } + // If either the Control Plane or any of the MachineDeployments are still pending to pick up the new version (generally // happens when upgrading the cluster) then the topology is not considered as fully reconciled. if s.UpgradeTracker.ControlPlane.PendingUpgrade || s.UpgradeTracker.MachineDeployments.PendingUpgrade() { diff --git a/internal/controllers/topology/cluster/conditions_test.go b/internal/controllers/topology/cluster/conditions_test.go index 1c8593622cc0..dbd6d9cce998 100644 --- a/internal/controllers/topology/cluster/conditions_test.go +++ b/internal/controllers/topology/cluster/conditions_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope" "sigs.k8s.io/cluster-api/internal/test/builder" "sigs.k8s.io/cluster-api/util/conditions" @@ -47,6 +48,24 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { wantConditionReason: clusterv1.TopologyReconcileFailedReason, wantErr: false, }, + { + name: "should set the condition to false if the there is a blocking hook", + reconcileErr: nil, + cluster: &clusterv1.Cluster{}, + s: &scope.Scope{ + HookResponseTracker: func() *scope.HookResponseTracker { + hrt := scope.NewHookResponseTracker() + hrt.Add(runtimehooksv1.BeforeClusterUpgrade, &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(10), + }, + }) + return hrt + }(), + }, + wantConditionStatus: corev1.ConditionFalse, + wantConditionReason: clusterv1.TopologyReconciledHookBlockingReason, + }, { name: "should set the condition to false if new version is not picked up because control plane is provisioning", reconcileErr: nil, @@ -71,6 +90,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.ControlPlane.IsProvisioning = true return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionFalse, wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason, @@ -100,6 +120,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.ControlPlane.IsUpgrading = true return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionFalse, wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason, @@ -129,6 +150,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.ControlPlane.IsScaling = true return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionFalse, wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason, @@ -170,6 +192,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.ControlPlane.PendingUpgrade = true return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionFalse, wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason, @@ -213,6 +236,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.MachineDeployments.MarkPendingUpgrade("md0-abc123") return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionFalse, wantConditionReason: clusterv1.TopologyReconciledMachineDeploymentsUpgradePendingReason, @@ -256,6 +280,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.MachineDeployments.MarkPendingUpgrade("md0-abc123") return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionFalse, wantConditionReason: clusterv1.TopologyReconciledMachineDeploymentsUpgradePendingReason, @@ -285,6 +310,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.ControlPlane.IsUpgrading = true return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionTrue, }, @@ -313,6 +339,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.ControlPlane.IsScaling = true return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionTrue, }, @@ -367,6 +394,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.MachineDeployments.MarkPendingUpgrade("md1-abc123") return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionFalse, wantConditionReason: clusterv1.TopologyReconciledMachineDeploymentsUpgradePendingReason, @@ -421,6 +449,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) { ut.ControlPlane.PendingUpgrade = false return ut }(), + HookResponseTracker: scope.NewHookResponseTracker(), }, wantConditionStatus: corev1.ConditionTrue, }, diff --git a/internal/controllers/topology/cluster/desired_state.go b/internal/controllers/topology/cluster/desired_state.go index 56bc213a0f42..609ff2745a5e 100644 --- a/internal/controllers/topology/cluster/desired_state.go +++ b/internal/controllers/topology/cluster/desired_state.go @@ -31,9 +31,12 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/controllers/external" + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" + "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/contract" "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope" tlog "sigs.k8s.io/cluster-api/internal/log" + runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog" ) // computeDesiredState computes the desired state of the cluster topology. @@ -60,7 +63,7 @@ func (r *Reconciler) computeDesiredState(ctx context.Context, s *scope.Scope) (* // Compute the desired state of the ControlPlane object, eventually adding a reference to the // InfrastructureMachineTemplate generated by the previous step. - if desiredState.ControlPlane.Object, err = computeControlPlane(ctx, s, desiredState.ControlPlane.InfrastructureMachineTemplate); err != nil { + if desiredState.ControlPlane.Object, err = r.computeControlPlane(ctx, s, desiredState.ControlPlane.InfrastructureMachineTemplate); err != nil { return nil, errors.Wrapf(err, "failed to compute ControlPlane") } @@ -167,7 +170,7 @@ func computeControlPlaneInfrastructureMachineTemplate(_ context.Context, s *scop // computeControlPlane computes the desired state for the ControlPlane object starting from the // corresponding template defined in the blueprint. -func computeControlPlane(_ context.Context, s *scope.Scope, infrastructureMachineTemplate *unstructured.Unstructured) (*unstructured.Unstructured, error) { +func (r *Reconciler) computeControlPlane(ctx context.Context, s *scope.Scope, infrastructureMachineTemplate *unstructured.Unstructured) (*unstructured.Unstructured, error) { template := s.Blueprint.ControlPlane.Template templateClonedFromRef := s.Blueprint.ClusterClass.Spec.ControlPlane.Ref cluster := s.Current.Cluster @@ -240,7 +243,7 @@ func computeControlPlane(_ context.Context, s *scope.Scope, infrastructureMachin } // Sets the desired Kubernetes version for the control plane. - version, err := computeControlPlaneVersion(s) + version, err := r.computeControlPlaneVersion(ctx, s) if err != nil { return nil, errors.Wrap(err, "failed to compute version of control plane") } @@ -254,7 +257,7 @@ func computeControlPlane(_ context.Context, s *scope.Scope, infrastructureMachin // computeControlPlaneVersion calculates the version of the desired control plane. // The version is calculated using the state of the current machine deployments, the current control plane // and the version defined in the topology. -func computeControlPlaneVersion(s *scope.Scope) (string, error) { +func (r *Reconciler) computeControlPlaneVersion(ctx context.Context, s *scope.Scope) (string, error) { desiredVersion := s.Blueprint.Topology.Version // If we are creating the control plane object (current control plane is nil), use version from topology. if s.Current.ControlPlane == nil || s.Current.ControlPlane.Object == nil { @@ -334,6 +337,25 @@ func computeControlPlaneVersion(s *scope.Scope) (string, error) { return *currentVersion, nil } + if feature.Gates.Enabled(feature.RuntimeSDK) { + // At this point the control plane and the machine deployments are stable and we are almost ready to pick + // up the desiredVersion. Call the BeforeClusterUpgrade hook before picking up the desired version. + hookRequest := &runtimehooksv1.BeforeClusterUpgradeRequest{ + Cluster: *s.Current.Cluster, + FromKubernetesVersion: *currentVersion, + ToKubernetesVersion: desiredVersion, + } + hookResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{} + if err := r.RuntimeClient.CallAllExtensions(ctx, runtimehooksv1.BeforeClusterUpgrade, s.Current.Cluster, hookRequest, hookResponse); err != nil { + return "", errors.Wrapf(err, "failed to call %s hook", runtimecatalog.HookName(runtimehooksv1.BeforeClusterUpgrade)) + } + s.HookResponseTracker.Add(runtimehooksv1.BeforeClusterUpgrade, hookResponse) + if hookResponse.RetryAfterSeconds != 0 { + // Cannot pickup the new version right now. Need to try again later. + return *currentVersion, nil + } + } + // Control plane and machine deployments are stable. // Ready to pick up the topology version. return desiredVersion, nil @@ -626,6 +648,12 @@ func computeMachineDeploymentVersion(s *scope.Scope, desiredControlPlaneState *s return currentVersion, nil } + // If the ControlPlane is pending picking up an upgrade then do not pick up the new version yet. + if s.UpgradeTracker.ControlPlane.PendingUpgrade { + s.UpgradeTracker.MachineDeployments.MarkPendingUpgrade(currentMDState.Object.Name) + return currentVersion, nil + } + // At this point the control plane is stable (not scaling, not upgrading, not being upgraded). // Checking to see if the machine deployments are also stable. // If any of the MachineDeployments is rolling out, do not upgrade the machine deployment yet. diff --git a/internal/controllers/topology/cluster/desired_state_test.go b/internal/controllers/topology/cluster/desired_state_test.go index 409192d98ff8..925793da6a37 100644 --- a/internal/controllers/topology/cluster/desired_state_test.go +++ b/internal/controllers/topology/cluster/desired_state_test.go @@ -28,12 +28,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/component-base/featuregate/testing" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" + "sigs.k8s.io/cluster-api/feature" "sigs.k8s.io/cluster-api/internal/contract" "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope" + runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog" + fakeruntimeclient "sigs.k8s.io/cluster-api/internal/runtime/client/fake" "sigs.k8s.io/cluster-api/internal/test/builder" ) @@ -291,7 +296,9 @@ func TestComputeControlPlane(t *testing.T) { scope := scope.New(cluster) scope.Blueprint = blueprint - obj, err := computeControlPlane(ctx, scope, nil) + r := &Reconciler{} + + obj, err := r.computeControlPlane(ctx, scope, nil) g.Expect(err).ToNot(HaveOccurred()) g.Expect(obj).ToNot(BeNil()) @@ -330,7 +337,9 @@ func TestComputeControlPlane(t *testing.T) { scope := scope.New(clusterWithoutReplicas) scope.Blueprint = blueprint - obj, err := computeControlPlane(ctx, scope, nil) + r := &Reconciler{} + + obj, err := r.computeControlPlane(ctx, scope, nil) g.Expect(err).ToNot(HaveOccurred()) g.Expect(obj).ToNot(BeNil()) @@ -370,7 +379,9 @@ func TestComputeControlPlane(t *testing.T) { scope := scope.New(cluster) scope.Blueprint = blueprint - obj, err := computeControlPlane(ctx, scope, infrastructureMachineTemplate) + r := &Reconciler{} + + obj, err := r.computeControlPlane(ctx, scope, infrastructureMachineTemplate) g.Expect(err).ToNot(HaveOccurred()) g.Expect(obj).ToNot(BeNil()) @@ -420,7 +431,9 @@ func TestComputeControlPlane(t *testing.T) { scope := scope.New(clusterWithControlPlaneRef) scope.Blueprint = blueprint - obj, err := computeControlPlane(ctx, scope, nil) + r := &Reconciler{} + + obj, err := r.computeControlPlane(ctx, scope, nil) g.Expect(err).ToNot(HaveOccurred()) g.Expect(obj).ToNot(BeNil()) @@ -486,7 +499,9 @@ func TestComputeControlPlane(t *testing.T) { Object: tt.currentControlPlane, } - obj, err := computeControlPlane(ctx, s, nil) + r := &Reconciler{} + + obj, err := r.computeControlPlane(ctx, s, nil) g.Expect(err).NotTo(HaveOccurred()) g.Expect(obj).NotTo(BeNil()) assertNestedField(g, obj, tt.expectedVersion, contract.ControlPlane().Version().Path()...) @@ -524,7 +539,9 @@ func TestComputeControlPlane(t *testing.T) { s.Current.ControlPlane.Object.SetOwnerReferences([]metav1.OwnerReference{*ownerReferenceTo(shim)}) s.Blueprint = blueprint - obj, err := computeControlPlane(ctx, s, nil) + r := &Reconciler{} + + obj, err := r.computeControlPlane(ctx, s, nil) g.Expect(err).ToNot(HaveOccurred()) g.Expect(obj).ToNot(BeNil()) g.Expect(hasOwnerReferenceFrom(obj, shim)).To(BeTrue()) @@ -532,6 +549,8 @@ func TestComputeControlPlane(t *testing.T) { } func TestComputeControlPlaneVersion(t *testing.T) { + defer utilfeature.SetFeatureGateDuringTest(t, feature.Gates, feature.RuntimeSDK, true)() + // Note: the version used by the machine deployments does // not affect how we determining the control plane version. // We only want to know if the machine deployments are stable. @@ -567,12 +586,47 @@ func TestComputeControlPlaneVersion(t *testing.T) { }). Build() + nonBlockingBeforeClusterUpgradeResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + + blockingBeforeClusterUpgradeResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + RetryAfterSeconds: int32(10), + }, + } + + failureBeforeClusterUpgradeResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusFailure, + }, + }, + } + + catalog := runtimecatalog.New() + _ = runtimehooksv1.AddToCatalog(catalog) + + beforeClusterUpgradeGVH, err := catalog.GroupVersionHook(runtimehooksv1.BeforeClusterUpgrade) + if err != nil { + panic("unable to compute GVH") + } + tests := []struct { name string + hookResponse *runtimehooksv1.BeforeClusterUpgradeResponse topologyVersion string controlPlaneObj *unstructured.Unstructured machineDeploymentsState scope.MachineDeploymentsStateMap expectedVersion string + wantErr bool }{ { name: "should return cluster.spec.topology.version if creating a new control plane", @@ -585,6 +639,7 @@ func TestComputeControlPlaneVersion(t *testing.T) { // Control plane is not scaling implies that controlplane.spec.replicas is equal to controlplane.status.replicas, // Controlplane.status.updatedReplicas and controlplane.status.readyReplicas. name: "should return cluster.spec.topology.version if the control plane is not upgrading and not scaling", + hookResponse: nonBlockingBeforeClusterUpgradeResponse, topologyVersion: "v1.2.3", controlPlaneObj: builder.ControlPlane("test1", "cp1"). WithSpecFields(map[string]interface{}{ @@ -656,6 +711,7 @@ func TestComputeControlPlaneVersion(t *testing.T) { }, { name: "should return cluster.spec.topology.version if control plane is not upgrading and not scaling and none of the machine deployments are rolling out", + hookResponse: nonBlockingBeforeClusterUpgradeResponse, topologyVersion: "v1.2.3", controlPlaneObj: builder.ControlPlane("test1", "cp1"). WithSpecFields(map[string]interface{}{ @@ -675,6 +731,51 @@ func TestComputeControlPlaneVersion(t *testing.T) { }, expectedVersion: "v1.2.3", }, + { + name: "should return the controlplane.spec.version if the BeforeClusterUpgrade hooks returns a blocking response", + hookResponse: blockingBeforeClusterUpgradeResponse, + topologyVersion: "v1.2.3", + controlPlaneObj: builder.ControlPlane("test1", "cp1"). + WithSpecFields(map[string]interface{}{ + "spec.version": "v1.2.2", + "spec.replicas": int64(2), + }). + WithStatusFields(map[string]interface{}{ + "status.version": "v1.2.2", + "status.replicas": int64(2), + "status.updatedReplicas": int64(2), + "status.readyReplicas": int64(2), + }). + Build(), + machineDeploymentsState: scope.MachineDeploymentsStateMap{ + "md1": &scope.MachineDeploymentState{Object: machineDeploymentStable}, + "md2": &scope.MachineDeploymentState{Object: machineDeploymentStable}, + }, + expectedVersion: "v1.2.2", + }, + { + name: "should fail if the BeforeClusterUpgrade hooks returns a failure response", + hookResponse: failureBeforeClusterUpgradeResponse, + topologyVersion: "v1.2.3", + controlPlaneObj: builder.ControlPlane("test1", "cp1"). + WithSpecFields(map[string]interface{}{ + "spec.version": "v1.2.2", + "spec.replicas": int64(2), + }). + WithStatusFields(map[string]interface{}{ + "status.version": "v1.2.2", + "status.replicas": int64(2), + "status.updatedReplicas": int64(2), + "status.readyReplicas": int64(2), + }). + Build(), + machineDeploymentsState: scope.MachineDeploymentsStateMap{ + "md1": &scope.MachineDeploymentState{Object: machineDeploymentStable}, + "md2": &scope.MachineDeploymentState{Object: machineDeploymentStable}, + }, + expectedVersion: "v1.2.2", + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -688,14 +789,31 @@ func TestComputeControlPlaneVersion(t *testing.T) { }, }}, Current: &scope.ClusterState{ + Cluster: &clusterv1.Cluster{}, ControlPlane: &scope.ControlPlaneState{Object: tt.controlPlaneObj}, MachineDeployments: tt.machineDeploymentsState, }, - UpgradeTracker: scope.NewUpgradeTracker(), + UpgradeTracker: scope.NewUpgradeTracker(), + HookResponseTracker: scope.NewHookResponseTracker(), + } + + runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder(). + WithCatalog(catalog). + WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{ + beforeClusterUpgradeGVH: tt.hookResponse, + }). + Build() + + r := &Reconciler{ + RuntimeClient: runtimeClient, + } + version, err := r.computeControlPlaneVersion(ctx, s) + if tt.wantErr { + g.Expect(err).NotTo(BeNil()) + } else { + g.Expect(err).To(BeNil()) + g.Expect(version).To(Equal(tt.expectedVersion)) } - version, err := computeControlPlaneVersion(s) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(version).To(Equal(tt.expectedVersion)) }) } } diff --git a/internal/controllers/topology/cluster/scope/hookresponsetracker.go b/internal/controllers/topology/cluster/scope/hookresponsetracker.go new file mode 100644 index 000000000000..2ba4316daac3 --- /dev/null +++ b/internal/controllers/topology/cluster/scope/hookresponsetracker.go @@ -0,0 +1,84 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scope + +import ( + "fmt" + "strings" + "time" + + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" + runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog" +) + +// HookResponseTracker is a helper to capture the responses of the various lifecycle hooks. +type HookResponseTracker struct { + responses map[string]runtimehooksv1.ResponseObject +} + +// NewHookResponseTracker returns a new HookResponseTracker. +func NewHookResponseTracker() *HookResponseTracker { + return &HookResponseTracker{ + responses: map[string]runtimehooksv1.ResponseObject{}, + } +} + +// Add add the response of a hook to the tracker. +func (h *HookResponseTracker) Add(hook runtimecatalog.Hook, response runtimehooksv1.ResponseObject) { + hookName := runtimecatalog.HookName(hook) + h.responses[hookName] = response +} + +// AggregateRetryAfter calculates the lowest non-zero retryAfterSeconds time from all the tracked responses. +func (h *HookResponseTracker) AggregateRetryAfter() time.Duration { + res := int32(0) + for _, resp := range h.responses { + if retryResponse, ok := resp.(runtimehooksv1.RetryResponseObject); ok { + res = lowestNonZeroRetryAfterSeconds(res, retryResponse.GetRetryAfterSeconds()) + } + } + return time.Duration(res) * time.Second +} + +// AggregateMessage returns a human friendly message about the blocking status of hooks. +func (h *HookResponseTracker) AggregateMessage() string { + blockingHooks := []string{} + for hook, resp := range h.responses { + if retryResponse, ok := resp.(runtimehooksv1.RetryResponseObject); ok { + if retryResponse.GetRetryAfterSeconds() != 0 { + blockingHooks = append(blockingHooks, hook) + } + } + } + if len(blockingHooks) == 0 { + return "" + } + return fmt.Sprintf("hooks %q are blocking", strings.Join(blockingHooks, ",")) +} + +func lowestNonZeroRetryAfterSeconds(i, j int32) int32 { + if i == 0 { + return j + } + if j == 0 { + return i + } + if i < j { + return i + } + return j +} diff --git a/internal/controllers/topology/cluster/scope/hookresponsetracker_test.go b/internal/controllers/topology/cluster/scope/hookresponsetracker_test.go new file mode 100644 index 000000000000..5ff76e2794af --- /dev/null +++ b/internal/controllers/topology/cluster/scope/hookresponsetracker_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scope + +import ( + "testing" + "time" + + . "github.com/onsi/gomega" + + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" + runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog" +) + +func TestHookResponseTracker_AggregateRetryAfter(t *testing.T) { + nonBlockingBeforeClusterCreateResponse := &runtimehooksv1.BeforeClusterCreateResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(0), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + blockingBeforeClusterCreateResponse := &runtimehooksv1.BeforeClusterCreateResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(10), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + + nonBlockingBeforeClusterUpgradeResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(0), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + blockingBeforeClusterUpgradeResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(5), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + + t.Run("AggregateRetryAfter should return non zero value if there are any blocking hook responses", func(t *testing.T) { + g := NewWithT(t) + + hrt := NewHookResponseTracker() + hrt.Add(runtimehooksv1.BeforeClusterCreate, blockingBeforeClusterCreateResponse) + hrt.Add(runtimehooksv1.BeforeClusterUpgrade, nonBlockingBeforeClusterUpgradeResponse) + + g.Expect(hrt.AggregateRetryAfter()).To(Equal(time.Duration(10) * time.Second)) + }) + t.Run("AggregateRetryAfter should return zero value if there are no blocking hook responses", func(t *testing.T) { + g := NewWithT(t) + + hrt := NewHookResponseTracker() + hrt.Add(runtimehooksv1.BeforeClusterCreate, nonBlockingBeforeClusterCreateResponse) + hrt.Add(runtimehooksv1.BeforeClusterUpgrade, nonBlockingBeforeClusterUpgradeResponse) + + g.Expect(hrt.AggregateRetryAfter()).To(Equal(time.Duration(0))) + }) + t.Run("AggregateRetryAfter should return the lowest non-zero value if there are multiple blocking hook responses", func(t *testing.T) { + g := NewWithT(t) + + hrt := NewHookResponseTracker() + hrt.Add(runtimehooksv1.BeforeClusterCreate, blockingBeforeClusterCreateResponse) + hrt.Add(runtimehooksv1.BeforeClusterUpgrade, blockingBeforeClusterUpgradeResponse) + + g.Expect(hrt.AggregateRetryAfter()).To(Equal(time.Duration(5) * time.Second)) + }) +} + +func TestHookResponseTracker_AggregateMessage(t *testing.T) { + nonBlockingBeforeClusterCreateResponse := &runtimehooksv1.BeforeClusterCreateResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(0), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + blockingBeforeClusterCreateResponse := &runtimehooksv1.BeforeClusterCreateResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(10), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + + nonBlockingBeforeClusterUpgradeResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(0), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + blockingBeforeClusterUpgradeResponse := &runtimehooksv1.BeforeClusterUpgradeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: int32(5), + CommonResponse: runtimehooksv1.CommonResponse{ + Status: runtimehooksv1.ResponseStatusSuccess, + }, + }, + } + + t.Run("AggregateMessage should return a message with the names of all the blocking hooks", func(t *testing.T) { + g := NewWithT(t) + + hrt := NewHookResponseTracker() + hrt.Add(runtimehooksv1.BeforeClusterCreate, blockingBeforeClusterCreateResponse) + hrt.Add(runtimehooksv1.BeforeClusterUpgrade, blockingBeforeClusterUpgradeResponse) + + g.Expect(hrt.AggregateMessage()).To(ContainSubstring(runtimecatalog.HookName(runtimehooksv1.BeforeClusterCreate))) + g.Expect(hrt.AggregateMessage()).To(ContainSubstring(runtimecatalog.HookName(runtimehooksv1.BeforeClusterUpgrade))) + }) + t.Run("AggregateMessage should return empty string if there are no blocking hook responses", func(t *testing.T) { + g := NewWithT(t) + + hrt := NewHookResponseTracker() + hrt.Add(runtimehooksv1.BeforeClusterCreate, nonBlockingBeforeClusterCreateResponse) + hrt.Add(runtimehooksv1.BeforeClusterUpgrade, nonBlockingBeforeClusterUpgradeResponse) + + g.Expect(hrt.AggregateMessage()).To(Equal("")) + }) +} diff --git a/internal/controllers/topology/cluster/scope/scope.go b/internal/controllers/topology/cluster/scope/scope.go index 6407b7ab8717..413470dc8173 100644 --- a/internal/controllers/topology/cluster/scope/scope.go +++ b/internal/controllers/topology/cluster/scope/scope.go @@ -33,6 +33,10 @@ type Scope struct { // UpgradeTracker holds information about ongoing upgrades in the managed topology. UpgradeTracker *UpgradeTracker + + // HookResponseTracker holds the hook responses that will be used to + // calculate a combined reconcile result. + HookResponseTracker *HookResponseTracker } // New returns a new Scope with only the cluster; while processing a request in the topology/ClusterReconciler controller @@ -46,6 +50,7 @@ func New(cluster *clusterv1.Cluster) *Scope { Current: &ClusterState{ Cluster: cluster, }, - UpgradeTracker: NewUpgradeTracker(), + UpgradeTracker: NewUpgradeTracker(), + HookResponseTracker: NewHookResponseTracker(), } } diff --git a/internal/runtime/catalog/catalog.go b/internal/runtime/catalog/catalog.go index 1112d18802ad..431aa5ffbfcd 100644 --- a/internal/runtime/catalog/catalog.go +++ b/internal/runtime/catalog/catalog.go @@ -145,8 +145,7 @@ func (c *Catalog) AddHook(gv schema.GroupVersion, hookFunc Hook, hookMeta *HookM } // Calculate the hook name based on the func name. - hookFuncName := goruntime.FuncForPC(reflect.ValueOf(hookFunc).Pointer()).Name() - hookName := hookFuncName[strings.LastIndex(hookFuncName, ".")+1:] + hookName := HookName(hookFunc) gvh := GroupVersionHook{ Group: gv.Group, @@ -347,6 +346,13 @@ func (gvh GroupVersionHook) String() string { return strings.Join([]string{gvh.Group, "/", gvh.Version, ", Hook=", gvh.Hook}, "") } +// HookName returns the name of the runtime hook. +func HookName(hook Hook) string { + hookFuncName := goruntime.FuncForPC(reflect.ValueOf(hook).Pointer()).Name() + hookName := hookFuncName[strings.LastIndex(hookFuncName, ".")+1:] + return hookName +} + var emptyGroupVersionHook = GroupVersionHook{} var emptyGroupVersionKind = schema.GroupVersionKind{} diff --git a/internal/runtime/catalog/test/catalog_test.go b/internal/runtime/catalog/test/catalog_test.go index 1a997fce8daa..20d7b9f9f23b 100644 --- a/internal/runtime/catalog/test/catalog_test.go +++ b/internal/runtime/catalog/test/catalog_test.go @@ -175,6 +175,13 @@ func TestValidateResponse(t *testing.T) { } } +func TestHookName(t *testing.T) { + g := NewWithT(t) + expected := "FakeHook" + actual := runtimecatalog.HookName(v1alpha1.FakeHook) + g.Expect(actual).To(Equal(expected)) +} + type GoodRequest struct { metav1.TypeMeta `json:",inline"` diff --git a/internal/runtime/client/fake/fake_client.go b/internal/runtime/client/fake/fake_client.go new file mode 100644 index 000000000000..509f76978d5d --- /dev/null +++ b/internal/runtime/client/fake/fake_client.go @@ -0,0 +1,151 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fake is used to help with testing functions that need a fake RuntimeClient. +package fake + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + runtimev1 "sigs.k8s.io/cluster-api/exp/runtime/api/v1alpha1" + runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1" + runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog" + runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client" +) + +// RuntimeClientBuilder is used to build a fake runtime client. +type RuntimeClientBuilder struct { + ready bool + catalog *runtimecatalog.Catalog + callAllResponses map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject + callResponses map[string]runtimehooksv1.ResponseObject +} + +// NewRuntimeClientBuilder returns a new builder for the fake runtime client. +func NewRuntimeClientBuilder() *RuntimeClientBuilder { + return &RuntimeClientBuilder{} +} + +// WithCatalog can be use the provided catalog in the fake runtime client. +func (f *RuntimeClientBuilder) WithCatalog(catalog *runtimecatalog.Catalog) *RuntimeClientBuilder { + f.catalog = catalog + return f +} + +// WithCallAllExtensionResponses can be used to dictate the responses for the runtime hooks. +func (f *RuntimeClientBuilder) WithCallAllExtensionResponses(responses map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject) *RuntimeClientBuilder { + f.callAllResponses = responses + return f +} + +// WithCallExtensionResponses can be used to dictate the responses for the runtime extension handlers. +func (f *RuntimeClientBuilder) WithCallExtensionResponses(responses map[string]runtimehooksv1.ResponseObject) *RuntimeClientBuilder { + f.callResponses = responses + return f +} + +// MarkReady can be used to mark the fake runtime client as either ready or not ready. +func (f *RuntimeClientBuilder) MarkReady(ready bool) *RuntimeClientBuilder { + f.ready = ready + return f +} + +// Build returns the fake runtime client. +func (f *RuntimeClientBuilder) Build() runtimeclient.Client { + return &runtimeClient{ + isReady: f.ready, + callAllResponses: f.callAllResponses, + callResponses: f.callResponses, + catalog: f.catalog, + } +} + +var _ runtimeclient.Client = &runtimeClient{} + +type runtimeClient struct { + isReady bool + catalog *runtimecatalog.Catalog + callAllResponses map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject + callResponses map[string]runtimehooksv1.ResponseObject +} + +// CallAllExtensions implements Client. +func (fc *runtimeClient) CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook, forObject metav1.Object, request runtime.Object, response runtimehooksv1.ResponseObject) error { + gvh, err := fc.catalog.GroupVersionHook(hook) + if err != nil { + return errors.Wrap(err, "failed to compute GVH") + } + expectedResponse, ok := fc.callAllResponses[gvh] + if !ok { + // This should actually panic because an error here would mean a mistake in the test setup. + panic(fmt.Sprintf("test response not available hook for %q", gvh)) + } + if err := fc.catalog.Convert(expectedResponse, response, ctx); err != nil { + // This should actually panic because an error here would mean a mistake in the test setup. + panic("cannot update response") + } + if response.GetStatus() == runtimehooksv1.ResponseStatusFailure { + return errors.Errorf("runtime hook %q failed", gvh) + } + return nil +} + +// CallExtension implements Client. +func (fc *runtimeClient) CallExtension(ctx context.Context, _ runtimecatalog.Hook, _ metav1.Object, name string, request runtime.Object, response runtimehooksv1.ResponseObject) error { + expectedResponse, ok := fc.callResponses[name] + if !ok { + // This should actually panic because an error here would mean a mistake in the test setup. + panic(fmt.Sprintf("test response not available for extension %q", name)) + } + if err := fc.catalog.Convert(expectedResponse, response, ctx); err != nil { + // This should actually panic because an error here would mean a mistake in the test setup. + panic("cannot update response") + } + // If the received response is a failure then return an error. + if response.GetStatus() == runtimehooksv1.ResponseStatusFailure { + return errors.Errorf("ExtensionHandler %s failed with message %s", name, response.GetMessage()) + } + return nil +} + +// Discover implements Client. +func (fc *runtimeClient) Discover(context.Context, *runtimev1.ExtensionConfig) (*runtimev1.ExtensionConfig, error) { + panic("unimplemented") +} + +// IsReady implements Client. +func (fc *runtimeClient) IsReady() bool { + return fc.isReady +} + +// Register implements Client. +func (fc *runtimeClient) Register(extensionConfig *runtimev1.ExtensionConfig) error { + panic("unimplemented") +} + +// Unregister implements Client. +func (fc *runtimeClient) Unregister(extensionConfig *runtimev1.ExtensionConfig) error { + panic("unimplemented") +} + +func (fc *runtimeClient) WarmUp(extensionConfigList *runtimev1.ExtensionConfigList) error { + panic("unimplemented") +} diff --git a/main.go b/main.go index 6f5bfb391fea..21bff37b63cf 100644 --- a/main.go +++ b/main.go @@ -313,6 +313,16 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { os.Exit(1) } + var runtimeClient runtimeclient.Client + if feature.Gates.Enabled(feature.RuntimeSDK) { + // This is the creation of the runtimeClient for the controllers, embedding a shared catalog and registry instance. + runtimeClient = runtimeclient.New(runtimeclient.Options{ + Catalog: catalog, + Registry: runtimeregistry.New(), + Client: mgr.GetClient(), + }) + } + if feature.Gates.Enabled(feature.ClusterTopology) { unstructuredCachingClient, err := client.NewDelegatingClient( client.NewDelegatingClientInput{ @@ -342,6 +352,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { if err := (&controllers.ClusterTopologyReconciler{ Client: mgr.GetClient(), APIReader: mgr.GetAPIReader(), + RuntimeClient: runtimeClient, UnstructuredCachingClient: unstructuredCachingClient, WatchFilterValue: watchFilterValue, }).SetupWithManager(ctx, mgr, concurrency(clusterTopologyConcurrency)); err != nil { @@ -369,16 +380,10 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { } if feature.Gates.Enabled(feature.RuntimeSDK) { - // This is the creation of the single RuntimeExtension registry for the controller. - registry := runtimeregistry.New() if err = (&runtimecontrollers.ExtensionConfigReconciler{ - Client: mgr.GetClient(), - APIReader: mgr.GetAPIReader(), - RuntimeClient: runtimeclient.New(runtimeclient.Options{ - Catalog: catalog, - Registry: registry, - Client: mgr.GetClient(), - }), + Client: mgr.GetClient(), + APIReader: mgr.GetAPIReader(), + RuntimeClient: runtimeClient, WatchFilterValue: watchFilterValue, }).SetupWithManager(ctx, mgr, concurrency(extensionConfigConcurrency)); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ExtensionConfig")