Skip to content

implement forceful deletion #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ AppWrapper operator ensures that when Kueue admits an AppWrapper for
execution, all of the necessary information will be propagated
to cause the child's Kueue-enabled operator to admit it as well.

AppWrappers are also designed to harden workloads by providing an
additional level of automatic fault detection and recovery. The AppWrapper
controller monitors the health of the workload and if corrective actions
are not taken by the primary resource controllers within specified deadlines,
the AppWrapper controller will orchestrate workload-level retries and
resource deletion to ensure that either the workload returns to a
healthy state or is cleanly removed from the cluster and its quota
freed for use by other workloads. For details on customizing and
configuring these fault tolerance capabilities, please see
[fault_tolerance.md](docs/fault_tolerance.md).

## Description

Kueue has a well-developed pattern for Kueue-enabling a Custom
Expand Down
11 changes: 7 additions & 4 deletions api/v1beta2/appwrapper_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ const (
ResourcesDeployed AppWrapperCondition = "ResourcesDeployed"
PodsReady AppWrapperCondition = "PodsReady"
Unhealthy AppWrapperCondition = "Unhealthy"
DeletingResources AppWrapperCondition = "DeletingResources"
)

const (
WarmupGracePeriodDurationAnnotation = "workload.codeflare.dev.appwrapper/warmupGracePeriodDuration"
FailureGracePeriodDurationAnnotation = "workload.codeflare.dev.appwrapper/failureGracePeriodDuration"
ResetPauseDurationAnnotation = "workload.codeflare.dev.appwrapper/resetPauseDuration"
RetryLimitAnnotation = "workload.codeflare.dev.appwrapper/retryLimit"
WarmupGracePeriodDurationAnnotation = "workload.codeflare.dev.appwrapper/warmupGracePeriodDuration"
FailureGracePeriodDurationAnnotation = "workload.codeflare.dev.appwrapper/failureGracePeriodDuration"
ResetPauseDurationAnnotation = "workload.codeflare.dev.appwrapper/resetPauseDuration"
RetryLimitAnnotation = "workload.codeflare.dev.appwrapper/retryLimit"
DeletionGracePeriodAnnotation = "workload.codeflare.dev.appwrapper/deletionGracePeriodDuration"
DebuggingFailureDeletionDelayDurationAnnotation = "workload.codeflare.dev.appwrapper/debuggingFailureDeletionDelayDuration"
)

//+kubebuilder:object:root=true
Expand Down
5 changes: 5 additions & 0 deletions cmd/standalone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func main() {
setupLog.Info("Build info", "version", BuildVersion, "date", BuildDate)
setupLog.Info("Configuration", "config", awConfig)

if err := config.ValidateConfig(awConfig); err != nil {
setupLog.Error(err, "invalid appwrapper config")
os.Exit(1)
}

// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancelation and
Expand Down
5 changes: 5 additions & 0 deletions cmd/unified/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func main() {
setupLog.Info("Build info", "version", BuildVersion, "date", BuildDate)
setupLog.Info("Configuration", "config", awConfig)

if err := config.ValidateConfig(awConfig); err != nil {
setupLog.Error(err, "invalid appwrapper config")
os.Exit(1)
}

// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancelation and
Expand Down
71 changes: 71 additions & 0 deletions docs/fault_tolerance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
## Fault Tolerance

### Overall Design

The `podSets` contained in the AppWrapper specification enable the AppWrapper
controller to inject labels into every `Pod` that is created by
the workload during its execution. Throughout the execution of the
workload, the AppWrapper controller monitors the number and health of
all labeled `Pods` and uses this information to determine if a
workload is unhealthy. A workload can be deemed *unhealthy* either
because it contains a non-zero number of `Failed` pods or because
after the `WarmupGracePeriod` has passed and it has fewer
`Running` and `Completed` pods than expected.

If a workload is determined to be unhealthy, the AppWrapper controller
first waits for a `FailureGracePeriod` to allow the primary resource
controller an opportunity to react and return the workload to a
healthy state. If the `FailureGracePeriod` expires, the AppWrapper
controller will *reset* the workload by deleting its resources, waiting
for a `ResetPause`, and then creating new instances of the resources.
During this reset period, the AppWrapper **does not** release the workload's
quota; this ensures that when the resources are recreated they will still
have sufficient quota to execute. The number of times an AppWrapper is reset
is tracked as part of its status; if the number of resets exceeds the `RetryLimit`,
then the AppWrapper moves into a `Failed` state and its resources are deleted
(thus finally releasing its quota). If at any time during this retry loop,
an AppWrapper is suspended (ie, Kueue decides to preempt the AppWrapper),
the AppWrapper controller will respect this request by proceeding to delete
the resources

When the AppWrapper controller decides to delete the resources for a workload,
it proceeds through several phases. First it does a normal delete of the
resources, allowing the primary resource controllers time to cascade the deletion
through all child resources. During a `DeletionGracePeriod`, the AppWrapper controller
monitors to see if the primary controllers have managed to successfully delete
all of the workload's Pods and resources. If they fail to accomplish this within
the `DeletionGracePeriod`, the AppWrapper controller then initiates a *forceful*
deletion of all remaining Pods and resources by deleting them with a `GracePeriod` of `0`.
An AppWrapper will continue to have its `ResourcesDeployed` condition to be
`True` until all resources and Pods are successfully deleted.

This process ensures that when `ResourcesDeployed` becomes `False`, which
indicates to Kueue that the quota has been released, all resources created by
a failed workload will have been totally removed from the cluster.

### Configuration Details

The parameters of the retry loop described about are configured at the operator level
and can be customized on a per-AppWrapper basis by adding annotations.
The table below lists the parameters, gives their default, and the annotation that
can be used to customize them.

| Parameter | Default Value | Annotation |
|---------------------|---------------|---------------------------------------------------------------|
| WarmupGracePeriod | 5 Minutes | workload.codeflare.dev.appwrapper/warmupGracePeriodDuration |
| FailureGracePeriod | 1 Minute | workload.codeflare.dev.appwrapper/failureGracePeriodDuration |
| ResetPause | 90 Seconds | workload.codeflare.dev.appwrapper/resetPauseDuration |
| RetryLimit | 3 | workload.codeflare.dev.appwrapper/retryLimit |
| DeletionGracePeriod | 10 Minutes | workload.codeflare.dev.appwrapper/deletionGracePeriodDuration |
| GracePeriodCeiling | 24 Hours | Not Applicable |

The `GracePeriodCeiling` imposes an upper limit on the other grace periods to
reduce the impact of user-added annotations on overall system utilization.

To support debugging `Failed` workloads, an additional annotation
`workload.codeflare.dev.appwrapper/debuggingFailureDeletionDelayDuration` can
be added to an AppWrapper when it is created to add a delay between the time the
AppWrapper enters the `Failed` state and when the process of deleting its resources
begins. Since the AppWrapper continues to consume quota during this delayed deletion period,
this annotation should be used sparingly and only when interactive debugging of
the failed workload is being actively pursued.
2 changes: 1 addition & 1 deletion docs/state-diagram.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ stateDiagram-v2
%% Failures
rs --> f
rn --> f
rn --> rt : Pod Failures
rn --> rt : Workload Unhealthy
rt --> rs

classDef quota fill:lightblue
Expand Down
72 changes: 65 additions & 7 deletions internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,41 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperResuming)

case workloadv1beta2.AppWrapperFailed:
// Support for debugging failed jobs.
// When an appwrapper is annotated with a non-zero debugging delay,
// we hold quota for the delay period and do not delete the resources of
// a failed appwrapper unless Kueue preempts it by setting Suspend to true.
deletionDelay := r.debuggingFailureDeletionDelay(ctx, aw)

if deletionDelay > 0 && !aw.Spec.Suspend {
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Type: string(workloadv1beta2.DeletingResources),
Status: metav1.ConditionFalse,
Reason: "DeletionPaused",
Message: fmt.Sprintf("%v has value %v", workloadv1beta2.DebuggingFailureDeletionDelayDurationAnnotation, deletionDelay),
})
whenDelayed := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.DeletingResources)).LastTransitionTime

now := time.Now()
deadline := whenDelayed.Add(deletionDelay)
if now.Before(deadline) {
return ctrl.Result{RequeueAfter: deadline.Sub(now)}, r.Status().Update(ctx, aw)
}
}

if meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.ResourcesDeployed)) {
if !r.deleteComponents(ctx, aw) {
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
msg := "Resources deleted for failed AppWrapper"
if deletionDelay > 0 && aw.Spec.Suspend {
msg = "Kueue forced resource deletion by suspending AppWrapper"
}
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Type: string(workloadv1beta2.ResourcesDeployed),
Status: metav1.ConditionFalse,
Reason: string(workloadv1beta2.AppWrapperFailed),
Message: "Resources deleted for failed AppWrapper",
Message: msg,
})
}
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Expand Down Expand Up @@ -393,26 +419,36 @@ func (r *AppWrapperReconciler) workloadStatus(ctx context.Context, aw *workloadv
return summary, nil
}

func (r *AppWrapperReconciler) limitDuration(desired time.Duration) time.Duration {
if desired < 0 {
return 0 * time.Second
} else if desired > r.Config.FaultTolerance.GracePeriodCeiling {
return r.Config.FaultTolerance.GracePeriodCeiling
} else {
return desired
}
}

func (r *AppWrapperReconciler) warmupGraceDuration(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
if userPeriod, ok := aw.Annotations[workloadv1beta2.WarmupGracePeriodDurationAnnotation]; ok {
if duration, err := time.ParseDuration(userPeriod); err == nil {
return duration
return r.limitDuration(duration)
} else {
log.FromContext(ctx).Info("Malformed warmup period annotation", "annotation", userPeriod, "error", err)
}
}
return r.Config.FaultTolerance.WarmupGracePeriod
return r.limitDuration(r.Config.FaultTolerance.WarmupGracePeriod)
}

func (r *AppWrapperReconciler) failureGraceDuration(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
if userPeriod, ok := aw.Annotations[workloadv1beta2.FailureGracePeriodDurationAnnotation]; ok {
if duration, err := time.ParseDuration(userPeriod); err == nil {
return duration
return r.limitDuration(duration)
} else {
log.FromContext(ctx).Info("Malformed grace period annotation", "annotation", userPeriod, "error", err)
}
}
return r.Config.FaultTolerance.FailureGracePeriod
return r.limitDuration(r.Config.FaultTolerance.FailureGracePeriod)
}

func (r *AppWrapperReconciler) retryLimit(ctx context.Context, aw *workloadv1beta2.AppWrapper) int32 {
Expand All @@ -429,12 +465,34 @@ func (r *AppWrapperReconciler) retryLimit(ctx context.Context, aw *workloadv1bet
func (r *AppWrapperReconciler) resettingPauseDuration(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
if userPeriod, ok := aw.Annotations[workloadv1beta2.ResetPauseDurationAnnotation]; ok {
if duration, err := time.ParseDuration(userPeriod); err == nil {
return duration
return r.limitDuration(duration)
} else {
log.FromContext(ctx).Info("Malformed reset pause annotation", "annotation", userPeriod, "error", err)
}
}
return r.Config.FaultTolerance.ResetPause
return r.limitDuration(r.Config.FaultTolerance.ResetPause)
}

func (r *AppWrapperReconciler) deletionGraceDuration(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
if userPeriod, ok := aw.Annotations[workloadv1beta2.DeletionGracePeriodAnnotation]; ok {
if duration, err := time.ParseDuration(userPeriod); err == nil {
return r.limitDuration(duration)
} else {
log.FromContext(ctx).Info("Malformed deletion period annotation", "annotation", userPeriod, "error", err)
}
}
return r.limitDuration(r.Config.FaultTolerance.DeletionGracePeriod)
}

func (r *AppWrapperReconciler) debuggingFailureDeletionDelay(ctx context.Context, aw *workloadv1beta2.AppWrapper) time.Duration {
if userPeriod, ok := aw.Annotations[workloadv1beta2.DebuggingFailureDeletionDelayDurationAnnotation]; ok {
if duration, err := time.ParseDuration(userPeriod); err == nil {
return r.limitDuration(duration)
} else {
log.FromContext(ctx).Info("Malformed delay deletion annotation", "annotation", userPeriod, "error", err)
}
}
return 0 * time.Second
}

func clearCondition(aw *workloadv1beta2.AppWrapper, condition workloadv1beta2.AppWrapperCondition, reason string, message string) {
Expand Down
58 changes: 56 additions & 2 deletions internal/controller/appwrapper/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package appwrapper
import (
"context"
"fmt"
"time"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/pkg/utils"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -164,7 +166,11 @@ func (r *AppWrapperReconciler) createComponents(ctx context.Context, aw *workloa
}

func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
// TODO forceful deletion: See https://github.com/project-codeflare/appwrapper/issues/36
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Type: string(workloadv1beta2.DeletingResources),
Status: metav1.ConditionTrue,
Reason: "DeletionInitiated",
})
log := log.FromContext(ctx)
remaining := 0
for _, component := range aw.Spec.Components {
Expand All @@ -181,5 +187,53 @@ func (r *AppWrapperReconciler) deleteComponents(ctx context.Context, aw *workloa
}
remaining++ // no error deleting resource, resource therefore still exists
}
return remaining == 0

deletionGracePeriod := r.deletionGraceDuration(ctx, aw)
whenInitiated := meta.FindStatusCondition(aw.Status.Conditions, string(workloadv1beta2.DeletingResources)).LastTransitionTime
gracePeriodExpired := time.Now().After(whenInitiated.Time.Add(deletionGracePeriod))

if remaining > 0 && !gracePeriodExpired {
// Resources left and deadline hasn't expired, just requeue the deletion
return false
}

pods := &v1.PodList{Items: []v1.Pod{}}
if err := r.List(ctx, pods,
client.UnsafeDisableDeepCopy,
client.InNamespace(aw.Namespace),
client.MatchingLabels{AppWrapperLabel: aw.Name}); err != nil {
log.Error(err, "Pod list error")
}

if remaining == 0 && len(pods.Items) == 0 {
// no resources or pods left; deletion is complete
clearCondition(aw, workloadv1beta2.DeletingResources, "DeletionComplete", "")
return true
}

if gracePeriodExpired {
if len(pods.Items) > 0 {
// force deletion of pods first
for _, pod := range pods.Items {
if err := r.Delete(ctx, &pod, client.GracePeriodSeconds(0)); err != nil {
log.Error(err, "Forceful pod deletion error")
}
}
} else {
// force deletion of wrapped resources once pods are gone
for _, component := range aw.Spec.Components {
obj, err := parseComponent(aw, component.Template.Raw)
if err != nil {
log.Error(err, "Parsing error")
continue
}
if err := r.Delete(ctx, obj, client.GracePeriodSeconds(0)); err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "Forceful deletion error")
}
}
}
}

// requeue deletion
return false
}
Loading