Skip to content

Commit 4a20ee3

Browse files
committed
Identify fail forward in csvSources
This commit undos changes to the Resolve and ResolveSteps methods and updats the csvSourceProvider to infer whether or not fail forward is enabled in a namespace. Signed-off-by: Alexander Greene <[email protected]>
1 parent 6048250 commit 4a20ee3

13 files changed

+204
-249
lines changed

Diff for: pkg/controller/operators/catalog/operator.go

+2-17
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"k8s.io/client-go/util/workqueue"
4343

4444
"github.com/operator-framework/api/pkg/operators/reference"
45-
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
4645
"github.com/operator-framework/api/pkg/operators/v1alpha1"
4746
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
4847
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
@@ -903,20 +902,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
903902
return
904903
}
905904

906-
func (o *Operator) isFailForwardEnabled(namespace string) (bool, error) {
907-
ogs, err := o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace).List(labels.Everything())
908-
if err != nil {
909-
o.logger.Debugf("failed to list operatorgroups in the %s namespace: %v", namespace, err)
910-
// Couldn't list operatorGroups, assuming default upgradeStrategy
911-
// so existing behavior is observed for failed CSVs.
912-
return false, nil
913-
}
914-
if len(ogs) != 1 {
915-
return false, fmt.Errorf("found %d operatorGroups in namespace %s, expected 1", len(ogs), namespace)
916-
}
917-
return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil
918-
}
919-
920905
func (o *Operator) syncResolvingNamespace(obj interface{}) error {
921906
ns, ok := obj.(*corev1.Namespace)
922907
if !ok {
@@ -943,7 +928,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
943928
return err
944929
}
945930

946-
failForwardEnabled, err := o.isFailForwardEnabled(namespace)
931+
failForwardEnabled, err := resolver.IsFailForwardEnabled(o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace))
947932
if err != nil {
948933
return err
949934
}
@@ -998,7 +983,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
998983
logger.Debug("resolving subscriptions in namespace")
999984

1000985
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
1001-
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, failForwardEnabled)
986+
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace)
1002987
if err != nil {
1003988
go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error())
1004989
// If the error is constraints not satisfiable, then simply project the

Diff for: pkg/controller/operators/catalog/operator_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1254,7 +1254,7 @@ func TestSyncResolvingNamespace(t *testing.T) {
12541254

12551255
o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time)
12561256
o.resolver = &fakes.FakeStepResolver{
1257-
ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
1257+
ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
12581258
return nil, nil, nil, tt.fields.resolveErr
12591259
},
12601260
}

Diff for: pkg/controller/operators/catalog/subscriptions_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ func TestSyncSubscriptions(t *testing.T) {
10311031

10321032
o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time)
10331033
o.resolver = &fakes.FakeStepResolver{
1034-
ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
1034+
ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
10351035
return tt.fields.resolveSteps, tt.fields.bundleLookups, tt.fields.resolveSubs, tt.fields.resolveErr
10361036
},
10371037
}
@@ -1168,7 +1168,7 @@ func BenchmarkSyncResolvingNamespace(b *testing.B) {
11681168
},
11691169
},
11701170
resolver: &fakes.FakeStepResolver{
1171-
ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
1171+
ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
11721172
steps := []*v1alpha1.Step{
11731173
{
11741174
Resolving: "csv.v.2",

Diff for: pkg/controller/registry/resolver/fail_forward.go

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package resolver
2+
3+
import (
4+
"fmt"
5+
6+
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
7+
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
8+
v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
)
11+
12+
// IsFailForwardEnabled takes a namespaced operatorGroup lister and returns
13+
// True if an operatorGroup exists in the namespace and its upgradeStrategy
14+
// is set to UnsafeFailForward and false otherwise. An error is returned if
15+
// an more than one operatorGroup exists in the namespace.
16+
// No error is returned if no OperatorGroups are found to keep the resolver
17+
// backwards compatible.
18+
func IsFailForwardEnabled(ogLister v1listers.OperatorGroupNamespaceLister) (bool, error) {
19+
ogs, err := ogLister.List(labels.Everything())
20+
if err != nil || len(ogs) == 0 {
21+
return false, nil
22+
}
23+
if len(ogs) != 1 {
24+
return false, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs))
25+
}
26+
return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil
27+
}
28+
29+
type walkOption func(csv *operatorsv1alpha1.ClusterServiceVersion) error
30+
31+
func WithCSVPhase(phase operatorsv1alpha1.ClusterServiceVersionPhase) walkOption {
32+
return func(csv *operatorsv1alpha1.ClusterServiceVersion) error {
33+
if csv == nil || csv.Status.Phase != phase {
34+
return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase)
35+
}
36+
return nil
37+
}
38+
}
39+
40+
func WithUniqueCSVs() walkOption {
41+
visited := map[string]struct{}{}
42+
return func(csv *operatorsv1alpha1.ClusterServiceVersion) error {
43+
// Check if we have visited the CSV before
44+
if _, ok := visited[csv.GetName()]; ok {
45+
return fmt.Errorf("infinite replacement chain detected")
46+
}
47+
48+
visited[csv.GetName()] = struct{}{}
49+
return nil
50+
}
51+
}
52+
53+
// WalkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns
54+
// the last clusterServiceVersions in the replacement chain. An error is returned if any of the
55+
// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement
56+
// chain is detected.
57+
func WalkReplacementChain(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion, options ...walkOption) (*operatorsv1alpha1.ClusterServiceVersion, error) {
58+
if csv == nil {
59+
return nil, fmt.Errorf("csv cannot be nil")
60+
}
61+
62+
for {
63+
// Check if there is a CSV that replaces this CSVs
64+
next, ok := csvToReplacement[csv.GetName()]
65+
if !ok {
66+
break
67+
}
68+
69+
// Check walk options
70+
for _, o := range options {
71+
if err := o(csv); err != nil {
72+
return nil, err
73+
}
74+
}
75+
76+
// Move along replacement chain.
77+
csv = next
78+
}
79+
return csv, nil
80+
}
81+
82+
// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other
83+
// CSVs are in the replacing phase.
84+
func isReplacementChainThatEndsInFailure(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion) (bool, error) {
85+
lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(operatorsv1alpha1.CSVPhaseReplacing), WithUniqueCSVs())
86+
if err != nil {
87+
return false, err
88+
}
89+
return (lastCSV != nil && lastCSV.Status.Phase == operatorsv1alpha1.CSVPhaseFailed), nil
90+
}
91+
92+
// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it.
93+
func ReplacementMapping(csvs []*operatorsv1alpha1.ClusterServiceVersion) map[string]*operatorsv1alpha1.ClusterServiceVersion {
94+
replacementMapping := map[string]*operatorsv1alpha1.ClusterServiceVersion{}
95+
for _, csv := range csvs {
96+
if csv.Spec.Replaces != "" {
97+
replacementMapping[csv.Spec.Replaces] = csv
98+
}
99+
}
100+
return replacementMapping
101+
}

Diff for: pkg/controller/registry/resolver/instrumented_resolver.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ func NewInstrumentedResolver(resolver StepResolver, successMetricsEmitter, failu
2222
}
2323
}
2424

25-
func (ir *InstrumentedResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
25+
func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
2626
start := time.Now()
27-
steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace, failForwardEnabled)
27+
steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace)
2828
if err != nil {
2929
ir.failureMetricsEmitter(time.Since(start))
3030
} else {

Diff for: pkg/controller/registry/resolver/instrumented_resolver_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ const (
1717
type fakeResolverWithError struct{}
1818
type fakeResolverWithoutError struct{}
1919

20-
func (r *fakeResolverWithError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
20+
func (r *fakeResolverWithError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
2121
return nil, nil, nil, errors.New("Fake error")
2222
}
2323

24-
func (r *fakeResolverWithoutError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
24+
func (r *fakeResolverWithoutError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
2525
return nil, nil, nil, nil
2626
}
2727

@@ -45,7 +45,7 @@ func TestInstrumentedResolverFailure(t *testing.T) {
4545
}
4646

4747
instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithError(), changeToSuccess, changeToFailure)
48-
instrumentedResolver.ResolveSteps("", false)
48+
instrumentedResolver.ResolveSteps("")
4949
require.Equal(t, len(result), 1) // check that only one call was made to a change function
5050
require.Equal(t, result[0], failure) // check that the call was made to changeToFailure function
5151
}
@@ -62,7 +62,7 @@ func TestInstrumentedResolverSuccess(t *testing.T) {
6262
}
6363

6464
instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithoutError(), changeToSuccess, changeToFailure)
65-
instrumentedResolver.ResolveSteps("", false)
65+
instrumentedResolver.ResolveSteps("")
6666
require.Equal(t, len(result), 1) // check that only one call was made to a change function
6767
require.Equal(t, result[0], success) // check that the call was made to changeToSuccess function
6868
}

Diff for: pkg/controller/registry/resolver/resolver.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (w *debugWriter) Write(b []byte) (int, error) {
5555
return n, nil
5656
}
5757

58-
func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, existingEntryPredicates ...cache.Predicate) ([]*cache.Entry, error) {
58+
func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription) ([]*cache.Entry, error) {
5959
var errs []error
6060

6161
variables := make(map[solver.Identifier]solver.Variable)
@@ -72,8 +72,7 @@ func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, e
7272
}
7373

7474
preferredNamespace := namespaces[0]
75-
existingEntryPredicates = append(existingEntryPredicates, cache.True())
76-
_, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(existingEntryPredicates...), namespacedCache, visited)
75+
_, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(cache.True()), namespacedCache, visited)
7776
if err != nil {
7877
return nil, err
7978
}

Diff for: pkg/controller/registry/resolver/resolver_test.go

+1-79
Original file line numberDiff line numberDiff line change
@@ -191,85 +191,6 @@ func TestSolveOperators_WithSystemConstraints(t *testing.T) {
191191
}
192192
}
193193

194-
func WithInstalledCSV(sub *v1alpha1.Subscription, csvName string) *v1alpha1.Subscription {
195-
sub.Status.InstalledCSV = csvName
196-
return sub
197-
}
198-
199-
func TestSolveOperators_WithFailForward(t *testing.T) {
200-
const namespace = "test-namespace"
201-
catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace}
202-
203-
packageASubV2 := newSub(namespace, "packageA", "alpha", catalog)
204-
APISet := cache.APISet{opregistry.APIKey{Group: "g", Version: "v", Kind: "k", Plural: "ks"}: struct{}{}}
205-
206-
// packageA provides an API
207-
packageAV1 := genEntry("packageA.v1", "0.0.1", "", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false)
208-
packageAV2 := genEntry("packageA.v2", "0.0.2", "packageA.v1", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false)
209-
packageAV3 := genEntry("packageA.v3", "0.0.3", "packageA.v2", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false)
210-
211-
existingPackageAV1 := existingOperator(namespace, "packageA.v1", "packageA", "alpha", "", APISet, nil, nil, nil)
212-
existingPackageAV2 := existingOperator(namespace, "packageA.v2", "packageA", "alpha", "packageA.v1", APISet, nil, nil, nil)
213-
214-
testCases := []struct {
215-
title string
216-
expectedOperators []*cache.Entry
217-
csvs []*v1alpha1.ClusterServiceVersion
218-
subs []*v1alpha1.Subscription
219-
snapshotEntries []*cache.Entry
220-
failForwardPredicates []cache.Predicate
221-
err string
222-
}{
223-
{
224-
title: "Resolver fails if v1 and v2 provide the same APIs and v1 is not omitted from the resolver",
225-
snapshotEntries: []*cache.Entry{packageAV1, packageAV2},
226-
expectedOperators: nil,
227-
csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2},
228-
subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)},
229-
err: "provide k (g/v)",
230-
},
231-
{
232-
title: "Resolver succeeds if v1 and v2 provide the same APIs and v1 is omitted from the resolver",
233-
snapshotEntries: []*cache.Entry{packageAV1, packageAV2},
234-
expectedOperators: nil,
235-
csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2},
236-
subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)},
237-
failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))},
238-
err: "",
239-
},
240-
{
241-
title: "Resolver succeeds if v1 and v2 provide the same APIs, v1 is omitted from the resolver, and an upgrade for v2 exists",
242-
snapshotEntries: []*cache.Entry{packageAV1, packageAV2, packageAV3},
243-
expectedOperators: []*cache.Entry{packageAV3},
244-
csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2},
245-
subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)},
246-
failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))},
247-
err: "",
248-
},
249-
}
250-
251-
for _, testCase := range testCases {
252-
resolver := Resolver{
253-
cache: cache.New(cache.StaticSourceProvider{
254-
catalog: &cache.Snapshot{
255-
Entries: testCase.snapshotEntries,
256-
},
257-
cache.NewVirtualSourceKey(namespace): csvSnapshotOrPanic(namespace, testCase.subs, testCase.csvs...),
258-
}),
259-
log: logrus.New(),
260-
}
261-
operators, err := resolver.Resolve([]string{namespace}, testCase.subs, testCase.failForwardPredicates...)
262-
263-
if testCase.err != "" {
264-
require.Error(t, err)
265-
require.Containsf(t, err.Error(), testCase.err, "Test %s failed", testCase.title)
266-
} else {
267-
require.NoErrorf(t, err, "Test %s failed", testCase.title)
268-
}
269-
require.ElementsMatch(t, testCase.expectedOperators, operators, "Test %s failed", testCase.title)
270-
}
271-
}
272-
273194
func TestDisjointChannelGraph(t *testing.T) {
274195
const namespace = "test-namespace"
275196
catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace}
@@ -1521,6 +1442,7 @@ func TestSolveOperators_TransferApiOwnership(t *testing.T) {
15211442
key: cache.NewVirtualSourceKey(namespace),
15221443
csvLister: &csvs,
15231444
subLister: fakeSubscriptionLister(p.subs),
1445+
ogLister: fakeOperatorGroupLister{},
15241446
logger: logger,
15251447
},
15261448
}),

0 commit comments

Comments
 (0)