4
4
"context"
5
5
"crypto/sha256"
6
6
"fmt"
7
+ "sort"
7
8
"strings"
8
9
"time"
9
10
@@ -18,6 +19,7 @@ import (
18
19
"k8s.io/apimachinery/pkg/api/resource"
19
20
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
21
k8slabels "k8s.io/apimachinery/pkg/labels"
22
+ "k8s.io/apiserver/pkg/storage/names"
21
23
"k8s.io/client-go/kubernetes"
22
24
listersbatchv1 "k8s.io/client-go/listers/batch/v1"
23
25
listerscorev1 "k8s.io/client-go/listers/core/v1"
@@ -41,6 +43,13 @@ const (
41
43
// e.g 1m30s
42
44
BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout"
43
45
BundleUnpackPodLabel = "job-name"
46
+
47
+ // BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before
48
+ // attempting to recreate a failed unpack job for a bundle.
49
+ BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval"
50
+
51
+ // bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle.
52
+ bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref"
44
53
)
45
54
46
55
type BundleUnpackResult struct {
@@ -86,6 +95,12 @@ func newBundleUnpackResult(lookup *operatorsv1alpha1.BundleLookup) *BundleUnpack
86
95
87
96
func (c * ConfigMapUnpacker ) job (cmRef * corev1.ObjectReference , bundlePath string , secrets []corev1.LocalObjectReference , annotationUnpackTimeout time.Duration ) * batchv1.Job {
88
97
job := & batchv1.Job {
98
+ ObjectMeta : metav1.ObjectMeta {
99
+ Labels : map [string ]string {
100
+ install .OLMManagedLabelKey : install .OLMManagedLabelValue ,
101
+ bundleUnpackRefLabel : cmRef .Name ,
102
+ },
103
+ },
89
104
Spec : batchv1.JobSpec {
90
105
//ttlSecondsAfterFinished: 0 // can use in the future to not have to clean up job
91
106
Template : corev1.PodTemplateSpec {
@@ -279,7 +294,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string
279
294
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker
280
295
281
296
type Unpacker interface {
282
- UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout time.Duration ) (result * BundleUnpackResult , err error )
297
+ UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout , retryInterval time.Duration ) (result * BundleUnpackResult , err error )
283
298
}
284
299
285
300
type ConfigMapUnpacker struct {
@@ -440,7 +455,7 @@ const (
440
455
NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status"
441
456
)
442
457
443
- func (c * ConfigMapUnpacker ) UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout time.Duration ) (result * BundleUnpackResult , err error ) {
458
+ func (c * ConfigMapUnpacker ) UnpackBundle (lookup * operatorsv1alpha1.BundleLookup , timeout , retryInterval time.Duration ) (result * BundleUnpackResult , err error ) {
444
459
result = newBundleUnpackResult (lookup )
445
460
446
461
// if bundle lookup failed condition already present, then there is nothing more to do
@@ -502,7 +517,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup,
502
517
secrets = append (secrets , corev1.LocalObjectReference {Name : secretName })
503
518
}
504
519
var job * batchv1.Job
505
- job , err = c .ensureJob (cmRef , result .Path , secrets , timeout )
520
+ job , err = c .ensureJob (cmRef , result .Path , secrets , timeout , retryInterval )
506
521
if err != nil || job == nil {
507
522
// ensureJob can return nil if the job present does not match the expected job (spec and ownerefs)
508
523
// The current job is deleted in that case so UnpackBundle needs to be retried
@@ -641,16 +656,39 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name
641
656
return
642
657
}
643
658
644
- func (c * ConfigMapUnpacker ) ensureJob (cmRef * corev1.ObjectReference , bundlePath string , secrets []corev1.LocalObjectReference , timeout time.Duration ) (job * batchv1.Job , err error ) {
659
+ func (c * ConfigMapUnpacker ) ensureJob (cmRef * corev1.ObjectReference , bundlePath string , secrets []corev1.LocalObjectReference , timeout time.Duration , unpackRetryInterval time. Duration ) (job * batchv1.Job , err error ) {
645
660
fresh := c .job (cmRef , bundlePath , secrets , timeout )
646
- job , err = c .jobLister .Jobs (fresh .GetNamespace ()).Get (fresh .GetName ())
661
+ var jobs , toDelete []* batchv1.Job
662
+ jobs , err = c .jobLister .Jobs (fresh .GetNamespace ()).List (k8slabels.ValidatedSetSelector {bundleUnpackRefLabel : cmRef .Name })
647
663
if err != nil {
648
- if apierrors .IsNotFound (err ) {
649
- job , err = c .client .BatchV1 ().Jobs (fresh .GetNamespace ()).Create (context .TODO (), fresh , metav1.CreateOptions {})
650
- }
651
-
652
664
return
653
665
}
666
+ if len (jobs ) == 0 {
667
+ job , err = c .client .BatchV1 ().Jobs (fresh .GetNamespace ()).Create (context .TODO (), fresh , metav1.CreateOptions {})
668
+ return
669
+ }
670
+
671
+ maxRetainedJobs := 5 // TODO: make this configurable
672
+ job , toDelete = sortUnpackJobs (jobs , maxRetainedJobs ) // choose latest or on-failed job attempt
673
+
674
+ // only check for retries if an unpackRetryInterval is specified
675
+ if unpackRetryInterval > 0 {
676
+ if _ , isFailed := getCondition (job , batchv1 .JobFailed ); isFailed {
677
+ // Look for other unpack jobs for the same bundle
678
+ if cond , failed := getCondition (job , batchv1 .JobFailed ); failed {
679
+ if time .Now ().After (cond .LastTransitionTime .Time .Add (unpackRetryInterval )) {
680
+ fresh .SetName (names .SimpleNameGenerator .GenerateName (fresh .GetName ()))
681
+ job , err = c .client .BatchV1 ().Jobs (fresh .GetNamespace ()).Create (context .TODO (), fresh , metav1.CreateOptions {})
682
+ }
683
+ }
684
+
685
+ // cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking
686
+ for _ , j := range toDelete {
687
+ _ = c .client .BatchV1 ().Jobs (j .GetNamespace ()).Delete (context .TODO (), j .GetName (), metav1.DeleteOptions {})
688
+ }
689
+ return
690
+ }
691
+ }
654
692
655
693
if equality .Semantic .DeepDerivative (fresh .GetOwnerReferences (), job .GetOwnerReferences ()) && equality .Semantic .DeepDerivative (fresh .Spec , job .Spec ) {
656
694
return
@@ -791,6 +829,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con
791
829
return
792
830
}
793
831
832
+ func sortUnpackJobs (jobs []* batchv1.Job , maxRetainedJobs int ) (latest * batchv1.Job , toDelete []* batchv1.Job ) {
833
+ if len (jobs ) == 0 {
834
+ return
835
+ }
836
+ // sort jobs so that latest job is first
837
+ // with preference for non-failed jobs
838
+ sort .Slice (jobs , func (i , j int ) bool {
839
+ condI , failedI := getCondition (jobs [i ], batchv1 .JobFailed )
840
+ condJ , failedJ := getCondition (jobs [j ], batchv1 .JobFailed )
841
+ if failedI != failedJ {
842
+ return ! failedI // non-failed job goes first
843
+ }
844
+ return condI .LastTransitionTime .After (condJ .LastTransitionTime .Time )
845
+ })
846
+ latest = jobs [0 ]
847
+ if len (jobs ) <= maxRetainedJobs {
848
+ return
849
+ }
850
+ if maxRetainedJobs == 0 {
851
+ toDelete = jobs [1 :]
852
+ return
853
+ }
854
+
855
+ // cleanup old failed jobs, n-1 recent jobs and the oldest job
856
+ for i := 0 ; i < maxRetainedJobs && i + maxRetainedJobs < len (jobs ); i ++ {
857
+ toDelete = append (toDelete , jobs [maxRetainedJobs + i ])
858
+ }
859
+
860
+ return
861
+ }
862
+
794
863
// OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified.
795
864
// If the timeout annotation is not set, return timeout < 0 which is subsequently ignored.
796
865
// This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis.
@@ -817,3 +886,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL
817
886
818
887
return d , nil
819
888
}
889
+
890
+ // OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified.
891
+ // If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0,
892
+ // determines the minimum interval between recreating a failed unpack job.
893
+ func OperatorGroupBundleUnpackRetryInterval (ogLister v1listers.OperatorGroupNamespaceLister ) (time.Duration , error ) {
894
+ ogs , err := ogLister .List (k8slabels .Everything ())
895
+ if err != nil {
896
+ return 0 , err
897
+ }
898
+ if len (ogs ) != 1 {
899
+ return 0 , fmt .Errorf ("found %d operatorGroups, expected 1" , len (ogs ))
900
+ }
901
+
902
+ timeoutStr , ok := ogs [0 ].GetAnnotations ()[BundleUnpackRetryMinimumIntervalAnnotationKey ]
903
+ if ! ok {
904
+ return 0 , nil
905
+ }
906
+
907
+ d , err := time .ParseDuration (timeoutStr )
908
+ if err != nil {
909
+ return 0 , fmt .Errorf ("failed to parse unpack retry annotation(%s: %s): %w" , BundleUnpackRetryMinimumIntervalAnnotationKey , timeoutStr , err )
910
+ }
911
+
912
+ return d , nil
913
+ }
0 commit comments