|
4 | 4 | "context"
|
5 | 5 | "crypto/sha256"
|
6 | 6 | "fmt"
|
| 7 | + "sort" |
7 | 8 | "strings"
|
8 | 9 | "time"
|
9 | 10 |
|
@@ -671,36 +672,29 @@ func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath
|
671 | 672 |
|
672 | 673 | // only check for retries if an unpackRetryInterval is specified
|
673 | 674 | if unpackRetryInterval > 0 {
|
674 |
| - if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed { |
675 |
| - lastFailureTime := failedCond.LastTransitionTime.Time |
| 675 | + maxRetainedJobs := 5 // TODO: make this configurable |
| 676 | + if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed { |
676 | 677 | // Look for other unpack jobs for the same bundle
|
677 |
| - var jobs []*batchv1.Job |
| 678 | + var jobs, toDelete []*batchv1.Job |
678 | 679 | jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
|
679 |
| - if err != nil { |
| 680 | + if err != nil || len(jobs) == 0 { |
680 | 681 | return
|
681 | 682 | }
|
682 | 683 |
|
683 |
| - var failed bool |
684 |
| - var cond *batchv1.JobCondition |
685 |
| - for _, j := range jobs { |
686 |
| - cond, failed = getCondition(j, batchv1.JobFailed) |
687 |
| - if !failed { |
688 |
| - // found an in-progress unpack attempt |
689 |
| - job = j |
690 |
| - break |
691 |
| - } |
692 |
| - if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) { |
693 |
| - lastFailureTime = cond.LastTransitionTime.Time |
694 |
| - } |
695 |
| - } |
| 684 | + job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) |
696 | 685 |
|
697 |
| - if failed { |
698 |
| - if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) { |
| 686 | + if cond, failed := getCondition(job, batchv1.JobFailed); failed { |
| 687 | + if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) { |
699 | 688 | fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName()))
|
700 | 689 | job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
|
701 | 690 | }
|
702 |
| - return |
703 | 691 | }
|
| 692 | + |
| 693 | + // cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking |
| 694 | + for _, j := range toDelete { |
| 695 | + _ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{}) |
| 696 | + } |
| 697 | + return |
704 | 698 | }
|
705 | 699 | }
|
706 | 700 |
|
@@ -845,6 +839,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con
|
845 | 839 | return
|
846 | 840 | }
|
847 | 841 |
|
| 842 | +func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) { |
| 843 | + if len(jobs) == 0 { |
| 844 | + return |
| 845 | + } |
| 846 | + // sort jobs so that latest job is first |
| 847 | + // with preference for non-failed jobs |
| 848 | + sort.Slice(jobs, func(i, j int) bool { |
| 849 | + condI, failedI := getCondition(jobs[i], batchv1.JobFailed) |
| 850 | + condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed) |
| 851 | + if failedI != failedJ { |
| 852 | + return !failedI // non-failed job goes first |
| 853 | + } |
| 854 | + return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time) |
| 855 | + }) |
| 856 | + latest = jobs[0] |
| 857 | + if len(jobs) <= maxRetainedJobs { |
| 858 | + return |
| 859 | + } |
| 860 | + if maxRetainedJobs == 0 { |
| 861 | + toDelete = jobs[1:] |
| 862 | + return |
| 863 | + } |
| 864 | + |
| 865 | + // cleanup old failed jobs, n-1 recent jobs and the oldest job |
| 866 | + for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ { |
| 867 | + toDelete = append(toDelete, jobs[maxRetainedJobs+i]) |
| 868 | + } |
| 869 | + |
| 870 | + return |
| 871 | +} |
| 872 | + |
848 | 873 | // OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified.
|
849 | 874 | // If the timeout annotation is not set, return timeout < 0 which is subsequently ignored.
|
850 | 875 | // This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis.
|
|
0 commit comments