@@ -11,10 +11,13 @@ import (
11
11
"sync"
12
12
"time"
13
13
14
+ "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
15
+ "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
14
16
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
15
17
errorwrap "github.com/pkg/errors"
16
18
"github.com/sirupsen/logrus"
17
19
"google.golang.org/grpc/connectivity"
20
+ batchv1 "k8s.io/api/batch/v1"
18
21
corev1 "k8s.io/api/core/v1"
19
22
rbacv1 "k8s.io/api/rbac/v1"
20
23
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
@@ -32,6 +35,9 @@ import (
32
35
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33
36
"k8s.io/apimachinery/pkg/util/validation/field"
34
37
"k8s.io/apimachinery/pkg/util/yaml"
38
+ batchv1applyconfigurations "k8s.io/client-go/applyconfigurations/batch/v1"
39
+ corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1"
40
+ rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
35
41
"k8s.io/client-go/dynamic"
36
42
"k8s.io/client-go/informers"
37
43
"k8s.io/client-go/metadata"
@@ -103,6 +109,7 @@ type Operator struct {
103
109
client versioned.Interface
104
110
dynamicClient dynamic.Interface
105
111
lister operatorlister.OperatorLister
112
+ k8sLabelQueueSets map [schema.GroupVersionResource ]workqueue.RateLimitingInterface
106
113
catsrcQueueSet * queueinformer.ResourceQueueSet
107
114
subQueueSet * queueinformer.ResourceQueueSet
108
115
ipQueueSet * queueinformer.ResourceQueueSet
@@ -191,6 +198,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
191
198
lister : lister ,
192
199
namespace : operatorNamespace ,
193
200
recorder : eventRecorder ,
201
+ k8sLabelQueueSets : map [schema.GroupVersionResource ]workqueue.RateLimitingInterface {},
194
202
catsrcQueueSet : queueinformer .NewEmptyResourceQueueSet (),
195
203
subQueueSet : queueinformer .NewEmptyResourceQueueSet (),
196
204
ipQueueSet : queueinformer .NewEmptyResourceQueueSet (),
@@ -363,21 +371,84 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
363
371
op .lister .RbacV1 ().RegisterRoleLister (metav1 .NamespaceAll , roleInformer .Lister ())
364
372
sharedIndexInformers = append (sharedIndexInformers , roleInformer .Informer ())
365
373
374
+ labelObjects := func (gvr schema.GroupVersionResource , informer cache.SharedIndexInformer , sync queueinformer.LegacySyncHandler ) error {
375
+ op .k8sLabelQueueSets [gvr ] = workqueue .NewRateLimitingQueueWithConfig (workqueue .DefaultControllerRateLimiter (), workqueue.RateLimitingQueueConfig {
376
+ Name : gvr .String (),
377
+ })
378
+ queueInformer , err := queueinformer .NewQueueInformer (
379
+ ctx ,
380
+ queueinformer .WithLogger (op .logger ),
381
+ queueinformer .WithInformer (informer ),
382
+ queueinformer .WithSyncer (sync .ToSyncer ()),
383
+ )
384
+ if err != nil {
385
+ return err
386
+ }
387
+
388
+ if err := op .RegisterQueueInformer (queueInformer ); err != nil {
389
+ return err
390
+ }
391
+
392
+ return nil
393
+ }
394
+
395
+ if err := labelObjects (rbacv1 .SchemeGroupVersion .WithResource ("roles" ), roleInformer .Informer (), labeller .ObjectLabeler [* rbacv1.Role , * rbacv1applyconfigurations.RoleApplyConfiguration ](
396
+ ctx , op .logger , labeller .HasOLMOwnerRef ,
397
+ rbacv1applyconfigurations .Role ,
398
+ func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.Role , error ) {
399
+ return op .opClient .KubernetesInterface ().RbacV1 ().Roles (namespace ).Apply (ctx , cfg , opts )
400
+ },
401
+ )); err != nil {
402
+ return nil , err
403
+ }
404
+
366
405
// Wire RoleBindings
367
406
roleBindingInformer := k8sInformerFactory .Rbac ().V1 ().RoleBindings ()
368
407
op .lister .RbacV1 ().RegisterRoleBindingLister (metav1 .NamespaceAll , roleBindingInformer .Lister ())
369
408
sharedIndexInformers = append (sharedIndexInformers , roleBindingInformer .Informer ())
370
409
410
+ if err := labelObjects (rbacv1 .SchemeGroupVersion .WithResource ("rolebindings" ), roleBindingInformer .Informer (), labeller .ObjectLabeler [* rbacv1.RoleBinding , * rbacv1applyconfigurations.RoleBindingApplyConfiguration ](
411
+ ctx , op .logger , labeller .HasOLMOwnerRef ,
412
+ rbacv1applyconfigurations .RoleBinding ,
413
+ func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleBindingApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.RoleBinding , error ) {
414
+ return op .opClient .KubernetesInterface ().RbacV1 ().RoleBindings (namespace ).Apply (ctx , cfg , opts )
415
+ },
416
+ )); err != nil {
417
+ return nil , err
418
+ }
419
+
371
420
// Wire ServiceAccounts
372
421
serviceAccountInformer := k8sInformerFactory .Core ().V1 ().ServiceAccounts ()
373
422
op .lister .CoreV1 ().RegisterServiceAccountLister (metav1 .NamespaceAll , serviceAccountInformer .Lister ())
374
423
sharedIndexInformers = append (sharedIndexInformers , serviceAccountInformer .Informer ())
375
424
425
+ if err := labelObjects (corev1 .SchemeGroupVersion .WithResource ("serviceaccounts" ), serviceAccountInformer .Informer (), labeller .ObjectLabeler [* corev1.ServiceAccount , * corev1applyconfigurations.ServiceAccountApplyConfiguration ](
426
+ ctx , op .logger , func (object metav1.Object ) bool {
427
+ return labeller .HasOLMOwnerRef (object ) || labeller .HasOLMLabel (object )
428
+ },
429
+ corev1applyconfigurations .ServiceAccount ,
430
+ func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.ServiceAccountApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.ServiceAccount , error ) {
431
+ return op .opClient .KubernetesInterface ().CoreV1 ().ServiceAccounts (namespace ).Apply (ctx , cfg , opts )
432
+ },
433
+ )); err != nil {
434
+ return nil , err
435
+ }
436
+
376
437
// Wire Services
377
438
serviceInformer := k8sInformerFactory .Core ().V1 ().Services ()
378
439
op .lister .CoreV1 ().RegisterServiceLister (metav1 .NamespaceAll , serviceInformer .Lister ())
379
440
sharedIndexInformers = append (sharedIndexInformers , serviceInformer .Informer ())
380
441
442
+ if err := labelObjects (corev1 .SchemeGroupVersion .WithResource ("services" ), serviceInformer .Informer (), labeller .ObjectLabeler [* corev1.Service , * corev1applyconfigurations.ServiceApplyConfiguration ](
443
+ ctx , op .logger , labeller .HasOLMOwnerRef ,
444
+ corev1applyconfigurations .Service ,
445
+ func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.ServiceApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.Service , error ) {
446
+ return op .opClient .KubernetesInterface ().CoreV1 ().Services (namespace ).Apply (ctx , cfg , opts )
447
+ },
448
+ )); err != nil {
449
+ return nil , err
450
+ }
451
+
381
452
// Wire Pods for CatalogSource
382
453
catsrcReq , err := labels .NewRequirement (reconciler .CatalogSourceLabelKey , selection .Exists , nil )
383
454
if err != nil {
@@ -392,6 +463,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
392
463
op .lister .CoreV1 ().RegisterPodLister (metav1 .NamespaceAll , csPodInformer .Lister ())
393
464
sharedIndexInformers = append (sharedIndexInformers , csPodInformer .Informer ())
394
465
466
+ if err := labelObjects (corev1 .SchemeGroupVersion .WithResource ("pods" ), csPodInformer .Informer (), labeller .ObjectLabeler [* corev1.Pod , * corev1applyconfigurations.PodApplyConfiguration ](
467
+ ctx , op .logger , func (object metav1.Object ) bool {
468
+ _ , ok := object .GetLabels ()[reconciler .CatalogSourceLabelKey ]
469
+ return ok
470
+ },
471
+ corev1applyconfigurations .Pod ,
472
+ func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.PodApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.Pod , error ) {
473
+ return op .opClient .KubernetesInterface ().CoreV1 ().Pods (namespace ).Apply (ctx , cfg , opts )
474
+ },
475
+ )); err != nil {
476
+ return nil , err
477
+ }
478
+
395
479
// Wire Pods for BundleUnpack job
396
480
buReq , err := labels .NewRequirement (bundle .BundleUnpackPodLabel , selection .Exists , nil )
397
481
if err != nil {
@@ -416,6 +500,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
416
500
jobInformer := k8sInformerFactory .Batch ().V1 ().Jobs ()
417
501
sharedIndexInformers = append (sharedIndexInformers , jobInformer .Informer ())
418
502
503
+ if err := labelObjects (batchv1 .SchemeGroupVersion .WithResource ("jobs" ), jobInformer .Informer (), labeller .ObjectLabeler [* batchv1.Job , * batchv1applyconfigurations.JobApplyConfiguration ](
504
+ ctx , op .logger , func (object metav1.Object ) bool {
505
+ for _ , ownerRef := range object .GetOwnerReferences () {
506
+ if ownerRef .APIVersion == corev1 .SchemeGroupVersion .String () && ownerRef .Kind == "ConfigMap" {
507
+ cm , err := configMapInformer .Lister ().ConfigMaps (object .GetNamespace ()).Get (ownerRef .Name )
508
+ if err != nil {
509
+ return false
510
+ }
511
+ return labeller .HasOLMOwnerRef (cm )
512
+ }
513
+ }
514
+ return false
515
+ },
516
+ batchv1applyconfigurations .Job ,
517
+ func (namespace string , ctx context.Context , cfg * batchv1applyconfigurations.JobApplyConfiguration , opts metav1.ApplyOptions ) (* batchv1.Job , error ) {
518
+ return op .opClient .KubernetesInterface ().BatchV1 ().Jobs (namespace ).Apply (ctx , cfg , opts )
519
+ },
520
+ )); err != nil {
521
+ return nil , err
522
+ }
523
+
419
524
// Generate and register QueueInformers for k8s resources
420
525
k8sSyncer := queueinformer .LegacySyncHandler (op .syncObject ).ToSyncerWithDelete (op .handleDeletion )
421
526
for _ , informer := range sharedIndexInformers {
@@ -480,6 +585,20 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
480
585
return nil , err
481
586
}
482
587
588
+ if err := labelObjects (apiextensionsv1 .SchemeGroupVersion .WithResource ("customresourcedefinitions" ), crdInformer , labeller .ObjectPatchLabeler (
589
+ ctx , op .logger , func (object metav1.Object ) bool {
590
+ for key := range object .GetAnnotations () {
591
+ if strings .HasPrefix (key , alongside .AnnotationPrefix ) {
592
+ return true
593
+ }
594
+ }
595
+ return false
596
+ },
597
+ op .opClient .ApiextensionsInterface ().ApiextensionsV1 ().CustomResourceDefinitions ().Patch ,
598
+ )); err != nil {
599
+ return nil , err
600
+ }
601
+
483
602
// Namespace sync for resolving subscriptions
484
603
namespaceInformer := informers .NewSharedInformerFactory (op .opClient .KubernetesInterface (), resyncPeriod ()).Core ().V1 ().Namespaces ()
485
604
op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
0 commit comments