Skip to content

Commit c20b41b

Browse files
committed
(bugfix): rewrite the contentmanager implementation
to fix bugs associated with insufficient permissions resulting in halting reconciliation of all ClusterExtension and informer sync errors not being reported via the ClusterExtension status conditions. Signed-off-by: everettraven <[email protected]>
1 parent 2490a01 commit c20b41b

12 files changed

+837
-378
lines changed

Diff for: api/v1alpha1/clusterextension_types.go

+5
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ const (
116116
// TODO(user): add more Types, here and into init()
117117
TypeInstalled = "Installed"
118118
TypeResolved = "Resolved"
119+
TypeManaged = "Managed"
119120

120121
// TypeDeprecated is a rollup condition that is present when
121122
// any of the deprecated conditions are present.
@@ -139,6 +140,8 @@ const (
139140
ReasonUnpackFailed = "UnpackFailed"
140141

141142
ReasonErrorGettingReleaseState = "ErrorGettingReleaseState"
143+
144+
ReasonManagementFailed = "ManagementFailed"
142145
)
143146

144147
func init() {
@@ -151,6 +154,7 @@ func init() {
151154
TypeChannelDeprecated,
152155
TypeBundleDeprecated,
153156
TypeUnpacked,
157+
TypeManaged,
154158
)
155159
// TODO(user): add Reasons from above
156160
conditionsets.ConditionReasons = append(conditionsets.ConditionReasons,
@@ -164,6 +168,7 @@ func init() {
164168
ReasonUnpackSuccess,
165169
ReasonUnpackFailed,
166170
ReasonErrorGettingReleaseState,
171+
ReasonManagementFailed,
167172
)
168173
}
169174

Diff for: cmd/manager/main.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -254,14 +254,22 @@ func main() {
254254
Preflights: preflights,
255255
}
256256

257+
cm := contentmanager.NewManager(clientRestConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper())
258+
contentManagerFinalizerKey := fmt.Sprintf("%s/contentmanager-cleanup", domain)
259+
clusterExtensionFinalizers.Register(contentManagerFinalizerKey, finalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) {
260+
ext := obj.(*ocv1alpha1.ClusterExtension)
261+
cm.Delete(ext)
262+
return crfinalizer.Result{}, nil
263+
}))
264+
257265
if err = (&controllers.ClusterExtensionReconciler{
258266
Client: cl,
259267
Resolver: resolver,
260268
Unpacker: unpacker,
261269
Applier: applier,
262270
InstalledBundleGetter: &controllers.DefaultInstalledBundleGetter{ActionClientGetter: acg},
263271
Finalizers: clusterExtensionFinalizers,
264-
Watcher: contentmanager.New(clientRestConfigMapper, mgr.GetConfig(), mgr.GetRESTMapper()),
272+
Manager: cm,
265273
}).SetupWithManager(mgr); err != nil {
266274
setupLog.Error(err, "unable to create controller", "controller", "ClusterExtension")
267275
os.Exit(1)

Diff for: config/samples/olm_v1alpha1_clusterextension.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ rules:
3636
# Manage ArgoCD CRDs
3737
- apiGroups: [apiextensions.k8s.io]
3838
resources: [customresourcedefinitions]
39-
verbs: [create]
39+
verbs: [create, list, watch]
4040
- apiGroups: [apiextensions.k8s.io]
4141
resources: [customresourcedefinitions]
42-
verbs: [get, list, watch, update, patch, delete]
42+
verbs: [get, update, patch, delete]
4343
resourceNames:
4444
- appprojects.argoproj.io
4545
- argocds.argoproj.io

Diff for: internal/applier/helm.go

+45-8
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package applier
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"io"
79
"io/fs"
810
"strings"
911

1012
"helm.sh/helm/v3/pkg/action"
1113
"helm.sh/helm/v3/pkg/chart"
1214
"helm.sh/helm/v3/pkg/chartutil"
15+
"helm.sh/helm/v3/pkg/postrender"
1316
"helm.sh/helm/v3/pkg/release"
1417
"helm.sh/helm/v3/pkg/storage/driver"
1518
corev1 "k8s.io/api/core/v1"
19+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1620
"sigs.k8s.io/controller-runtime/pkg/client"
1721

1822
helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client"
@@ -21,6 +25,7 @@ import (
2125
"github.com/operator-framework/operator-controller/internal/rukpak/convert"
2226
"github.com/operator-framework/operator-controller/internal/rukpak/preflights/crdupgradesafety"
2327
"github.com/operator-framework/operator-controller/internal/rukpak/util"
28+
apimachyaml "k8s.io/apimachinery/pkg/util/yaml"
2429
)
2530

2631
const (
@@ -63,7 +68,11 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust
6368
return nil, "", err
6469
}
6570

66-
rel, desiredRel, state, err := h.getReleaseState(ac, ext, chrt, values, labels)
71+
post := &postrenderer{
72+
labels: labels,
73+
}
74+
75+
rel, desiredRel, state, err := h.getReleaseState(ac, ext, chrt, values, post)
6776
if err != nil {
6877
return nil, "", err
6978
}
@@ -96,7 +105,7 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust
96105
install.CreateNamespace = false
97106
install.Labels = labels
98107
return nil
99-
})
108+
}, helmclient.AppendInstallPostRenderer(post))
100109
if err != nil {
101110
return nil, state, err
102111
}
@@ -105,7 +114,7 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust
105114
upgrade.MaxHistory = maxHelmReleaseHistory
106115
upgrade.Labels = labels
107116
return nil
108-
})
117+
}, helmclient.AppendUpgradePostRenderer(post))
109118
if err != nil {
110119
return nil, state, err
111120
}
@@ -125,7 +134,7 @@ func (h *Helm) Apply(ctx context.Context, contentFS fs.FS, ext *ocv1alpha1.Clust
125134
return relObjects, state, nil
126135
}
127136

128-
func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.ClusterExtension, chrt *chart.Chart, values chartutil.Values, labels map[string]string) (*release.Release, *release.Release, string, error) {
137+
func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.ClusterExtension, chrt *chart.Chart, values chartutil.Values, post postrender.PostRenderer) (*release.Release, *release.Release, string, error) {
129138
currentRelease, err := cl.Get(ext.GetName())
130139
if err != nil && !errors.Is(err, driver.ErrReleaseNotFound) {
131140
return nil, nil, StateError, err
@@ -138,9 +147,8 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl
138147
desiredRelease, err := cl.Install(ext.GetName(), ext.Spec.InstallNamespace, chrt, values, func(i *action.Install) error {
139148
i.DryRun = true
140149
i.DryRunOption = "server"
141-
i.Labels = labels
142150
return nil
143-
})
151+
}, helmclient.AppendInstallPostRenderer(post))
144152
if err != nil {
145153
return nil, nil, StateError, err
146154
}
@@ -150,9 +158,8 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl
150158
upgrade.MaxHistory = maxHelmReleaseHistory
151159
upgrade.DryRun = true
152160
upgrade.DryRunOption = "server"
153-
upgrade.Labels = labels
154161
return nil
155-
})
162+
}, helmclient.AppendUpgradePostRenderer(post))
156163
if err != nil {
157164
return currentRelease, nil, StateError, err
158165
}
@@ -164,3 +171,33 @@ func (h *Helm) getReleaseState(cl helmclient.ActionInterface, ext *ocv1alpha1.Cl
164171
}
165172
return currentRelease, desiredRelease, relState, nil
166173
}
174+
175+
type postrenderer struct {
176+
labels map[string]string
177+
cascade postrender.PostRenderer
178+
}
179+
180+
func (p *postrenderer) Run(renderedManifests *bytes.Buffer) (*bytes.Buffer, error) {
181+
var buf bytes.Buffer
182+
dec := apimachyaml.NewYAMLOrJSONDecoder(renderedManifests, 1024)
183+
for {
184+
obj := unstructured.Unstructured{}
185+
err := dec.Decode(&obj)
186+
if errors.Is(err, io.EOF) {
187+
break
188+
}
189+
if err != nil {
190+
return nil, err
191+
}
192+
obj.SetLabels(util.MergeMaps(obj.GetLabels(), p.labels))
193+
b, err := obj.MarshalJSON()
194+
if err != nil {
195+
return nil, err
196+
}
197+
buf.Write(b)
198+
}
199+
if p.cascade != nil {
200+
return p.cascade.Run(&buf)
201+
}
202+
return &buf, nil
203+
}

Diff for: internal/contentmanager/cache.go

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package contentmanager
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"k8s.io/apimachinery/pkg/labels"
8+
"k8s.io/apimachinery/pkg/runtime"
9+
"k8s.io/apimachinery/pkg/runtime/schema"
10+
"sigs.k8s.io/controller-runtime/pkg/client"
11+
"sigs.k8s.io/controller-runtime/pkg/source"
12+
)
13+
14+
// Cache is a storage mechanism for keeping track of
15+
// managed content sources. It also exposes methods
16+
// for retrieving the stored references of the managed
17+
// content
18+
type Cache interface {
19+
// Get attempts to fetch the object with the provided namespace/name and GroupVersionKind
20+
Get(client.ObjectKey, schema.GroupVersionKind) (runtime.Object, error)
21+
// List attempts to list all the objects with the provided GroupVersionKind and label selector
22+
List(schema.GroupVersionKind, labels.Selector) ([]runtime.Object, error)
23+
// Close stops all sources belonging to the cache
24+
Close()
25+
// AddSource adds a new ReaderCloserSyncincSource to the cache for the provided GroupVersionKind
26+
AddSource(schema.GroupVersionKind, ReaderCloserSyncingSource)
27+
// RemoveSource stops and removes a source from cache for the provided GroupVersionKind
28+
RemoveSource(schema.GroupVersionKind)
29+
// Sources returns the current mapping of GroupVersionKind to ReaderCloserSyncingSource
30+
Sources() map[schema.GroupVersionKind]ReaderCloserSyncingSource
31+
}
32+
33+
// ReaderCloserSyncingSource is a wrapper of the controller-runtime
34+
// source.SyncingSource that includes methods for:
35+
// - Getting a specific resource synced by the source
36+
// - Listing resources synced by the source
37+
// - Closing the source, stopping it's interaction with the Kubernetes API server and reaction to events
38+
type ReaderCloserSyncingSource interface {
39+
source.SyncingSource
40+
// Get attempts to fetch the object with the provided namespace/name
41+
Get(client.ObjectKey) (runtime.Object, error)
42+
// List attempts to list all the objects with the provided label selector
43+
List(labels.Selector) ([]runtime.Object, error)
44+
// Close stops the source
45+
Close()
46+
}
47+
48+
type cache struct {
49+
sources map[schema.GroupVersionKind]ReaderCloserSyncingSource
50+
mu sync.Mutex
51+
}
52+
53+
func NewCache() Cache {
54+
return &cache{
55+
sources: make(map[schema.GroupVersionKind]ReaderCloserSyncingSource),
56+
}
57+
}
58+
59+
var _ Cache = (*cache)(nil)
60+
61+
func (c *cache) Close() {
62+
c.mu.Lock()
63+
defer c.mu.Unlock()
64+
65+
for _, source := range c.sources {
66+
source.Close()
67+
}
68+
}
69+
70+
func (c *cache) Get(key client.ObjectKey, gvk schema.GroupVersionKind) (runtime.Object, error) {
71+
c.mu.Lock()
72+
defer c.mu.Unlock()
73+
74+
source, ok := c.sources[gvk]
75+
if !ok {
76+
return nil, fmt.Errorf("no source for GVK %q", gvk)
77+
}
78+
79+
return source.Get(key)
80+
}
81+
82+
func (c *cache) List(gvk schema.GroupVersionKind, selector labels.Selector) ([]runtime.Object, error) {
83+
c.mu.Lock()
84+
defer c.mu.Unlock()
85+
86+
source, ok := c.sources[gvk]
87+
if !ok {
88+
return nil, fmt.Errorf("no source for GVK %q", gvk)
89+
}
90+
91+
return source.List(selector)
92+
}
93+
94+
func (c *cache) AddSource(gvk schema.GroupVersionKind, source ReaderCloserSyncingSource) {
95+
c.mu.Lock()
96+
defer c.mu.Unlock()
97+
if _, ok := c.sources[gvk]; !ok {
98+
c.sources[gvk] = source
99+
}
100+
}
101+
102+
func (c *cache) RemoveSource(gvk schema.GroupVersionKind) {
103+
c.mu.Lock()
104+
defer c.mu.Unlock()
105+
if source, ok := c.sources[gvk]; ok {
106+
source.Close()
107+
}
108+
delete(c.sources, gvk)
109+
}
110+
111+
func (c *cache) Sources() map[schema.GroupVersionKind]ReaderCloserSyncingSource {
112+
c.mu.Lock()
113+
defer c.mu.Unlock()
114+
return c.sources
115+
}

0 commit comments

Comments
 (0)