Skip to content

Commit c55c24d

Browse files
authoredSep 5, 2023
*: label k8s objects we own (#3020)
Signed-off-by: Steve Kuznetsov <[email protected]>
1 parent 6e0d407 commit c55c24d

File tree

6 files changed

+348
-7
lines changed

6 files changed

+348
-7
lines changed
 

Diff for: ‎go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/coreos/go-semver v0.3.0
88
github.com/davecgh/go-spew v1.1.1
99
github.com/distribution/distribution v2.7.1+incompatible
10+
github.com/evanphx/json-patch v5.6.0+incompatible
1011
github.com/fsnotify/fsnotify v1.6.0
1112
github.com/ghodss/yaml v1.0.0
1213
github.com/go-air/gini v1.0.4
@@ -95,7 +96,6 @@ require (
9596
github.com/docker/go-metrics v0.0.1 // indirect
9697
github.com/docker/go-units v0.5.0 // indirect
9798
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
98-
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
9999
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
100100
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
101101
github.com/fatih/color v1.13.0 // indirect

Diff for: ‎pkg/controller/operators/catalog/operator.go

+119
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
15+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
1416
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
1517
errorwrap "github.com/pkg/errors"
1618
"github.com/sirupsen/logrus"
1719
"google.golang.org/grpc/connectivity"
20+
batchv1 "k8s.io/api/batch/v1"
1821
corev1 "k8s.io/api/core/v1"
1922
rbacv1 "k8s.io/api/rbac/v1"
2023
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
@@ -32,6 +35,9 @@ import (
3235
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3336
"k8s.io/apimachinery/pkg/util/validation/field"
3437
"k8s.io/apimachinery/pkg/util/yaml"
38+
batchv1applyconfigurations "k8s.io/client-go/applyconfigurations/batch/v1"
39+
corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1"
40+
rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
3541
"k8s.io/client-go/dynamic"
3642
"k8s.io/client-go/informers"
3743
"k8s.io/client-go/metadata"
@@ -103,6 +109,7 @@ type Operator struct {
103109
client versioned.Interface
104110
dynamicClient dynamic.Interface
105111
lister operatorlister.OperatorLister
112+
k8sLabelQueueSets map[schema.GroupVersionResource]workqueue.RateLimitingInterface
106113
catsrcQueueSet *queueinformer.ResourceQueueSet
107114
subQueueSet *queueinformer.ResourceQueueSet
108115
ipQueueSet *queueinformer.ResourceQueueSet
@@ -191,6 +198,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
191198
lister: lister,
192199
namespace: operatorNamespace,
193200
recorder: eventRecorder,
201+
k8sLabelQueueSets: map[schema.GroupVersionResource]workqueue.RateLimitingInterface{},
194202
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
195203
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
196204
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
@@ -363,21 +371,84 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
363371
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
364372
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())
365373

374+
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
375+
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
376+
Name: gvr.String(),
377+
})
378+
queueInformer, err := queueinformer.NewQueueInformer(
379+
ctx,
380+
queueinformer.WithLogger(op.logger),
381+
queueinformer.WithInformer(informer),
382+
queueinformer.WithSyncer(sync.ToSyncer()),
383+
)
384+
if err != nil {
385+
return err
386+
}
387+
388+
if err := op.RegisterQueueInformer(queueInformer); err != nil {
389+
return err
390+
}
391+
392+
return nil
393+
}
394+
395+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
396+
ctx, op.logger, labeller.HasOLMOwnerRef,
397+
rbacv1applyconfigurations.Role,
398+
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
399+
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
400+
},
401+
)); err != nil {
402+
return nil, err
403+
}
404+
366405
// Wire RoleBindings
367406
roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings()
368407
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
369408
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())
370409

410+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
411+
ctx, op.logger, labeller.HasOLMOwnerRef,
412+
rbacv1applyconfigurations.RoleBinding,
413+
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
414+
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
415+
},
416+
)); err != nil {
417+
return nil, err
418+
}
419+
371420
// Wire ServiceAccounts
372421
serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts()
373422
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
374423
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())
375424

425+
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
426+
ctx, op.logger, func(object metav1.Object) bool {
427+
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
428+
},
429+
corev1applyconfigurations.ServiceAccount,
430+
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
431+
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
432+
},
433+
)); err != nil {
434+
return nil, err
435+
}
436+
376437
// Wire Services
377438
serviceInformer := k8sInformerFactory.Core().V1().Services()
378439
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
379440
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())
380441

442+
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
443+
ctx, op.logger, labeller.HasOLMOwnerRef,
444+
corev1applyconfigurations.Service,
445+
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
446+
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
447+
},
448+
)); err != nil {
449+
return nil, err
450+
}
451+
381452
// Wire Pods for CatalogSource
382453
catsrcReq, err := labels.NewRequirement(reconciler.CatalogSourceLabelKey, selection.Exists, nil)
383454
if err != nil {
@@ -392,6 +463,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
392463
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
393464
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())
394465

466+
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
467+
ctx, op.logger, func(object metav1.Object) bool {
468+
_, ok := object.GetLabels()[reconciler.CatalogSourceLabelKey]
469+
return ok
470+
},
471+
corev1applyconfigurations.Pod,
472+
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
473+
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
474+
},
475+
)); err != nil {
476+
return nil, err
477+
}
478+
395479
// Wire Pods for BundleUnpack job
396480
buReq, err := labels.NewRequirement(bundle.BundleUnpackPodLabel, selection.Exists, nil)
397481
if err != nil {
@@ -416,6 +500,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
416500
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
417501
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())
418502

503+
if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
504+
ctx, op.logger, func(object metav1.Object) bool {
505+
for _, ownerRef := range object.GetOwnerReferences() {
506+
if ownerRef.APIVersion == corev1.SchemeGroupVersion.String() && ownerRef.Kind == "ConfigMap" {
507+
cm, err := configMapInformer.Lister().ConfigMaps(object.GetNamespace()).Get(ownerRef.Name)
508+
if err != nil {
509+
return false
510+
}
511+
return labeller.HasOLMOwnerRef(cm)
512+
}
513+
}
514+
return false
515+
},
516+
batchv1applyconfigurations.Job,
517+
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
518+
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
519+
},
520+
)); err != nil {
521+
return nil, err
522+
}
523+
419524
// Generate and register QueueInformers for k8s resources
420525
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
421526
for _, informer := range sharedIndexInformers {
@@ -480,6 +585,20 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
480585
return nil, err
481586
}
482587

588+
if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler(
589+
ctx, op.logger, func(object metav1.Object) bool {
590+
for key := range object.GetAnnotations() {
591+
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
592+
return true
593+
}
594+
}
595+
return false
596+
},
597+
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
598+
)); err != nil {
599+
return nil, err
600+
}
601+
483602
// Namespace sync for resolving subscriptions
484603
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
485604
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())

Diff for: ‎pkg/controller/operators/internal/alongside/alongside.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
const (
13-
prefix = "operatorframework.io/installed-alongside-"
13+
AnnotationPrefix = "operatorframework.io/installed-alongside-"
1414
)
1515

1616
// NamespacedName is a reference to an object by namespace and name.
@@ -33,7 +33,7 @@ type Annotator struct{}
3333
func (a Annotator) FromObject(o Annotatable) []NamespacedName {
3434
var result []NamespacedName
3535
for k, v := range o.GetAnnotations() {
36-
if !strings.HasPrefix(k, prefix) {
36+
if !strings.HasPrefix(k, AnnotationPrefix) {
3737
continue
3838
}
3939
tokens := strings.Split(v, "/")
@@ -55,7 +55,7 @@ func (a Annotator) ToObject(o Annotatable, nns []NamespacedName) {
5555
annotations := o.GetAnnotations()
5656

5757
for key := range annotations {
58-
if strings.HasPrefix(key, prefix) {
58+
if strings.HasPrefix(key, AnnotationPrefix) {
5959
delete(annotations, key)
6060
}
6161
}
@@ -82,5 +82,5 @@ func key(n NamespacedName) string {
8282
hasher.Write([]byte(n.Namespace))
8383
hasher.Write([]byte{'/'})
8484
hasher.Write([]byte(n.Name))
85-
return fmt.Sprintf("%s%x", prefix, hasher.Sum64())
85+
return fmt.Sprintf("%s%x", AnnotationPrefix, hasher.Sum64())
8686
}

Diff for: ‎pkg/controller/operators/internal/alongside/alongside_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestAnnotatorFromObject(t *testing.T) {
2323
NamespacedNames []NamespacedName
2424
}{
2525
{
26-
Name: "annotation without prefix ignored",
26+
Name: "annotation without AnnotationPrefix ignored",
2727
Object: TestAnnotatable{
2828
"foo": "namespace/name",
2929
},
@@ -66,7 +66,7 @@ func TestAnnotatorToObject(t *testing.T) {
6666
},
6767
},
6868
{
69-
Name: "annotation without prefix ignored",
69+
Name: "annotation without AnnotationPrefix ignored",
7070
Object: TestAnnotatable{
7171
"operatorframework.io/something-else": "",
7272
},

Diff for: ‎pkg/controller/operators/labeller/labels.go

+160
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package labeller
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"strings"
8+
9+
jsonpatch "github.com/evanphx/json-patch"
10+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
11+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
12+
"github.com/sirupsen/logrus"
13+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
14+
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
"k8s.io/apimachinery/pkg/runtime/schema"
17+
"k8s.io/apimachinery/pkg/types"
18+
19+
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
20+
operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1"
21+
operatorsv1alpha2 "github.com/operator-framework/api/pkg/operators/v1alpha2"
22+
23+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
24+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/decorators"
25+
)
26+
27+
type ApplyConfig[T any] interface {
28+
WithLabels(map[string]string) T
29+
}
30+
31+
type Client[A ApplyConfig[A], T metav1.Object] interface {
32+
Apply(ctx context.Context, cfg ApplyConfig[A], opts metav1.ApplyOptions) (result T, err error)
33+
}
34+
35+
func hasLabel(obj metav1.Object) bool {
36+
value, ok := obj.GetLabels()[install.OLMManagedLabelKey]
37+
return ok && value == install.OLMManagedLabelValue
38+
}
39+
40+
func ObjectLabeler[T metav1.Object, A ApplyConfig[A]](
41+
ctx context.Context,
42+
logger *logrus.Logger,
43+
check func(metav1.Object) bool,
44+
applyConfigFor func(name, namespace string) A,
45+
apply func(namespace string, ctx context.Context, cfg A, opts metav1.ApplyOptions) (T, error),
46+
) queueinformer.LegacySyncHandler {
47+
return func(obj interface{}) error {
48+
cast, ok := obj.(T)
49+
if !ok {
50+
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
51+
logger.WithError(err).Error("casting failed")
52+
return fmt.Errorf("casting failed: %w", err)
53+
}
54+
55+
if !check(cast) || hasLabel(cast) {
56+
return nil
57+
}
58+
59+
cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
60+
cfg.WithLabels(map[string]string{
61+
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
62+
})
63+
64+
_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
65+
return err
66+
}
67+
}
68+
69+
// CRDs did not have applyconfigurations generated for them on accident, we can remove this when
70+
// https://github.com/kubernetes/kubernetes/pull/120177 lands
71+
func ObjectPatchLabeler(
72+
ctx context.Context,
73+
logger *logrus.Logger,
74+
check func(metav1.Object) bool,
75+
patch func(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *apiextensionsv1.CustomResourceDefinition, err error),
76+
) func(
77+
obj interface{},
78+
) error {
79+
return func(obj interface{}) error {
80+
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
81+
if !ok {
82+
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj)
83+
logger.WithError(err).Error("casting failed")
84+
return fmt.Errorf("casting failed: %w", err)
85+
}
86+
87+
if !check(cast) || hasLabel(cast) {
88+
return nil
89+
}
90+
91+
uid := cast.GetUID()
92+
rv := cast.GetResourceVersion()
93+
94+
// to ensure they appear in the patch as preconditions
95+
previous := cast.DeepCopy()
96+
previous.SetUID("")
97+
previous.SetResourceVersion("")
98+
99+
oldData, err := json.Marshal(previous)
100+
if err != nil {
101+
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
102+
}
103+
104+
// to ensure they appear in the patch as preconditions
105+
updated := cast.DeepCopy()
106+
updated.SetUID(uid)
107+
updated.SetResourceVersion(rv)
108+
labels := updated.GetLabels()
109+
if labels == nil {
110+
labels = map[string]string{}
111+
}
112+
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
113+
updated.SetLabels(labels)
114+
115+
newData, err := json.Marshal(updated)
116+
if err != nil {
117+
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
118+
}
119+
120+
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
121+
if err != nil {
122+
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
123+
}
124+
125+
_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
126+
return err
127+
}
128+
}
129+
130+
// HasOLMOwnerRef determines if an object is owned by another object in the OLM Groups.
131+
// This checks both classical OwnerRefs and the "OLM OwnerRef" in labels to handle
132+
// cluster-scoped resources.
133+
func HasOLMOwnerRef(object metav1.Object) bool {
134+
for _, ref := range object.GetOwnerReferences() {
135+
for _, gv := range []schema.GroupVersion{
136+
operatorsv1.GroupVersion,
137+
operatorsv1alpha1.SchemeGroupVersion,
138+
operatorsv1alpha2.GroupVersion,
139+
} {
140+
if ref.APIVersion == gv.String() {
141+
return true
142+
}
143+
}
144+
}
145+
hasOLMOwnerLabels := true
146+
for _, label := range []string{ownerutil.OwnerKey, ownerutil.OwnerNamespaceKey, ownerutil.OwnerKind} {
147+
_, exists := object.GetLabels()[label]
148+
hasOLMOwnerLabels = hasOLMOwnerLabels && exists
149+
}
150+
return hasOLMOwnerLabels
151+
}
152+
153+
func HasOLMLabel(object metav1.Object) bool {
154+
for key := range object.GetLabels() {
155+
if strings.HasPrefix(key, decorators.ComponentLabelKeyPrefix) {
156+
return true
157+
}
158+
}
159+
return false
160+
}

Diff for: ‎pkg/controller/operators/olm/operator.go

+62
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
1011
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/plugins"
1112
"github.com/sirupsen/logrus"
1213
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
14+
appsv1 "k8s.io/api/apps/v1"
1315
corev1 "k8s.io/api/core/v1"
1416
rbacv1 "k8s.io/api/rbac/v1"
1517
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -18,9 +20,12 @@ import (
1820
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1921
"k8s.io/apimachinery/pkg/labels"
2022
"k8s.io/apimachinery/pkg/runtime"
23+
"k8s.io/apimachinery/pkg/runtime/schema"
2124
"k8s.io/apimachinery/pkg/selection"
2225
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2326
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27+
appsv1applyconfigurations "k8s.io/client-go/applyconfigurations/apps/v1"
28+
rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
2429
"k8s.io/client-go/informers"
2530
k8sscheme "k8s.io/client-go/kubernetes/scheme"
2631
"k8s.io/client-go/metadata/metadatainformer"
@@ -73,6 +78,7 @@ type Operator struct {
7378
opClient operatorclient.ClientInterface
7479
client versioned.Interface
7580
lister operatorlister.OperatorLister
81+
k8sLabelQueueSets map[schema.GroupVersionResource]workqueue.RateLimitingInterface
7682
protectedCopiedCSVNamespaces map[string]struct{}
7783
copiedCSVLister metadatalister.Lister
7884
ogQueueSet *queueinformer.ResourceQueueSet
@@ -151,6 +157,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
151157
resolver: config.strategyResolver,
152158
apiReconciler: config.apiReconciler,
153159
lister: lister,
160+
k8sLabelQueueSets: map[schema.GroupVersionResource]workqueue.RateLimitingInterface{},
154161
recorder: eventRecorder,
155162
apiLabeler: config.apiLabeler,
156163
csvIndexers: map[string]cache.Indexer{},
@@ -427,6 +434,37 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
427434
}
428435
}
429436

437+
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
438+
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
439+
Name: gvr.String(),
440+
})
441+
queueInformer, err := queueinformer.NewQueueInformer(
442+
ctx,
443+
queueinformer.WithLogger(op.logger),
444+
queueinformer.WithInformer(informer),
445+
queueinformer.WithSyncer(sync.ToSyncer()),
446+
)
447+
if err != nil {
448+
return err
449+
}
450+
451+
if err := op.RegisterQueueInformer(queueInformer); err != nil {
452+
return err
453+
}
454+
455+
return nil
456+
}
457+
458+
if err := labelObjects(appsv1.SchemeGroupVersion.WithResource("deployments"), informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
459+
ctx, op.logger, labeller.HasOLMOwnerRef,
460+
appsv1applyconfigurations.Deployment,
461+
func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) {
462+
return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts)
463+
},
464+
)); err != nil {
465+
return nil, err
466+
}
467+
430468
// add queue for all namespaces as well
431469
objGCQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/obj-gc", ""))
432470
op.objGCQueueSet.Set("", objGCQueue)
@@ -481,6 +519,18 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
481519
return nil, err
482520
}
483521

522+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterroles"), clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
523+
ctx, op.logger, labeller.HasOLMOwnerRef,
524+
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration {
525+
return rbacv1applyconfigurations.ClusterRole(name)
526+
},
527+
func(_ string, ctx context.Context, cfg *rbacv1applyconfigurations.ClusterRoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.ClusterRole, error) {
528+
return op.opClient.KubernetesInterface().RbacV1().ClusterRoles().Apply(ctx, cfg, opts)
529+
},
530+
)); err != nil {
531+
return nil, err
532+
}
533+
484534
clusterRoleBindingInformer := k8sInformerFactory.Rbac().V1().ClusterRoleBindings()
485535
informersByNamespace[metav1.NamespaceAll].ClusterRoleBindingInformer = clusterRoleBindingInformer
486536
op.lister.RbacV1().RegisterClusterRoleBindingLister(clusterRoleBindingInformer.Lister())
@@ -497,6 +547,18 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
497547
return nil, err
498548
}
499549

550+
if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
551+
ctx, op.logger, labeller.HasOLMOwnerRef,
552+
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration {
553+
return rbacv1applyconfigurations.ClusterRoleBinding(name)
554+
},
555+
func(_ string, ctx context.Context, cfg *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.ClusterRoleBinding, error) {
556+
return op.opClient.KubernetesInterface().RbacV1().ClusterRoleBindings().Apply(ctx, cfg, opts)
557+
},
558+
)); err != nil {
559+
return nil, err
560+
}
561+
500562
// register namespace queueinformer
501563
namespaceInformer := k8sInformerFactory.Core().V1().Namespaces()
502564
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer

0 commit comments

Comments
 (0)
Please sign in to comment.