Skip to content

Commit 7788801

Browse files
Merge pull request #948 from perdasilva/workqueue-fix-415
[release-4.15] OCPBUGS-48662: Fix concurrent namespace resolution
2 parents ba9df23 + 8c30132 commit 7788801

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1075
-1553
lines changed

staging/operator-lifecycle-manager/cmd/olm/manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55

6+
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
67
appsv1 "k8s.io/api/apps/v1"
78
corev1 "k8s.io/api/core/v1"
89
rbacv1 "k8s.io/api/rbac/v1"
@@ -87,6 +88,12 @@ func Manager(ctx context.Context, debug bool) (ctrl.Manager, error) {
8788
&rbacv1.ClusterRoleBinding{}: {
8889
Label: labels.SelectorFromValidatedSet(map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}),
8990
},
91+
&admissionregistrationv1.MutatingWebhookConfiguration{}: {
92+
Label: labels.SelectorFromValidatedSet(map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}),
93+
},
94+
&admissionregistrationv1.ValidatingWebhookConfiguration{}: {
95+
Label: labels.SelectorFromValidatedSet(map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}),
96+
},
9097
&operatorsv1alpha1.ClusterServiceVersion{}: {
9198
Label: copiedLabelDoesNotExist,
9299
},

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/apimachinery/pkg/runtime/schema"
3232
"k8s.io/apimachinery/pkg/selection"
33+
"k8s.io/apimachinery/pkg/types"
3334
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3435
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3536
"k8s.io/apimachinery/pkg/util/sets"
@@ -270,7 +271,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
270271
// Wire InstallPlans
271272
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
272273
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
273-
ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ips")
274+
ipQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
275+
workqueue.RateLimitingQueueConfig{
276+
Name: "ips",
277+
})
274278
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
275279
ipQueueInformer, err := queueinformer.NewQueueInformer(
276280
ctx,
@@ -289,7 +293,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
289293

290294
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
291295
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
292-
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ogs")
296+
ogQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
297+
workqueue.RateLimitingQueueConfig{
298+
Name: "ogs",
299+
})
293300
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
294301
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
295302
ctx,
@@ -308,15 +315,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
308315
// Wire CatalogSources
309316
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
310317
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
311-
catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "catsrcs")
318+
catsrcQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
319+
workqueue.RateLimitingQueueConfig{
320+
Name: "catsrcs",
321+
})
312322
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
313323
catsrcQueueInformer, err := queueinformer.NewQueueInformer(
314324
ctx,
315325
queueinformer.WithMetricsProvider(metrics.NewMetricsCatalogSource(op.lister.OperatorsV1alpha1().CatalogSourceLister())),
316326
queueinformer.WithLogger(op.logger),
317327
queueinformer.WithQueue(catsrcQueue),
318328
queueinformer.WithInformer(catsrcInformer.Informer()),
319-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncerWithDelete(op.handleCatSrcDeletion)),
329+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncer()),
330+
queueinformer.WithDeletionHandler(op.handleCatSrcDeletion),
320331
)
321332
if err != nil {
322333
return nil, err
@@ -334,7 +345,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
334345
subIndexer := subInformer.Informer().GetIndexer()
335346
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer
336347

337-
subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "subs")
348+
subQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
349+
workqueue.RateLimitingQueueConfig{
350+
Name: "subs",
351+
})
338352
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
339353
subSyncer, err := subscription.NewSyncer(
340354
ctx,
@@ -345,7 +359,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
345359
subscription.WithCatalogInformer(catsrcInformer.Informer()),
346360
subscription.WithInstallPlanInformer(ipInformer.Informer()),
347361
subscription.WithSubscriptionQueue(subQueue),
348-
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
362+
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
349363
subscription.WithRegistryReconcilerFactory(op.reconciler),
350364
subscription.WithGlobalCatalogNamespace(op.namespace),
351365
subscription.WithOperatorCacheProvider(op.operatorCacheProvider),
@@ -660,13 +674,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
660674
}
661675

662676
// Generate and register QueueInformers for k8s resources
663-
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
677+
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()
664678
for _, informer := range sharedIndexInformers {
665679
queueInformer, err := queueinformer.NewQueueInformer(
666680
ctx,
667681
queueinformer.WithLogger(op.logger),
668682
queueinformer.WithInformer(informer),
669683
queueinformer.WithSyncer(k8sSyncer),
684+
queueinformer.WithDeletionHandler(op.handleDeletion),
670685
)
671686
if err != nil {
672687
return nil, err
@@ -714,7 +729,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
714729
ctx,
715730
queueinformer.WithLogger(op.logger),
716731
queueinformer.WithInformer(crdInformer),
717-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
732+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()),
733+
queueinformer.WithDeletionHandler(op.handleDeletion),
718734
)
719735
if err != nil {
720736
return nil, err
@@ -735,7 +751,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
735751
// Namespace sync for resolving subscriptions
736752
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
737753
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
738-
op.nsResolveQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
754+
op.nsResolveQueue = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
755+
workqueue.RateLimitingQueueConfig{
756+
Name: "resolve",
757+
})
739758
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
740759
ctx,
741760
queueinformer.WithLogger(op.logger),
@@ -775,12 +794,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
775794

776795
if err == nil {
777796
for ns := range namespaces {
778-
o.nsResolveQueue.Add(ns)
797+
o.nsResolveQueue.Add(types.NamespacedName{Name: ns})
779798
}
780799
}
781800
}
782801

783-
o.nsResolveQueue.Add(state.Key.Namespace)
802+
o.nsResolveQueue.Add(types.NamespacedName{Name: state.Key.Namespace})
784803
}
785804
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
786805
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
@@ -861,18 +880,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
861880
func (o *Operator) handleCatSrcDeletion(obj interface{}) {
862881
catsrc, ok := obj.(metav1.Object)
863882
if !ok {
883+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
864884
if !ok {
865-
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
866-
if !ok {
867-
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
868-
return
869-
}
885+
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
886+
return
887+
}
870888

871-
catsrc, ok = tombstone.Obj.(metav1.Object)
872-
if !ok {
873-
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
874-
return
875-
}
889+
catsrc, ok = tombstone.Obj.(metav1.Object)
890+
if !ok {
891+
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
892+
return
876893
}
877894
}
878895
sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
@@ -1400,7 +1417,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14001417
}
14011418

14021419
logger.Info("unpacking is not complete yet, requeueing")
1403-
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
1420+
o.nsResolveQueue.AddAfter(types.NamespacedName{Name: namespace}, 5*time.Second)
14041421
return nil
14051422
}
14061423
}
@@ -1482,7 +1499,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
14821499
return fmt.Errorf("casting Subscription failed")
14831500
}
14841501

1485-
o.nsResolveQueue.Add(sub.GetNamespace())
1502+
o.nsResolveQueue.Add(types.NamespacedName{Name: sub.GetNamespace()})
14861503

14871504
return nil
14881505
}
@@ -1496,7 +1513,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
14961513
return fmt.Errorf("casting OperatorGroup failed")
14971514
}
14981515

1499-
o.nsResolveQueue.Add(og.GetNamespace())
1516+
o.nsResolveQueue.Add(types.NamespacedName{Name: og.GetNamespace()})
15001517

15011518
return nil
15021519
}

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1999,12 +1999,15 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
19991999
client: clientFake,
20002000
lister: lister,
20012001
namespace: namespace,
2002-
nsResolveQueue: workqueue.NewNamedRateLimitingQueue(
2002+
nsResolveQueue: workqueue.NewRateLimitingQueueWithConfig(
20032003
workqueue.NewMaxOfRateLimiter(
20042004
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 1000*time.Second),
20052005
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
20062006
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
2007-
), "resolver"),
2007+
),
2008+
workqueue.RateLimitingQueueConfig{
2009+
Name: "resolver",
2010+
}),
20082011
resolver: config.resolver,
20092012
reconciler: config.reconciler,
20102013
recorder: config.recorder,

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/reconciler.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,14 @@ import (
2828

2929
// ReconcilerFromLegacySyncHandler returns a reconciler that invokes the given legacy sync handler and on delete funcs.
3030
// Since the reconciler does not return an updated kubestate, it MUST be the last reconciler in a given chain.
31-
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDelete func(obj interface{})) kubestate.Reconciler {
31+
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler) kubestate.Reconciler {
3232
var rec kubestate.ReconcilerFunc = func(ctx context.Context, in kubestate.State) (out kubestate.State, err error) {
3333
out = in
3434
switch s := in.(type) {
3535
case SubscriptionExistsState:
3636
if sync != nil {
3737
err = sync(s.Subscription())
3838
}
39-
case SubscriptionDeletedState:
40-
if onDelete != nil {
41-
onDelete(s.Subscription())
42-
}
4339
case SubscriptionState:
4440
if sync != nil {
4541
err = sync(s.Subscription())

staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription/state.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ type SubscriptionState interface {
2525
Subscription() *v1alpha1.Subscription
2626
Add() SubscriptionExistsState
2727
Update() SubscriptionExistsState
28-
Delete() SubscriptionDeletedState
2928
}
3029

3130
// SubscriptionExistsState describes subscription states in which the subscription exists on the cluster.
@@ -49,13 +48,6 @@ type SubscriptionUpdatedState interface {
4948
isSubscriptionUpdatedState()
5049
}
5150

52-
// SubscriptionDeletedState describes subscription states in which the subscription no longer exists and was deleted from the cluster.
53-
type SubscriptionDeletedState interface {
54-
SubscriptionState
55-
56-
isSubscriptionDeletedState()
57-
}
58-
5951
// CatalogHealthState describes subscription states that represent a subscription with respect to catalog health.
6052
type CatalogHealthState interface {
6153
SubscriptionExistsState
@@ -176,12 +168,6 @@ func (s *subscriptionState) Update() SubscriptionExistsState {
176168
}
177169
}
178170

179-
func (s *subscriptionState) Delete() SubscriptionDeletedState {
180-
return &subscriptionDeletedState{
181-
SubscriptionState: s,
182-
}
183-
}
184-
185171
func NewSubscriptionState(sub *v1alpha1.Subscription) SubscriptionState {
186172
return &subscriptionState{
187173
State: kubestate.NewState(),
@@ -207,12 +193,6 @@ type subscriptionUpdatedState struct {
207193

208194
func (c *subscriptionUpdatedState) isSubscriptionUpdatedState() {}
209195

210-
type subscriptionDeletedState struct {
211-
SubscriptionState
212-
}
213-
214-
func (c *subscriptionDeletedState) isSubscriptionDeletedState() {}
215-
216196
type catalogHealthState struct {
217197
SubscriptionExistsState
218198
}

0 commit comments

Comments
 (0)