diff --git a/pkg/controller/bundle/bundle_unpacker.go b/pkg/controller/bundle/bundle_unpacker.go index 79f67ab177..cfe46187bb 100644 --- a/pkg/controller/bundle/bundle_unpacker.go +++ b/pkg/controller/bundle/bundle_unpacker.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "fmt" + "sort" "strings" "time" @@ -18,6 +19,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8slabels "k8s.io/apimachinery/pkg/labels" + "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/kubernetes" listersbatchv1 "k8s.io/client-go/listers/batch/v1" listerscorev1 "k8s.io/client-go/listers/core/v1" @@ -41,6 +43,13 @@ const ( // e.g 1m30s BundleUnpackTimeoutAnnotationKey = "operatorframework.io/bundle-unpack-timeout" BundleUnpackPodLabel = "job-name" + + // BundleUnpackRetryMinimumIntervalAnnotationKey sets a minimum interval to wait before + // attempting to recreate a failed unpack job for a bundle. + BundleUnpackRetryMinimumIntervalAnnotationKey = "operatorframework.io/bundle-unpack-min-retry-interval" + + // bundleUnpackRefLabel is used to filter for all unpack jobs for a specific bundle. + bundleUnpackRefLabel = "operatorframework.io/bundle-unpack-ref" ) type BundleUnpackResult struct { @@ -89,6 +98,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ install.OLMManagedLabelKey: install.OLMManagedLabelValue, + bundleUnpackRefLabel: cmRef.Name, }, }, Spec: batchv1.JobSpec{ @@ -287,7 +297,7 @@ func (c *ConfigMapUnpacker) job(cmRef *corev1.ObjectReference, bundlePath string //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Unpacker type Unpacker interface { - UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) + UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) } type ConfigMapUnpacker struct { @@ -448,7 +458,7 @@ const ( NotUnpackedMessage = "bundle contents have not yet been persisted to installplan status" ) -func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout time.Duration) (result *BundleUnpackResult, err error) { +func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, timeout, retryInterval time.Duration) (result *BundleUnpackResult, err error) { result = newBundleUnpackResult(lookup) // if bundle lookup failed condition already present, then there is nothing more to do @@ -510,7 +520,7 @@ func (c *ConfigMapUnpacker) UnpackBundle(lookup *operatorsv1alpha1.BundleLookup, secrets = append(secrets, corev1.LocalObjectReference{Name: secretName}) } var job *batchv1.Job - job, err = c.ensureJob(cmRef, result.Path, secrets, timeout) + job, err = c.ensureJob(cmRef, result.Path, secrets, timeout, retryInterval) if err != nil || job == nil { // ensureJob can return nil if the job present does not match the expected job (spec and ownerefs) // The current job is deleted in that case so UnpackBundle needs to be retried @@ -649,16 +659,39 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name return } -func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration) (job *batchv1.Job, err error) { +func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) { fresh := c.job(cmRef, bundlePath, secrets, timeout) - job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName()) + var jobs, toDelete []*batchv1.Job + jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name}) if err != nil { - if apierrors.IsNotFound(err) { - job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) - } - return } + if len(jobs) == 0 { + job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) + return + } + + maxRetainedJobs := 5 // TODO: make this configurable + job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt + + // only check for retries if an unpackRetryInterval is specified + if unpackRetryInterval > 0 { + if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed { + // Look for other unpack jobs for the same bundle + if cond, failed := getCondition(job, batchv1.JobFailed); failed { + if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) { + fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName())) + job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{}) + } + } + + // cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking + for _, j := range toDelete { + _ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{}) + } + return + } + } if equality.Semantic.DeepDerivative(fresh.GetOwnerReferences(), job.GetOwnerReferences()) && equality.Semantic.DeepDerivative(fresh.Spec, job.Spec) { return @@ -801,6 +834,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con return } +func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) { + if len(jobs) == 0 { + return + } + // sort jobs so that latest job is first + // with preference for non-failed jobs + sort.Slice(jobs, func(i, j int) bool { + condI, failedI := getCondition(jobs[i], batchv1.JobFailed) + condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed) + if failedI != failedJ { + return !failedI // non-failed job goes first + } + return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time) + }) + latest = jobs[0] + if len(jobs) <= maxRetainedJobs { + return + } + if maxRetainedJobs == 0 { + toDelete = jobs[1:] + return + } + + // cleanup old failed jobs, n-1 recent jobs and the oldest job + for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ { + toDelete = append(toDelete, jobs[maxRetainedJobs+i]) + } + + return +} + // OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified. // If the timeout annotation is not set, return timeout < 0 which is subsequently ignored. // This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis. @@ -827,3 +891,28 @@ func OperatorGroupBundleUnpackTimeout(ogLister v1listers.OperatorGroupNamespaceL return d, nil } + +// OperatorGroupBundleUnpackRetryInterval returns bundle unpack retry interval from annotation if specified. +// If the retry annotation is not set, return retry = 0 which is subsequently ignored. This interval, if > 0, +// determines the minimum interval between recreating a failed unpack job. +func OperatorGroupBundleUnpackRetryInterval(ogLister v1listers.OperatorGroupNamespaceLister) (time.Duration, error) { + ogs, err := ogLister.List(k8slabels.Everything()) + if err != nil { + return 0, err + } + if len(ogs) != 1 { + return 0, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs)) + } + + timeoutStr, ok := ogs[0].GetAnnotations()[BundleUnpackRetryMinimumIntervalAnnotationKey] + if !ok { + return 0, nil + } + + d, err := time.ParseDuration(timeoutStr) + if err != nil { + return 0, fmt.Errorf("failed to parse unpack retry annotation(%s: %s): %w", BundleUnpackRetryMinimumIntervalAnnotationKey, timeoutStr, err) + } + + return d, nil +} diff --git a/pkg/controller/bundle/bundle_unpacker_test.go b/pkg/controller/bundle/bundle_unpacker_test.go index 2bb0d47025..a583eb5412 100644 --- a/pkg/controller/bundle/bundle_unpacker_test.go +++ b/pkg/controller/bundle/bundle_unpacker_test.go @@ -208,7 +208,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: pathHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -441,7 +441,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: digestHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -712,7 +712,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: digestHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -978,7 +978,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: pathHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -1213,7 +1213,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: pathHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -1459,7 +1459,7 @@ func TestConfigMapUnpacker(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: pathHash, Namespace: "ns-a", - Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash}, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", @@ -1673,7 +1673,7 @@ func TestConfigMapUnpacker(t *testing.T) { ) require.NoError(t, err) - res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout) + res, err := unpacker.UnpackBundle(tt.args.lookup, tt.args.annotationTimeout, 0) require.Equal(t, tt.expected.err, err) if tt.expected.res == nil { @@ -1839,3 +1839,213 @@ func TestOperatorGroupBundleUnpackTimeout(t *testing.T) { }) } } + +func TestOperatorGroupBundleUnpackRetryInterval(t *testing.T) { + nsName := "fake-ns" + + for _, tc := range []struct { + name string + operatorGroups []*operatorsv1.OperatorGroup + expectedTimeout time.Duration + expectedError error + }{ + { + name: "No operator groups exist", + expectedTimeout: 0, + expectedError: errors.New("found 0 operatorGroups, expected 1"), + }, + { + name: "Multiple operator groups exist", + operatorGroups: []*operatorsv1.OperatorGroup{ + { + TypeMeta: metav1.TypeMeta{ + Kind: operatorsv1.OperatorGroupKind, + APIVersion: operatorsv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "og1", + Namespace: nsName, + }, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: operatorsv1.OperatorGroupKind, + APIVersion: operatorsv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "og2", + Namespace: nsName, + }, + }, + }, + expectedTimeout: 0, + expectedError: errors.New("found 2 operatorGroups, expected 1"), + }, + { + name: "One operator group exists with valid unpack retry annotation", + operatorGroups: []*operatorsv1.OperatorGroup{ + { + TypeMeta: metav1.TypeMeta{ + Kind: operatorsv1.OperatorGroupKind, + APIVersion: operatorsv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "og", + Namespace: nsName, + Annotations: map[string]string{BundleUnpackRetryMinimumIntervalAnnotationKey: "1m"}, + }, + }, + }, + expectedTimeout: 1 * time.Minute, + expectedError: nil, + }, + { + name: "One operator group exists with no unpack retry annotation", + operatorGroups: []*operatorsv1.OperatorGroup{ + { + TypeMeta: metav1.TypeMeta{ + Kind: operatorsv1.OperatorGroupKind, + APIVersion: operatorsv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "og", + Namespace: nsName, + }, + }, + }, + expectedTimeout: 0, + expectedError: nil, + }, + { + name: "One operator group exists with invalid unpack retry annotation", + operatorGroups: []*operatorsv1.OperatorGroup{ + { + TypeMeta: metav1.TypeMeta{ + Kind: operatorsv1.OperatorGroupKind, + APIVersion: operatorsv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "og", + Namespace: nsName, + Annotations: map[string]string{BundleUnpackRetryMinimumIntervalAnnotationKey: "invalid"}, + }, + }, + }, + expectedTimeout: 0, + expectedError: fmt.Errorf("failed to parse unpack retry annotation(operatorframework.io/bundle-unpack-min-retry-interval: invalid): %w", errors.New("time: invalid duration \"invalid\"")), + }, + } { + t.Run(tc.name, func(t *testing.T) { + ogIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + ogLister := v1listers.NewOperatorGroupLister(ogIndexer).OperatorGroups(nsName) + + for _, og := range tc.operatorGroups { + err := ogIndexer.Add(og) + assert.NoError(t, err) + } + + timeout, err := OperatorGroupBundleUnpackRetryInterval(ogLister) + + assert.Equal(t, tc.expectedTimeout, timeout) + assert.Equal(t, tc.expectedError, err) + }) + } +} + +func TestSortUnpackJobs(t *testing.T) { + // if there is a non-failed job, it should be first + // otherwise, the latest job should be first + //first n-1 jobs and oldest job are preserved + testJob := func(name string, failed bool, ts int64) *batchv1.Job { + conditions := []batchv1.JobCondition{} + if failed { + conditions = append(conditions, batchv1.JobCondition{ + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: time.Unix(ts, 0)}, + }) + } + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: "test"}, + }, + Status: batchv1.JobStatus{ + Conditions: conditions, + }, + } + } + failedJobs := []*batchv1.Job{ + testJob("f-1", true, 1), + testJob("f-2", true, 2), + testJob("f-3", true, 3), + testJob("f-4", true, 4), + testJob("f-5", true, 5), + } + nonFailedJob := testJob("s-1", false, 1) + for _, tc := range []struct { + name string + jobs []*batchv1.Job + maxRetained int + expectedLatest *batchv1.Job + expectedToDelete []*batchv1.Job + }{ + { + name: "no job history", + maxRetained: 0, + jobs: []*batchv1.Job{ + failedJobs[1], + failedJobs[2], + failedJobs[0], + }, + expectedLatest: failedJobs[2], + expectedToDelete: []*batchv1.Job{ + failedJobs[1], + failedJobs[0], + }, + }, { + name: "empty job list", + maxRetained: 1, + }, { + name: "retain oldest", + maxRetained: 1, + jobs: []*batchv1.Job{ + failedJobs[2], + failedJobs[0], + failedJobs[1], + }, + expectedToDelete: []*batchv1.Job{ + failedJobs[1], + }, + expectedLatest: failedJobs[2], + }, { + name: "multiple old jobs", + maxRetained: 2, + jobs: []*batchv1.Job{ + failedJobs[1], + failedJobs[0], + failedJobs[2], + failedJobs[3], + failedJobs[4], + }, + expectedLatest: failedJobs[4], + expectedToDelete: []*batchv1.Job{ + failedJobs[1], + failedJobs[2], + }, + }, { + name: "select non-failed as latest", + maxRetained: 3, + jobs: []*batchv1.Job{ + failedJobs[0], + failedJobs[1], + nonFailedJob, + }, + expectedLatest: nonFailedJob, + }, + } { + latest, toDelete := sortUnpackJobs(tc.jobs, tc.maxRetained) + assert.Equal(t, tc.expectedLatest, latest) + assert.ElementsMatch(t, tc.expectedToDelete, toDelete) + } +} diff --git a/pkg/controller/bundle/bundlefakes/fake_unpacker.go b/pkg/controller/bundle/bundlefakes/fake_unpacker.go index 56b666c542..791df3ea61 100644 --- a/pkg/controller/bundle/bundlefakes/fake_unpacker.go +++ b/pkg/controller/bundle/bundlefakes/fake_unpacker.go @@ -10,11 +10,12 @@ import ( ) type FakeUnpacker struct { - UnpackBundleStub func(*v1alpha1.BundleLookup, time.Duration) (*bundle.BundleUnpackResult, error) + UnpackBundleStub func(*v1alpha1.BundleLookup, time.Duration, time.Duration) (*bundle.BundleUnpackResult, error) unpackBundleMutex sync.RWMutex unpackBundleArgsForCall []struct { arg1 *v1alpha1.BundleLookup arg2 time.Duration + arg3 time.Duration } unpackBundleReturns struct { result1 *bundle.BundleUnpackResult @@ -28,17 +29,18 @@ type FakeUnpacker struct { invocationsMutex sync.RWMutex } -func (fake *FakeUnpacker) UnpackBundle(arg1 *v1alpha1.BundleLookup, arg2 time.Duration) (*bundle.BundleUnpackResult, error) { +func (fake *FakeUnpacker) UnpackBundle(arg1 *v1alpha1.BundleLookup, arg2 time.Duration, arg3 time.Duration) (*bundle.BundleUnpackResult, error) { fake.unpackBundleMutex.Lock() ret, specificReturn := fake.unpackBundleReturnsOnCall[len(fake.unpackBundleArgsForCall)] fake.unpackBundleArgsForCall = append(fake.unpackBundleArgsForCall, struct { arg1 *v1alpha1.BundleLookup arg2 time.Duration - }{arg1, arg2}) - fake.recordInvocation("UnpackBundle", []interface{}{arg1, arg2}) + arg3 time.Duration + }{arg1, arg2, arg3}) + fake.recordInvocation("UnpackBundle", []interface{}{arg1, arg2, arg3}) fake.unpackBundleMutex.Unlock() if fake.UnpackBundleStub != nil { - return fake.UnpackBundleStub(arg1, arg2) + return fake.UnpackBundleStub(arg1, arg2, arg3) } if specificReturn { return ret.result1, ret.result2 @@ -53,17 +55,17 @@ func (fake *FakeUnpacker) UnpackBundleCallCount() int { return len(fake.unpackBundleArgsForCall) } -func (fake *FakeUnpacker) UnpackBundleCalls(stub func(*v1alpha1.BundleLookup, time.Duration) (*bundle.BundleUnpackResult, error)) { +func (fake *FakeUnpacker) UnpackBundleCalls(stub func(*v1alpha1.BundleLookup, time.Duration, time.Duration) (*bundle.BundleUnpackResult, error)) { fake.unpackBundleMutex.Lock() defer fake.unpackBundleMutex.Unlock() fake.UnpackBundleStub = stub } -func (fake *FakeUnpacker) UnpackBundleArgsForCall(i int) (*v1alpha1.BundleLookup, time.Duration) { +func (fake *FakeUnpacker) UnpackBundleArgsForCall(i int) (*v1alpha1.BundleLookup, time.Duration, time.Duration) { fake.unpackBundleMutex.RLock() defer fake.unpackBundleMutex.RUnlock() argsForCall := fake.unpackBundleArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeUnpacker) UnpackBundleReturns(result1 *bundle.BundleUnpackResult, result2 error) { diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 615f2267e3..34fd983886 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -1112,6 +1112,11 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { return err } + minUnpackRetryInterval, err := bundle.OperatorGroupBundleUnpackRetryInterval(ogLister) + if err != nil { + return err + } + // TODO: parallel maxGeneration := 0 subscriptionUpdated := false @@ -1207,7 +1212,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { logger.Debug("unpacking bundles") var unpacked bool - unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout) + unpacked, steps, bundleLookups, err = o.unpackBundles(namespace, steps, bundleLookups, unpackTimeout, minUnpackRetryInterval) if err != nil { // If the error was fatal capture and fail if olmerrors.IsFatal(err) { @@ -1664,7 +1669,7 @@ type UnpackedBundleReference struct { Properties string `json:"properties"` } -func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) { +func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, unpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) { unpacked := true outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups)) @@ -1679,7 +1684,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1. var errs []error for i := 0; i < len(outBundleLookups); i++ { lookup := outBundleLookups[i] - res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout) + res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, unpackRetryInterval) if err != nil { errs = append(errs, err) continue diff --git a/pkg/controller/operators/catalog/subscriptions_test.go b/pkg/controller/operators/catalog/subscriptions_test.go index b84d24c9e1..94fefe61ed 100644 --- a/pkg/controller/operators/catalog/subscriptions_test.go +++ b/pkg/controller/operators/catalog/subscriptions_test.go @@ -1172,7 +1172,7 @@ func TestSyncSubscriptions(t *testing.T) { defer cancel() fakeBundleUnpacker := &bundlefakes.FakeUnpacker{ - UnpackBundleStub: func(lookup *v1alpha1.BundleLookup, timeout time.Duration) (*bundle.BundleUnpackResult, error) { + UnpackBundleStub: func(lookup *v1alpha1.BundleLookup, timeout, retryInterval time.Duration) (*bundle.BundleUnpackResult, error) { return &bundle.BundleUnpackResult{BundleLookup: lookup.DeepCopy()}, tt.fields.unpackBundleErr }, } diff --git a/test/e2e/fbc_provider.go b/test/e2e/fbc_provider.go index 1787187445..362ef19c36 100644 --- a/test/e2e/fbc_provider.go +++ b/test/e2e/fbc_provider.go @@ -31,3 +31,9 @@ func NewFileBasedFiledBasedCatalogProvider(path string) (FileBasedCatalogProvide func (f *fileBasedFileBasedCatalogProvider) GetCatalog() string { return f.fbc } + +func NewRawFileBasedCatalogProvider(data string) (FileBasedCatalogProvider, error) { + return &fileBasedFileBasedCatalogProvider{ + fbc: string(data), + }, nil +} diff --git a/test/e2e/subscription_e2e_test.go b/test/e2e/subscription_e2e_test.go index a3421b12dd..fdad3895b7 100644 --- a/test/e2e/subscription_e2e_test.go +++ b/test/e2e/subscription_e2e_test.go @@ -2524,6 +2524,202 @@ var _ = Describe("Subscription", func() { }) }) }) + When("bundle unpack retries are enabled", func() { + It("should retry failing unpack jobs", func() { + if ok, err := inKind(c); ok && err == nil { + Skip("This spec fails when run using KIND cluster. See https://github.com/operator-framework/operator-lifecycle-manager/issues/2420 for more details") + } else if err != nil { + Skip("Could not determine whether running in a kind cluster. Skipping.") + } + By("Ensuring a registry to host bundle images") + local, err := Local(c) + Expect(err).NotTo(HaveOccurred(), "cannot determine if test running locally or on CI: %s", err) + + var registryURL string + var copyImage func(dst, dstTag, src, srcTag string) error + if local { + registryURL, err = createDockerRegistry(c, generatedNamespace.GetName()) + Expect(err).NotTo(HaveOccurred(), "error creating container registry: %s", err) + defer deleteDockerRegistry(c, generatedNamespace.GetName()) + + // ensure registry pod is ready before attempting port-forwarding + _ = awaitPod(GinkgoT(), c, generatedNamespace.GetName(), registryName, podReady) + + err = registryPortForward(generatedNamespace.GetName()) + Expect(err).NotTo(HaveOccurred(), "port-forwarding local registry: %s", err) + copyImage = func(dst, dstTag, src, srcTag string) error { + if !strings.HasPrefix(src, "docker://") { + src = fmt.Sprintf("docker://%s", src) + } + if !strings.HasPrefix(dst, "docker://") { + dst = fmt.Sprintf("docker://%s", dst) + } + _, err := skopeoLocalCopy(dst, dstTag, src, srcTag) + return err + } + } else { + registryURL = fmt.Sprintf("%s/%s", openshiftregistryFQDN, generatedNamespace.GetName()) + registryAuth, err := openshiftRegistryAuth(c, generatedNamespace.GetName()) + Expect(err).NotTo(HaveOccurred(), "error getting openshift registry authentication: %s", err) + copyImage = func(dst, dstTag, src, srcTag string) error { + if !strings.HasPrefix(src, "docker://") { + src = fmt.Sprintf("docker://%s", src) + } + if !strings.HasPrefix(dst, "docker://") { + dst = fmt.Sprintf("docker://%s", dst) + } + skopeoArgs := skopeoCopyCmd(dst, dstTag, src, srcTag, registryAuth) + err = createSkopeoPod(c, skopeoArgs, generatedNamespace.GetName()) + if err != nil { + return fmt.Errorf("error creating skopeo pod: %v", err) + } + + // wait for skopeo pod to exit successfully + awaitPod(GinkgoT(), c, generatedNamespace.GetName(), skopeo, func(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodSucceeded + }) + + if err := deleteSkopeoPod(c, generatedNamespace.GetName()); err != nil { + return fmt.Errorf("error deleting skopeo pod: %s", err) + } + return nil + } + } + + // The remote image to be copied onto the local registry + srcImage := "quay.io/olmtest/example-operator-bundle:" + srcTag := "0.1.0" + + // on-cluster image ref + bundleImage := registryURL + "/unpack-retry-bundle:" + bundleTag := genName("x") + + unpackRetryCatalog := fmt.Sprintf(` +schema: olm.package +name: unpack-retry-package +defaultChannel: stable +--- +schema: olm.channel +package: unpack-retry-package +name: stable +entries: + - name: example-operator.v0.1.0 +--- +schema: olm.bundle +name: example-operator.v0.1.0 +package: unpack-retry-package +image: %s%s +properties: + - type: olm.package + value: + packageName: unpack-retry-package + version: 1.0.0 +`, bundleImage, bundleTag) + + By("creating a catalog referencing a non-existent bundle image") + unpackRetryProvider, err := NewRawFileBasedCatalogProvider(unpackRetryCatalog) + Expect(err).ToNot(HaveOccurred()) + catalogSourceName := fmt.Sprintf("%s-catsrc", generatedNamespace.GetName()) + magicCatalog := NewMagicCatalog(ctx.Ctx().Client(), generatedNamespace.GetName(), catalogSourceName, unpackRetryProvider) + Expect(magicCatalog.DeployCatalog(context.Background())).To(BeNil()) + + By("patching the OperatorGroup to reduce the bundle unpacking timeout") + ogNN := types.NamespacedName{Name: operatorGroup.GetName(), Namespace: generatedNamespace.GetName()} + addBundleUnpackTimeoutOGAnnotation(context.Background(), ctx.Ctx().Client(), ogNN, "1s") + + By("creating a subscription for the missing bundle") + unpackRetrySubName := fmt.Sprintf("%s-unpack-retry-package-sub", generatedNamespace.GetName()) + createSubscriptionForCatalog(crc, generatedNamespace.GetName(), unpackRetrySubName, catalogSourceName, "unpack-retry-package", stableChannel, "", operatorsv1alpha1.ApprovalAutomatic) + + By("waiting for bundle unpack to fail") + Eventually( + func() error { + fetched, err := crc.OperatorsV1alpha1().Subscriptions(generatedNamespace.GetName()).Get(context.Background(), unpackRetrySubName, metav1.GetOptions{}) + if err != nil { + return err + } + if cond := fetched.Status.GetCondition(v1alpha1.SubscriptionBundleUnpackFailed); cond.Status != corev1.ConditionTrue || cond.Reason != "BundleUnpackFailed" { + return fmt.Errorf("%s condition not found", v1alpha1.SubscriptionBundleUnpackFailed) + } + return nil + }, + 5*time.Minute, + interval, + ).Should(Succeed()) + + By("pushing missing bundle image") + Expect(copyImage(bundleImage, bundleTag, srcImage, srcTag)).To(Succeed()) + + By("patching the OperatorGroup to increase the bundle unpacking timeout") + addBundleUnpackTimeoutOGAnnotation(context.Background(), ctx.Ctx().Client(), ogNN, "") // revert to default unpack timeout + + By("patching operator group to enable unpack retries") + setBundleUnpackRetryMinimumIntervalAnnotation(context.Background(), ctx.Ctx().Client(), ogNN, "1s") + + By("waiting until the subscription has an IP reference") + subscription, err := fetchSubscription(crc, generatedNamespace.GetName(), unpackRetrySubName, subscriptionHasInstallPlanChecker()) + Expect(err).Should(BeNil()) + + By("waiting for the v0.1.0 CSV to report a succeeded phase") + _, err = fetchCSV(crc, subscription.Status.CurrentCSV, generatedNamespace.GetName(), buildCSVConditionChecker(operatorsv1alpha1.CSVPhaseSucceeded)) + Expect(err).ShouldNot(HaveOccurred()) + + By("checking if old unpack conditions on subscription are removed") + Eventually(func() error { + fetched, err := crc.OperatorsV1alpha1().Subscriptions(generatedNamespace.GetName()).Get(context.Background(), unpackRetrySubName, metav1.GetOptions{}) + if err != nil { + return err + } + if cond := fetched.Status.GetCondition(v1alpha1.SubscriptionBundleUnpacking); cond.Status != corev1.ConditionFalse { + return fmt.Errorf("subscription condition %s has unexpected value %s, expected %s", v1alpha1.SubscriptionBundleUnpacking, cond.Status, corev1.ConditionFalse) + } + if cond := fetched.Status.GetCondition(v1alpha1.SubscriptionBundleUnpackFailed); cond.Status != corev1.ConditionUnknown { + return fmt.Errorf("unexpected condition %s on subscription", v1alpha1.SubscriptionBundleUnpackFailed) + } + return nil + }).Should(Succeed()) + }) + + It("should not retry successful unpack jobs", func() { + By("deploying the testing catalog") + provider, err := NewFileBasedFiledBasedCatalogProvider(filepath.Join(testdataDir, failForwardTestDataBaseDir, "example-operator.v0.1.0.yaml")) + Expect(err).To(BeNil()) + catalogSourceName := fmt.Sprintf("%s-catsrc", generatedNamespace.GetName()) + magicCatalog := NewMagicCatalog(ctx.Ctx().Client(), generatedNamespace.GetName(), catalogSourceName, provider) + Expect(magicCatalog.DeployCatalog(context.Background())).To(BeNil()) + + By("creating the testing subscription") + subName := fmt.Sprintf("%s-packagea-sub", generatedNamespace.GetName()) + createSubscriptionForCatalog(crc, generatedNamespace.GetName(), subName, catalogSourceName, "packageA", stableChannel, "", operatorsv1alpha1.ApprovalAutomatic) + + By("waiting until the subscription has an IP reference") + subscription, err := fetchSubscription(crc, generatedNamespace.GetName(), subName, subscriptionHasInstallPlanChecker()) + Expect(err).Should(BeNil()) + + By("waiting for the v0.1.0 CSV to report a succeeded phase") + _, err = fetchCSV(crc, subscription.Status.CurrentCSV, generatedNamespace.GetName(), buildCSVConditionChecker(operatorsv1alpha1.CSVPhaseSucceeded)) + Expect(err).ShouldNot(HaveOccurred()) + + By("patching operator group to enable unpack retries") + ogNN := types.NamespacedName{Name: operatorGroup.GetName(), Namespace: generatedNamespace.GetName()} + setBundleUnpackRetryMinimumIntervalAnnotation(context.Background(), ctx.Ctx().Client(), ogNN, "1s") + + By("Ensuring successful bundle unpack jobs are not retried") + Consistently(func() error { + fetched, err := crc.OperatorsV1alpha1().Subscriptions(generatedNamespace.GetName()).Get(context.Background(), subName, metav1.GetOptions{}) + if err != nil { + return err + } + if cond := fetched.Status.GetCondition(v1alpha1.SubscriptionBundleUnpacking); cond.Status == corev1.ConditionTrue { + return fmt.Errorf("unexpected condition status for %s on subscription %s", v1alpha1.SubscriptionBundleUnpacking, subName) + } + if cond := fetched.Status.GetCondition(v1alpha1.SubscriptionBundleUnpackFailed); cond.Status == corev1.ConditionTrue { + return fmt.Errorf("unexpected condition status for %s on subscription %s", v1alpha1.SubscriptionBundleUnpackFailed, subName) + } + return nil + }).Should(Succeed()) + }) + }) }) const ( diff --git a/test/e2e/util.go b/test/e2e/util.go index 04878b321b..7615e11d39 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -94,13 +94,25 @@ func objectRefToNamespacedName(ip *corev1.ObjectReference) types.NamespacedName // adding the "operatorframework.io/bundle-unpack-timeout" annotation to an OperatorGroup // resource. func addBundleUnpackTimeoutOGAnnotation(ctx context.Context, c k8scontrollerclient.Client, ogNN types.NamespacedName, timeout string) { + setOGAnnotation(ctx, c, ogNN, bundle.BundleUnpackTimeoutAnnotationKey, timeout) +} + +func setBundleUnpackRetryMinimumIntervalAnnotation(ctx context.Context, c k8scontrollerclient.Client, ogNN types.NamespacedName, interval string) { + setOGAnnotation(ctx, c, ogNN, bundle.BundleUnpackRetryMinimumIntervalAnnotationKey, interval) +} + +func setOGAnnotation(ctx context.Context, c k8scontrollerclient.Client, ogNN types.NamespacedName, key, value string) { Eventually(func() error { og := &operatorsv1.OperatorGroup{} if err := c.Get(ctx, ogNN, og); err != nil { return err } annotations := og.GetAnnotations() - annotations[bundle.BundleUnpackTimeoutAnnotationKey] = timeout + if len(value) == 0 { + delete(annotations, key) + } else { + annotations[key] = value + } og.SetAnnotations(annotations) return c.Update(ctx, og)