Skip to content

Commit c6e9911

Browse files
Merge pull request openshift#689 from ankitathomas/installplan-retry-4.14
[release-4.14] OCPBUGS-29194: Retry failing unpack jobs
2 parents 84c3b96 + 17f58eb commit c6e9911

File tree

10 files changed

+665
-35
lines changed

10 files changed

+665
-35
lines changed

staging/operator-lifecycle-manager/pkg/controller/bundle/bundle_unpacker.go

+103-9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"sort"
78
"strings"
89
"time"
910

@@ -18,6 +19,7 @@ import (
1819
"k8s.io/apimachinery/pkg/api/resource"
1920
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2021
k8slabels "k8s.io/apimachinery/pkg/labels"
22+
"k8s.io/apiserver/pkg/storage/names"
2123
"k8s.io/client-go/kubernetes"
2224
listersbatchv1 "k8s.io/client-go/listers/batch/v1"
2325
listerscorev1 "k8s.io/client-go/listers/core/v1"
@@ -41,6 +43,13 @@ const (
4143
// e.g 1m30s
4244
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
4345
BundleUnpackPodLabel = "job-name"
46+
47+
// BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before
48+
// attempting to recreate a failed unpack job for a bundle.
49+
BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval"
50+
51+
// bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle.
52+
bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref"
4453
)
4554

4655
type BundleUnpackResult struct {
@@ -86,6 +95,12 @@ func newBundleUnpackResult(lookup *operatorsv1alpha1.BundleLookup) *BundleUnpack
8695

8796
func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, annotationUnpackTimeout time.Duration) *batchv1.Job {
8897
job := &batchv1.Job{
98+
ObjectMeta: metav1.ObjectMeta{
99+
Labels: map[string]string{
100+
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
101+
bundleUnpackRefLabel: cmRef.Name,
102+
},
103+
},
89104
Spec: batchv1.JobSpec{
90105
//ttlSecondsAfterFinished: 0 // can use in the future to not have to clean up job
91106
Template: corev1.PodTemplateSpec{
@@ -279,7 +294,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
279294
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker
280295

281296
type Unpacker interface {
282-
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error)
297+
UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error)
283298
}
284299

285300
type ConfigMapUnpacker struct {
@@ -440,7 +455,7 @@ const (
440455
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status"
441456
)
442457

443-
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) {
458+
func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) {
444459
result = newBundleUnpackResult(lookup)
445460

446461
// if bundle lookup failed condition already present, then there is nothing more to do
@@ -502,7 +517,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup,
502517
secrets = append(secrets, corev1.LocalObjectReference{Name: secretName})
503518
}
504519
var job *batchv1.Job
505-
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout)
520+
job, err = c.ensureJob(cmRef, result.Path, secrets, timeout, retryInterval)
506521
if err != nil || job == nil {
507522
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs)
508523
// The current job is deleted in that case so UnpackBundle needs to be retried
@@ -641,16 +656,39 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
641656
return
642657
}
643658

644-
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) {
659+
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) {
645660
fresh := c.job(cmRef, bundlePath, secrets, timeout)
646-
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
661+
var jobs, toDelete []*batchv1.Job
662+
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
647663
if err != nil {
648-
if apierrors.IsNotFound(err) {
649-
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
650-
}
651-
652664
return
653665
}
666+
if len(jobs) == 0 {
667+
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
668+
return
669+
}
670+
671+
maxRetainedJobs := 5 // TODO: make this configurable
672+
job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt
673+
674+
// only check for retries if an unpackRetryInterval is specified
675+
if unpackRetryInterval > 0 {
676+
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
677+
// Look for other unpack jobs for the same bundle
678+
if cond, failed := getCondition(job, batchv1.JobFailed); failed {
679+
if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) {
680+
fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName()))
681+
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
682+
}
683+
}
684+
685+
// cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking
686+
for _, j := range toDelete {
687+
_ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{})
688+
}
689+
return
690+
}
691+
}
654692

655693
if equality.Semantic.DeepDerivative(fresh.GetOwnerReferences(), job.GetOwnerReferences()) && equality.Semantic.DeepDerivative(fresh.Spec, job.Spec) {
656694
return
@@ -791,6 +829,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con
791829
return
792830
}
793831

832+
func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) {
833+
if len(jobs) == 0 {
834+
return
835+
}
836+
// sort jobs so that latest job is first
837+
// with preference for non-failed jobs
838+
sort.Slice(jobs, func(i, j int) bool {
839+
condI, failedI := getCondition(jobs[i], batchv1.JobFailed)
840+
condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed)
841+
if failedI != failedJ {
842+
return !failedI // non-failed job goes first
843+
}
844+
return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time)
845+
})
846+
latest = jobs[0]
847+
if len(jobs) <= maxRetainedJobs {
848+
return
849+
}
850+
if maxRetainedJobs == 0 {
851+
toDelete = jobs[1:]
852+
return
853+
}
854+
855+
// cleanup old failed jobs, n-1 recent jobs and the oldest job
856+
for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ {
857+
toDelete = append(toDelete, jobs[maxRetainedJobs+i])
858+
}
859+
860+
return
861+
}
862+
794863
// OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified.
795864
// If the timeout annotation is not set, return timeout < 0 which is subsequently ignored.
796865
// This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis.
@@ -817,3 +886,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL
817886

818887
return d, nil
819888
}
889+
890+
// OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified.
891+
// If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0,
892+
// determines the minimum interval between recreating a failed unpack job.
893+
func OperatorGroupBundleUnpackRetryInterval(ogLister v1listers.OperatorGroupNamespaceLister) (time.Duration, error) {
894+
ogs, err := ogLister.List(k8slabels.Everything())
895+
if err != nil {
896+
return 0, err
897+
}
898+
if len(ogs) != 1 {
899+
return 0, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs))
900+
}
901+
902+
timeoutStr, ok := ogs[0].GetAnnotations()[BundleUnpackRetryMinimumIntervalAnnotationKey]
903+
if !ok {
904+
return 0, nil
905+
}
906+
907+
d, err := time.ParseDuration(timeoutStr)
908+
if err != nil {
909+
return 0, fmt.Errorf("failed to parse unpack retry annotation(%s: %s): %w", BundleUnpackRetryMinimumIntervalAnnotationKey, timeoutStr, err)
910+
}
911+
912+
return d, nil
913+
}

0 commit comments

Comments
 (0)