Skip to content

Distinguish NoSchedule and NoExecute Autopilot labels #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
return nil, err
}
summary := &podStatusSummary{expected: pc}
checkUnhealthyNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MigrateImpactedWorkloads && !r.isAutopilotExempt(ctx, aw)
checkUnhealthyNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes && !r.isAutopilotExempt(ctx, aw)

for _, pod := range pods.Items {
switch pod.Status.Phase {
Expand Down
69 changes: 52 additions & 17 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package appwrapper

import (
"context"
"reflect"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -43,26 +43,32 @@ type NodeHealthMonitor struct {
}

var (
// unhealthyNodes is a mapping from Node names to a set of resource quantities that Autopilot has labeled as unhealthy on that Node
unhealthyNodes = make(map[string]map[string]*resource.Quantity)
// unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
unhealthyNodes = make(map[string]sets.Set[string])
unhealthyNodesMutex sync.RWMutex

// unschedulableNodes is a mapping from Node names to resource quantities than Autopilot has labeled as unschedulable on that Node
unschedulableNodes = make(map[string]map[string]*resource.Quantity)
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch

//gocyclo:ignore
func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
}

flaggedResources := make(map[string]*resource.Quantity)
flaggedResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
for resourceName, apLabels := range r.Config.Autopilot.ResourceUnhealthyConfig {
if apValue, ok := apLabels[key]; ok && apValue == value {
flaggedResources[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute {
flaggedResources.Insert(resourceName)
}
}
}
}
Expand All @@ -73,7 +79,7 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
if len(flaggedResources) == 0 {
delete(unhealthyNodes, node.GetName())
nodeChanged = true
} else if !reflect.DeepEqual(priorEntry, flaggedResources) {
} else if !priorEntry.Equal(flaggedResources) {
unhealthyNodes[node.GetName()] = flaggedResources
nodeChanged = true
}
Expand Down Expand Up @@ -106,15 +112,40 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}

// compute unhealthy resource totals
missingQuantities := map[string]*resource.Quantity{}
for _, quantities := range unhealthyNodes {
// update unschedulable resource quantities for this node
flaggedQuantities := make(map[string]*resource.Quantity)
if node.Spec.Unschedulable {
// flag all configured resources if the node is cordoned
for resourceName := range r.Config.Autopilot.ResourceTaints {
flaggedQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
}
} else {
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
flaggedQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
}
}
}
}
}

if len(flaggedQuantities) > 0 {
unschedulableNodes[node.GetName()] = flaggedQuantities
} else {
delete(unschedulableNodes, node.GetName())
}

// compute unschedulable resource totals
unschedulableQuantities := map[string]*resource.Quantity{}
for _, quantities := range unschedulableNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if missingQuantities[resourceName] == nil {
missingQuantities[resourceName] = ptr.To(*quantity)
if unschedulableQuantities[resourceName] == nil {
unschedulableQuantities[resourceName] = ptr.To(*quantity)
} else {
missingQuantities[resourceName].Add(*quantity)
unschedulableQuantities[resourceName].Add(*quantity)
}
}
}
Expand All @@ -125,10 +156,10 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
limitsChanged := false
for i, quota := range resources {
var lendingLimit *resource.Quantity
if missingQuantity := missingQuantities[quota.Name.String()]; missingQuantity != nil {
if quota.NominalQuota.Cmp(*missingQuantity) > 0 {
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
lendingLimit = ptr.To(quota.NominalQuota)
lendingLimit.Sub(*missingQuantity)
lendingLimit.Sub(*unschedulableQuantity)
} else {
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
}
Expand All @@ -145,7 +176,11 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
var err error
if limitsChanged {
err = r.Update(ctx, cq)
if err == nil {
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
}
}

return ctrl.Result{}, err
}

Expand Down
22 changes: 15 additions & 7 deletions internal/controller/appwrapper/node_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ var _ = Describe("NodeMonitor Controller", func() {
By("Healthy cluster has no unhealthy nodes")
Expect(len(unhealthyNodes)).Should(Equal(0))

By("A newly labeled node is detected as unhealthy")
By("A node labeled EVICT is detected as unhealthy")
node := getNode(node1Name.Name)
node.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
node.Labels["autopilot.ibm.com/gpuhealth"] = "EVICT"
Expect(k8sClient.Update(ctx, node)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
Expand All @@ -87,7 +87,6 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(len(unhealthyNodes)).Should(Equal(1))
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
Expect(unhealthyNodes[node1Name.Name]["nvidia.com/gpu"].String()).Should(Equal("4"))

By("Repeated reconcile does not change map")
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expand All @@ -97,10 +96,9 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(len(unhealthyNodes)).Should(Equal(1))
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
Expect(unhealthyNodes[node1Name.Name]["nvidia.com/gpu"].String()).Should(Equal("4"))

By("Removing the autopilot label updates unhealthyNodes")
node.Labels["autopilot.ibm.com/gpuhealth"] = "OK"
By("Removing the EVICT label updates unhealthyNodes")
node.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
Expect(k8sClient.Update(ctx, node)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
Expand All @@ -122,7 +120,7 @@ var _ = Describe("NodeMonitor Controller", func() {

// remove 4 gpus, lending limit should be 2
node1 := getNode(node1Name.Name)
node1.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
node1.Labels["autopilot.ibm.com/gpuhealth"] = "EVICT"
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -160,6 +158,16 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).Should(BeNil())

// cordon node1, lending limit should be 2
node1 = getNode(node1Name.Name)
node1.Spec.Unschedulable = true
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))

Expect(k8sClient.Delete(ctx, queue)).To(Succeed())
})
})
10 changes: 5 additions & 5 deletions internal/controller/appwrapper/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,19 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
}

if r.Config.Autopilot != nil && r.Config.Autopilot.InjectAntiAffinities && !r.isAutopilotExempt(ctx, aw) {
toAdd := map[string]string{}
for resource, labels := range r.Config.Autopilot.ResourceUnhealthyConfig {
toAdd := map[string][]string{}
for resource, taints := range r.Config.Autopilot.ResourceTaints {
if hasResourceRequest(spec, resource) {
for k, v := range labels {
toAdd[k] = v
for _, taint := range taints {
toAdd[taint.Key] = append(toAdd[taint.Key], taint.Value)
}
}
}
if len(toAdd) > 0 {
nodeSelectors := []v1.NodeSelectorTerm{}
for k, v := range toAdd {
nodeSelectors = append(nodeSelectors, v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: []string{v}}},
MatchExpressions: []v1.NodeSelectorRequirement{{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v}},
})
}
if err := addNodeSelectorsToAffinity(spec, nodeSelectors); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"sigs.k8s.io/kueue/apis/config/v1beta1"
)

Expand Down Expand Up @@ -48,9 +49,9 @@ type KueueJobReconcillerConfig struct {
}

type AutopilotConfig struct {
InjectAntiAffinities bool `json:"injectAntiAffinities,omitempty"`
MigrateImpactedWorkloads bool `json:"migrateImpactedWorkloads,omitempty"`
ResourceUnhealthyConfig map[string]map[string]string `json:"resourceUnhealthyConfig,omitempty"`
InjectAntiAffinities bool `json:"injectAntiAffinities,omitempty"`
MonitorNodes bool `json:"monitorNodes,omitempty"`
ResourceTaints map[string][]v1.Taint `json:"resourceTaints,omitempty"`
}

type FaultToleranceConfig struct {
Expand Down Expand Up @@ -101,10 +102,12 @@ func NewAppWrapperConfig() *AppWrapperConfig {
LabelKeysToCopy: []string{},
},
Autopilot: &AutopilotConfig{
InjectAntiAffinities: true,
MigrateImpactedWorkloads: true,
ResourceUnhealthyConfig: map[string]map[string]string{
"nvidia.com/gpu": {"autopilot.ibm.com/gpuhealth": "ERR"},
InjectAntiAffinities: true,
MonitorNodes: true,
ResourceTaints: map[string][]v1.Taint{
"nvidia.com/gpu": {
{Key: "autopilot.ibm.com/gpuhealth", Value: "ERR", Effect: v1.TaintEffectNoSchedule},
{Key: "autopilot.ibm.com/gpuhealth", Value: "EVICT", Effect: v1.TaintEffectNoExecute}},
},
},
UserRBACAdmissionCheck: true,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func SetupControllers(mgr ctrl.Manager, awConfig *config.AppWrapperConfig) error
}
}

if awConfig.Autopilot != nil && awConfig.Autopilot.MigrateImpactedWorkloads {
if awConfig.Autopilot != nil && awConfig.Autopilot.MonitorNodes {
if err := (&appwrapper.NodeHealthMonitor{
Client: mgr.GetClient(),
Config: awConfig,
Expand Down
16 changes: 11 additions & 5 deletions site/_pages/arch-fault-tolerance.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,20 @@ Autopilot configuration used by the controller is:
```yaml
autopilot:
injectAntiAffinities: true
migrateImpactedWorkloads: true
resourceUnhealthyConfig:
monitorNodes: true
resourceTaints:
nvidia.com/gpu:
autopilot.ibm.com/gpuhealth: ERR
- key: autopilot.ibm.com/gpuhealth
value: ERR
effect: NoSchedule
- key: autopilot.ibm.com/gpuhealth
value: EVICT
effect: NoExecute
```

The `resourceUnhealthyConfig` is a map from resource names to labels. For this example
The `resourceTaints` is a map from resource names to taints. For this example
configuration, for exactly those Pods that have a non-zero resource request for
`nvidia.com/gpu`, the AppWrapper controller will automatically inject the stanze below
`nvidia.com/gpu`, the AppWrapper controller will automatically inject the stanza below
into the `affinity` portion of their Spec.
```yaml
nodeAffinity:
Expand All @@ -185,4 +190,5 @@ into the `affinity` portion of their Spec.
operator: NotIn
values:
- ERR
- EVICT
```
2 changes: 1 addition & 1 deletion test/e2e/appwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ var _ = Describe("AppWrapper E2E Test", func() {
err := updateNode(ctx, nodeName, func(n *v1.Node) { delete(n.Labels, "autopilot.ibm.com/gpuhealth") })
Expect(err).ShouldNot(HaveOccurred())
})
err = updateNode(ctx, nodeName, func(n *v1.Node) { n.Labels["autopilot.ibm.com/gpuhealth"] = "ERR" })
err = updateNode(ctx, nodeName, func(n *v1.Node) { n.Labels["autopilot.ibm.com/gpuhealth"] = "EVICT" })
Expect(err).ShouldNot(HaveOccurred())
By("workload is reset")
Eventually(AppWrapperPhase(ctx, aw), 120*time.Second).Should(Equal(workloadv1beta2.AppWrapperResetting))
Expand Down