Skip to content

Commit 16da916

Browse files
authored
helm-operator: reduce cache memory footprint (operator-framework#6377)
1 parent d21ed64 commit 16da916

File tree

8 files changed

+149
-106
lines changed

8 files changed

+149
-106
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
entries:
2+
- description: >
3+
(helm): Use informer cache label selectors to reduce memory consumption.
4+
kind: bugfix
5+
breaking: false
6+
- description: >
7+
(helm): Fix bug with detection of owner reference support when setting up dynamic watches
8+
kind: bugfix
9+
breaking: false

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ require (
4141
k8s.io/client-go v0.26.2
4242
k8s.io/kubectl v0.26.2
4343
k8s.io/utils v0.0.0-20230711102312-30195339c3c7
44-
sigs.k8s.io/controller-runtime v0.14.5
44+
sigs.k8s.io/controller-runtime v0.14.6
4545
sigs.k8s.io/controller-tools v0.11.3
4646
sigs.k8s.io/kubebuilder/v3 v3.9.1
4747
sigs.k8s.io/yaml v1.3.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -1658,8 +1658,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
16581658
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
16591659
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI=
16601660
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo=
1661-
sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s=
1662-
sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
1661+
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
1662+
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
16631663
sigs.k8s.io/controller-tools v0.11.3 h1:T1xzLkog9saiyQSLz1XOImu4OcbdXWytc5cmYsBeBiE=
16641664
sigs.k8s.io/controller-tools v0.11.3/go.mod h1:qcfX7jfcfYD/b7lAhvqAyTbt/px4GpvN88WKLFFv7p8=
16651665
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=

internal/cmd/helm-operator/run/cmd.go

+86-27
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/spf13/cobra"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
apimachruntime "k8s.io/apimachinery/pkg/runtime"
2829
"k8s.io/client-go/rest"
2930
ctrl "sigs.k8s.io/controller-runtime"
3031
"sigs.k8s.io/controller-runtime/pkg/cache"
@@ -45,6 +46,10 @@ import (
4546
"github.com/operator-framework/operator-sdk/internal/helm/watches"
4647
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
4748
sdkVersion "github.com/operator-framework/operator-sdk/internal/version"
49+
"helm.sh/helm/v3/pkg/chart/loader"
50+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
51+
"k8s.io/apimachinery/pkg/labels"
52+
"k8s.io/apimachinery/pkg/selection"
4853
)
4954

5055
var log = logf.Log.WithName("cmd")
@@ -136,6 +141,23 @@ func run(cmd *cobra.Command, f *flags.Flags) {
136141
// Set default manager options
137142
options = f.ToManagerOptions(options)
138143

144+
if options.Scheme == nil {
145+
options.Scheme = apimachruntime.NewScheme()
146+
}
147+
148+
ws, err := watches.Load(f.WatchesFile)
149+
if err != nil {
150+
log.Error(err, "Failed to load watches file.")
151+
os.Exit(1)
152+
}
153+
154+
watchNamespaces := getWatchNamespaces(options.Namespace)
155+
options.NewCache, err = buildNewCacheFunc(watchNamespaces, ws, options.Scheme)
156+
if err != nil {
157+
log.Error(err, "Failed to create NewCache function for manager.")
158+
os.Exit(1)
159+
}
160+
139161
if options.NewClient == nil {
140162
options.NewClient = func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
141163
// Create the Client for Write operations.
@@ -152,27 +174,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
152174
})
153175
}
154176
}
155-
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
156-
log = log.WithValues("Namespace", namespace)
157-
if found {
158-
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
159-
if namespace == metav1.NamespaceAll {
160-
log.Info("Watching all namespaces.")
161-
options.Namespace = metav1.NamespaceAll
162-
} else {
163-
if strings.Contains(namespace, ",") {
164-
log.Info("Watching multiple namespaces.")
165-
options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ","))
166-
} else {
167-
log.Info("Watching single namespace.")
168-
options.Namespace = namespace
169-
}
170-
}
171-
} else if options.Namespace == "" {
172-
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
173-
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
174-
options.Namespace = metav1.NamespaceAll
175-
}
176177

177178
mgr, err := manager.New(cfg, options)
178179
if err != nil {
@@ -189,11 +190,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
189190
os.Exit(1)
190191
}
191192

192-
ws, err := watches.Load(f.WatchesFile)
193-
if err != nil {
194-
log.Error(err, "Failed to create new manager factories.")
195-
os.Exit(1)
196-
}
197193
acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger())
198194
if err != nil {
199195
log.Error(err, "Failed to create Helm action config getter")
@@ -207,7 +203,6 @@ func run(cmd *cobra.Command, f *flags.Flags) {
207203
}
208204

209205
err := controller.Add(mgr, controller.WatchOptions{
210-
Namespace: namespace,
211206
GVK: w.GroupVersionKind,
212207
ManagerFactory: release.NewManagerFactory(mgr, acg, w.ChartDir),
213208
ReconcilePeriod: reconcilePeriod,
@@ -250,3 +245,67 @@ func exitIfUnsupported(options manager.Options) {
250245
os.Exit(1)
251246
}
252247
}
248+
249+
func getWatchNamespaces(defaultNamespace string) []string {
250+
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar)
251+
log = log.WithValues("Namespace", namespace)
252+
if found {
253+
log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar))
254+
if namespace == metav1.NamespaceAll {
255+
log.Info("Watching all namespaces.")
256+
return []string{metav1.NamespaceAll}
257+
}
258+
if strings.Contains(namespace, ",") {
259+
log.Info("Watching multiple namespaces.")
260+
return strings.Split(namespace, ",")
261+
}
262+
log.Info("Watching single namespace.")
263+
return []string{namespace}
264+
}
265+
if defaultNamespace == "" {
266+
log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+
267+
"Watching all namespaces.", k8sutil.WatchNamespaceEnvVar))
268+
return []string{metav1.NamespaceAll}
269+
}
270+
return []string{defaultNamespace}
271+
}
272+
273+
func buildNewCacheFunc(watchNamespaces []string, ws []watches.Watch, sch *apimachruntime.Scheme) (cache.NewCacheFunc, error) {
274+
selectorsByObject := cache.SelectorsByObject{}
275+
chartNames := make([]string, 0, len(ws))
276+
for _, w := range ws {
277+
sch.AddKnownTypeWithName(w.GroupVersionKind, &unstructured.Unstructured{})
278+
279+
crObj := &unstructured.Unstructured{}
280+
crObj.SetGroupVersionKind(w.GroupVersionKind)
281+
sel, err := metav1.LabelSelectorAsSelector(&w.Selector)
282+
if err != nil {
283+
return nil, fmt.Errorf("unable to parse watch selector for %s: %v", w.GroupVersionKind, err)
284+
}
285+
selectorsByObject[crObj] = cache.ObjectSelector{Label: sel}
286+
287+
chrt, err := loader.LoadDir(w.ChartDir)
288+
if err != nil {
289+
return nil, fmt.Errorf("unable to load chart for %s: %v", w.GroupVersionKind, err)
290+
}
291+
chartNames = append(chartNames, chrt.Name())
292+
293+
}
294+
req, err := labels.NewRequirement("helm.sdk.operatorframework.io/chart", selection.In, chartNames)
295+
if err != nil {
296+
return nil, fmt.Errorf("unable to create label requirement for cache default selector: %v", err)
297+
}
298+
defaultSelector := labels.NewSelector().Add(*req)
299+
300+
return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
301+
opts.SelectorsByObject = selectorsByObject
302+
opts.DefaultSelector = cache.ObjectSelector{Label: defaultSelector}
303+
if len(watchNamespaces) > 1 {
304+
return cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts)
305+
}
306+
if len(watchNamespaces) == 1 {
307+
opts.Namespace = watchNamespaces[0]
308+
}
309+
return cache.New(config, opts)
310+
}, nil
311+
}

internal/helm/client/client.go

+42
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,45 @@ func containsResourcePolicyKeep(annotations map[string]string) bool {
105105
resourcePolicyType = strings.ToLower(strings.TrimSpace(resourcePolicyType))
106106
return resourcePolicyType == kube.KeepPolicy
107107
}
108+
109+
type labelInjectingClient struct {
110+
kube.Interface
111+
labels map[string]string
112+
}
113+
114+
func NewLabelInjectingClient(base kube.Interface, labels map[string]string) kube.Interface {
115+
return &labelInjectingClient{
116+
Interface: base,
117+
labels: labels,
118+
}
119+
}
120+
121+
func (c *labelInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) {
122+
resourceList, err := c.Interface.Build(reader, validate)
123+
if err != nil {
124+
return resourceList, err
125+
}
126+
err = resourceList.Visit(func(r *resource.Info, err error) error {
127+
if err != nil {
128+
return err
129+
}
130+
objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r.Object)
131+
if err != nil {
132+
return err
133+
}
134+
u := &unstructured.Unstructured{Object: objMap}
135+
labels := u.GetLabels()
136+
if labels == nil {
137+
labels = map[string]string{}
138+
}
139+
for k, v := range c.labels {
140+
labels[k] = v
141+
}
142+
u.SetLabels(labels)
143+
return nil
144+
})
145+
if err != nil {
146+
return nil, err
147+
}
148+
return resourceList, nil
149+
}

internal/helm/controller/controller.go

+5-37
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package controller
1616

1717
import (
1818
"fmt"
19-
"reflect"
2019
"strings"
2120
"sync"
2221
"time"
@@ -31,7 +30,6 @@ import (
3130
crthandler "sigs.k8s.io/controller-runtime/pkg/handler"
3231
logf "sigs.k8s.io/controller-runtime/pkg/log"
3332
"sigs.k8s.io/controller-runtime/pkg/manager"
34-
ctrlpredicate "sigs.k8s.io/controller-runtime/pkg/predicate"
3533
"sigs.k8s.io/controller-runtime/pkg/source"
3634
"sigs.k8s.io/yaml"
3735

@@ -46,7 +44,6 @@ var log = logf.Log.WithName("helm.controller")
4644
// WatchOptions contains the necessary values to create a new controller that
4745
// manages helm releases in a particular namespace based on a GVK watch.
4846
type WatchOptions struct {
49-
Namespace string
5047
GVK schema.GroupVersionKind
5148
ManagerFactory release.ManagerFactory
5249
ReconcilePeriod time.Duration
@@ -71,10 +68,6 @@ func Add(mgr manager.Manager, options WatchOptions) error {
7168
SuppressOverrideValues: options.SuppressOverrideValues,
7269
}
7370

74-
// Register the GVK with the schema
75-
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
76-
metav1.AddToGroupVersion(mgr.GetScheme(), options.GVK.GroupVersion())
77-
7871
c, err := controller.New(controllerName, mgr, controller.Options{
7972
Reconciler: r,
8073
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
@@ -86,18 +79,7 @@ func Add(mgr manager.Manager, options WatchOptions) error {
8679
o := &unstructured.Unstructured{}
8780
o.SetGroupVersionKind(options.GVK)
8881

89-
var preds []ctrlpredicate.Predicate
90-
p, err := parsePredicateSelector(options.Selector)
91-
92-
if err != nil {
93-
return err
94-
}
95-
96-
if p != nil {
97-
preds = append(preds, p)
98-
}
99-
100-
if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}, preds...); err != nil {
82+
if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}); err != nil {
10183
return err
10284
}
10385

@@ -106,33 +88,19 @@ func Add(mgr manager.Manager, options WatchOptions) error {
10688
}
10789

10890
log.Info("Watching resource", "apiVersion", options.GVK.GroupVersion(), "kind",
109-
options.GVK.Kind, "namespace", options.Namespace, "reconcilePeriod", options.ReconcilePeriod.String())
91+
options.GVK.Kind, "reconcilePeriod", options.ReconcilePeriod.String())
11092
return nil
11193
}
11294

113-
// parsePredicateSelector parses the selector in the WatchOptions and creates a predicate
114-
// that is used to filter resources based on the specified selector
115-
func parsePredicateSelector(selector metav1.LabelSelector) (ctrlpredicate.Predicate, error) {
116-
// If a selector has been specified in watches.yaml, add it to the watch's predicates.
117-
if !reflect.ValueOf(selector).IsZero() {
118-
p, err := ctrlpredicate.LabelSelectorPredicate(selector)
119-
if err != nil {
120-
return nil, fmt.Errorf("error constructing predicate from watches selector: %v", err)
121-
}
122-
return p, nil
123-
}
124-
return nil, nil
125-
}
126-
12795
// watchDependentResources adds a release hook function to the HelmOperatorReconciler
12896
// that adds watches for resources in released Helm charts.
12997
func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c controller.Controller) {
130-
owner := &unstructured.Unstructured{}
131-
owner.SetGroupVersionKind(r.GVK)
132-
13398
var m sync.RWMutex
13499
watches := map[schema.GroupVersionKind]struct{}{}
135100
releaseHook := func(release *rpb.Release) error {
101+
owner := &unstructured.Unstructured{}
102+
owner.SetGroupVersionKind(r.GVK)
103+
owner.SetNamespace(release.Namespace)
136104
resources := releaseutil.SplitManifests(release.Manifest)
137105
for _, resource := range resources {
138106
var u unstructured.Unstructured

internal/helm/controller/controller_test.go

-39
This file was deleted.

internal/helm/release/manager_factory.go

+4
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues
5858
return nil, fmt.Errorf("failed to load chart dir: %w", err)
5959
}
6060

61+
actionConfig.KubeClient = client.NewLabelInjectingClient(actionConfig.KubeClient, map[string]string{
62+
"helm.sdk.operatorframework.io/chart": crChart.Name(),
63+
})
64+
6165
releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr)
6266
if err != nil {
6367
return nil, fmt.Errorf("failed to get helm release name: %w", err)

0 commit comments

Comments
 (0)