@@ -11,10 +11,13 @@ import (
11
11
"sync"
12
12
"time"
13
13
14
+ "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
15
+ "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
14
16
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
15
17
errorwrap "github.com/pkg/errors"
16
18
"github.com/sirupsen/logrus"
17
19
"google.golang.org/grpc/connectivity"
20
+ batchv1 "k8s.io/api/batch/v1"
18
21
corev1 "k8s.io/api/core/v1"
19
22
rbacv1 "k8s.io/api/rbac/v1"
20
23
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
@@ -32,6 +35,9 @@ import (
32
35
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33
36
"k8s.io/apimachinery/pkg/util/validation/field"
34
37
"k8s.io/apimachinery/pkg/util/yaml"
38
+ batchv1applyconfigurations "k8s.io/client-go/applyconfigurations/batch/v1"
39
+ corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1"
40
+ rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
35
41
"k8s.io/client-go/dynamic"
36
42
"k8s.io/client-go/informers"
37
43
"k8s.io/client-go/metadata"
@@ -103,6 +109,7 @@ type Operator struct {
103
109
client versioned.Interface
104
110
dynamicClient dynamic.Interface
105
111
lister operatorlister.OperatorLister
112
+ k8sLabelQueueSets map [schema.GroupVersionResource ]workqueue.RateLimitingInterface
106
113
catsrcQueueSet * queueinformer.ResourceQueueSet
107
114
subQueueSet * queueinformer.ResourceQueueSet
108
115
ipQueueSet * queueinformer.ResourceQueueSet
@@ -191,6 +198,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
191
198
lister : lister ,
192
199
namespace : operatorNamespace ,
193
200
recorder : eventRecorder ,
201
+ k8sLabelQueueSets : map [schema.GroupVersionResource ]workqueue.RateLimitingInterface {},
194
202
catsrcQueueSet : queueinformer .NewEmptyResourceQueueSet (),
195
203
subQueueSet : queueinformer .NewEmptyResourceQueueSet (),
196
204
ipQueueSet : queueinformer .NewEmptyResourceQueueSet (),
@@ -363,21 +371,85 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
363
371
op .lister .RbacV1 ().RegisterRoleLister (metav1 .NamespaceAll , roleInformer .Lister ())
364
372
sharedIndexInformers = append (sharedIndexInformers , roleInformer .Informer ())
365
373
374
+ labelObjects := func (gvr schema.GroupVersionResource , informer cache.SharedIndexInformer , sync func (obj interface {}) error ) error {
375
+ op .k8sLabelQueueSets [gvr ] = workqueue .NewRateLimitingQueueWithConfig (workqueue .DefaultControllerRateLimiter (), workqueue.RateLimitingQueueConfig {
376
+ Name : gvr .String (),
377
+ })
378
+ queueInformer , err := queueinformer .NewQueueInformer (
379
+ ctx ,
380
+ queueinformer .WithLogger (op .logger ),
381
+ queueinformer .WithInformer (informer ),
382
+ queueinformer .WithSyncer (queueinformer .LegacySyncHandler (sync ).ToSyncer ()),
383
+ )
384
+ if err != nil {
385
+ return err
386
+ }
387
+
388
+ if err := op .RegisterQueueInformer (queueInformer ); err != nil {
389
+ return err
390
+ }
391
+
392
+ return nil
393
+ }
394
+
395
+ if err := labelObjects (rbacv1 .SchemeGroupVersion .WithResource ("roles" ), roleInformer .Informer (), labeller .ObjectLabeler [* rbacv1.Role , * rbacv1applyconfigurations.RoleApplyConfiguration ](
396
+ ctx , op .logger , labeller .HasOLMOwnerRef ,
397
+ rbacv1applyconfigurations .Role ,
398
+ func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.Role , error ) {
399
+ return op .opClient .KubernetesInterface ().RbacV1 ().Roles (namespace ).Apply (ctx , cfg , opts )
400
+ },
401
+ )); err != nil {
402
+ return nil , err
403
+ }
404
+
366
405
// Wire RoleBindings
367
406
roleBindingInformer := k8sInformerFactory .Rbac ().V1 ().RoleBindings ()
368
407
op .lister .RbacV1 ().RegisterRoleBindingLister (metav1 .NamespaceAll , roleBindingInformer .Lister ())
369
408
sharedIndexInformers = append (sharedIndexInformers , roleBindingInformer .Informer ())
370
409
410
+ if err := labelObjects (rbacv1 .SchemeGroupVersion .WithResource ("rolebindings" ), roleBindingInformer .Informer (), labeller .ObjectLabeler [* rbacv1.RoleBinding , * rbacv1applyconfigurations.RoleBindingApplyConfiguration ](
411
+ ctx , op .logger , labeller .HasOLMOwnerRef ,
412
+ rbacv1applyconfigurations .RoleBinding ,
413
+ func (namespace string , ctx context.Context , cfg * rbacv1applyconfigurations.RoleBindingApplyConfiguration , opts metav1.ApplyOptions ) (* rbacv1.RoleBinding , error ) {
414
+ return op .opClient .KubernetesInterface ().RbacV1 ().RoleBindings (namespace ).Apply (ctx , cfg , opts )
415
+ },
416
+ )); err != nil {
417
+ return nil , err
418
+ }
419
+
371
420
// Wire ServiceAccounts
372
421
serviceAccountInformer := k8sInformerFactory .Core ().V1 ().ServiceAccounts ()
373
422
op .lister .CoreV1 ().RegisterServiceAccountLister (metav1 .NamespaceAll , serviceAccountInformer .Lister ())
374
423
sharedIndexInformers = append (sharedIndexInformers , serviceAccountInformer .Informer ())
375
424
425
+ if err := labelObjects (corev1 .SchemeGroupVersion .WithResource ("serviceaccounts" ), serviceAccountInformer .Informer (), labeller .ObjectLabeler [* corev1.ServiceAccount , * corev1applyconfigurations.ServiceAccountApplyConfiguration ](
426
+ ctx , op .logger , func (object metav1.Object ) bool {
427
+ return labeller .HasOLMOwnerRef (object ) || labeller .HasOLMLabel (object )
428
+ },
429
+ corev1applyconfigurations .ServiceAccount ,
430
+ func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.ServiceAccountApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.ServiceAccount , error ) {
431
+ return op .opClient .KubernetesInterface ().CoreV1 ().ServiceAccounts (namespace ).Apply (ctx , cfg , opts )
432
+ },
433
+ )); err != nil {
434
+ return nil , err
435
+ }
436
+
376
437
// Wire Services
377
438
serviceInformer := k8sInformerFactory .Core ().V1 ().Services ()
378
439
op .lister .CoreV1 ().RegisterServiceLister (metav1 .NamespaceAll , serviceInformer .Lister ())
379
440
sharedIndexInformers = append (sharedIndexInformers , serviceInformer .Informer ())
380
441
442
+ // TODO(skuznets): some services don't seem to have any marker to key off of, but they match the operator name
443
+ if err := labelObjects (corev1 .SchemeGroupVersion .WithResource ("services" ), serviceInformer .Informer (), labeller .ObjectLabeler [* corev1.Service , * corev1applyconfigurations.ServiceApplyConfiguration ](
444
+ ctx , op .logger , labeller .HasOLMOwnerRef ,
445
+ corev1applyconfigurations .Service ,
446
+ func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.ServiceApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.Service , error ) {
447
+ return op .opClient .KubernetesInterface ().CoreV1 ().Services (namespace ).Apply (ctx , cfg , opts )
448
+ },
449
+ )); err != nil {
450
+ return nil , err
451
+ }
452
+
381
453
// Wire Pods for CatalogSource
382
454
catsrcReq , err := labels .NewRequirement (reconciler .CatalogSourceLabelKey , selection .Exists , nil )
383
455
if err != nil {
@@ -392,6 +464,23 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
392
464
op .lister .CoreV1 ().RegisterPodLister (metav1 .NamespaceAll , csPodInformer .Lister ())
393
465
sharedIndexInformers = append (sharedIndexInformers , csPodInformer .Informer ())
394
466
467
+ if err := labelObjects (corev1 .SchemeGroupVersion .WithResource ("pods" ), csPodInformer .Informer (), labeller .ObjectLabeler [* corev1.Pod , * corev1applyconfigurations.PodApplyConfiguration ](
468
+ ctx , op .logger , func (object metav1.Object ) bool {
469
+ if labels := object .GetLabels (); labels != nil {
470
+ if _ , ok := labels [reconciler .CatalogSourceLabelKey ]; ok {
471
+ return true
472
+ }
473
+ }
474
+ return false
475
+ },
476
+ corev1applyconfigurations .Pod ,
477
+ func (namespace string , ctx context.Context , cfg * corev1applyconfigurations.PodApplyConfiguration , opts metav1.ApplyOptions ) (* corev1.Pod , error ) {
478
+ return op .opClient .KubernetesInterface ().CoreV1 ().Pods (namespace ).Apply (ctx , cfg , opts )
479
+ },
480
+ )); err != nil {
481
+ return nil , err
482
+ }
483
+
395
484
// Wire Pods for BundleUnpack job
396
485
buReq , err := labels .NewRequirement (bundle .BundleUnpackPodLabel , selection .Exists , nil )
397
486
if err != nil {
@@ -416,6 +505,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
416
505
jobInformer := k8sInformerFactory .Batch ().V1 ().Jobs ()
417
506
sharedIndexInformers = append (sharedIndexInformers , jobInformer .Informer ())
418
507
508
+ if err := labelObjects (batchv1 .SchemeGroupVersion .WithResource ("jobs" ), jobInformer .Informer (), labeller .ObjectLabeler [* batchv1.Job , * batchv1applyconfigurations.JobApplyConfiguration ](
509
+ ctx , op .logger , func (object metav1.Object ) bool {
510
+ job , ok := object .(* batchv1.Job )
511
+ if ! ok {
512
+ return false
513
+ }
514
+ for _ , container := range job .Spec .Template .Spec .Containers {
515
+ if strings .Join (container .Command [0 :3 ], " " ) == "opm alpha bundle extract" {
516
+ return true
517
+ }
518
+ }
519
+ return false
520
+ },
521
+ batchv1applyconfigurations .Job ,
522
+ func (namespace string , ctx context.Context , cfg * batchv1applyconfigurations.JobApplyConfiguration , opts metav1.ApplyOptions ) (* batchv1.Job , error ) {
523
+ return op .opClient .KubernetesInterface ().BatchV1 ().Jobs (namespace ).Apply (ctx , cfg , opts )
524
+ },
525
+ )); err != nil {
526
+ return nil , err
527
+ }
528
+
419
529
// Generate and register QueueInformers for k8s resources
420
530
k8sSyncer := queueinformer .LegacySyncHandler (op .syncObject ).ToSyncerWithDelete (op .handleDeletion )
421
531
for _ , informer := range sharedIndexInformers {
@@ -480,6 +590,20 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
480
590
return nil , err
481
591
}
482
592
593
+ if err := labelObjects (apiextensionsv1 .SchemeGroupVersion .WithResource ("customresourcedefinitions" ), crdInformer , labeller .ObjectPatchLabeler [* apiextensionsv1.CustomResourceDefinition ](
594
+ ctx , op .logger , func (object metav1.Object ) bool {
595
+ for key := range object .GetAnnotations () {
596
+ if strings .HasPrefix (key , alongside .AnnotationPrefix ) {
597
+ return true
598
+ }
599
+ }
600
+ return false
601
+ },
602
+ op .opClient .ApiextensionsInterface ().ApiextensionsV1 ().CustomResourceDefinitions ().Patch ,
603
+ )); err != nil {
604
+ return nil , err
605
+ }
606
+
483
607
// Namespace sync for resolving subscriptions
484
608
namespaceInformer := informers .NewSharedInformerFactory (op .opClient .KubernetesInterface (), resyncPeriod ()).Core ().V1 ().Namespaces ()
485
609
op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
0 commit comments