@@ -34,6 +34,8 @@ import (
34
34
"k8s.io/client-go/tools/record"
35
35
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
36
36
"sigs.k8s.io/cluster-api/controllers/external"
37
+ "sigs.k8s.io/cluster-api/controllers/noderefutil"
38
+ "sigs.k8s.io/cluster-api/controllers/remote"
37
39
"sigs.k8s.io/cluster-api/util"
38
40
"sigs.k8s.io/cluster-api/util/patch"
39
41
ctrl "sigs.k8s.io/controller-runtime"
@@ -86,7 +88,6 @@ func (r *MachineSetReconciler) SetupWithManager(mgr ctrl.Manager, options contro
86
88
}
87
89
88
90
r .recorder = mgr .GetEventRecorderFor ("machineset-controller" )
89
-
90
91
r .scheme = mgr .GetScheme ()
91
92
return nil
92
93
}
@@ -173,14 +174,6 @@ func (r *MachineSetReconciler) reconcile(ctx context.Context, machineSet *cluste
173
174
machineSet .Spec .Selector .MatchLabels [clusterv1 .MachineSetLabelName ] = machineSet .Name
174
175
machineSet .Spec .Template .Labels [clusterv1 .MachineSetLabelName ] = machineSet .Name
175
176
176
- selector , err := metav1 .LabelSelectorAsSelector (& machineSet .Spec .Selector )
177
- if err != nil {
178
- return ctrl.Result {}, errors .Wrapf (err , "failed to parse MachineSet %q label selector" , machineSet .Name )
179
- }
180
- // Copy label selector to its status counterpart in string format.
181
- // This is necessary for CRDs including scale subresources.
182
- machineSet .Status .Selector = selector .String ()
183
-
184
177
selectorMap , err := metav1 .LabelSelectorAsMap (& machineSet .Spec .Selector )
185
178
if err != nil {
186
179
return ctrl.Result {}, errors .Wrapf (err , "failed to convert MachineSet %q label selector to a map" , machineSet .Name )
@@ -222,19 +215,19 @@ func (r *MachineSetReconciler) reconcile(ctx context.Context, machineSet *cluste
222
215
syncErr := r .syncReplicas (ctx , machineSet , filteredMachines )
223
216
224
217
ms := machineSet .DeepCopy ()
225
- newStatus := r .calculateStatus (ms , filteredMachines )
218
+ newStatus := r .calculateStatus (cluster , ms , filteredMachines )
226
219
227
220
// Always updates status as machines come up or die.
228
- updatedMS , err := updateMachineSetStatus ( r . Client , machineSet , newStatus , logger )
221
+ updatedMS , err := r . patchMachineSetStatus ( ctx , machineSet , newStatus )
229
222
if err != nil {
230
223
if syncErr != nil {
231
- return ctrl.Result {}, errors .Wrapf (err , "failed to sync machines: %v. failed to update machine set status " , syncErr )
224
+ return ctrl.Result {}, errors .Wrapf (err , "failed to sync machines: %v. failed to patch MachineSet's Status " , syncErr )
232
225
}
233
- return ctrl.Result {}, errors .Wrap (err , "failed to update machine set status " )
226
+ return ctrl.Result {}, errors .Wrap (err , "failed to patch MachineSet's Status " )
234
227
}
235
228
236
229
if syncErr != nil {
237
- return ctrl.Result {}, errors .Wrapf (syncErr , "failed to sync Machineset replicas" )
230
+ return ctrl.Result {}, errors .Wrapf (syncErr , "failed to sync MachineSet replicas" )
238
231
}
239
232
240
233
var replicas int32
@@ -589,3 +582,115 @@ func (r *MachineSetReconciler) hasMatchingLabels(machineSet *clusterv1.MachineSe
589
582
func (r * MachineSetReconciler ) shouldAdopt (ms * clusterv1.MachineSet ) bool {
590
583
return ! util .HasOwner (ms .OwnerReferences , clusterv1 .GroupVersion .String (), []string {"MachineDeployment" , "Cluster" })
591
584
}
585
+
586
+ func (r * MachineSetReconciler ) calculateStatus (cluster * clusterv1.Cluster , ms * clusterv1.MachineSet , filteredMachines []* clusterv1.Machine ) clusterv1.MachineSetStatus {
587
+ logger := r .Log .WithValues ("machineset" , ms .Name , "namespace" , ms .Namespace )
588
+ newStatus := ms .Status
589
+
590
+ // Copy label selector to its status counterpart in string format.
591
+ // This is necessary for CRDs including scale subresources.
592
+ selector , _ := metav1 .LabelSelectorAsSelector (& ms .Spec .Selector )
593
+ newStatus .Selector = selector .String ()
594
+
595
+ // Count the number of machines that have labels matching the labels of the machine
596
+ // template of the replica set, the matching machines may have more
597
+ // labels than are in the template. Because the label of machineTemplateSpec is
598
+ // a superset of the selector of the replica set, so the possible
599
+ // matching machines must be part of the filteredMachines.
600
+ fullyLabeledReplicasCount := 0
601
+ readyReplicasCount := 0
602
+ availableReplicasCount := 0
603
+ templateLabel := labels .Set (ms .Spec .Template .Labels ).AsSelectorPreValidated ()
604
+
605
+ for _ , machine := range filteredMachines {
606
+ if templateLabel .Matches (labels .Set (machine .Labels )) {
607
+ fullyLabeledReplicasCount ++
608
+ }
609
+
610
+ if machine .Status .NodeRef == nil {
611
+ logger .V (2 ).Info ("Unable to retrieve Node status, missing NodeRef" , "machine" , machine .Name )
612
+ continue
613
+ }
614
+
615
+ node , err := r .getMachineNode (cluster , machine )
616
+ if err != nil {
617
+ logger .Error (err , "Unable to retrieve Node status" )
618
+ continue
619
+ }
620
+
621
+ if noderefutil .IsNodeReady (node ) {
622
+ readyReplicasCount ++
623
+ if noderefutil .IsNodeAvailable (node , ms .Spec .MinReadySeconds , metav1 .Now ()) {
624
+ availableReplicasCount ++
625
+ }
626
+ }
627
+ }
628
+
629
+ newStatus .Replicas = int32 (len (filteredMachines ))
630
+ newStatus .FullyLabeledReplicas = int32 (fullyLabeledReplicasCount )
631
+ newStatus .ReadyReplicas = int32 (readyReplicasCount )
632
+ newStatus .AvailableReplicas = int32 (availableReplicasCount )
633
+ return newStatus
634
+ }
635
+
636
+ // patchMachineSetStatus attempts to update the Status.Replicas of the given MachineSet.
637
+ func (r * MachineSetReconciler ) patchMachineSetStatus (ctx context.Context , ms * clusterv1.MachineSet , newStatus clusterv1.MachineSetStatus ) (* clusterv1.MachineSet , error ) {
638
+ logger := r .Log .WithValues ("machineset" , ms .Name , "namespace" , ms .Namespace )
639
+
640
+ // This is the steady state. It happens when the MachineSet doesn't have any expectations, since
641
+ // we do a periodic relist every 10 minutes. If the generations differ but the replicas are
642
+ // the same, a caller might've resized to the same replica count.
643
+ if ms .Status .Replicas == newStatus .Replicas &&
644
+ ms .Status .FullyLabeledReplicas == newStatus .FullyLabeledReplicas &&
645
+ ms .Status .ReadyReplicas == newStatus .ReadyReplicas &&
646
+ ms .Status .AvailableReplicas == newStatus .AvailableReplicas &&
647
+ ms .Generation == ms .Status .ObservedGeneration {
648
+ return ms , nil
649
+ }
650
+
651
+ patch := client .MergeFrom (ms .DeepCopyObject ())
652
+
653
+ // Save the generation number we acted on, otherwise we might wrongfully indicate
654
+ // that we've seen a spec update when we retry.
655
+ newStatus .ObservedGeneration = ms .Generation
656
+
657
+ // Calculate the replicas for logging.
658
+ var replicas int32
659
+ if ms .Spec .Replicas != nil {
660
+ replicas = * ms .Spec .Replicas
661
+ }
662
+ logger .V (4 ).Info (fmt .Sprintf ("Updating status for %v: %s/%s, " , ms .Kind , ms .Namespace , ms .Name ) +
663
+ fmt .Sprintf ("replicas %d->%d (need %d), " , ms .Status .Replicas , newStatus .Replicas , replicas ) +
664
+ fmt .Sprintf ("fullyLabeledReplicas %d->%d, " , ms .Status .FullyLabeledReplicas , newStatus .FullyLabeledReplicas ) +
665
+ fmt .Sprintf ("readyReplicas %d->%d, " , ms .Status .ReadyReplicas , newStatus .ReadyReplicas ) +
666
+ fmt .Sprintf ("availableReplicas %d->%d, " , ms .Status .AvailableReplicas , newStatus .AvailableReplicas ) +
667
+ fmt .Sprintf ("sequence No: %v->%v" , ms .Status .ObservedGeneration , newStatus .ObservedGeneration ))
668
+
669
+ ms .Status = newStatus
670
+ if err := r .Client .Status ().Patch (ctx , ms , patch ); err != nil {
671
+ // TODO(vincepri): Try to fix this once we upgrade to CRDv1.
672
+ // Our Status.Replicas field is a required non-pointer integer, Go defaults this field to "0" value when decoding
673
+ // the data from the API server. For this reason, when we try to write the value "0", the patch is going to think
674
+ // the value is already there and shouldn't be patched, making it fail validation.
675
+ // Fallback to Update.
676
+ if ! apierrors .IsInvalid (err ) {
677
+ return nil , err
678
+ }
679
+ if err := r .Client .Status ().Update (ctx , ms ); err != nil {
680
+ return nil , err
681
+ }
682
+ }
683
+ return ms , nil
684
+ }
685
+
686
+ func (r * MachineSetReconciler ) getMachineNode (cluster * clusterv1.Cluster , machine * clusterv1.Machine ) (* corev1.Node , error ) {
687
+ c , err := remote .NewClusterClient (r .Client , cluster , r .scheme )
688
+ if err != nil {
689
+ return nil , err
690
+ }
691
+ node := & corev1.Node {}
692
+ if err := c .Get (context .TODO (), client.ObjectKey {Name : machine .Status .NodeRef .Name }, node ); err != nil {
693
+ return nil , errors .Wrapf (err , "error retrieving node %s for machine %s/%s" , machine .Status .NodeRef .Name , machine .Namespace , machine .Name )
694
+ }
695
+ return node , nil
696
+ }
0 commit comments