-
Notifications
You must be signed in to change notification settings - Fork 553
retry unpacking jobs on failure #3016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"context" | ||
"crypto/sha256" | ||
"fmt" | ||
"sort" | ||
"strings" | ||
"time" | ||
|
||
|
@@ -18,6 +19,7 @@ import ( | |
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
k8slabels "k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/apiserver/pkg/storage/names" | ||
"k8s.io/client-go/kubernetes" | ||
listersbatchv1 "k8s.io/client-go/listers/batch/v1" | ||
listerscorev1 "k8s.io/client-go/listers/core/v1" | ||
|
@@ -41,6 +43,13 @@ const ( | |
// e.g 1m30s | ||
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout" | ||
BundleUnpackPodLabel = "job-name" | ||
|
||
// BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before | ||
// attempting to recreate a failed unpack job for a bundle. | ||
BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval" | ||
|
||
// bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle. | ||
bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref" | ||
) | ||
|
||
type BundleUnpackResult struct { | ||
|
@@ -89,6 +98,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string | |
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: map[string]string{ | ||
install.OLMManagedLabelKey: install.OLMManagedLabelValue, | ||
bundleUnpackRefLabel: cmRef.Name, | ||
}, | ||
}, | ||
Spec: batchv1.JobSpec{ | ||
|
@@ -287,7 +297,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string | |
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker | ||
|
||
type Unpacker interface { | ||
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) | ||
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) | ||
} | ||
|
||
type ConfigMapUnpacker struct { | ||
|
@@ -448,7 +458,7 @@ const ( | |
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status" | ||
) | ||
|
||
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) { | ||
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) { | ||
result = newBundleUnpackResult(lookup) | ||
|
||
// if bundle lookup failed condition already present, then there is nothing more to do | ||
|
@@ -510,7 +520,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, | |
secrets = append(secrets, corev1.LocalObjectReference{Name: secretName}) | ||
} | ||
var job *batchv1.Job | ||
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout) | ||
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout, retryInterval) | ||
if err != nil || job == nil { | ||
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs) | ||
// The current job is deleted in that case so UnpackBundle needs to be retried | ||
|
@@ -649,16 +659,39 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name | |
return | ||
} | ||
|
||
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) { | ||
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) { | ||
fresh := c.job(cmRef, bundlePath, secrets, timeout) | ||
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName()) | ||
var jobs, toDelete []*batchv1.Job | ||
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name}) | ||
if err != nil { | ||
if apierrors.IsNotFound(err) { | ||
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) | ||
} | ||
|
||
return | ||
} | ||
if len(jobs) == 0 { | ||
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) | ||
return | ||
} | ||
|
||
maxRetainedJobs := 5 // TODO: make this configurable | ||
job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt | ||
|
||
// only check for retries if an unpackRetryInterval is specified | ||
if unpackRetryInterval > 0 { | ||
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed { | ||
// Look for other unpack jobs for the same bundle | ||
if cond, failed := getCondition(job, batchv1.JobFailed); failed { | ||
if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) { | ||
fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName())) | ||
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) | ||
} | ||
} | ||
|
||
// cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking | ||
for _, j := range toDelete { | ||
_ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{}) | ||
} | ||
return | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This appears to retry without limit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the retry cadence? is it exp backoff? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This runs whenever olm syncs resolves a namespace - we use the default client-go workqueue so we have exp backoff up to ~15 min. We do however reset this backoff each time the new unpack job begins, so this can become as short as a 5 second retry loop if the unpack timeout is short enough. |
||
|
||
if equality.Semantic.DeepDerivative(fresh.GetOwnerReferences(), job.GetOwnerReferences()) && equality.Semantic.DeepDerivative(fresh.Spec, job.Spec) { | ||
return | ||
|
@@ -801,6 +834,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con | |
return | ||
} | ||
|
||
func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) { | ||
if len(jobs) == 0 { | ||
return | ||
} | ||
// sort jobs so that latest job is first | ||
// with preference for non-failed jobs | ||
sort.Slice(jobs, func(i, j int) bool { | ||
condI, failedI := getCondition(jobs[i], batchv1.JobFailed) | ||
condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed) | ||
if failedI != failedJ { | ||
return !failedI // non-failed job goes first | ||
} | ||
return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time) | ||
}) | ||
latest = jobs[0] | ||
if len(jobs) <= maxRetainedJobs { | ||
return | ||
} | ||
if maxRetainedJobs == 0 { | ||
toDelete = jobs[1:] | ||
return | ||
} | ||
|
||
// cleanup old failed jobs, n-1 recent jobs and the oldest job | ||
for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ { | ||
toDelete = append(toDelete, jobs[maxRetainedJobs+i]) | ||
} | ||
|
||
return | ||
} | ||
|
||
// OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified. | ||
// If the timeout annotation is not set, return timeout < 0 which is subsequently ignored. | ||
// This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis. | ||
|
@@ -827,3 +891,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL | |
|
||
return d, nil | ||
} | ||
|
||
// OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified. | ||
// If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0, | ||
// determines the minimum interval between recreating a failed unpack job. | ||
func OperatorGroupBundleUnpackRetryInterval(ogLister v1listers.OperatorGroupNamespaceLister) (time.Duration, error) { | ||
ogs, err := ogLister.List(k8slabels.Everything()) | ||
if err != nil { | ||
return 0, err | ||
} | ||
if len(ogs) != 1 { | ||
return 0, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs)) | ||
} | ||
|
||
timeoutStr, ok := ogs[0].GetAnnotations()[BundleUnpackRetryMinimumIntervalAnnotationKey] | ||
if !ok { | ||
return 0, nil | ||
} | ||
|
||
d, err := time.ParseDuration(timeoutStr) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to parse unpack retry annotation(%s: %s): %w", BundleUnpackRetryMinimumIntervalAnnotationKey, timeoutStr, err) | ||
} | ||
|
||
return d, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you have a follow-up PR to document how to use this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll follow up with operator-framework/olm-docs#313 once the PR is merged