Skip to content

Commit 59597a3

Browse files
authored
properly synchronize access to unhealthyNodes map (#202)
1 parent 6f6eba6 commit 59597a3

File tree

2 files changed

+44
-32
lines changed

2 files changed

+44
-32
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
510510
return nil, err
511511
}
512512
summary := &podStatusSummary{expected: pc}
513+
checkUnhealthyNodes := r.Config.Autopilot != nil && r.Config.Autopilot.EvacuateWorkloads
513514

514515
for _, pod := range pods.Items {
515516
switch pod.Status.Phase {
@@ -518,29 +519,33 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
518519
case v1.PodRunning:
519520
if pod.DeletionTimestamp.IsZero() {
520521
summary.running += 1
521-
if len(unhealthyNodes) > 0 {
522-
if resources, ok := unhealthyNodes[pod.Spec.NodeName]; ok {
523-
for badResource := range resources {
524-
for _, container := range pod.Spec.Containers {
525-
if limit, ok := container.Resources.Limits[v1.ResourceName(badResource)]; ok {
526-
if !limit.IsZero() {
527-
if summary.unhealthyNodes == nil {
528-
summary.unhealthyNodes = make(sets.Set[string])
522+
if checkUnhealthyNodes {
523+
unhealthyNodesMutex.RLock() // BEGIN CRITICAL SECTION
524+
if len(unhealthyNodes) > 0 {
525+
if resources, ok := unhealthyNodes[pod.Spec.NodeName]; ok {
526+
for badResource := range resources {
527+
for _, container := range pod.Spec.Containers {
528+
if limit, ok := container.Resources.Limits[v1.ResourceName(badResource)]; ok {
529+
if !limit.IsZero() {
530+
if summary.unhealthyNodes == nil {
531+
summary.unhealthyNodes = make(sets.Set[string])
532+
}
533+
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
529534
}
530-
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
531535
}
532-
}
533-
if request, ok := container.Resources.Requests[v1.ResourceName(badResource)]; ok {
534-
if !request.IsZero() {
535-
if summary.unhealthyNodes == nil {
536-
summary.unhealthyNodes = make(sets.Set[string])
536+
if request, ok := container.Resources.Requests[v1.ResourceName(badResource)]; ok {
537+
if !request.IsZero() {
538+
if summary.unhealthyNodes == nil {
539+
summary.unhealthyNodes = make(sets.Set[string])
540+
}
541+
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
537542
}
538-
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
539543
}
540544
}
541545
}
542546
}
543547
}
548+
unhealthyNodesMutex.RUnlock() // END CRITICAL SECTION
544549
}
545550
}
546551
case v1.PodSucceeded:

internal/controller/appwrapper/node_health_monitor.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package appwrapper
1818

1919
import (
2020
"context"
21+
"sync"
2122

2223
v1 "k8s.io/api/core/v1"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -38,8 +39,11 @@ type NodeHealthMonitor struct {
3839
Config *config.AppWrapperConfig
3940
}
4041

41-
// unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
42-
var unhealthyNodes = make(map[string]sets.Set[string])
42+
var (
43+
// unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
44+
unhealthyNodes = make(map[string]sets.Set[string])
45+
unhealthyNodesMutex sync.RWMutex
46+
)
4347

4448
// permission to watch nodes
4549
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
@@ -55,8 +59,6 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
5559
return ctrl.Result{}, nil
5660
}
5761

58-
log.FromContext(ctx).V(2).Info("Reconcilling", "node", req.NamespacedName)
59-
6062
flaggedResources := make(sets.Set[string])
6163
for key, value := range node.GetLabels() {
6264
for resource, apLabels := range r.Config.Autopilot.ResourceUnhealthyConfig {
@@ -66,22 +68,27 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
6668
}
6769
}
6870

69-
hadEntries := len(unhealthyNodes) > 0
70-
71-
if len(flaggedResources) == 0 {
72-
delete(unhealthyNodes, node.GetName())
73-
} else {
71+
nodeChanged := false
72+
unhealthyNodesMutex.Lock() // BEGIN CRITICAL SECTION
73+
if priorEntry, ok := unhealthyNodes[node.GetName()]; ok {
74+
if len(flaggedResources) == 0 {
75+
delete(unhealthyNodes, node.GetName())
76+
nodeChanged = true
77+
} else if !priorEntry.Equal(flaggedResources) {
78+
unhealthyNodes[node.GetName()] = flaggedResources
79+
nodeChanged = true
80+
}
81+
} else if len(flaggedResources) > 0 {
7482
unhealthyNodes[node.GetName()] = flaggedResources
83+
nodeChanged = true
7584
}
85+
numUnhealthy := len(unhealthyNodes)
86+
unhealthyNodesMutex.Unlock() // END CRITICAL SECTION
7687

77-
if len(unhealthyNodes) == 0 {
78-
if hadEntries {
79-
log.FromContext(ctx).Info("All nodes now healthy")
80-
} else {
81-
log.FromContext(ctx).V(2).Info("All nodes now healthy")
82-
}
83-
} else {
84-
log.FromContext(ctx).Info("Some nodes unhealthy", "number", len(unhealthyNodes), "details", unhealthyNodes)
88+
if nodeChanged {
89+
// This unsynchronized read of unhealthyNodes for logging purposes is safe because this method
90+
// is the only writer to the map and the controller runtime is configured to not allow concurrent execution of this method.
91+
log.FromContext(ctx).Info("Updated node health information", "Number Unhealthy Nodes", numUnhealthy, "Unhealthy Resource Details", unhealthyNodes)
8592
}
8693

8794
return ctrl.Result{}, nil

0 commit comments

Comments
 (0)