Skip to content

Commit 7d77701

Browse files
committed
cleanup old jobs
Signed-off-by: Ankita Thomas <[email protected]>
1 parent 66ec506 commit 7d77701

File tree

5 files changed

+158
-44
lines changed

5 files changed

+158
-44
lines changed

Diff for: pkg/controller/bundle/bundle_unpacker.go

+49-30
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"sort"
78
"strings"
89
"time"
910

@@ -660,47 +661,34 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
660661

661662
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) {
662663
fresh := c.job(cmRef, bundlePath, secrets, timeout)
663-
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
664+
var jobs, toDelete []*batchv1.Job
665+
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
664666
if err != nil {
665-
if apierrors.IsNotFound(err) {
666-
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
667-
}
668-
669667
return
670668
}
669+
if len(jobs) == 0 {
670+
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
671+
}
672+
673+
maxRetainedJobs := 5 // TODO: make this configurable
674+
job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt
671675

672676
// only check for retries if an unpackRetryInterval is specified
673677
if unpackRetryInterval > 0 {
674-
if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
675-
lastFailureTime := failedCond.LastTransitionTime.Time
678+
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
676679
// Look for other unpack jobs for the same bundle
677-
var jobs []*batchv1.Job
678-
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
679-
if err != nil {
680-
return
681-
}
682-
683-
var failed bool
684-
var cond *batchv1.JobCondition
685-
for _, j := range jobs {
686-
cond, failed = getCondition(j, batchv1.JobFailed)
687-
if !failed {
688-
// found an in-progress unpack attempt
689-
job = j
690-
break
691-
}
692-
if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) {
693-
lastFailureTime = cond.LastTransitionTime.Time
694-
}
695-
}
696-
697-
if failed {
698-
if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) {
680+
if cond, failed := getCondition(job, batchv1.JobFailed); failed {
681+
if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) {
699682
fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName()))
700683
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
701684
}
702-
return
703685
}
686+
687+
// cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking
688+
for _, j := range toDelete {
689+
_ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{})
690+
}
691+
return
704692
}
705693
}
706694

@@ -845,6 +833,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con
845833
return
846834
}
847835

836+
func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) {
837+
if len(jobs) == 0 {
838+
return
839+
}
840+
// sort jobs so that latest job is first
841+
// with preference for non-failed jobs
842+
sort.Slice(jobs, func(i, j int) bool {
843+
condI, failedI := getCondition(jobs[i], batchv1.JobFailed)
844+
condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed)
845+
if failedI != failedJ {
846+
return !failedI // non-failed job goes first
847+
}
848+
return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time)
849+
})
850+
latest = jobs[0]
851+
if len(jobs) <= maxRetainedJobs {
852+
return
853+
}
854+
if maxRetainedJobs == 0 {
855+
toDelete = jobs[1:]
856+
return
857+
}
858+
859+
// cleanup old failed jobs, n-1 recent jobs and the oldest job
860+
for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ {
861+
toDelete = append(toDelete, jobs[maxRetainedJobs+i])
862+
}
863+
864+
return
865+
}
866+
848867
// OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified.
849868
// If the timeout annotation is not set, return timeout < 0 which is subsequently ignored.
850869
// This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis.

Diff for: pkg/controller/bundle/bundle_unpacker_test.go

+97
Original file line numberDiff line numberDiff line change
@@ -1951,3 +1951,100 @@ func TestOperatorGroupBundleUnpackRetryInterval(t *testing.T) {
19511951
})
19521952
}
19531953
}
1954+
1955+
func TestSortUnpackJobs(t *testing.T) {
1956+
// if there is a non-failed job, it should be first
1957+
// otherwise, the latest job should be first
1958+
//first n-1 jobs and oldest job are preserved
1959+
testJob := func(name string, failed bool, ts int64) *batchv1.Job {
1960+
conditions := []batchv1.JobCondition{}
1961+
if failed {
1962+
conditions = append(conditions, batchv1.JobCondition{
1963+
Type: batchv1.JobFailed,
1964+
Status: corev1.ConditionTrue,
1965+
LastTransitionTime: metav1.Time{Time: time.Unix(ts, 0)},
1966+
})
1967+
}
1968+
return &batchv1.Job{
1969+
ObjectMeta: metav1.ObjectMeta{
1970+
Name: name,
1971+
},
1972+
Status: batchv1.JobStatus{
1973+
Conditions: conditions,
1974+
},
1975+
}
1976+
}
1977+
failedJobs := []*batchv1.Job{
1978+
testJob("f-1", true, 1),
1979+
testJob("f-2", true, 2),
1980+
testJob("f-3", true, 3),
1981+
testJob("f-4", true, 4),
1982+
testJob("f-5", true, 5),
1983+
}
1984+
nonFailedJob := testJob("s-1", false, 1)
1985+
for _, tc := range []struct {
1986+
name string
1987+
jobs []*batchv1.Job
1988+
maxRetained int
1989+
expectedLatest *batchv1.Job
1990+
expectedToDelete []*batchv1.Job
1991+
}{
1992+
{
1993+
name: "no job history",
1994+
maxRetained: 0,
1995+
jobs: []*batchv1.Job{
1996+
failedJobs[1],
1997+
failedJobs[2],
1998+
failedJobs[0],
1999+
},
2000+
expectedLatest: failedJobs[2],
2001+
expectedToDelete: []*batchv1.Job{
2002+
failedJobs[1],
2003+
failedJobs[0],
2004+
},
2005+
}, {
2006+
name: "empty job list",
2007+
maxRetained: 1,
2008+
}, {
2009+
name: "retain oldest",
2010+
maxRetained: 1,
2011+
jobs: []*batchv1.Job{
2012+
failedJobs[2],
2013+
failedJobs[0],
2014+
failedJobs[1],
2015+
},
2016+
expectedToDelete: []*batchv1.Job{
2017+
failedJobs[1],
2018+
},
2019+
expectedLatest: failedJobs[2],
2020+
}, {
2021+
name: "multiple old jobs",
2022+
maxRetained: 2,
2023+
jobs: []*batchv1.Job{
2024+
failedJobs[1],
2025+
failedJobs[0],
2026+
failedJobs[2],
2027+
failedJobs[3],
2028+
failedJobs[4],
2029+
},
2030+
expectedLatest: failedJobs[4],
2031+
expectedToDelete: []*batchv1.Job{
2032+
failedJobs[1],
2033+
failedJobs[2],
2034+
},
2035+
}, {
2036+
name: "select non-failed as latest",
2037+
maxRetained: 3,
2038+
jobs: []*batchv1.Job{
2039+
failedJobs[0],
2040+
failedJobs[1],
2041+
nonFailedJob,
2042+
},
2043+
expectedLatest: nonFailedJob,
2044+
},
2045+
} {
2046+
latest, toDelete := sortUnpackJobs(tc.jobs, tc.maxRetained)
2047+
assert.Equal(t, tc.expectedLatest, latest)
2048+
assert.ElementsMatch(t, tc.expectedToDelete, toDelete)
2049+
}
2050+
}

Diff for: pkg/controller/operators/catalog/operator.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1669,7 +1669,7 @@ type UnpackedBundleReference struct {
16691669
Properties string `json:"properties"`
16701670
}
16711671

1672-
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
1672+
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, unpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
16731673
unpacked := true
16741674

16751675
outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups))
@@ -1684,7 +1684,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.
16841684
var errs []error
16851685
for i := 0; i < len(outBundleLookups); i++ {
16861686
lookup := outBundleLookups[i]
1687-
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval)
1687+
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, unpackRetryInterval)
16881688
if err != nil {
16891689
errs = append(errs, err)
16901690
continue

Diff for: test/e2e/registry.go

-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func createDockerRegistry(client operatorclient.ClientInterface, namespace strin
5454
Port: int32(5000),
5555
},
5656
},
57-
Type: corev1.ServiceTypeNodePort,
5857
},
5958
}
6059

Diff for: test/e2e/subscription_e2e_test.go

+10-11
Original file line numberDiff line numberDiff line change
@@ -2526,6 +2526,11 @@ var _ = Describe("Subscription", func() {
25262526
})
25272527
When("bundle unpack retries are enabled", func() {
25282528
It("should retry failing unpack jobs", func() {
2529+
if ok, err := inKind(c); ok && err == nil {
2530+
Skip("This spec fails when run using KIND cluster. See https://github.com/operator-framework/operator-lifecycle-manager/issues/2420 for more details")
2531+
} else if err != nil {
2532+
Skip("Could not determine whether running in a kind cluster. Skipping.")
2533+
}
25292534
By("Ensuring a registry to host bundle images")
25302535
local, err := Local(c)
25312536
Expect(err).NotTo(HaveOccurred(), "cannot determine if test running locally or on CI: %s", err)
@@ -2581,20 +2586,14 @@ var _ = Describe("Subscription", func() {
25812586
}
25822587
}
25832588

2584-
// testImage is the name of the image used throughout the test - the image overwritten by skopeo
2585-
// the tag is generated randomly and appended to the end of the testImage
2589+
// The remote image to be copied onto the local registry
25862590
srcImage := "quay.io/olmtest/example-operator-bundle:"
25872591
srcTag := "0.1.0"
2588-
bundleImage := fmt.Sprint(registryURL, "/unpack-retry-bundle", ":")
2592+
2593+
// on-cluster image ref
2594+
bundleImage := registryURL + "/unpack-retry-bundle:"
25892595
bundleTag := genName("x")
2590-
//// hash hashes data with sha256 and returns the hex string.
2591-
//func hash(data string) string {
2592-
// // A SHA256 hash is 64 characters, which is within the 253 character limit for kube resource names
2593-
// h := fmt.Sprintf("%x", sha256.Sum256([]byte(data)))
2594-
//
2595-
// // Make the hash 63 characters instead to comply with the 63 character limit for labels
2596-
// return fmt.Sprintf(h[:len(h)-1])
2597-
//}
2596+
25982597
unpackRetryCatalog := fmt.Sprintf(`
25992598
schema: olm.package
26002599
name: unpack-retry-package

0 commit comments

Comments
 (0)