Skip to content

Commit 488fd2c

Browse files
committed
(fix) List catalogsource using client, instead of referring to cache
Using the information in the resolver cache to list the available catalogsources leads to the very common and widely known problem of using caches: invalid data due to a stale cache. This has showed up multiple times in production environments over the years, manifesting itself in the form of the all subscriptions in a namespace being transitioned into an error state when a Catalogsource that the cache claims to exist, has actually been deleted from the cluster, but the cache was not updated. The Subscriptions are transitioned to an error state because of the deleted catalogsource with the follwing error message: "message": "failed to populate resolver cache from source <deleted-catalogsource>: failed to list bundles: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing dial tcp: lookup <deleted-catalogsource>.<ns>.svc on 172.....: no such host\"", "reason": "ErrorPreventedResolution", "status": "True", "type": "ResolutionFailed" This PR switches the information lookup from the cache, to using a client to list the CatalogSources present in the cluster.
1 parent 8089266 commit 488fd2c

File tree

7 files changed

+83
-21
lines changed

7 files changed

+83
-21
lines changed

pkg/controller/operators/catalog/operator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
214214
clientFactory: clients.NewFactory(validatingConfig),
215215
}
216216
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
217-
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
217+
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, lister.OperatorsV1alpha1().CatalogSourceLister(), logger)
218218
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
219219
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient, workloadUserID, opmImage, utilImage)
220220
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)

pkg/controller/operators/catalog/subscription/reconciler.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,13 @@ func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.St
9191

9292
var healthUpdated, deprecationUpdated bool
9393
next, healthUpdated = s.UpdateHealth(c.now(), catalogHealth...)
94-
deprecationUpdated, err = c.updateDeprecatedStatus(ctx, s.Subscription())
95-
if err != nil {
96-
return next, err
94+
if healthUpdated {
95+
if _, err := c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{}); err != nil {
96+
return nil, err
97+
}
9798
}
98-
if healthUpdated || deprecationUpdated {
99+
deprecationUpdated, err = c.updateDeprecatedStatus(ctx, s.Subscription())
100+
if deprecationUpdated {
99101
_, err = c.client.OperatorsV1alpha1().Subscriptions(ns).UpdateStatus(ctx, s.Subscription(), metav1.UpdateOptions{})
100102
}
101103
case SubscriptionExistsState:

pkg/controller/registry/resolver/cache/cache.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error {
139139
err := snapshot.err
140140
snapshot.m.RUnlock()
141141
if err != nil {
142-
errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err))
142+
errs = append(errs, fmt.Errorf("error using catalog %s (in namespace %s): %w", key.Name, key.Namespace, err))
143143
}
144144
}
145145
return errors.NewAggregate(errs)

pkg/controller/registry/resolver/cache/cache_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) {
238238
key: ErrorSource{Error: errors.New("testing")},
239239
})
240240

241-
require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing")
241+
require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalog dummyname (in namespace dummynamespace): testing")
242242
}

pkg/controller/registry/resolver/source_registry.go

+49-12
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@ import (
88
"time"
99

1010
"github.com/blang/semver/v4"
11+
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
12+
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1113
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
1214
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
1315
"github.com/operator-framework/operator-registry/pkg/api"
1416
"github.com/operator-framework/operator-registry/pkg/client"
1517
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
1618
"github.com/sirupsen/logrus"
19+
"k8s.io/apimachinery/pkg/labels"
1720
)
1821

1922
// todo: move to pkg/controller/operators/catalog
@@ -65,31 +68,65 @@ func (i *sourceInvalidator) GetValidChannel(key cache.SourceKey) <-chan struct{}
6568
}
6669

6770
type RegistrySourceProvider struct {
68-
rcp RegistryClientProvider
69-
logger logrus.StdLogger
70-
invalidator *sourceInvalidator
71+
rcp RegistryClientProvider
72+
catsrcLister v1alpha1listers.CatalogSourceLister
73+
logger logrus.StdLogger
74+
invalidator *sourceInvalidator
7175
}
7276

73-
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, logger logrus.StdLogger) *RegistrySourceProvider {
77+
func SourceProviderFromRegistryClientProvider(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, logger logrus.StdLogger) *RegistrySourceProvider {
7478
return &RegistrySourceProvider{
75-
rcp: rcp,
76-
logger: logger,
79+
rcp: rcp,
80+
logger: logger,
81+
catsrcLister: catsrcLister,
7782
invalidator: &sourceInvalidator{
7883
validChans: make(map[cache.SourceKey]chan struct{}),
7984
ttl: 5 * time.Minute,
8085
},
8186
}
8287
}
8388

89+
type errorSource struct {
90+
error
91+
}
92+
93+
func (s errorSource) Snapshot(_ context.Context) (*cache.Snapshot, error) {
94+
return nil, s.error
95+
}
96+
8497
func (a *RegistrySourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
8598
result := make(map[cache.SourceKey]cache.Source)
86-
for key, client := range a.rcp.ClientsForNamespaces(namespaces...) {
87-
result[cache.SourceKey(key)] = &registrySource{
88-
key: cache.SourceKey(key),
89-
client: client,
90-
logger: a.logger,
91-
invalidator: a.invalidator,
99+
100+
cats := []*operatorsv1alpha1.CatalogSource{}
101+
for _, ns := range namespaces {
102+
catsInNamespace, err := a.catsrcLister.CatalogSources(ns).List(labels.Everything())
103+
if err != nil {
104+
result[cache.SourceKey{Name: "", Namespace: ns}] = errorSource{
105+
error: fmt.Errorf("failed to list catalogsources for namespace %q: %w", ns, err),
106+
}
107+
return result
92108
}
109+
cats = append(cats, catsInNamespace...)
110+
}
111+
112+
clients := a.rcp.ClientsForNamespaces(namespaces...)
113+
for _, cat := range cats {
114+
key := cache.SourceKey{Name: cat.Name, Namespace: cat.Namespace}
115+
if client, ok := clients[registry.CatalogKey{Name: cat.Name, Namespace: cat.Namespace}]; ok {
116+
result[key] = &registrySource{
117+
key: key,
118+
client: client,
119+
logger: a.logger,
120+
invalidator: a.invalidator,
121+
}
122+
} else {
123+
result[key] = errorSource{
124+
error: fmt.Errorf("no registry client established for catalogsource %s/%s", cat.Namespace, cat.Name),
125+
}
126+
}
127+
}
128+
if len(result) == 0 {
129+
return nil
93130
}
94131
return result
95132
}

pkg/controller/registry/resolver/step_resolver.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/operator-framework/api/pkg/operators/v1alpha1"
99
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
10+
v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
1011
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
1112
controllerbundle "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/bundle"
1213
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
@@ -16,10 +17,12 @@ import (
1617
corev1 "k8s.io/api/core/v1"
1718
"k8s.io/apimachinery/pkg/api/errors"
1819
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/labels"
1921
)
2022

2123
const (
2224
BundleLookupConditionPacked v1alpha1.BundleLookupConditionType = "BundleLookupNotPersisted"
25+
exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution"
2326
)
2427

2528
// init hooks provides the downstream a way to modify the upstream behavior
@@ -32,6 +35,7 @@ type StepResolver interface {
3235
type OperatorStepResolver struct {
3336
subLister v1alpha1listers.SubscriptionLister
3437
csvLister v1alpha1listers.ClusterServiceVersionLister
38+
ogLister v1listers.OperatorGroupLister
3539
client versioned.Interface
3640
globalCatalogNamespace string
3741
resolver *Resolver
@@ -69,6 +73,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
6973
stepResolver := &OperatorStepResolver{
7074
subLister: lister.OperatorsV1alpha1().SubscriptionLister(),
7175
csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(),
76+
ogLister: lister.OperatorsV1().OperatorGroupLister(),
7277
client: client,
7378
globalCatalogNamespace: globalCatalogNamespace,
7479
resolver: NewDefaultResolver(cacheSourceProvider, catsrcPriorityProvider{lister: lister.OperatorsV1alpha1().CatalogSourceLister()}, log),
@@ -91,7 +96,22 @@ func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step,
9196
return nil, nil, nil, err
9297
}
9398

94-
namespaces := []string{namespace, r.globalCatalogNamespace}
99+
namespaces := []string{namespace}
100+
ogs, err := r.ogLister.OperatorGroups(namespace).List(labels.Everything())
101+
if err != nil {
102+
return nil, nil, nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", namespace, err)
103+
}
104+
if len(ogs) != 1 {
105+
return nil, nil, nil, fmt.Errorf("expected 1 OperatorGroup in the namespace, found %d", len(ogs))
106+
}
107+
og := ogs[0]
108+
if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" {
109+
// Exclusion specified
110+
// Ignore the globalNamespace for the purposes of resolution in this namespace
111+
r.log.Printf("excluding global catalogs from resolution in namespace %s", namespace)
112+
} else {
113+
namespaces = append(namespaces, r.globalCatalogNamespace)
114+
}
95115
operators, err := r.resolver.Resolve(namespaces, subs)
96116
if err != nil {
97117
return nil, nil, nil, err

pkg/controller/registry/resolver/step_resolver_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -1218,7 +1218,7 @@ func TestResolver(t *testing.T) {
12181218
steps: [][]*v1alpha1.Step{},
12191219
subs: []*v1alpha1.Subscription{},
12201220
errAssert: func(t *testing.T, err error) {
1221-
assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv")
1221+
assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv")
12221222
assert.Contains(t, err.Error(), "in phase Failed instead of Replacing")
12231223
},
12241224
},
@@ -1377,6 +1377,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
13771377
name: "NewSubscription/Permissions/ClusterPermissions",
13781378
clusterState: []runtime.Object{
13791379
newSub(namespace, "a", "alpha", catalog),
1380+
newOperatorGroup("test-og", namespace),
13801381
},
13811382
bundlesInCatalog: []*api.Bundle{bundle},
13821383
out: out{
@@ -1392,6 +1393,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
13921393
name: "don't create default service accounts",
13931394
clusterState: []runtime.Object{
13941395
newSub(namespace, "a", "alpha", catalog),
1396+
newOperatorGroup("test-og", namespace),
13951397
},
13961398
bundlesInCatalog: []*api.Bundle{bundleWithDefaultServiceAccount},
13971399
out: out{
@@ -1418,6 +1420,7 @@ func TestNamespaceResolverRBAC(t *testing.T) {
14181420
lister := operatorlister.NewLister()
14191421
lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, informerFactory.Operators().V1alpha1().Subscriptions().Lister())
14201422
lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister())
1423+
lister.OperatorsV1().RegisterOperatorGroupLister(namespace, informerFactory.Operators().V1().OperatorGroups().Lister())
14211424

14221425
stubSnapshot := &resolvercache.Snapshot{}
14231426
for _, bundle := range tt.bundlesInCatalog {

0 commit comments

Comments
 (0)