Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5effa46

Browse files
committedAug 31, 2023
*: label k8s objects we own
Signed-off-by: Steve Kuznetsov <[email protected]>
1 parent 9e7031f commit 5effa46

File tree

6 files changed

+340
-7
lines changed

6 files changed

+340
-7
lines changed
 

‎go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/coreos/go-semver v0.3.0
88
github.com/davecgh/go-spew v1.1.1
99
github.com/distribution/distribution v2.7.1+incompatible
10+
github.com/evanphx/json-patch v5.6.0+incompatible
1011
github.com/fsnotify/fsnotify v1.6.0
1112
github.com/ghodss/yaml v1.0.0
1213
github.com/go-air/gini v1.0.4
@@ -95,7 +96,6 @@ require (
9596
github.com/docker/go-metrics v0.0.1 // indirect
9697
github.com/docker/go-units v0.5.0 // indirect
9798
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
98-
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
9999
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
100100
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
101101
github.com/fatih/color v1.13.0 // indirect

‎pkg/controller/operators/catalog/operator.go

+120
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
15+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
1416
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
1517
errorwrap "github.com/pkg/errors"
1618
"github.com/sirupsen/logrus"
1719
"google.golang.org/grpc/connectivity"
20+
batchv1 "k8s.io/api/batch/v1"
1821
corev1 "k8s.io/api/core/v1"
1922
rbacv1 "k8s.io/api/rbac/v1"
2023
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
@@ -32,6 +35,9 @@ import (
3235
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3336
"k8s.io/apimachinery/pkg/util/validation/field"
3437
"k8s.io/apimachinery/pkg/util/yaml"
38+
batchv1applyconfigurations "k8s.io/client-go/applyconfigurations/batch/v1"
39+
corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1"
40+
rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
3541
"k8s.io/client-go/dynamic"
3642
"k8s.io/client-go/informers"
3743
"k8s.io/client-go/metadata"
@@ -103,6 +109,7 @@ type Operator struct {
103109
client versioned.Interface
104110
dynamicClient dynamic.Interface
105111
lister operatorlister.OperatorLister
112+
k8sLabelQueueSets map[schema.GroupVersionResource]workqueue.RateLimitingInterface
106113
catsrcQueueSet *queueinformer.ResourceQueueSet
107114
subQueueSet *queueinformer.ResourceQueueSet
108115
ipQueueSet *queueinformer.ResourceQueueSet
@@ -191,6 +198,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
191198
lister: lister,
192199
namespace: operatorNamespace,
193200
recorder: eventRecorder,
201+
k8sLabelQueueSets: map[schema.GroupVersionResource]workqueue.RateLimitingInterface{},
194202
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
195203
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
196204
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
@@ -363,21 +371,85 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
363371
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
364372
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())
365373

374+
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
375+
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
376+
Name: gvr.String(),
377+
})
378+
queueInformer, err := queueinformer.NewQueueInformer(
379+
ctx,
380+
queueinformer.WithLogger(op.logger),
381+
queueinformer.WithInformer(informer),
382+
queueinformer.WithSyncer(sync.ToSyncer()),
383+
)
384+
if err != nil {
385+
return err
386+
}
387+
388+
if err := op.RegisterQueueInformer(queueInformer); err != nil {
389+
return err
390+
}
391+
392+
return nil
393+
}
394+
395+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
396+
ctx, op.logger, labeller.HasOLMOwnerRef,
397+
rbacv1applyconfigurations.Role,
398+
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
399+
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
400+
},
401+
)); err != nil {
402+
return nil, err
403+
}
404+
366405
// Wire RoleBindings
367406
roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings()
368407
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
369408
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())
370409

410+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
411+
ctx, op.logger, labeller.HasOLMOwnerRef,
412+
rbacv1applyconfigurations.RoleBinding,
413+
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
414+
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
415+
},
416+
)); err != nil {
417+
return nil, err
418+
}
419+
371420
// Wire ServiceAccounts
372421
serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts()
373422
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
374423
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())
375424

425+
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
426+
ctx, op.logger, func(object metav1.Object) bool {
427+
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
428+
},
429+
corev1applyconfigurations.ServiceAccount,
430+
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
431+
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
432+
},
433+
)); err != nil {
434+
return nil, err
435+
}
436+
376437
// Wire Services
377438
serviceInformer := k8sInformerFactory.Core().V1().Services()
378439
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
379440
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())
380441

442+
// TODO(skuznets): some services don't seem to have any marker to key off of, but they match the operator name
443+
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
444+
ctx, op.logger, labeller.HasOLMOwnerRef,
445+
corev1applyconfigurations.Service,
446+
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
447+
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
448+
},
449+
)); err != nil {
450+
return nil, err
451+
}
452+
381453
// Wire Pods for CatalogSource
382454
catsrcReq, err := labels.NewRequirement(reconciler.CatalogSourceLabelKey, selection.Exists, nil)
383455
if err != nil {
@@ -392,6 +464,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
392464
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
393465
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())
394466

467+
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
468+
ctx, op.logger, func(object metav1.Object) bool {
469+
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
470+
return ok
471+
},
472+
corev1applyconfigurations.Pod,
473+
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
474+
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
475+
},
476+
)); err != nil {
477+
return nil, err
478+
}
479+
395480
// Wire Pods for BundleUnpack job
396481
buReq, err := labels.NewRequirement(bundle.BundleUnpackPodLabel, selection.Exists, nil)
397482
if err != nil {
@@ -416,6 +501,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
416501
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
417502
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())
418503

504+
if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
505+
ctx, op.logger, func(object metav1.Object) bool {
506+
job, ok := object.(*batchv1.Job)
507+
if !ok {
508+
return false
509+
}
510+
for _, container := range job.Spec.Template.Spec.Containers {
511+
if strings.Join(container.Command[0:3], " ") == "opm alpha bundle extract" {
512+
return true
513+
}
514+
}
515+
return false
516+
},
517+
batchv1applyconfigurations.Job,
518+
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
519+
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
520+
},
521+
)); err != nil {
522+
return nil, err
523+
}
524+
419525
// Generate and register QueueInformers for k8s resources
420526
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
421527
for _, informer := range sharedIndexInformers {
@@ -480,6 +586,20 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
480586
return nil, err
481587
}
482588

589+
if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler(
590+
ctx, op.logger, func(object metav1.Object) bool {
591+
for key := range object.GetAnnotations() {
592+
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
593+
return true
594+
}
595+
}
596+
return false
597+
},
598+
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
599+
)); err != nil {
600+
return nil, err
601+
}
602+
483603
// Namespace sync for resolving subscriptions
484604
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
485605
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())

‎pkg/controller/operators/internal/alongside/alongside.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
const (
13-
prefix = "operatorframework.io/installed-alongside-"
13+
AnnotationPrefix = "operatorframework.io/installed-alongside-"
1414
)
1515

1616
// NamespacedName is a reference to an object by namespace and name.
@@ -33,7 +33,7 @@ type Annotator struct{}
3333
func (a Annotator) FromObject(o Annotatable) []NamespacedName {
3434
var result []NamespacedName
3535
for k, v := range o.GetAnnotations() {
36-
if !strings.HasPrefix(k, prefix) {
36+
if !strings.HasPrefix(k, AnnotationPrefix) {
3737
continue
3838
}
3939
tokens := strings.Split(v, "/")
@@ -55,7 +55,7 @@ func (a Annotator) ToObject(o Annotatable, nns []NamespacedName) {
5555
annotations := o.GetAnnotations()
5656

5757
for key := range annotations {
58-
if strings.HasPrefix(key, prefix) {
58+
if strings.HasPrefix(key, AnnotationPrefix) {
5959
delete(annotations, key)
6060
}
6161
}
@@ -82,5 +82,5 @@ func key(n NamespacedName) string {
8282
hasher.Write([]byte(n.Namespace))
8383
hasher.Write([]byte{'/'})
8484
hasher.Write([]byte(n.Name))
85-
return fmt.Sprintf("%s%x", prefix, hasher.Sum64())
85+
return fmt.Sprintf("%s%x", AnnotationPrefix, hasher.Sum64())
8686
}

‎pkg/controller/operators/internal/alongside/alongside_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestAnnotatorFromObject(t *testing.T) {
2323
NamespacedNames []NamespacedName
2424
}{
2525
{
26-
Name: "annotation without prefix ignored",
26+
Name: "annotation without AnnotationPrefix ignored",
2727
Object: TestAnnotatable{
2828
"foo": "namespace/name",
2929
},
@@ -66,7 +66,7 @@ func TestAnnotatorToObject(t *testing.T) {
6666
},
6767
},
6868
{
69-
Name: "annotation without prefix ignored",
69+
Name: "annotation without AnnotationPrefix ignored",
7070
Object: TestAnnotatable{
7171
"operatorframework.io/something-else": "",
7272
},
+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package labeller
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"strings"
8+
9+
jsonpatch "github.com/evanphx/json-patch"
10+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
11+
"github.com/sirupsen/logrus"
12+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
13+
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/runtime/schema"
16+
"k8s.io/apimachinery/pkg/types"
17+
18+
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
19+
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
20+
operatorsv1alpha2 "github.com/operator-framework/api/pkg/operators/v1alpha2"
21+
22+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
23+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/decorators"
24+
)
25+
26+
type ApplyConfig[T any] interface {
27+
WithLabels(map[string]string) T
28+
}
29+
30+
type Client[A ApplyConfig[A], T metav1.Object] interface {
31+
Apply(ctx context.Context, cfg ApplyConfig[A], opts metav1.ApplyOptions) (result T, err error)
32+
}
33+
34+
func hasLabel(obj metav1.Object) bool {
35+
value, ok := obj.GetLabels()[install.OLMManagedLabelKey]
36+
return ok && value == install.OLMManagedLabelValue
37+
}
38+
39+
func ObjectLabeler[T metav1.Object, A ApplyConfig[A]](
40+
ctx context.Context,
41+
logger *logrus.Logger,
42+
check func(metav1.Object) bool,
43+
applyConfigFor func(name, namespace string) A,
44+
apply func(namespace string, ctx context.Context, cfg A, opts metav1.ApplyOptions) (T, error),
45+
) queueinformer.LegacySyncHandler {
46+
return func(obj interface{}) error {
47+
cast, ok := obj.(T)
48+
if !ok {
49+
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
50+
logger.WithError(err).Error("casting failed")
51+
return fmt.Errorf("casting failed: %w", err)
52+
}
53+
54+
if !check(cast) || hasLabel(cast) {
55+
return nil
56+
}
57+
58+
cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
59+
cfg.WithLabels(map[string]string{
60+
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
61+
})
62+
63+
_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
64+
return err
65+
}
66+
}
67+
68+
// CRDs did not have applyconfigurations generated for them on accident, we can remove this when
69+
// https://github.com/kubernetes/kubernetes/pull/120177 lands
70+
func ObjectPatchLabeler(
71+
ctx context.Context,
72+
logger *logrus.Logger,
73+
check func(metav1.Object) bool,
74+
patch func(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *apiextensionsv1.CustomResourceDefinition, err error),
75+
) func(
76+
obj interface{},
77+
) error {
78+
return func(obj interface{}) error {
79+
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
80+
if !ok {
81+
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
82+
logger.WithError(err).Error("casting failed")
83+
return fmt.Errorf("casting failed: %w", err)
84+
}
85+
86+
if !check(cast) || hasLabel(cast) {
87+
return nil
88+
}
89+
90+
uid := cast.GetUID()
91+
rv := cast.GetResourceVersion()
92+
93+
// to ensure they appear in the patch as preconditions
94+
previous := cast.DeepCopy()
95+
previous.SetUID("")
96+
previous.SetResourceVersion("")
97+
98+
oldData, err := json.Marshal(previous)
99+
if err != nil {
100+
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
101+
}
102+
103+
// to ensure they appear in the patch as preconditions
104+
updated := cast.DeepCopy()
105+
updated.SetUID(uid)
106+
updated.SetResourceVersion(rv)
107+
labels := updated.GetLabels()
108+
if labels == nil {
109+
labels = map[string]string{}
110+
}
111+
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
112+
updated.SetLabels(labels)
113+
114+
newData, err := json.Marshal(updated)
115+
if err != nil {
116+
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
117+
}
118+
119+
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
120+
if err != nil {
121+
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
122+
}
123+
124+
_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
125+
return err
126+
}
127+
}
128+
129+
func HasOLMOwnerRef(object metav1.Object) bool {
130+
for _, ref := range object.GetOwnerReferences() {
131+
for _, gv := range []schema.GroupVersion{
132+
operatorsv1.GroupVersion,
133+
operatorsv1alpha1.SchemeGroupVersion,
134+
operatorsv1alpha2.GroupVersion,
135+
} {
136+
if ref.APIVersion == gv.String() {
137+
return true
138+
}
139+
}
140+
}
141+
return false
142+
}
143+
144+
func HasOLMLabel(object metav1.Object) bool {
145+
for key := range object.GetLabels() {
146+
if strings.HasPrefix(key, decorators.ComponentLabelKeyPrefix) {
147+
return true
148+
}
149+
}
150+
return false
151+
}

‎pkg/controller/operators/olm/operator.go

+62
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
1011
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/plugins"
1112
"github.com/sirupsen/logrus"
1213
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
14+
appsv1 "k8s.io/api/apps/v1"
1315
corev1 "k8s.io/api/core/v1"
1416
rbacv1 "k8s.io/api/rbac/v1"
1517
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -18,9 +20,12 @@ import (
1820
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1921
"k8s.io/apimachinery/pkg/labels"
2022
"k8s.io/apimachinery/pkg/runtime"
23+
"k8s.io/apimachinery/pkg/runtime/schema"
2124
"k8s.io/apimachinery/pkg/selection"
2225
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2326
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27+
appsv1applyconfigurations "k8s.io/client-go/applyconfigurations/apps/v1"
28+
rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
2429
"k8s.io/client-go/informers"
2530
k8sscheme "k8s.io/client-go/kubernetes/scheme"
2631
"k8s.io/client-go/metadata/metadatainformer"
@@ -73,6 +78,7 @@ type Operator struct {
7378
opClient operatorclient.ClientInterface
7479
client versioned.Interface
7580
lister operatorlister.OperatorLister
81+
k8sLabelQueueSets map[schema.GroupVersionResource]workqueue.RateLimitingInterface
7682
protectedCopiedCSVNamespaces map[string]struct{}
7783
copiedCSVLister metadatalister.Lister
7884
ogQueueSet *queueinformer.ResourceQueueSet
@@ -151,6 +157,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
151157
resolver: config.strategyResolver,
152158
apiReconciler: config.apiReconciler,
153159
lister: lister,
160+
k8sLabelQueueSets: map[schema.GroupVersionResource]workqueue.RateLimitingInterface{},
154161
recorder: eventRecorder,
155162
apiLabeler: config.apiLabeler,
156163
csvIndexers: map[string]cache.Indexer{},
@@ -427,6 +434,37 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
427434
}
428435
}
429436

437+
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
438+
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
439+
Name: gvr.String(),
440+
})
441+
queueInformer, err := queueinformer.NewQueueInformer(
442+
ctx,
443+
queueinformer.WithLogger(op.logger),
444+
queueinformer.WithInformer(informer),
445+
queueinformer.WithSyncer(sync.ToSyncer()),
446+
)
447+
if err != nil {
448+
return err
449+
}
450+
451+
if err := op.RegisterQueueInformer(queueInformer); err != nil {
452+
return err
453+
}
454+
455+
return nil
456+
}
457+
458+
if err := labelObjects(appsv1.SchemeGroupVersion.WithResource("deployments"), informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
459+
ctx, op.logger, labeller.HasOLMOwnerRef,
460+
appsv1applyconfigurations.Deployment,
461+
func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) {
462+
return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts)
463+
},
464+
)); err != nil {
465+
return nil, err
466+
}
467+
430468
// add queue for all namespaces as well
431469
objGCQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/obj-gc", ""))
432470
op.objGCQueueSet.Set("", objGCQueue)
@@ -481,6 +519,18 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
481519
return nil, err
482520
}
483521

522+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterroles"), clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
523+
ctx, op.logger, labeller.HasOLMOwnerRef,
524+
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration {
525+
return rbacv1applyconfigurations.ClusterRole(name)
526+
},
527+
func(_ string, ctx context.Context, cfg *rbacv1applyconfigurations.ClusterRoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.ClusterRole, error) {
528+
return op.opClient.KubernetesInterface().RbacV1().ClusterRoles().Apply(ctx, cfg, opts)
529+
},
530+
)); err != nil {
531+
return nil, err
532+
}
533+
484534
clusterRoleBindingInformer := k8sInformerFactory.Rbac().V1().ClusterRoleBindings()
485535
informersByNamespace[metav1.NamespaceAll].ClusterRoleBindingInformer = clusterRoleBindingInformer
486536
op.lister.RbacV1().RegisterClusterRoleBindingLister(clusterRoleBindingInformer.Lister())
@@ -497,6 +547,18 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
497547
return nil, err
498548
}
499549

550+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
551+
ctx, op.logger, labeller.HasOLMOwnerRef,
552+
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration {
553+
return rbacv1applyconfigurations.ClusterRoleBinding(name)
554+
},
555+
func(_ string, ctx context.Context, cfg *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.ClusterRoleBinding, error) {
556+
return op.opClient.KubernetesInterface().RbacV1().ClusterRoleBindings().Apply(ctx, cfg, opts)
557+
},
558+
)); err != nil {
559+
return nil, err
560+
}
561+
500562
// register namespace queueinformer
501563
namespaceInformer := k8sInformerFactory.Core().V1().Namespaces()
502564
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer

0 commit comments

Comments
 (0)
Please sign in to comment.