Skip to content

Commit cf618d3

Browse files
committed
Hash and prune copied ClusterServiceVersions in olm-operator.
The olm-operator doesn't need to maintain a local cache of full copied ClusterServiceVersion objects because it also caches the source of truth (non-copied ClusterServiceVersions). Computing and storing a hash over all relevant fields of copies before pruning allows for efficient comparison between originals and copies. Signed-off-by: Ben Luddy <[email protected]>
1 parent 9950875 commit cf618d3

File tree

4 files changed

+464
-235
lines changed

4 files changed

+464
-235
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package pruning
2+
3+
import (
4+
"context"
5+
6+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
"k8s.io/apimachinery/pkg/watch"
9+
"k8s.io/client-go/tools/cache"
10+
11+
"github.com/operator-framework/api/pkg/operators/v1alpha1"
12+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
13+
)
14+
15+
type Pruner interface {
16+
Prune(*v1alpha1.ClusterServiceVersion)
17+
}
18+
19+
type PrunerFunc func(*v1alpha1.ClusterServiceVersion)
20+
21+
func (f PrunerFunc) Prune(csv *v1alpha1.ClusterServiceVersion) {
22+
f(csv)
23+
}
24+
25+
func NewListerWatcher(client versioned.Interface, namespace string, override func(*metav1.ListOptions), p Pruner) cache.ListerWatcher {
26+
return &cache.ListWatch{
27+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
28+
override(&options)
29+
list, err := client.OperatorsV1alpha1().ClusterServiceVersions(namespace).List(context.TODO(), options)
30+
if err != nil {
31+
return list, err
32+
}
33+
for i := range list.Items {
34+
p.Prune(&list.Items[i])
35+
}
36+
return list, nil
37+
},
38+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
39+
override(&options)
40+
w, err := client.OperatorsV1alpha1().ClusterServiceVersions(namespace).Watch(context.TODO(), options)
41+
if err != nil {
42+
return w, err
43+
}
44+
return watch.Filter(w, watch.FilterFunc(func(e watch.Event) (watch.Event, bool) {
45+
if csv, ok := e.Object.(*v1alpha1.ClusterServiceVersion); ok {
46+
p.Prune(csv)
47+
}
48+
return e, true
49+
})), nil
50+
},
51+
}
52+
}

pkg/controller/operators/olm/operator.go

+69-22
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ import (
3333
"github.com/operator-framework/api/pkg/operators/v1alpha1"
3434
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
3535
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
36+
operatorsv1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
3637
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/certs"
3738
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
39+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/pruning"
3840
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/overrides"
3941
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
4042
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/clients"
@@ -66,10 +68,11 @@ type Operator struct {
6668
opClient operatorclient.ClientInterface
6769
client versioned.Interface
6870
lister operatorlister.OperatorLister
71+
copiedCSVLister operatorsv1alpha1listers.ClusterServiceVersionLister
6972
ogQueueSet *queueinformer.ResourceQueueSet
7073
csvQueueSet *queueinformer.ResourceQueueSet
7174
csvCopyQueueSet *queueinformer.ResourceQueueSet
72-
csvGCQueueSet *queueinformer.ResourceQueueSet
75+
copiedCSVGCQueueSet *queueinformer.ResourceQueueSet
7376
objGCQueueSet *queueinformer.ResourceQueueSet
7477
nsQueueSet workqueue.RateLimitingInterface
7578
apiServiceQueue workqueue.RateLimitingInterface
@@ -125,7 +128,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
125128
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
126129
csvQueueSet: queueinformer.NewEmptyResourceQueueSet(),
127130
csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(),
128-
csvGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
131+
copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
129132
objGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
130133
apiServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "apiservice"),
131134
resolver: config.strategyResolver,
@@ -146,8 +149,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
146149
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
147150
for _, namespace := range config.watchedNamespaces {
148151
// Wire CSVs
149-
extInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, config.resyncPeriod(), externalversions.WithNamespace(namespace))
150-
csvInformer := extInformerFactory.Operators().V1alpha1().ClusterServiceVersions()
152+
csvInformer := externalversions.NewSharedInformerFactoryWithOptions(
153+
op.client,
154+
config.resyncPeriod(),
155+
externalversions.WithNamespace(namespace),
156+
externalversions.WithTweakListOptions(func(options *metav1.ListOptions) {
157+
options.LabelSelector = fmt.Sprintf("!%s", v1alpha1.CopiedLabelKey)
158+
}),
159+
).Operators().V1alpha1().ClusterServiceVersions()
151160
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
152161
csvQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv", namespace))
153162
op.csvQueueSet.Set(namespace, csvQueue)
@@ -188,24 +197,65 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
188197
return nil, err
189198
}
190199

191-
// Register separate queue for gcing csvs
192-
csvGCQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv-gc", namespace))
193-
op.csvGCQueueSet.Set(namespace, csvGCQueue)
194-
csvGCQueueInformer, err := queueinformer.NewQueueInformer(
200+
// A separate informer solely for CSV copies. Fields
201+
// are pruned from local copies of the objects managed
202+
// by this informer in order to reduce cached size.
203+
copiedCSVInformer := cache.NewSharedIndexInformer(
204+
pruning.NewListerWatcher(
205+
op.client,
206+
namespace,
207+
func(opts *metav1.ListOptions) {
208+
opts.LabelSelector = v1alpha1.CopiedLabelKey
209+
},
210+
pruning.PrunerFunc(func(csv *v1alpha1.ClusterServiceVersion) {
211+
nonstatus, status := copyableCSVHash(csv)
212+
*csv = v1alpha1.ClusterServiceVersion{
213+
TypeMeta: csv.TypeMeta,
214+
ObjectMeta: csv.ObjectMeta,
215+
Status: v1alpha1.ClusterServiceVersionStatus{
216+
Phase: csv.Status.Phase,
217+
Reason: csv.Status.Reason,
218+
},
219+
}
220+
if csv.Annotations == nil {
221+
csv.Annotations = make(map[string]string, 2)
222+
}
223+
// These annotation keys are
224+
// intentionally invalid -- all writes
225+
// to copied CSVs are regenerated from
226+
// the corresponding non-copied CSV,
227+
// so it should never be transmitted
228+
// back to the API server.
229+
csv.Annotations["$copyhash-nonstatus"] = nonstatus
230+
csv.Annotations["$copyhash-status"] = status
231+
}),
232+
),
233+
&v1alpha1.ClusterServiceVersion{},
234+
config.resyncPeriod(),
235+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
236+
)
237+
op.copiedCSVLister = operatorsv1alpha1listers.NewClusterServiceVersionLister(copiedCSVInformer.GetIndexer())
238+
239+
// Register separate queue for gcing copied csvs
240+
copiedCSVGCQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv-gc", namespace))
241+
op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue)
242+
copiedCSVGCQueueInformer, err := queueinformer.NewQueueInformer(
195243
ctx,
244+
queueinformer.WithInformer(copiedCSVInformer),
196245
queueinformer.WithLogger(op.logger),
197-
queueinformer.WithQueue(csvGCQueue),
198-
queueinformer.WithIndexer(csvIndexer),
246+
queueinformer.WithQueue(copiedCSVGCQueue),
247+
queueinformer.WithIndexer(copiedCSVInformer.GetIndexer()),
199248
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGcCsv).ToSyncer()),
200249
)
201250
if err != nil {
202251
return nil, err
203252
}
204-
if err := op.RegisterQueueInformer(csvGCQueueInformer); err != nil {
253+
if err := op.RegisterQueueInformer(copiedCSVGCQueueInformer); err != nil {
205254
return nil, err
206255
}
207256

208257
// Wire OperatorGroup reconciliation
258+
extInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, config.resyncPeriod(), externalversions.WithNamespace(namespace))
209259
operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups()
210260
op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
211261
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/og", namespace))
@@ -916,6 +966,11 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
916966

917967
metrics.DeleteCSVMetric(clusterServiceVersion)
918968

969+
if clusterServiceVersion.IsCopied() {
970+
logger.Warning("deleted csv is copied. skipping additional cleanup steps") // should not happen?
971+
return
972+
}
973+
919974
defer func(csv v1alpha1.ClusterServiceVersion) {
920975
if clusterServiceVersion.IsCopied() {
921976
logger.Debug("deleted csv is copied. skipping operatorgroup requeue")
@@ -956,11 +1011,6 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
9561011
return
9571012
}
9581013

959-
if clusterServiceVersion.IsCopied() {
960-
logger.Debug("deleted csv is copied. skipping additional cleanup steps")
961-
return
962-
}
963-
9641014
logger.Info("gcing children")
9651015
namespaces := make([]string, 0)
9661016
if targetNamespaces == "" {
@@ -978,7 +1028,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
9781028
for _, namespace := range namespaces {
9791029
if namespace != operatorNamespace {
9801030
logger.WithField("targetNamespace", namespace).Debug("requeueing child csv for deletion")
981-
if err := a.csvGCQueueSet.Requeue(namespace, clusterServiceVersion.GetName()); err != nil {
1031+
if err := a.copiedCSVGCQueueSet.Requeue(namespace, clusterServiceVersion.GetName()); err != nil {
9821032
logger.WithError(err).Warn("unable to requeue")
9831033
}
9841034
}
@@ -1060,7 +1110,7 @@ func (a *Operator) removeDanglingChildCSVs(csv *v1alpha1.ClusterServiceVersion)
10601110
})
10611111

10621112
if !csv.IsCopied() {
1063-
logger.Debug("removeDanglingChild called on a parent. this is a no-op but should be avoided.")
1113+
logger.Warning("removeDanglingChild called on a parent. this is a no-op but should be avoided.")
10641114
return nil
10651115
}
10661116

@@ -1124,10 +1174,7 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
11241174
}
11251175

11261176
if clusterServiceVersion.IsCopied() {
1127-
logger.Debug("skipping copied csv transition, schedule for gc check")
1128-
if err := a.csvGCQueueSet.Requeue(clusterServiceVersion.GetNamespace(), clusterServiceVersion.GetName()); err != nil {
1129-
logger.WithError(err).Warn("unable to requeue")
1130-
}
1177+
logger.Warning("skipping copied csv transition") // should not happen?
11311178
return
11321179
}
11331180

0 commit comments

Comments
 (0)