Skip to content

Commit 4fc64d2

Browse files
ankitathomastmshort
authored andcommitted
cleanup old jobs
Signed-off-by: Ankita Thomas <[email protected]>
1 parent 218a6bb commit 4fc64d2

File tree

5 files changed

+165
-49
lines changed

5 files changed

+165
-49
lines changed

pkg/controller/bundle/bundle_unpacker.go

+50-30
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"fmt"
7+
"sort"
78
"strings"
89
"time"
910

@@ -660,47 +661,35 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
660661

661662
func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) {
662663
fresh := c.job(cmRef, bundlePath, secrets, timeout)
663-
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
664+
var jobs, toDelete []*batchv1.Job
665+
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
664666
if err != nil {
665-
if apierrors.IsNotFound(err) {
666-
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
667-
}
668-
669667
return
670668
}
669+
if len(jobs) == 0 {
670+
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
671+
return
672+
}
673+
674+
maxRetainedJobs := 5 // TODO: make this configurable
675+
job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt
671676

672677
// only check for retries if an unpackRetryInterval is specified
673678
if unpackRetryInterval > 0 {
674-
if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
675-
lastFailureTime := failedCond.LastTransitionTime.Time
679+
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
676680
// Look for other unpack jobs for the same bundle
677-
var jobs []*batchv1.Job
678-
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
679-
if err != nil {
680-
return
681-
}
682-
683-
var failed bool
684-
var cond *batchv1.JobCondition
685-
for _, j := range jobs {
686-
cond, failed = getCondition(j, batchv1.JobFailed)
687-
if !failed {
688-
// found an in-progress unpack attempt
689-
job = j
690-
break
691-
}
692-
if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) {
693-
lastFailureTime = cond.LastTransitionTime.Time
694-
}
695-
}
696-
697-
if failed {
698-
if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) {
681+
if cond, failed := getCondition(job, batchv1.JobFailed); failed {
682+
if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) {
699683
fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName()))
700684
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
701685
}
702-
return
703686
}
687+
688+
// cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking
689+
for _, j := range toDelete {
690+
_ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{})
691+
}
692+
return
704693
}
705694
}
706695

@@ -845,6 +834,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con
845834
return
846835
}
847836

837+
func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) {
838+
if len(jobs) == 0 {
839+
return
840+
}
841+
// sort jobs so that latest job is first
842+
// with preference for non-failed jobs
843+
sort.Slice(jobs, func(i, j int) bool {
844+
condI, failedI := getCondition(jobs[i], batchv1.JobFailed)
845+
condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed)
846+
if failedI != failedJ {
847+
return !failedI // non-failed job goes first
848+
}
849+
return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time)
850+
})
851+
latest = jobs[0]
852+
if len(jobs) <= maxRetainedJobs {
853+
return
854+
}
855+
if maxRetainedJobs == 0 {
856+
toDelete = jobs[1:]
857+
return
858+
}
859+
860+
// cleanup old failed jobs, n-1 recent jobs and the oldest job
861+
for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ {
862+
toDelete = append(toDelete, jobs[maxRetainedJobs+i])
863+
}
864+
865+
return
866+
}
867+
848868
// OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified.
849869
// If the timeout annotation is not set, return timeout < 0 which is subsequently ignored.
850870
// This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis.

pkg/controller/bundle/bundle_unpacker_test.go

+103-5
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ func TestConfigMapUnpacker(t *testing.T) {
441441
ObjectMeta: metav1.ObjectMeta{
442442
Name: digestHash,
443443
Namespace: "ns-a",
444-
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue},
444+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash},
445445
OwnerReferences: []metav1.OwnerReference{
446446
{
447447
APIVersion: "v1",
@@ -712,7 +712,7 @@ func TestConfigMapUnpacker(t *testing.T) {
712712
ObjectMeta: metav1.ObjectMeta{
713713
Name: digestHash,
714714
Namespace: "ns-a",
715-
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue},
715+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash},
716716
OwnerReferences: []metav1.OwnerReference{
717717
{
718718
APIVersion: "v1",
@@ -978,7 +978,7 @@ func TestConfigMapUnpacker(t *testing.T) {
978978
ObjectMeta: metav1.ObjectMeta{
979979
Name: pathHash,
980980
Namespace: "ns-a",
981-
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue},
981+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
982982
OwnerReferences: []metav1.OwnerReference{
983983
{
984984
APIVersion: "v1",
@@ -1213,7 +1213,7 @@ func TestConfigMapUnpacker(t *testing.T) {
12131213
ObjectMeta: metav1.ObjectMeta{
12141214
Name: pathHash,
12151215
Namespace: "ns-a",
1216-
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue},
1216+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
12171217
OwnerReferences: []metav1.OwnerReference{
12181218
{
12191219
APIVersion: "v1",
@@ -1459,7 +1459,7 @@ func TestConfigMapUnpacker(t *testing.T) {
14591459
ObjectMeta: metav1.ObjectMeta{
14601460
Name: pathHash,
14611461
Namespace: "ns-a",
1462-
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue},
1462+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
14631463
OwnerReferences: []metav1.OwnerReference{
14641464
{
14651465
APIVersion: "v1",
@@ -1951,3 +1951,101 @@ func TestOperatorGroupBundleUnpackRetryInterval(t *testing.T) {
19511951
})
19521952
}
19531953
}
1954+
1955+
func TestSortUnpackJobs(t *testing.T) {
1956+
// if there is a non-failed job, it should be first
1957+
// otherwise, the latest job should be first
1958+
//first n-1 jobs and oldest job are preserved
1959+
testJob := func(name string, failed bool, ts int64) *batchv1.Job {
1960+
conditions := []batchv1.JobCondition{}
1961+
if failed {
1962+
conditions = append(conditions, batchv1.JobCondition{
1963+
Type: batchv1.JobFailed,
1964+
Status: corev1.ConditionTrue,
1965+
LastTransitionTime: metav1.Time{Time: time.Unix(ts, 0)},
1966+
})
1967+
}
1968+
return &batchv1.Job{
1969+
ObjectMeta: metav1.ObjectMeta{
1970+
Name: name,
1971+
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: "test"},
1972+
},
1973+
Status: batchv1.JobStatus{
1974+
Conditions: conditions,
1975+
},
1976+
}
1977+
}
1978+
failedJobs := []*batchv1.Job{
1979+
testJob("f-1", true, 1),
1980+
testJob("f-2", true, 2),
1981+
testJob("f-3", true, 3),
1982+
testJob("f-4", true, 4),
1983+
testJob("f-5", true, 5),
1984+
}
1985+
nonFailedJob := testJob("s-1", false, 1)
1986+
for _, tc := range []struct {
1987+
name string
1988+
jobs []*batchv1.Job
1989+
maxRetained int
1990+
expectedLatest *batchv1.Job
1991+
expectedToDelete []*batchv1.Job
1992+
}{
1993+
{
1994+
name: "no job history",
1995+
maxRetained: 0,
1996+
jobs: []*batchv1.Job{
1997+
failedJobs[1],
1998+
failedJobs[2],
1999+
failedJobs[0],
2000+
},
2001+
expectedLatest: failedJobs[2],
2002+
expectedToDelete: []*batchv1.Job{
2003+
failedJobs[1],
2004+
failedJobs[0],
2005+
},
2006+
}, {
2007+
name: "empty job list",
2008+
maxRetained: 1,
2009+
}, {
2010+
name: "retain oldest",
2011+
maxRetained: 1,
2012+
jobs: []*batchv1.Job{
2013+
failedJobs[2],
2014+
failedJobs[0],
2015+
failedJobs[1],
2016+
},
2017+
expectedToDelete: []*batchv1.Job{
2018+
failedJobs[1],
2019+
},
2020+
expectedLatest: failedJobs[2],
2021+
}, {
2022+
name: "multiple old jobs",
2023+
maxRetained: 2,
2024+
jobs: []*batchv1.Job{
2025+
failedJobs[1],
2026+
failedJobs[0],
2027+
failedJobs[2],
2028+
failedJobs[3],
2029+
failedJobs[4],
2030+
},
2031+
expectedLatest: failedJobs[4],
2032+
expectedToDelete: []*batchv1.Job{
2033+
failedJobs[1],
2034+
failedJobs[2],
2035+
},
2036+
}, {
2037+
name: "select non-failed as latest",
2038+
maxRetained: 3,
2039+
jobs: []*batchv1.Job{
2040+
failedJobs[0],
2041+
failedJobs[1],
2042+
nonFailedJob,
2043+
},
2044+
expectedLatest: nonFailedJob,
2045+
},
2046+
} {
2047+
latest, toDelete := sortUnpackJobs(tc.jobs, tc.maxRetained)
2048+
assert.Equal(t, tc.expectedLatest, latest)
2049+
assert.ElementsMatch(t, tc.expectedToDelete, toDelete)
2050+
}
2051+
}

pkg/controller/operators/catalog/operator.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1669,7 +1669,7 @@ type UnpackedBundleReference struct {
16691669
Properties string `json:"properties"`
16701670
}
16711671

1672-
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
1672+
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, unpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
16731673
unpacked := true
16741674

16751675
outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups))
@@ -1684,7 +1684,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.
16841684
var errs []error
16851685
for i := 0; i < len(outBundleLookups); i++ {
16861686
lookup := outBundleLookups[i]
1687-
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval)
1687+
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, unpackRetryInterval)
16881688
if err != nil {
16891689
errs = append(errs, err)
16901690
continue

test/e2e/registry.go

-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func createDockerRegistry(client operatorclient.ClientInterface, namespace strin
5454
Port: int32(5000),
5555
},
5656
},
57-
Type: corev1.ServiceTypeNodePort,
5857
},
5958
}
6059

test/e2e/subscription_e2e_test.go

+10-11
Original file line numberDiff line numberDiff line change
@@ -2526,6 +2526,11 @@ var _ = Describe("Subscription", func() {
25262526
})
25272527
When("bundle unpack retries are enabled", func() {
25282528
It("should retry failing unpack jobs", func() {
2529+
if ok, err := inKind(c); ok && err == nil {
2530+
Skip("This spec fails when run using KIND cluster. See https://github.com/operator-framework/operator-lifecycle-manager/issues/2420 for more details")
2531+
} else if err != nil {
2532+
Skip("Could not determine whether running in a kind cluster. Skipping.")
2533+
}
25292534
By("Ensuring a registry to host bundle images")
25302535
local, err := Local(c)
25312536
Expect(err).NotTo(HaveOccurred(), "cannot determine if test running locally or on CI: %s", err)
@@ -2581,20 +2586,14 @@ var _ = Describe("Subscription", func() {
25812586
}
25822587
}
25832588

2584-
// testImage is the name of the image used throughout the test - the image overwritten by skopeo
2585-
// the tag is generated randomly and appended to the end of the testImage
2589+
// The remote image to be copied onto the local registry
25862590
srcImage := "quay.io/olmtest/example-operator-bundle:"
25872591
srcTag := "0.1.0"
2588-
bundleImage := fmt.Sprint(registryURL, "/unpack-retry-bundle", ":")
2592+
2593+
// on-cluster image ref
2594+
bundleImage := registryURL + "/unpack-retry-bundle:"
25892595
bundleTag := genName("x")
2590-
//// hash hashes data with sha256 and returns the hex string.
2591-
//func hash(data string) string {
2592-
// // A SHA256 hash is 64 characters, which is within the 253 character limit for kube resource names
2593-
// h := fmt.Sprintf("%x", sha256.Sum256([]byte(data)))
2594-
//
2595-
// // Make the hash 63 characters instead to comply with the 63 character limit for labels
2596-
// return fmt.Sprintf(h[:len(h)-1])
2597-
//}
2596+
25982597
unpackRetryCatalog := fmt.Sprintf(`
25992598
schema: olm.package
26002599
name: unpack-retry-package

0 commit comments

Comments
 (0)