Skip to content

Commit d2c8252

Browse files
committedOct 3, 2023
*: detect when all objects are labelled, restart
When all of the k8s objects that need labels have them, we are good to exit the process. The next Pod that start up will detect that all labels are present and be able to filter informers going forward. Signed-off-by: Steve Kuznetsov <[email protected]>
1 parent 072243f commit d2c8252

File tree

14 files changed

+461
-127
lines changed

14 files changed

+461
-127
lines changed
 

‎cmd/olm/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func main() {
173173
olm.WithExternalClient(crClient),
174174
olm.WithMetadataClient(metadataClient),
175175
olm.WithOperatorClient(opClient),
176-
olm.WithRestConfig(config),
176+
olm.WithRestConfig(validatingConfig),
177177
olm.WithConfigClient(versionedConfigClient),
178178
olm.WithProtectedCopiedCSVNamespaces(*protectedCopiedCSVNamespaces),
179179
)

‎pkg/controller/install/apiservice.go

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func (i *StrategyDeploymentInstaller) createOrUpdateAPIService(caPEM []byte, des
6161
if err := ownerutil.AddOwnerLabels(apiService, i.owner); err != nil {
6262
return err
6363
}
64+
apiService.Labels[OLMManagedLabelKey] = OLMManagedLabelValue
6465

6566
// Create a service for the deployment
6667
containerPort := int32(443)

‎pkg/controller/install/deployment.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,11 @@ func (i *StrategyDeploymentInstaller) deploymentForSpec(name string, spec appsv1
152152
dep.Spec.Template.SetAnnotations(annotations)
153153

154154
// Set custom labels before CSV owner labels
155+
dep.SetLabels(specLabels)
155156
if dep.Labels == nil {
156157
dep.Labels = map[string]string{}
157158
}
158159
dep.Labels[OLMManagedLabelKey] = OLMManagedLabelValue
159-
dep.SetLabels(specLabels)
160160

161161
ownerutil.AddNonBlockingOwner(dep, i.owner)
162162
ownerutil.AddOwnerLabelsForKind(dep, i.owner, v1alpha1.ClusterServiceVersionKind)

‎pkg/controller/install/deployment_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ func TestInstallStrategyDeploymentCheckInstallErrors(t *testing.T) {
353353
dep.Spec.Template.SetAnnotations(map[string]string{"test": "annotation"})
354354
dep.Spec.RevisionHistoryLimit = &revisionHistoryLimit
355355
dep.SetLabels(labels.CloneAndAddLabel(dep.ObjectMeta.GetLabels(), DeploymentSpecHashLabelKey, HashDeploymentSpec(dep.Spec)))
356+
dep.Labels[OLMManagedLabelKey] = OLMManagedLabelValue
356357
dep.Status.Conditions = append(dep.Status.Conditions, appsv1.DeploymentCondition{
357358
Type: appsv1.DeploymentAvailable,
358359
Status: corev1.ConditionTrue,

‎pkg/controller/operators/catalog/operator.go

+36-4
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
207207
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
208208
catalogSubscriberIndexer: map[string]cache.Indexer{},
209209
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
210-
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient),
210+
clientAttenuator: scoped.NewClientAttenuator(logger, validatingConfig, opClient),
211211
installPlanTimeout: installPlanTimeout,
212212
bundleUnpackTimeout: bundleUnpackTimeout,
213-
clientFactory: clients.NewFactory(config),
213+
clientFactory: clients.NewFactory(validatingConfig),
214214
}
215215
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
216216
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
@@ -380,10 +380,22 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
380380
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
381381
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())
382382

383-
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync queueinformer.LegacySyncHandler) error {
383+
complete := map[schema.GroupVersionResource][]bool{}
384+
completeLock := &sync.Mutex{}
385+
386+
labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(done func() bool) queueinformer.LegacySyncHandler) error {
384387
if canFilter {
385388
return nil
386389
}
390+
var idx int
391+
if _, exists := complete[gvr]; exists {
392+
idx = len(complete[gvr])
393+
complete[gvr] = append(complete[gvr], false)
394+
} else {
395+
idx = 0
396+
complete[gvr] = []bool{false}
397+
}
398+
387399
queue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
388400
Name: gvr.String(),
389401
})
@@ -392,7 +404,18 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
392404
queueinformer.WithQueue(queue),
393405
queueinformer.WithLogger(op.logger),
394406
queueinformer.WithInformer(informer),
395-
queueinformer.WithSyncer(sync.ToSyncer()),
407+
queueinformer.WithSyncer(sync(func() bool {
408+
completeLock.Lock()
409+
complete[gvr][idx] = true
410+
allDone := true
411+
for _, items := range complete {
412+
for _, done := range items {
413+
allDone = allDone && done
414+
}
415+
}
416+
completeLock.Unlock()
417+
return allDone
418+
}).ToSyncer()),
396419
)
397420
if err != nil {
398421
return err
@@ -408,6 +431,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
408431
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
409432
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
410433
ctx, op.logger, labeller.Filter(rolesgvk),
434+
roleInformer.Lister().List,
411435
rbacv1applyconfigurations.Role,
412436
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
413437
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
@@ -420,6 +444,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
420444
func(role *rbacv1.Role) (string, error) {
421445
return resolver.PolicyRuleHashLabelValue(role.Rules)
422446
},
447+
roleInformer.Lister().List,
423448
rbacv1applyconfigurations.Role,
424449
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
425450
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
@@ -436,6 +461,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
436461
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
437462
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
438463
ctx, op.logger, labeller.Filter(rolebindingsgvk),
464+
roleBindingInformer.Lister().List,
439465
rbacv1applyconfigurations.RoleBinding,
440466
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
441467
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
@@ -448,6 +474,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
448474
func(roleBinding *rbacv1.RoleBinding) (string, error) {
449475
return resolver.RoleReferenceAndSubjectHashLabelValue(roleBinding.RoleRef, roleBinding.Subjects)
450476
},
477+
roleBindingInformer.Lister().List,
451478
rbacv1applyconfigurations.RoleBinding,
452479
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
453480
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
@@ -464,6 +491,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
464491
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
465492
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
466493
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
494+
serviceAccountInformer.Lister().List,
467495
corev1applyconfigurations.ServiceAccount,
468496
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
469497
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
@@ -480,6 +508,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
480508
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
481509
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
482510
ctx, op.logger, labeller.Filter(servicesgvk),
511+
serviceInformer.Lister().List,
483512
corev1applyconfigurations.Service,
484513
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
485514
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
@@ -505,6 +534,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
505534
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
506535
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
507536
ctx, op.logger, labeller.Filter(podsgvk),
537+
csPodInformer.Lister().List,
508538
corev1applyconfigurations.Pod,
509539
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
510540
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
@@ -542,6 +572,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
542572
ctx, op.logger, labeller.JobFilter(func(namespace, name string) (metav1.Object, error) {
543573
return configMapInformer.Lister().ConfigMaps(namespace).Get(name)
544574
}),
575+
jobInformer.Lister().List,
545576
batchv1applyconfigurations.Job,
546577
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
547578
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
@@ -617,6 +648,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
617648
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
618649
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler(
619650
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
651+
crdLister.List,
620652
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
621653
)); err != nil {
622654
return nil, err

‎pkg/controller/operators/labeller/filters.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{
7575

7676
func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
7777
okLock := sync.Mutex{}
78-
var ok bool
78+
ok := true
7979
g, ctx := errgroup.WithContext(ctx)
8080
allFilters := map[schema.GroupVersionResource]func(metav1.Object) bool{}
8181
for gvr, filter := range filters {
@@ -124,5 +124,6 @@ func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadat
124124
if err := g.Wait(); err != nil {
125125
return false, err
126126
}
127+
logger.WithField("canFilter", ok).Info("detected ability to filter informers")
127128
return ok, nil
128129
}

‎pkg/controller/operators/labeller/labels.go

+103-60
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"os"
78
"strings"
89

910
jsonpatch "github.com/evanphx/json-patch"
1011
"github.com/sirupsen/logrus"
1112
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1213
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/labels"
1315
"k8s.io/apimachinery/pkg/runtime/schema"
1416
"k8s.io/apimachinery/pkg/types"
1517

@@ -40,28 +42,50 @@ func ObjectLabeler[T metav1.Object, A ApplyConfig[A]](
4042
ctx context.Context,
4143
logger *logrus.Logger,
4244
check func(metav1.Object) bool,
45+
list func(options labels.Selector) ([]T, error),
4346
applyConfigFor func(name, namespace string) A,
4447
apply func(namespace string, ctx context.Context, cfg A, opts metav1.ApplyOptions) (T, error),
45-
) queueinformer.LegacySyncHandler {
46-
return func(obj interface{}) error {
47-
cast, ok := obj.(T)
48-
if !ok {
49-
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
50-
logger.WithError(err).Error("casting failed")
51-
return fmt.Errorf("casting failed: %w", err)
52-
}
48+
) func(done func() bool) queueinformer.LegacySyncHandler {
49+
return func(done func() bool) queueinformer.LegacySyncHandler {
50+
return func(obj interface{}) error {
51+
cast, ok := obj.(T)
52+
if !ok {
53+
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(T), obj)
54+
logger.WithError(err).Error("casting failed")
55+
return fmt.Errorf("casting failed: %w", err)
56+
}
5357

54-
if !check(cast) || hasLabel(cast) {
55-
return nil
56-
}
58+
if !check(cast) || hasLabel(cast) {
59+
// if the object we're processing does not need us to label it, it's possible that every object that requires
60+
// the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter
61+
// the informers used to drive the controller and stop having to track extraneous objects
62+
items, err := list(labels.Everything())
63+
if err != nil {
64+
logger.WithError(err).Warn("failed to list all objects to check for labelling completion")
65+
return nil
66+
}
67+
gvrFullyLabelled := true
68+
for _, item := range items {
69+
gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item))
70+
}
71+
if gvrFullyLabelled {
72+
allObjectsLabelled := done()
73+
if allObjectsLabelled {
74+
logrus.Info("detected that every object is labelled, exiting to re-start the process...")
75+
os.Exit(0)
76+
}
77+
}
78+
return nil
79+
}
5780

58-
cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
59-
cfg.WithLabels(map[string]string{
60-
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
61-
})
81+
cfg := applyConfigFor(cast.GetName(), cast.GetNamespace())
82+
cfg.WithLabels(map[string]string{
83+
install.OLMManagedLabelKey: install.OLMManagedLabelValue,
84+
})
6285

63-
_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
64-
return err
86+
_, err := apply(cast.GetNamespace(), ctx, cfg, metav1.ApplyOptions{})
87+
return err
88+
}
6589
}
6690
}
6791

@@ -71,58 +95,77 @@ func ObjectPatchLabeler(
7195
ctx context.Context,
7296
logger *logrus.Logger,
7397
check func(metav1.Object) bool,
98+
list func(selector labels.Selector) (ret []*metav1.PartialObjectMetadata, err error),
7499
patch func(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *apiextensionsv1.CustomResourceDefinition, err error),
75-
) func(
76-
obj interface{},
77-
) error {
78-
return func(obj interface{}) error {
79-
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
80-
if !ok {
81-
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj)
82-
logger.WithError(err).Error("casting failed")
83-
return fmt.Errorf("casting failed: %w", err)
84-
}
100+
) func(done func() bool) queueinformer.LegacySyncHandler {
101+
return func(done func() bool) queueinformer.LegacySyncHandler {
102+
return func(obj interface{}) error {
103+
cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
104+
if !ok {
105+
err := fmt.Errorf("wrong type %T, expected %T: %#v", obj, new(*apiextensionsv1.CustomResourceDefinition), obj)
106+
logger.WithError(err).Error("casting failed")
107+
return fmt.Errorf("casting failed: %w", err)
108+
}
85109

86-
if !check(cast) || hasLabel(cast) {
87-
return nil
88-
}
110+
if !check(cast) || hasLabel(cast) {
111+
// if the object we're processing does not need us to label it, it's possible that every object that requires
112+
// the label already has it; in which case we should exit the process, so the Pod that succeeds us can filter
113+
// the informers used to drive the controller and stop having to track extraneous objects
114+
items, err := list(labels.Everything())
115+
if err != nil {
116+
logger.WithError(err).Warn("failed to list all objects to check for labelling completion")
117+
return nil
118+
}
119+
gvrFullyLabelled := true
120+
for _, item := range items {
121+
gvrFullyLabelled = gvrFullyLabelled && (!check(item) || hasLabel(item))
122+
}
123+
if gvrFullyLabelled {
124+
allObjectsLabelled := done()
125+
if allObjectsLabelled {
126+
logrus.Fatal("detected that every object is labelled, exiting...")
127+
}
128+
}
129+
return nil
130+
}
89131

90-
uid := cast.GetUID()
91-
rv := cast.GetResourceVersion()
132+
uid := cast.GetUID()
133+
rv := cast.GetResourceVersion()
92134

93-
// to ensure they appear in the patch as preconditions
94-
previous := cast.DeepCopy()
95-
previous.SetUID("")
96-
previous.SetResourceVersion("")
135+
// to ensure they appear in the patch as preconditions
136+
previous := cast.DeepCopy()
137+
previous.SetUID("")
138+
previous.SetResourceVersion("")
97139

98-
oldData, err := json.Marshal(previous)
99-
if err != nil {
100-
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
101-
}
140+
oldData, err := json.Marshal(previous)
141+
if err != nil {
142+
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", previous.GetNamespace(), previous.GetName(), err)
143+
}
102144

103-
// to ensure they appear in the patch as preconditions
104-
updated := cast.DeepCopy()
105-
updated.SetUID(uid)
106-
updated.SetResourceVersion(rv)
107-
labels := updated.GetLabels()
108-
if labels == nil {
109-
labels = map[string]string{}
110-
}
111-
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
112-
updated.SetLabels(labels)
145+
// to ensure they appear in the patch as preconditions
146+
updated := cast.DeepCopy()
147+
updated.SetUID(uid)
148+
updated.SetResourceVersion(rv)
149+
labels := updated.GetLabels()
150+
if labels == nil {
151+
labels = map[string]string{}
152+
}
153+
labels[install.OLMManagedLabelKey] = install.OLMManagedLabelValue
154+
updated.SetLabels(labels)
113155

114-
newData, err := json.Marshal(updated)
115-
if err != nil {
116-
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
117-
}
156+
newData, err := json.Marshal(updated)
157+
if err != nil {
158+
return fmt.Errorf("failed to Marshal old data for %s/%s: %w", updated.GetNamespace(), updated.GetName(), err)
159+
}
118160

119-
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
120-
if err != nil {
121-
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
122-
}
161+
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
162+
if err != nil {
163+
return fmt.Errorf("failed to create patch for %s/%s: %w", cast.GetNamespace(), cast.GetName(), err)
164+
}
123165

124-
_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
125-
return err
166+
_, err = patch(ctx, cast.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{})
167+
return err
168+
}
126169
}
127170
}
128171

0 commit comments

Comments
 (0)
Please sign in to comment.