Skip to content

Commit 07d8b91

Browse files
ankitathomastmshort
authored andcommittedOct 3, 2023
preserve failed unpack jobs, enforce minimum interval between failing unpack jobs
Signed-off-by: Ankita Thomas <[email protected]>
1 parent a06d7d7 commit 07d8b91

File tree

4 files changed

+80
-15
lines changed

4 files changed

+80
-15
lines changed
 

‎pkg/controller/bundle/bundle_unpacker.go

+70-10
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ const (
4141
// e.g 1m30s
4242
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
4343
BundleUnpackPodLabel = "job-name"
44+
45+
// BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before
46+
// attempting to recreate a failed unpack job for a bundle.
47+
BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval"
48+
49+
// bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle.
50+
bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref"
4451
)
4552

4653
type BundleUnpackResult struct {
@@ -247,6 +254,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
247254
}
248255
job.SetNamespace(cmRef.Namespace)
249256
job.SetName(cmRef.Name)
257+
job.SetLabels(map[string]string{bundleUnpackRefLabel: cmRef.Name})
250258
job.SetOwnerReferences([]metav1.OwnerReference{ownerRef(cmRef)})
251259
if c.runAsUser > 0 {
252260
job.Spec.Template.Spec.SecurityContext.RunAsUser = &c.runAsUser
@@ -287,7 +295,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
287295
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker
288296

289297
type Unpacker interface {
290-
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error)
298+
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error)
291299
}
292300

293301
type ConfigMapUnpacker struct {
@@ -448,7 +456,7 @@ const (
448456
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status"
449457
)
450458

451-
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) {
459+
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) {
452460
result = newBundleUnpackResult(lookup)
453461

454462
// if bundle lookup failed condition already present, then there is nothing more to do
@@ -510,7 +518,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup,
510518
secrets = append(secrets, corev1.LocalObjectReference{Name: secretName})
511519
}
512520
var job *batchv1.Job
513-
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout)
521+
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout, retryInterval)
514522
if err != nil || job == nil {
515523
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs)
516524
// The current job is deleted in that case so UnpackBundle needs to be retried
@@ -649,7 +657,7 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
649657
return
650658
}
651659

652-
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) {
660+
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) {
653661
fresh := c.job(cmRef, bundlePath, secrets, timeout)
654662
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
655663
if err != nil {
@@ -659,13 +667,40 @@ func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath
659667

660668
return
661669
}
662-
// Cleanup old unpacking job and retry
663-
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
664-
err = c.client.BatchV1().Jobs(job.GetNamespace()).Delete(context.TODO(), job.GetName(), metav1.DeleteOptions{})
665-
if err == nil {
666-
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
670+
671+
// only check for retries if an unpackRetryInterval is specified
672+
if unpackRetryInterval > 0 {
673+
if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
674+
lastFailureTime := failedCond.LastTransitionTime.Time
675+
// Look for other unpack jobs for the same bundle
676+
var jobs []*batchv1.Job
677+
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
678+
if err != nil {
679+
return
680+
}
681+
682+
var failed bool
683+
var cond *batchv1.JobCondition
684+
for _, j := range jobs {
685+
cond, failed = getCondition(j, batchv1.JobFailed)
686+
if !failed {
687+
// found an in-progress unpack attempt
688+
job = j
689+
break
690+
}
691+
if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) {
692+
lastFailureTime = cond.LastTransitionTime.Time
693+
}
694+
}
695+
696+
if failed {
697+
if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) {
698+
fresh.SetName(fmt.Sprintf("%s-%d", fresh.GetName(), len(jobs)))
699+
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
700+
return
701+
}
702+
}
667703
}
668-
return
669704
}
670705

671706
if equality.Semantic.DeepDerivative(fresh.GetOwnerReferences(), job.GetOwnerReferences()) && equality.Semantic.DeepDerivative(fresh.Spec, job.Spec) {
@@ -835,3 +870,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL
835870

836871
return d, nil
837872
}
873+
874+
// OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified.
875+
// If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0,
876+
// determines the minimum interval between recreating a failed unpack job.
877+
func OperatorGroupBundleUnpackRetryInterval(ogLister v1listers.OperatorGroupNamespaceLister) (time.Duration, error) {
878+
ogs, err := ogLister.List(k8slabels.Everything())
879+
if err != nil {
880+
return 0, err
881+
}
882+
if len(ogs) != 1 {
883+
return 0, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs))
884+
}
885+
886+
timeoutStr, ok := ogs[0].GetAnnotations()[BundleUnpackRetryMinimumIntervalAnnotationKey]
887+
if !ok {
888+
return 0, nil
889+
}
890+
891+
d, err := time.ParseDuration(timeoutStr)
892+
if err != nil {
893+
return 0, fmt.Errorf("failed to parse unpack timeout annotation(%s: %s): %w", BundleUnpackRetryMinimumIntervalAnnotationKey, timeoutStr, err)
894+
}
895+
896+
return d, nil
897+
}

‎pkg/controller/bundle/bundle_unpacker_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1673,7 +1673,7 @@ func TestConfigMapUnpacker(t *testing.T) {
16731673
)
16741674
require.NoError(t, err)
16751675

1676-
res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout)
1676+
res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout, 0)
16771677
require.Equal(t, tt.expected.err, err)
16781678

16791679
if tt.expected.res == nil {

‎pkg/controller/bundle/bundlefakes/fake_unpacker.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pkg/controller/operators/catalog/operator.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -1112,6 +1112,11 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
11121112
return err
11131113
}
11141114

1115+
minUnpackRetryInterval, err := bundle.OperatorGroupBundleUnpackRetryInterval(ogLister)
1116+
if err != nil {
1117+
return err
1118+
}
1119+
11151120
// TODO: parallel
11161121
maxGeneration := 0
11171122
subscriptionUpdated := false
@@ -1207,7 +1212,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
12071212
logger.Debug("unpacking bundles")
12081213

12091214
var unpacked bool
1210-
unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout)
1215+
unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout, minUnpackRetryInterval)
12111216
if err != nil {
12121217
// If the error was fatal capture and fail
12131218
if olmerrors.IsFatal(err) {
@@ -1664,7 +1669,7 @@ type UnpackedBundleReference struct {
16641669
Properties string `json:"properties"`
16651670
}
16661671

1667-
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
1672+
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
16681673
unpacked := true
16691674

16701675
outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups))
@@ -1679,7 +1684,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.
16791684
var errs []error
16801685
for i := 0; i < len(outBundleLookups); i++ {
16811686
lookup := outBundleLookups[i]
1682-
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout)
1687+
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval)
16831688
if err != nil {
16841689
errs = append(errs, err)
16851690
continue

0 commit comments

Comments
 (0)
Please sign in to comment.