Skip to content

Commit b8f147f

Browse files
pkg/controller: use a metadata watch for CRDs
Using a full LIST+WATCH is an optimization, with trade-offs. Holding the state of the world for CRDs in memory when we rarely, if ever, actually need to access them is a bad use of that trade-off, especially when the sum total size of CRDs on even the most basic cluster is O(20MiB). Signed-off-by: Steve Kuznetsov <[email protected]>
1 parent ff0baf4 commit b8f147f

File tree

24 files changed

+115
-927
lines changed

24 files changed

+115
-927
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ ifeq (, $(wildcard $(KUBEBUILDER_ASSETS)/kube-apiserver))
6464
endif
6565

6666
cover.out:
67-
go test $(MOD_FLAGS) -tags "json1" -v -race -coverprofile=cover.out -covermode=atomic \
67+
go test $(MOD_FLAGS) -tags "json1" -race -coverprofile=cover.out -covermode=atomic \
6868
-coverpkg ./pkg/controller/... ./pkg/...
6969

7070
coverage: cover.out

pkg/controller/operators/catalog/operator.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2121
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
2222
"k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
23-
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
2423
apierrors "k8s.io/apimachinery/pkg/api/errors"
2524
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2625
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -34,6 +33,9 @@ import (
3433
"k8s.io/apimachinery/pkg/util/yaml"
3534
"k8s.io/client-go/dynamic"
3635
"k8s.io/client-go/informers"
36+
"k8s.io/client-go/metadata"
37+
"k8s.io/client-go/metadata/metadatainformer"
38+
"k8s.io/client-go/metadata/metadatalister"
3739
"k8s.io/client-go/tools/cache"
3840
"k8s.io/client-go/tools/clientcmd"
3941
"k8s.io/client-go/tools/record"
@@ -144,6 +146,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
144146
return nil, err
145147
}
146148

149+
metadataClient, err := metadata.NewForConfig(config)
150+
if err != nil {
151+
return nil, err
152+
}
153+
147154
// Create a new queueinformer-based operator.
148155
opClient, err := operatorclient.NewClientFromRestConfig(config)
149156
if err != nil {
@@ -443,13 +450,23 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
443450
return nil, err
444451
}
445452

446-
// Register CustomResourceDefinition QueueInformer
447-
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
448-
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
453+
// Register CustomResourceDefinition QueueInformer. Object metadata requests are used
454+
// by this informer in order to reduce cached size.
455+
gvr := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
456+
crdInformer := metadatainformer.NewFilteredMetadataInformer(
457+
metadataClient,
458+
gvr,
459+
metav1.NamespaceAll,
460+
resyncPeriod(),
461+
cache.Indexers{},
462+
nil,
463+
).Informer()
464+
crdLister := metadatalister.New(crdInformer.GetIndexer(), gvr)
465+
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdLister)
449466
crdQueueInformer, err := queueinformer.NewQueueInformer(
450467
ctx,
451468
queueinformer.WithLogger(op.logger),
452-
queueinformer.WithInformer(crdInformer.Informer()),
469+
queueinformer.WithInformer(crdInformer),
453470
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
454471
)
455472
if err != nil {

pkg/controller/operators/olm/operator.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
corev1 "k8s.io/api/core/v1"
1414
rbacv1 "k8s.io/api/rbac/v1"
1515
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
16-
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
1716
apierrors "k8s.io/apimachinery/pkg/api/errors"
1817
"k8s.io/apimachinery/pkg/api/meta"
1918
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -541,14 +540,25 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
541540
return nil, err
542541
}
543542

544-
// Register CustomResourceDefinition QueueInformer
545-
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), config.resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
543+
// Register CustomResourceDefinition QueueInformer. Object metadata requests are used
544+
// by this informer in order to reduce cached size.
545+
gvr := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
546+
crdInformer := metadatainformer.NewFilteredMetadataInformer(
547+
config.metadataClient,
548+
gvr,
549+
metav1.NamespaceAll,
550+
config.resyncPeriod(),
551+
cache.Indexers{},
552+
nil,
553+
).Informer()
554+
crdLister := metadatalister.New(crdInformer.GetIndexer(), gvr)
546555
informersByNamespace[metav1.NamespaceAll].CRDInformer = crdInformer
547-
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
556+
informersByNamespace[metav1.NamespaceAll].CRDLister = crdLister
557+
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdLister)
548558
crdQueueInformer, err := queueinformer.NewQueueInformer(
549559
ctx,
550560
queueinformer.WithLogger(op.logger),
551-
queueinformer.WithInformer(crdInformer.Informer()),
561+
queueinformer.WithInformer(crdInformer),
552562
queueinformer.WithSyncer(k8sSyncer),
553563
)
554564
if err != nil {
@@ -1183,7 +1193,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
11831193
}
11841194

11851195
for i, crdName := range desc.ConversionCRDs {
1186-
crd, err := a.lister.APIExtensionsV1().CustomResourceDefinitionLister().Get(crdName)
1196+
crd, err := a.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), crdName, metav1.GetOptions{})
11871197
if err != nil {
11881198
logger.Errorf("error getting CRD %v which was defined in CSVs spec.WebhookDefinition[%d]: %v\n", crdName, i, err)
11891199
continue

pkg/controller/operators/olm/operator_test.go

+30-13
Original file line numberDiff line numberDiff line change
@@ -298,13 +298,14 @@ func NewFakeOperator(ctx context.Context, options ...fakeOperatorOption) (*Opera
298298
*config.actionLog = append(*config.actionLog, action)
299299
return false, nil, nil
300300
}))
301-
config.operatorClient = operatorclient.NewClient(k8sClientFake, apiextensionsfake.NewSimpleClientset(config.extObjs...), apiregistrationfake.NewSimpleClientset(config.regObjs...))
301+
apiextensionsFake := apiextensionsfake.NewSimpleClientset(config.extObjs...)
302+
config.operatorClient = operatorclient.NewClient(k8sClientFake, apiextensionsFake, apiregistrationfake.NewSimpleClientset(config.regObjs...))
302303
config.configClient = configfake.NewSimpleClientset()
303304
metadataFake := metadatafake.NewSimpleMetadataClient(scheme, config.partialMetadata...)
304305
config.metadataClient = metadataFake
305306
// It's a travesty that we need to do this, but the fakes leave us no other option. In the API server, of course
306307
// changes to objects are transparently exposed in the metadata client. In fake-land, we need to enforce that ourselves.
307-
externalFake.PrependReactor("*", "*", func(action clienttesting.Action) (bool, runtime.Object, error) {
308+
propagate := func(action clienttesting.Action) (bool, runtime.Object, error) {
308309
var err error
309310
switch action.GetVerb() {
310311
case "create":
@@ -320,7 +321,9 @@ func NewFakeOperator(ctx context.Context, options ...fakeOperatorOption) (*Opera
320321
err = metadataFake.Resource(action.GetResource()).Delete(context.TODO(), a.GetName(), metav1.DeleteOptions{})
321322
}
322323
return false, nil, err
323-
})
324+
}
325+
externalFake.PrependReactor("*", "*", propagate)
326+
apiextensionsFake.PrependReactor("*", "*", propagate)
324327

325328
for _, ns := range config.namespaces {
326329
_, err := config.operatorClient.KubernetesInterface().CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
@@ -4397,7 +4400,7 @@ func TestSyncOperatorGroups(t *testing.T) {
43974400
operatorGroup *operatorsv1.OperatorGroup
43984401
csvs []*v1alpha1.ClusterServiceVersion
43994402
clientObjs []runtime.Object
4400-
crds []runtime.Object
4403+
crds []*apiextensionsv1.CustomResourceDefinition
44014404
k8sObjs []runtime.Object
44024405
apis []runtime.Object
44034406
}
@@ -4474,7 +4477,7 @@ func TestSyncOperatorGroups(t *testing.T) {
44744477
role,
44754478
roleBinding,
44764479
},
4477-
crds: []runtime.Object{crd},
4480+
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
44784481
},
44794482
expectedStatus: operatorsv1.OperatorGroupStatus{},
44804483
final: final{objects: map[string][]runtime.Object{
@@ -4553,7 +4556,7 @@ func TestSyncOperatorGroups(t *testing.T) {
45534556
role,
45544557
roleBinding,
45554558
},
4556-
crds: []runtime.Object{crd},
4559+
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
45574560
},
45584561
expectedStatus: operatorsv1.OperatorGroupStatus{
45594562
Namespaces: []string{operatorNamespace, targetNamespace},
@@ -4656,7 +4659,7 @@ func TestSyncOperatorGroups(t *testing.T) {
46564659
role,
46574660
roleBinding,
46584661
},
4659-
crds: []runtime.Object{crd},
4662+
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
46604663
},
46614664
expectedStatus: operatorsv1.OperatorGroupStatus{
46624665
Namespaces: []string{operatorNamespace, targetNamespace},
@@ -4762,7 +4765,7 @@ func TestSyncOperatorGroups(t *testing.T) {
47624765
role,
47634766
roleBinding,
47644767
},
4765-
crds: []runtime.Object{crd},
4768+
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
47664769
},
47674770
expectedStatus: operatorsv1.OperatorGroupStatus{
47684771
Namespaces: []string{corev1.NamespaceAll},
@@ -4925,7 +4928,7 @@ func TestSyncOperatorGroups(t *testing.T) {
49254928
role,
49264929
roleBinding,
49274930
},
4928-
crds: []runtime.Object{crd},
4931+
crds: []*apiextensionsv1.CustomResourceDefinition{crd},
49294932
},
49304933
expectedStatus: operatorsv1.OperatorGroupStatus{
49314934
Namespaces: []string{corev1.NamespaceAll},
@@ -4982,7 +4985,7 @@ func TestSyncOperatorGroups(t *testing.T) {
49824985
operatorGroup = tt.initial.operatorGroup.DeepCopy()
49834986
clientObjs = copyObjs(append(tt.initial.clientObjs, operatorGroup))
49844987
k8sObjs = copyObjs(tt.initial.k8sObjs)
4985-
extObjs = copyObjs(tt.initial.crds)
4988+
extObjs []runtime.Object
49864989
regObjs = copyObjs(tt.initial.apis)
49874990
)
49884991

@@ -4992,11 +4995,25 @@ func TestSyncOperatorGroups(t *testing.T) {
49924995

49934996
var partials []runtime.Object
49944997
for _, csv := range tt.initial.csvs {
4995-
clientObjs = append(clientObjs, csv)
4998+
clientObjs = append(clientObjs, csv.DeepCopy())
49964999
partials = append(partials, &metav1.PartialObjectMetadata{
5000+
TypeMeta: metav1.TypeMeta{
5001+
Kind: "ClusterServiceVersion",
5002+
APIVersion: v1alpha1.SchemeGroupVersion.String(),
5003+
},
49975004
ObjectMeta: csv.ObjectMeta,
49985005
})
49995006
}
5007+
for _, crd := range tt.initial.crds {
5008+
extObjs = append(extObjs, crd.DeepCopy())
5009+
partials = append(partials, &metav1.PartialObjectMetadata{
5010+
TypeMeta: metav1.TypeMeta{
5011+
Kind: "CustomResourceDefinition",
5012+
APIVersion: apiextensionsv1.SchemeGroupVersion.String(),
5013+
},
5014+
ObjectMeta: crd.ObjectMeta,
5015+
})
5016+
}
50005017
l := logrus.New()
50015018
l.SetLevel(logrus.DebugLevel)
50025019
l = l.WithField("test", tt.name).Logger
@@ -5094,12 +5111,12 @@ func TestSyncOperatorGroups(t *testing.T) {
50945111

50955112
t.Log("op.syncClusterServiceVersion")
50965113
if err := op.syncClusterServiceVersion(&csv); err != nil {
5097-
return false, err
5114+
return false, fmt.Errorf("failed to syncClusterServiceVersion: %w", err)
50985115
}
50995116

51005117
t.Log("op.syncCopyCSV")
51015118
if err := op.syncCopyCSV(&csv); err != nil && !tt.ignoreCopyError {
5102-
return false, err
5119+
return false, fmt.Errorf("failed to syncCopyCSV: %w", err)
51035120
}
51045121
}
51055122

pkg/controller/operators/olm/operatorgroup.go

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/sirupsen/logrus"
1212
corev1 "k8s.io/api/core/v1"
1313
rbacv1 "k8s.io/api/rbac/v1"
14+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1415
apierrors "k8s.io/apimachinery/pkg/api/errors"
1516
"k8s.io/apimachinery/pkg/api/meta"
1617
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -422,6 +423,7 @@ func (a *Operator) ensureClusterRolesForCSV(csv *v1alpha1.ClusterServiceVersion)
422423
if err != nil {
423424
return fmt.Errorf("crd %q not found: %s", owned.Name, err.Error())
424425
}
426+
crd.SetGroupVersionKind(apiextensionsv1.SchemeGroupVersion.WithKind("customresourcedefinition"))
425427
nameGroupPair := strings.SplitN(owned.Name, ".", 2) // -> etcdclusters etcd.database.coreos.com
426428
if len(nameGroupPair) != 2 {
427429
return fmt.Errorf("invalid parsing of name '%v', got %v", owned.Name, nameGroupPair)

pkg/controller/operators/olm/plugins/operator_plugin.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
1212
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
1313
"github.com/sirupsen/logrus"
14-
extensionsv1informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
1514
appsv1informers "k8s.io/client-go/informers/apps/v1"
1615
corev1informers "k8s.io/client-go/informers/core/v1"
1716
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
@@ -46,7 +45,8 @@ type Informers struct {
4645
ClusterRoleBindingInformer rbacv1informers.ClusterRoleBindingInformer
4746
NamespaceInformer corev1informers.NamespaceInformer
4847
APIServiceInformer apiregistrationv1informers.APIServiceInformer
49-
CRDInformer extensionsv1informers.CustomResourceDefinitionInformer
48+
CRDInformer cache.SharedIndexInformer
49+
CRDLister metadatalister.Lister
5050
}
5151

5252
// OperatorConfig gives access to required configuration from the host operator

pkg/controller/operators/olm/requirements.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package olm
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"strings"
@@ -94,7 +95,7 @@ func (a *Operator) requirementStatus(strategyDetailsDeployment *v1alpha1.Strateg
9495
}
9596

9697
// check if CRD exists - this verifies group, version, and kind, so no need for GVK check via discovery
97-
crd, err := a.lister.APIExtensionsV1().CustomResourceDefinitionLister().Get(r.Name)
98+
crd, err := a.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), r.Name, metav1.GetOptions{})
9899
if err != nil {
99100
status.Status = v1alpha1.RequirementStatusReasonNotPresent
100101
status.Message = "CRD is not present"

pkg/lib/operatorlister/customresourcedefinition.go

+19-9
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,30 @@ import (
44
"fmt"
55
"sync"
66

7-
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
8-
aextv1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
98
"k8s.io/apimachinery/pkg/labels"
9+
"k8s.io/client-go/metadata/metadatalister"
1010
)
1111

1212
// UnionCustomResourceDefinitionLister is a custom implementation of an CustomResourceDefinition lister that allows a new
13-
// Lister to be registered on the fly. This Lister lists both v1 and v1beta1 APIVersion (at the newer version) CRDs.
13+
// Lister to be registered on the fly.
1414
type UnionCustomResourceDefinitionLister struct {
15-
CustomResourceDefinitionLister aextv1.CustomResourceDefinitionLister
15+
CustomResourceDefinitionLister metadatalister.Lister
1616
CustomResourceDefinitionLock sync.RWMutex
1717
}
1818

19+
func (ucl *UnionCustomResourceDefinitionLister) Namespace(namespace string) metadatalister.NamespaceLister {
20+
ucl.CustomResourceDefinitionLock.RLock()
21+
defer ucl.CustomResourceDefinitionLock.RUnlock()
22+
23+
if ucl.CustomResourceDefinitionLister == nil {
24+
panic(fmt.Errorf("no CustomResourceDefinition lister registered"))
25+
}
26+
return ucl.CustomResourceDefinitionLister.Namespace(namespace)
27+
}
28+
1929
// List lists all CustomResourceDefinitions in the indexer.
20-
func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (ret []*apiextensionsv1.CustomResourceDefinition, err error) {
30+
func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error) {
2131
ucl.CustomResourceDefinitionLock.RLock()
2232
defer ucl.CustomResourceDefinitionLock.RUnlock()
2333

@@ -28,7 +38,7 @@ func (ucl *UnionCustomResourceDefinitionLister) List(selector labels.Selector) (
2838
}
2939

3040
// Get retrieves the CustomResourceDefinition with the given name
31-
func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*apiextensionsv1.CustomResourceDefinition, error) {
41+
func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*metav1.PartialObjectMetadata, error) {
3242
ucl.CustomResourceDefinitionLock.RLock()
3343
defer ucl.CustomResourceDefinitionLock.RUnlock()
3444

@@ -39,17 +49,17 @@ func (ucl *UnionCustomResourceDefinitionLister) Get(name string) (*apiextensions
3949
}
4050

4151
// RegisterCustomResourceDefinitionLister registers a new CustomResourceDefinitionLister
42-
func (ucl *UnionCustomResourceDefinitionLister) RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister) {
52+
func (ucl *UnionCustomResourceDefinitionLister) RegisterCustomResourceDefinitionLister(lister metadatalister.Lister) {
4353
ucl.CustomResourceDefinitionLock.Lock()
4454
defer ucl.CustomResourceDefinitionLock.Unlock()
4555

4656
ucl.CustomResourceDefinitionLister = lister
4757
}
4858

49-
func (l *apiExtensionsV1Lister) RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister) {
59+
func (l *apiExtensionsV1Lister) RegisterCustomResourceDefinitionLister(lister metadatalister.Lister) {
5060
l.customResourceDefinitionLister.RegisterCustomResourceDefinitionLister(lister)
5161
}
5262

53-
func (l *apiExtensionsV1Lister) CustomResourceDefinitionLister() aextv1.CustomResourceDefinitionLister {
63+
func (l *apiExtensionsV1Lister) CustomResourceDefinitionLister() metadatalister.Lister {
5464
return l.customResourceDefinitionLister
5565
}

pkg/lib/operatorlister/lister.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package operatorlister
22

33
import (
4-
aextv1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
54
appsv1 "k8s.io/client-go/listers/apps/v1"
65
corev1 "k8s.io/client-go/listers/core/v1"
76
rbacv1 "k8s.io/client-go/listers/rbac/v1"
7+
"k8s.io/client-go/metadata/metadatalister"
88
aregv1 "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
99

1010
v1 "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"
@@ -88,8 +88,8 @@ type APIRegistrationV1Lister interface {
8888

8989
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . APIExtensionsV1Lister
9090
type APIExtensionsV1Lister interface {
91-
RegisterCustomResourceDefinitionLister(lister aextv1.CustomResourceDefinitionLister)
92-
CustomResourceDefinitionLister() aextv1.CustomResourceDefinitionLister
91+
RegisterCustomResourceDefinitionLister(lister metadatalister.Lister)
92+
CustomResourceDefinitionLister() metadatalister.Lister
9393
}
9494

9595
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . OperatorsV1alpha1Lister

0 commit comments

Comments
 (0)