Skip to content

Commit 488a1b6

Browse files
*: filter informers when preconditions are met
When we can detect at startup time that all of the objects we're about to look at have the labels we're expecting, we can filter our informer factories upfront. Signed-off-by: Steve Kuznetsov <[email protected]>
1 parent 629e370 commit 488a1b6

File tree

4 files changed

+174
-52
lines changed

4 files changed

+174
-52
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ require (
3737
github.com/spf13/pflag v1.0.5
3838
github.com/stretchr/testify v1.8.3
3939
golang.org/x/net v0.10.0
40+
golang.org/x/sync v0.2.0
4041
golang.org/x/time v0.3.0
4142
google.golang.org/grpc v1.54.0
4243
gopkg.in/yaml.v2 v2.4.0
@@ -208,7 +209,6 @@ require (
208209
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
209210
golang.org/x/mod v0.10.0 // indirect
210211
golang.org/x/oauth2 v0.5.0 // indirect
211-
golang.org/x/sync v0.2.0 // indirect
212212
golang.org/x/sys v0.8.0 // indirect
213213
golang.org/x/term v0.8.0 // indirect
214214
golang.org/x/text v0.9.0 // indirect

pkg/controller/operators/catalog/operator.go

+34-43
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"sync"
1212
"time"
1313

14-
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
1514
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
1615
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
1716
errorwrap "github.com/pkg/errors"
@@ -187,6 +186,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
187186
return nil, err
188187
}
189188

189+
canFilter, err := labeller.Validate(ctx, logger, metadataClient)
190+
if err != nil {
191+
return nil, err
192+
}
193+
190194
// Allocate the new instance of an Operator.
191195
op := &Operator{
192196
Operator: queueOperator,
@@ -363,7 +367,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
363367
}
364368

365369
// Wire k8s sharedIndexInformers
366-
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod())
370+
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), func() []informers.SharedInformerOption {
371+
if !canFilter {
372+
return nil
373+
}
374+
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
375+
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
376+
})}
377+
}()...)
367378
sharedIndexInformers := []cache.SharedIndexInformer{}
368379

369380
// Wire Roles
@@ -392,8 +403,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
392403
return nil
393404
}
394405

395-
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
396-
ctx, op.logger, labeller.HasOLMOwnerRef,
406+
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
407+
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
408+
ctx, op.logger, labeller.Filter(rolesgvk),
397409
rbacv1applyconfigurations.Role,
398410
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
399411
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
@@ -407,8 +419,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
407419
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
408420
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())
409421

410-
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
411-
ctx, op.logger, labeller.HasOLMOwnerRef,
422+
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
423+
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
424+
ctx, op.logger, labeller.Filter(rolebindingsgvk),
412425
rbacv1applyconfigurations.RoleBinding,
413426
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
414427
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
@@ -422,10 +435,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
422435
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
423436
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())
424437

425-
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
426-
ctx, op.logger, func(object metav1.Object) bool {
427-
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
428-
},
438+
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
439+
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
440+
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
429441
corev1applyconfigurations.ServiceAccount,
430442
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
431443
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
@@ -440,8 +452,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
440452
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())
441453

442454
// TODO(skuznets): some services don't seem to have any marker to key off of, but they match the operator name
443-
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
444-
ctx, op.logger, labeller.HasOLMOwnerRef,
455+
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
456+
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
457+
ctx, op.logger, labeller.Filter(servicesgvk),
445458
corev1applyconfigurations.Service,
446459
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
447460
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
@@ -464,15 +477,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
464477
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
465478
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())
466479

467-
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
468-
ctx, op.logger, func(object metav1.Object) bool {
469-
if labels := object.GetLabels(); labels != nil {
470-
if _, ok := labels[reconciler.CatalogSourceLabelKey]; ok {
471-
return true
472-
}
473-
}
474-
return false
475-
},
480+
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
481+
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
482+
ctx, op.logger, labeller.Filter(podsgvk),
476483
corev1applyconfigurations.Pod,
477484
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
478485
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
@@ -505,19 +512,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
505512
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
506513
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())
507514

508-
if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
509-
ctx, op.logger, func(object metav1.Object) bool {
510-
job, ok := object.(*batchv1.Job)
511-
if !ok {
512-
return false
513-
}
514-
for _, container := range job.Spec.Template.Spec.Containers {
515-
if strings.Join(container.Command[0:3], " ") == "opm alpha bundle extract" {
516-
return true
517-
}
518-
}
519-
return false
520-
},
515+
jobsgvk := batchv1.SchemeGroupVersion.WithResource("jobs")
516+
if err := labelObjects(jobsgvk, jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
517+
ctx, op.logger, labeller.Filter(jobsgvk),
521518
batchv1applyconfigurations.Job,
522519
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
523520
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
@@ -590,15 +587,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
590587
return nil, err
591588
}
592589

593-
if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler[*apiextensionsv1.CustomResourceDefinition](
594-
ctx, op.logger, func(object metav1.Object) bool {
595-
for key := range object.GetAnnotations() {
596-
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
597-
return true
598-
}
599-
}
600-
return false
601-
},
590+
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
591+
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler[*apiextensionsv1.CustomResourceDefinition](
592+
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
602593
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
603594
)); err != nil {
604595
return nil, err
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package labeller
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync"
8+
9+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
10+
"github.com/sirupsen/logrus"
11+
"golang.org/x/sync/errgroup"
12+
appsv1 "k8s.io/api/apps/v1"
13+
batchv1 "k8s.io/api/batch/v1"
14+
corev1 "k8s.io/api/core/v1"
15+
rbacv1 "k8s.io/api/rbac/v1"
16+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/apimachinery/pkg/runtime/schema"
19+
"k8s.io/client-go/metadata"
20+
21+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
22+
)
23+
24+
func Filter(gvr schema.GroupVersionResource) func(metav1.Object) bool {
25+
if f, ok := filters[gvr]; ok {
26+
return f
27+
}
28+
return func(object metav1.Object) bool {
29+
return false
30+
}
31+
}
32+
33+
var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{
34+
corev1.SchemeGroupVersion.WithResource("services"): HasOLMOwnerRef,
35+
corev1.SchemeGroupVersion.WithResource("pods"): func(object metav1.Object) bool {
36+
if labels := object.GetLabels(); labels != nil {
37+
if _, ok := labels[reconciler.CatalogSourceLabelKey]; ok {
38+
return true
39+
}
40+
}
41+
return false
42+
},
43+
corev1.SchemeGroupVersion.WithResource("serviceaccounts"): func(object metav1.Object) bool {
44+
return HasOLMOwnerRef(object) || HasOLMLabel(object)
45+
},
46+
batchv1.SchemeGroupVersion.WithResource("jobs"): func(object metav1.Object) bool {
47+
job, ok := object.(*batchv1.Job)
48+
if !ok {
49+
return false
50+
}
51+
for _, container := range job.Spec.Template.Spec.Containers {
52+
if strings.Join(container.Command[0:3], " ") == "opm alpha bundle extract" {
53+
return true
54+
}
55+
}
56+
return false
57+
},
58+
appsv1.SchemeGroupVersion.WithResource("deployments"): HasOLMOwnerRef,
59+
rbacv1.SchemeGroupVersion.WithResource("roles"): HasOLMOwnerRef,
60+
rbacv1.SchemeGroupVersion.WithResource("rolebindings"): HasOLMOwnerRef,
61+
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): HasOLMOwnerRef,
62+
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): HasOLMOwnerRef,
63+
apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"): func(object metav1.Object) bool {
64+
for key := range object.GetAnnotations() {
65+
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
66+
return true
67+
}
68+
}
69+
return false
70+
},
71+
}
72+
73+
func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
74+
okLock := sync.Mutex{}
75+
var ok bool
76+
g, ctx := errgroup.WithContext(ctx)
77+
for gvr, filter := range filters {
78+
gvr, filter := gvr, filter
79+
g.Go(func() error {
80+
list, err := metadataClient.Resource(gvr).List(ctx, metav1.ListOptions{})
81+
if err != nil {
82+
return fmt.Errorf("failed to list %s: %w", gvr.String(), err)
83+
}
84+
var count int
85+
for _, item := range list.Items {
86+
if filter(&item) && !hasLabel(&item) {
87+
count++
88+
}
89+
}
90+
if count > 0 {
91+
logger.WithFields(logrus.Fields{
92+
"gvr": gvr.String(),
93+
"nonconforming": count,
94+
}).Info("found nonconforming items")
95+
}
96+
okLock.Lock()
97+
ok = ok && count == 0
98+
okLock.Unlock()
99+
return nil
100+
})
101+
}
102+
if err := g.Wait(); err != nil {
103+
return false, err
104+
}
105+
return ok, nil
106+
}

pkg/controller/operators/olm/operator.go

+33-8
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
141141
return nil, err
142142
}
143143

144+
canFilter, err := labeller.Validate(ctx, config.logger, config.metadataClient)
145+
if err != nil {
146+
return nil, err
147+
}
148+
144149
op := &Operator{
145150
Operator: queueOperator,
146151
clock: config.clock,
@@ -315,7 +320,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
315320
}
316321

317322
// Wire Deployments
318-
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace))
323+
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption {
324+
opts := []informers.SharedInformerOption{
325+
informers.WithNamespace(namespace),
326+
}
327+
if canFilter {
328+
opts = append(opts, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
329+
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
330+
}))
331+
}
332+
return opts
333+
}()...)
319334
depInformer := k8sInformerFactory.Apps().V1().Deployments()
320335
informersByNamespace[namespace].DeploymentInformer = depInformer
321336
op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister())
@@ -455,8 +470,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
455470
return nil
456471
}
457472

458-
if err := labelObjects(appsv1.SchemeGroupVersion.WithResource("deployments"), informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
459-
ctx, op.logger, labeller.HasOLMOwnerRef,
473+
deploymentsgvk := appsv1.SchemeGroupVersion.WithResource("deployments")
474+
if err := labelObjects(deploymentsgvk, informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
475+
ctx, op.logger, labeller.Filter(deploymentsgvk),
460476
appsv1applyconfigurations.Deployment,
461477
func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) {
462478
return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts)
@@ -502,7 +518,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
502518
return nil, err
503519
}
504520

505-
k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
521+
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption {
522+
if !canFilter {
523+
return nil
524+
}
525+
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
526+
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
527+
})}
528+
}()...)
506529
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
507530
informersByNamespace[metav1.NamespaceAll].ClusterRoleInformer = clusterRoleInformer
508531
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
@@ -519,8 +542,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
519542
return nil, err
520543
}
521544

522-
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterroles"), clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
523-
ctx, op.logger, labeller.HasOLMOwnerRef,
545+
clusterrolesgvk := rbacv1.SchemeGroupVersion.WithResource("clusterroles")
546+
if err := labelObjects(clusterrolesgvk, clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
547+
ctx, op.logger, labeller.Filter(clusterrolesgvk),
524548
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration {
525549
return rbacv1applyconfigurations.ClusterRole(name)
526550
},
@@ -547,8 +571,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
547571
return nil, err
548572
}
549573

550-
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
551-
ctx, op.logger, labeller.HasOLMOwnerRef,
574+
clusterrolebindingssgvk := rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings")
575+
if err := labelObjects(clusterrolebindingssgvk, clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
576+
ctx, op.logger, labeller.Filter(clusterrolebindingssgvk),
552577
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration {
553578
return rbacv1applyconfigurations.ClusterRoleBinding(name)
554579
},

0 commit comments

Comments
 (0)