Skip to content

Commit 2b29265

Browse files
committed
contentmanager: use namespace-scoped informers
Signed-off-by: Joe Lanford <[email protected]>
1 parent a2ae8b8 commit 2b29265

File tree

4 files changed

+187
-68
lines changed

4 files changed

+187
-68
lines changed

internal/operator-controller/contentmanager/cache/cache.go

+68-44
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"sync"
1111
"time"
1212

13+
corev1 "k8s.io/api/core/v1"
14+
"k8s.io/apimachinery/pkg/api/meta"
1315
"k8s.io/apimachinery/pkg/runtime/schema"
1416
"k8s.io/apimachinery/pkg/util/sets"
1517
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -41,27 +43,29 @@ type CloserSyncingSource interface {
4143
}
4244

4345
type sourcerer interface {
44-
// Source returns a CloserSyncingSource for the provided
45-
// GroupVersionKind. If the CloserSyncingSource encounters an
46+
// Source returns a CloserSyncingSource for the provided namespace
47+
// and GroupVersionKind. If the CloserSyncingSource encounters an
4648
// error after having initially synced, it should requeue the
4749
// provided client.Object and call the provided callback function
48-
Source(schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
50+
Source(string, schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
4951
}
5052

5153
type cache struct {
52-
sources map[schema.GroupVersionKind]CloserSyncingSource
54+
sources map[sourceKey]CloserSyncingSource
5355
sourcerer sourcerer
5456
owner client.Object
5557
syncTimeout time.Duration
5658
mu sync.Mutex
59+
restMapper meta.RESTMapper
5760
}
5861

59-
func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration) Cache {
62+
func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration, rm meta.RESTMapper) Cache {
6063
return &cache{
61-
sources: make(map[schema.GroupVersionKind]CloserSyncingSource),
64+
sources: make(map[sourceKey]CloserSyncingSource),
6265
sourcerer: sourcerer,
6366
owner: owner,
6467
syncTimeout: syncTimeout,
68+
restMapper: rm,
6569
}
6670
}
6771

@@ -70,15 +74,15 @@ var _ Cache = (*cache)(nil)
7074
func (c *cache) Watch(ctx context.Context, watcher Watcher, objs ...client.Object) error {
7175
c.mu.Lock()
7276
defer c.mu.Unlock()
73-
gvkSet, err := gvksForObjects(objs...)
77+
sourceKeySet, err := c.sourceKeysForObjects(objs...)
7478
if err != nil {
7579
return fmt.Errorf("getting set of GVKs for managed objects: %w", err)
7680
}
7781

78-
if err := c.removeStaleSources(gvkSet); err != nil {
82+
if err := c.removeStaleSources(sourceKeySet); err != nil {
7983
return fmt.Errorf("removing stale sources: %w", err)
8084
}
81-
return c.startNewSources(ctx, gvkSet, watcher)
85+
return c.startNewSources(ctx, sourceKeySet, watcher)
8286
}
8387

8488
func (c *cache) Close() error {
@@ -99,29 +103,35 @@ func (c *cache) Close() error {
99103
return errors.Join(errs...)
100104
}
101105

102-
func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupVersionKind], watcher Watcher) error {
103-
cacheGvks := c.getCacheGVKs()
104-
gvksToCreate := gvks.Difference(cacheGvks)
106+
type sourceKey struct {
107+
namespace string
108+
gvk schema.GroupVersionKind
109+
}
105110

111+
func (c *cache) startNewSources(ctx context.Context, sources sets.Set[sourceKey], watcher Watcher) error {
106112
type startResult struct {
107113
source CloserSyncingSource
108-
gvk schema.GroupVersionKind
114+
key sourceKey
109115
err error
110116
}
111117
startResults := make(chan startResult)
112118
wg := sync.WaitGroup{}
113-
for _, gvk := range gvksToCreate.UnsortedList() {
119+
120+
existingSourceKeys := c.getCacheKeys()
121+
sourcesToCreate := sources.Difference(existingSourceKeys)
122+
for _, srcKey := range sourcesToCreate.UnsortedList() {
114123
wg.Add(1)
115124
go func() {
116125
defer wg.Done()
117-
source, err := c.startNewSource(ctx, gvk, watcher)
126+
source, err := c.startNewSource(ctx, srcKey, watcher)
118127
startResults <- startResult{
119128
source: source,
120-
gvk: gvk,
129+
key: srcKey,
121130
err: err,
122131
}
123132
}()
124133
}
134+
125135
go func() {
126136
wg.Wait()
127137
close(startResults)
@@ -134,7 +144,7 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
134144
continue
135145
}
136146

137-
err := c.addSource(result.gvk, result.source)
147+
err := c.addSource(result.key, result.source)
138148
if err != nil {
139149
// If we made it here then there is a logic error in
140150
// calculating the diffs between what is currently being
@@ -146,20 +156,19 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
146156
slices.SortFunc(sourcesErrors, func(a, b error) int {
147157
return strings.Compare(a.Error(), b.Error())
148158
})
149-
150159
return errors.Join(sourcesErrors...)
151160
}
152161

153-
func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, watcher Watcher) (CloserSyncingSource, error) {
154-
s, err := c.sourcerer.Source(gvk, c.owner, func(ctx context.Context) {
162+
func (c *cache) startNewSource(ctx context.Context, srcKey sourceKey, watcher Watcher) (CloserSyncingSource, error) {
163+
s, err := c.sourcerer.Source(srcKey.namespace, srcKey.gvk, c.owner, func(ctx context.Context) {
155164
// this callback function ensures that we remove the source from the
156165
// cache if it encounters an error after it initially synced successfully
157166
c.mu.Lock()
158167
defer c.mu.Unlock()
159-
err := c.removeSource(gvk)
168+
err := c.removeSource(srcKey)
160169
if err != nil {
161170
logr := log.FromContext(ctx)
162-
logr.Error(err, "managed content cache postSyncError removing source failed", "gvk", gvk)
171+
logr.Error(err, "managed content cache postSyncError removing source failed", "namespace", srcKey.namespace, "gvk", srcKey.gvk)
163172
}
164173
})
165174
if err != nil {
@@ -168,7 +177,7 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,
168177

169178
err = watcher.Watch(s)
170179
if err != nil {
171-
return nil, fmt.Errorf("establishing watch for GVK %q: %w", gvk, err)
180+
return nil, fmt.Errorf("establishing watch for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
172181
}
173182

174183
syncCtx, syncCancel := context.WithTimeout(ctx, c.syncTimeout)
@@ -181,19 +190,19 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,
181190
return s, nil
182191
}
183192

184-
func (c *cache) addSource(gvk schema.GroupVersionKind, source CloserSyncingSource) error {
185-
if _, ok := c.sources[gvk]; !ok {
186-
c.sources[gvk] = source
193+
func (c *cache) addSource(key sourceKey, source CloserSyncingSource) error {
194+
if _, ok := c.sources[key]; !ok {
195+
c.sources[key] = source
187196
return nil
188197
}
189198
return errors.New("source already exists")
190199
}
191200

192-
func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error {
193-
cacheGvks := c.getCacheGVKs()
201+
func (c *cache) removeStaleSources(srcKeys sets.Set[sourceKey]) error {
202+
existingSrcKeys := c.getCacheKeys()
194203
removeErrs := []error{}
195-
gvksToRemove := cacheGvks.Difference(gvks)
196-
for _, gvk := range gvksToRemove.UnsortedList() {
204+
srcKeysToRemove := existingSrcKeys.Difference(srcKeys)
205+
for _, gvk := range srcKeysToRemove.UnsortedList() {
197206
err := c.removeSource(gvk)
198207
if err != nil {
199208
removeErrs = append(removeErrs, err)
@@ -207,23 +216,23 @@ func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error
207216
return errors.Join(removeErrs...)
208217
}
209218

210-
func (c *cache) removeSource(gvk schema.GroupVersionKind) error {
211-
if source, ok := c.sources[gvk]; ok {
212-
err := source.Close()
219+
func (c *cache) removeSource(srcKey sourceKey) error {
220+
if src, ok := c.sources[srcKey]; ok {
221+
err := src.Close()
213222
if err != nil {
214-
return fmt.Errorf("closing source for GVK %q: %w", gvk, err)
223+
return fmt.Errorf("closing source for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
215224
}
216225
}
217-
delete(c.sources, gvk)
226+
delete(c.sources, srcKey)
218227
return nil
219228
}
220229

221-
func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
222-
cacheGvks := sets.New[schema.GroupVersionKind]()
223-
for gvk := range c.sources {
224-
cacheGvks.Insert(gvk)
230+
func (c *cache) getCacheKeys() sets.Set[sourceKey] {
231+
sourceKeys := sets.New[sourceKey]()
232+
for key := range c.sources {
233+
sourceKeys.Insert(key)
225234
}
226-
return cacheGvks
235+
return sourceKeys
227236
}
228237

229238
// gvksForObjects builds a sets.Set of GroupVersionKinds for
@@ -233,8 +242,8 @@ func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
233242
//
234243
// An empty Group is assumed to be the "core" Kubernetes
235244
// API group.
236-
func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], error) {
237-
gvkSet := sets.New[schema.GroupVersionKind]()
245+
func (c *cache) sourceKeysForObjects(objs ...client.Object) (sets.Set[sourceKey], error) {
246+
sourceKeys := sets.New[sourceKey]()
238247
for _, obj := range objs {
239248
gvk := obj.GetObjectKind().GroupVersionKind()
240249

@@ -257,8 +266,23 @@ func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], e
257266
)
258267
}
259268

260-
gvkSet.Insert(gvk)
269+
// We shouldn't blindly accept the namespace value provided by the object.
270+
// If the object is cluster-scoped, but includes a namespace for some reason,
271+
// we need to make sure to create the source key with namespace set to
272+
// corev1.NamespaceAll to ensure that the informer we start actually ends up
273+
// watch the cluster-scoped object with a cluster-scoped informer.
274+
mapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
275+
if err != nil {
276+
return nil, fmt.Errorf("adding %q with GVK %q to set; rest mapping failed: %w", obj.GetName(), gvk, err)
277+
}
278+
279+
ns := corev1.NamespaceAll
280+
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
281+
ns = obj.GetNamespace()
282+
}
283+
284+
sourceKeys.Insert(sourceKey{ns, gvk})
261285
}
262286

263-
return gvkSet, nil
287+
return sourceKeys, nil
264288
}

0 commit comments

Comments
 (0)