Skip to content

Commit 462ce61

Browse files
authored
Identify fail forward in csvSources (#2743)
This commit undos changes to the Resolve and ResolveSteps methods and updats the csvSourceProvider to infer whether or not fail forward is enabled in a namespace. Signed-off-by: Alexander Greene <[email protected]>
1 parent 6048250 commit 462ce61

13 files changed

+204
-250
lines changed

pkg/controller/operators/catalog/operator.go

+2-17
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"k8s.io/client-go/util/workqueue"
4343

4444
"github.com/operator-framework/api/pkg/operators/reference"
45-
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
4645
"github.com/operator-framework/api/pkg/operators/v1alpha1"
4746
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
4847
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
@@ -903,20 +902,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
903902
return
904903
}
905904

906-
func (o *Operator) isFailForwardEnabled(namespace string) (bool, error) {
907-
ogs, err := o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace).List(labels.Everything())
908-
if err != nil {
909-
o.logger.Debugf("failed to list operatorgroups in the %s namespace: %v", namespace, err)
910-
// Couldn't list operatorGroups, assuming default upgradeStrategy
911-
// so existing behavior is observed for failed CSVs.
912-
return false, nil
913-
}
914-
if len(ogs) != 1 {
915-
return false, fmt.Errorf("found %d operatorGroups in namespace %s, expected 1", len(ogs), namespace)
916-
}
917-
return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil
918-
}
919-
920905
func (o *Operator) syncResolvingNamespace(obj interface{}) error {
921906
ns, ok := obj.(*corev1.Namespace)
922907
if !ok {
@@ -943,7 +928,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
943928
return err
944929
}
945930

946-
failForwardEnabled, err := o.isFailForwardEnabled(namespace)
931+
failForwardEnabled, err := resolver.IsFailForwardEnabled(o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace))
947932
if err != nil {
948933
return err
949934
}
@@ -998,7 +983,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
998983
logger.Debug("resolving subscriptions in namespace")
999984

1000985
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
1001-
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, failForwardEnabled)
986+
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace)
1002987
if err != nil {
1003988
go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error())
1004989
// If the error is constraints not satisfiable, then simply project the

pkg/controller/operators/catalog/operator_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1254,7 +1254,7 @@ func TestSyncResolvingNamespace(t *testing.T) {
12541254

12551255
o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time)
12561256
o.resolver = &fakes.FakeStepResolver{
1257-
ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
1257+
ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
12581258
return nil, nil, nil, tt.fields.resolveErr
12591259
},
12601260
}

pkg/controller/operators/catalog/subscriptions_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ func TestSyncSubscriptions(t *testing.T) {
10311031

10321032
o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time)
10331033
o.resolver = &fakes.FakeStepResolver{
1034-
ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
1034+
ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
10351035
return tt.fields.resolveSteps, tt.fields.bundleLookups, tt.fields.resolveSubs, tt.fields.resolveErr
10361036
},
10371037
}
@@ -1168,7 +1168,7 @@ func BenchmarkSyncResolvingNamespace(b *testing.B) {
11681168
},
11691169
},
11701170
resolver: &fakes.FakeStepResolver{
1171-
ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
1171+
ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
11721172
steps := []*v1alpha1.Step{
11731173
{
11741174
Resolving: "csv.v.2",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package resolver
2+
3+
import (
4+
"fmt"
5+
6+
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
7+
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
8+
v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
)
11+
12+
// IsFailForwardEnabled takes a namespaced operatorGroup lister and returns
13+
// True if an operatorGroup exists in the namespace and its upgradeStrategy
14+
// is set to UnsafeFailForward and false otherwise. An error is returned if
15+
// an more than one operatorGroup exists in the namespace.
16+
// No error is returned if no OperatorGroups are found to keep the resolver
17+
// backwards compatible.
18+
func IsFailForwardEnabled(ogLister v1listers.OperatorGroupNamespaceLister) (bool, error) {
19+
ogs, err := ogLister.List(labels.Everything())
20+
if err != nil || len(ogs) == 0 {
21+
return false, nil
22+
}
23+
if len(ogs) != 1 {
24+
return false, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs))
25+
}
26+
return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil
27+
}
28+
29+
type walkOption func(csv *operatorsv1alpha1.ClusterServiceVersion) error
30+
31+
// WithCSVPhase returns an error if the CSV is not in the given phase.
32+
func WithCSVPhase(phase operatorsv1alpha1.ClusterServiceVersionPhase) walkOption {
33+
return func(csv *operatorsv1alpha1.ClusterServiceVersion) error {
34+
if csv == nil || csv.Status.Phase != phase {
35+
return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase)
36+
}
37+
return nil
38+
}
39+
}
40+
41+
// WithUniqueCSVs returns an error if the CSV has been seen before.
42+
func WithUniqueCSVs() walkOption {
43+
visited := map[string]struct{}{}
44+
return func(csv *operatorsv1alpha1.ClusterServiceVersion) error {
45+
// Check if we have visited the CSV before
46+
if _, ok := visited[csv.GetName()]; ok {
47+
return fmt.Errorf("csv %s/%s has already been seen", csv.GetNamespace(), csv.GetName())
48+
}
49+
50+
visited[csv.GetName()] = struct{}{}
51+
return nil
52+
}
53+
}
54+
55+
// WalkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns
56+
// the last clusterServiceVersions in the replacement chain. An error is returned if any of the
57+
// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement
58+
// chain is detected.
59+
func WalkReplacementChain(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion, options ...walkOption) (*operatorsv1alpha1.ClusterServiceVersion, error) {
60+
if csv == nil {
61+
return nil, fmt.Errorf("csv cannot be nil")
62+
}
63+
64+
for {
65+
// Check if there is a CSV that replaces this CSVs
66+
next, ok := csvToReplacement[csv.GetName()]
67+
if !ok {
68+
break
69+
}
70+
71+
// Check walk options
72+
for _, o := range options {
73+
if err := o(csv); err != nil {
74+
return nil, err
75+
}
76+
}
77+
78+
// Move along replacement chain.
79+
csv = next
80+
}
81+
return csv, nil
82+
}
83+
84+
// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other
85+
// CSVs are in the replacing phase.
86+
func isReplacementChainThatEndsInFailure(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion) (bool, error) {
87+
lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(operatorsv1alpha1.CSVPhaseReplacing), WithUniqueCSVs())
88+
if err != nil {
89+
return false, err
90+
}
91+
return (lastCSV != nil && lastCSV.Status.Phase == operatorsv1alpha1.CSVPhaseFailed), nil
92+
}
93+
94+
// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it.
95+
func ReplacementMapping(csvs []*operatorsv1alpha1.ClusterServiceVersion) map[string]*operatorsv1alpha1.ClusterServiceVersion {
96+
replacementMapping := map[string]*operatorsv1alpha1.ClusterServiceVersion{}
97+
for _, csv := range csvs {
98+
if csv.Spec.Replaces != "" {
99+
replacementMapping[csv.Spec.Replaces] = csv
100+
}
101+
}
102+
return replacementMapping
103+
}

pkg/controller/registry/resolver/instrumented_resolver.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ func NewInstrumentedResolver(resolver StepResolver, successMetricsEmitter, failu
2222
}
2323
}
2424

25-
func (ir *InstrumentedResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
25+
func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
2626
start := time.Now()
27-
steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace, failForwardEnabled)
27+
steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace)
2828
if err != nil {
2929
ir.failureMetricsEmitter(time.Since(start))
3030
} else {

pkg/controller/registry/resolver/instrumented_resolver_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ const (
1717
type fakeResolverWithError struct{}
1818
type fakeResolverWithoutError struct{}
1919

20-
func (r *fakeResolverWithError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
20+
func (r *fakeResolverWithError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
2121
return nil, nil, nil, errors.New("Fake error")
2222
}
2323

24-
func (r *fakeResolverWithoutError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
24+
func (r *fakeResolverWithoutError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
2525
return nil, nil, nil, nil
2626
}
2727

@@ -45,7 +45,7 @@ func TestInstrumentedResolverFailure(t *testing.T) {
4545
}
4646

4747
instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithError(), changeToSuccess, changeToFailure)
48-
instrumentedResolver.ResolveSteps("", false)
48+
instrumentedResolver.ResolveSteps("")
4949
require.Equal(t, len(result), 1) // check that only one call was made to a change function
5050
require.Equal(t, result[0], failure) // check that the call was made to changeToFailure function
5151
}
@@ -62,7 +62,7 @@ func TestInstrumentedResolverSuccess(t *testing.T) {
6262
}
6363

6464
instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithoutError(), changeToSuccess, changeToFailure)
65-
instrumentedResolver.ResolveSteps("", false)
65+
instrumentedResolver.ResolveSteps("")
6666
require.Equal(t, len(result), 1) // check that only one call was made to a change function
6767
require.Equal(t, result[0], success) // check that the call was made to changeToSuccess function
6868
}

pkg/controller/registry/resolver/resolver.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (w *debugWriter) Write(b []byte) (int, error) {
5555
return n, nil
5656
}
5757

58-
func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, existingEntryPredicates ...cache.Predicate) ([]*cache.Entry, error) {
58+
func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription) ([]*cache.Entry, error) {
5959
var errs []error
6060

6161
variables := make(map[solver.Identifier]solver.Variable)
@@ -72,8 +72,7 @@ func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, e
7272
}
7373

7474
preferredNamespace := namespaces[0]
75-
existingEntryPredicates = append(existingEntryPredicates, cache.True())
76-
_, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(existingEntryPredicates...), namespacedCache, visited)
75+
_, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(cache.True()), namespacedCache, visited)
7776
if err != nil {
7877
return nil, err
7978
}

pkg/controller/registry/resolver/resolver_test.go

+1-79
Original file line numberDiff line numberDiff line change
@@ -191,85 +191,6 @@ func TestSolveOperators_WithSystemConstraints(t *testing.T) {
191191
}
192192
}
193193

194-
func WithInstalledCSV(sub *v1alpha1.Subscription, csvName string) *v1alpha1.Subscription {
195-
sub.Status.InstalledCSV = csvName
196-
return sub
197-
}
198-
199-
func TestSolveOperators_WithFailForward(t *testing.T) {
200-
const namespace = "test-namespace"
201-
catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace}
202-
203-
packageASubV2 := newSub(namespace, "packageA", "alpha", catalog)
204-
APISet := cache.APISet{opregistry.APIKey{Group: "g", Version: "v", Kind: "k", Plural: "ks"}: struct{}{}}
205-
206-
// packageA provides an API
207-
packageAV1 := genEntry("packageA.v1", "0.0.1", "", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false)
208-
packageAV2 := genEntry("packageA.v2", "0.0.2", "packageA.v1", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false)
209-
packageAV3 := genEntry("packageA.v3", "0.0.3", "packageA.v2", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false)
210-
211-
existingPackageAV1 := existingOperator(namespace, "packageA.v1", "packageA", "alpha", "", APISet, nil, nil, nil)
212-
existingPackageAV2 := existingOperator(namespace, "packageA.v2", "packageA", "alpha", "packageA.v1", APISet, nil, nil, nil)
213-
214-
testCases := []struct {
215-
title string
216-
expectedOperators []*cache.Entry
217-
csvs []*v1alpha1.ClusterServiceVersion
218-
subs []*v1alpha1.Subscription
219-
snapshotEntries []*cache.Entry
220-
failForwardPredicates []cache.Predicate
221-
err string
222-
}{
223-
{
224-
title: "Resolver fails if v1 and v2 provide the same APIs and v1 is not omitted from the resolver",
225-
snapshotEntries: []*cache.Entry{packageAV1, packageAV2},
226-
expectedOperators: nil,
227-
csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2},
228-
subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)},
229-
err: "provide k (g/v)",
230-
},
231-
{
232-
title: "Resolver succeeds if v1 and v2 provide the same APIs and v1 is omitted from the resolver",
233-
snapshotEntries: []*cache.Entry{packageAV1, packageAV2},
234-
expectedOperators: nil,
235-
csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2},
236-
subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)},
237-
failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))},
238-
err: "",
239-
},
240-
{
241-
title: "Resolver succeeds if v1 and v2 provide the same APIs, v1 is omitted from the resolver, and an upgrade for v2 exists",
242-
snapshotEntries: []*cache.Entry{packageAV1, packageAV2, packageAV3},
243-
expectedOperators: []*cache.Entry{packageAV3},
244-
csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2},
245-
subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)},
246-
failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))},
247-
err: "",
248-
},
249-
}
250-
251-
for _, testCase := range testCases {
252-
resolver := Resolver{
253-
cache: cache.New(cache.StaticSourceProvider{
254-
catalog: &cache.Snapshot{
255-
Entries: testCase.snapshotEntries,
256-
},
257-
cache.NewVirtualSourceKey(namespace): csvSnapshotOrPanic(namespace, testCase.subs, testCase.csvs...),
258-
}),
259-
log: logrus.New(),
260-
}
261-
operators, err := resolver.Resolve([]string{namespace}, testCase.subs, testCase.failForwardPredicates...)
262-
263-
if testCase.err != "" {
264-
require.Error(t, err)
265-
require.Containsf(t, err.Error(), testCase.err, "Test %s failed", testCase.title)
266-
} else {
267-
require.NoErrorf(t, err, "Test %s failed", testCase.title)
268-
}
269-
require.ElementsMatch(t, testCase.expectedOperators, operators, "Test %s failed", testCase.title)
270-
}
271-
}
272-
273194
func TestDisjointChannelGraph(t *testing.T) {
274195
const namespace = "test-namespace"
275196
catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace}
@@ -1521,6 +1442,7 @@ func TestSolveOperators_TransferApiOwnership(t *testing.T) {
15211442
key: cache.NewVirtualSourceKey(namespace),
15221443
csvLister: &csvs,
15231444
subLister: fakeSubscriptionLister(p.subs),
1445+
ogLister: fakeOperatorGroupLister{},
15241446
logger: logger,
15251447
},
15261448
}),

pkg/controller/registry/resolver/source_csvs.go

+20
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/blang/semver/v4"
1010
"github.com/operator-framework/api/pkg/operators/v1alpha1"
11+
v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
1112
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1213
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
1314
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection"
@@ -20,6 +21,7 @@ import (
2021
type csvSourceProvider struct {
2122
csvLister v1alpha1listers.ClusterServiceVersionLister
2223
subLister v1alpha1listers.SubscriptionLister
24+
ogLister v1listers.OperatorGroupLister
2325
logger logrus.StdLogger
2426
}
2527

@@ -30,6 +32,7 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey]
3032
key: cache.NewVirtualSourceKey(namespace),
3133
csvLister: csp.csvLister.ClusterServiceVersions(namespace),
3234
subLister: csp.subLister.Subscriptions(namespace),
35+
ogLister: csp.ogLister.OperatorGroups(namespace),
3336
logger: csp.logger,
3437
}
3538
break // first ns is assumed to be the target ns, todo: make explicit
@@ -41,6 +44,7 @@ type csvSource struct {
4144
key cache.SourceKey
4245
csvLister v1alpha1listers.ClusterServiceVersionNamespaceLister
4346
subLister v1alpha1listers.SubscriptionNamespaceLister
47+
ogLister v1listers.OperatorGroupNamespaceLister
4448
logger logrus.StdLogger
4549
}
4650

@@ -55,6 +59,11 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
5559
return nil, err
5660
}
5761

62+
failForwardEnabled, err := IsFailForwardEnabled(s.ogLister)
63+
if err != nil {
64+
return nil, err
65+
}
66+
5867
// build a catalog snapshot of CSVs without subscriptions
5968
csvSubscriptions := make(map[*v1alpha1.ClusterServiceVersion]*v1alpha1.Subscription)
6069
for _, sub := range subs {
@@ -75,6 +84,17 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
7584
if csv.IsCopied() {
7685
continue
7786
}
87+
88+
if failForwardEnabled {
89+
replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs))
90+
if err != nil {
91+
return nil, err
92+
}
93+
if csv.Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure {
94+
continue
95+
}
96+
}
97+
7898
entry, err := newEntryFromV1Alpha1CSV(csv)
7999
if err != nil {
80100
return nil, err

0 commit comments

Comments
 (0)