From 2b29265d395b420a9e8579d65a6e00e24e087ebd Mon Sep 17 00:00:00 2001
From: Joe Lanford <joe.lanford@gmail.com>
Date: Wed, 26 Mar 2025 21:35:19 -0400
Subject: [PATCH] contentmanager: use namespace-scoped informers

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
---
 .../contentmanager/cache/cache.go             | 112 +++++++++------
 .../contentmanager/cache/cache_test.go        | 131 +++++++++++++++---
 .../contentmanager/contentmanager.go          |   6 +-
 .../contentmanager/sourcerer.go               |   6 +-
 4 files changed, 187 insertions(+), 68 deletions(-)

diff --git a/internal/operator-controller/contentmanager/cache/cache.go b/internal/operator-controller/contentmanager/cache/cache.go
index f56cfd575..0cf8bc9a1 100644
--- a/internal/operator-controller/contentmanager/cache/cache.go
+++ b/internal/operator-controller/contentmanager/cache/cache.go
@@ -10,6 +10,8 @@ import (
 	"sync"
 	"time"
 
+	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/meta"
 	"k8s.io/apimachinery/pkg/runtime/schema"
 	"k8s.io/apimachinery/pkg/util/sets"
 	"sigs.k8s.io/controller-runtime/pkg/client"
@@ -41,27 +43,29 @@ type CloserSyncingSource interface {
 }
 
 type sourcerer interface {
-	// Source returns a CloserSyncingSource for the provided
-	// GroupVersionKind. If the CloserSyncingSource encounters an
+	// Source returns a CloserSyncingSource for the provided namespace
+	// and GroupVersionKind. If the CloserSyncingSource encounters an
 	// error after having initially synced, it should requeue the
 	// provided client.Object and call the provided callback function
-	Source(schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
+	Source(string, schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
 }
 
 type cache struct {
-	sources     map[schema.GroupVersionKind]CloserSyncingSource
+	sources     map[sourceKey]CloserSyncingSource
 	sourcerer   sourcerer
 	owner       client.Object
 	syncTimeout time.Duration
 	mu          sync.Mutex
+	restMapper  meta.RESTMapper
 }
 
-func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration) Cache {
+func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration, rm meta.RESTMapper) Cache {
 	return &cache{
-		sources:     make(map[schema.GroupVersionKind]CloserSyncingSource),
+		sources:     make(map[sourceKey]CloserSyncingSource),
 		sourcerer:   sourcerer,
 		owner:       owner,
 		syncTimeout: syncTimeout,
+		restMapper:  rm,
 	}
 }
 
@@ -70,15 +74,15 @@ var _ Cache = (*cache)(nil)
 func (c *cache) Watch(ctx context.Context, watcher Watcher, objs ...client.Object) error {
 	c.mu.Lock()
 	defer c.mu.Unlock()
-	gvkSet, err := gvksForObjects(objs...)
+	sourceKeySet, err := c.sourceKeysForObjects(objs...)
 	if err != nil {
 		return fmt.Errorf("getting set of GVKs for managed objects: %w", err)
 	}
 
-	if err := c.removeStaleSources(gvkSet); err != nil {
+	if err := c.removeStaleSources(sourceKeySet); err != nil {
 		return fmt.Errorf("removing stale sources: %w", err)
 	}
-	return c.startNewSources(ctx, gvkSet, watcher)
+	return c.startNewSources(ctx, sourceKeySet, watcher)
 }
 
 func (c *cache) Close() error {
@@ -99,29 +103,35 @@ func (c *cache) Close() error {
 	return errors.Join(errs...)
 }
 
-func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupVersionKind], watcher Watcher) error {
-	cacheGvks := c.getCacheGVKs()
-	gvksToCreate := gvks.Difference(cacheGvks)
+type sourceKey struct {
+	namespace string
+	gvk       schema.GroupVersionKind
+}
 
+func (c *cache) startNewSources(ctx context.Context, sources sets.Set[sourceKey], watcher Watcher) error {
 	type startResult struct {
 		source CloserSyncingSource
-		gvk    schema.GroupVersionKind
+		key    sourceKey
 		err    error
 	}
 	startResults := make(chan startResult)
 	wg := sync.WaitGroup{}
-	for _, gvk := range gvksToCreate.UnsortedList() {
+
+	existingSourceKeys := c.getCacheKeys()
+	sourcesToCreate := sources.Difference(existingSourceKeys)
+	for _, srcKey := range sourcesToCreate.UnsortedList() {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
-			source, err := c.startNewSource(ctx, gvk, watcher)
+			source, err := c.startNewSource(ctx, srcKey, watcher)
 			startResults <- startResult{
 				source: source,
-				gvk:    gvk,
+				key:    srcKey,
 				err:    err,
 			}
 		}()
 	}
+
 	go func() {
 		wg.Wait()
 		close(startResults)
@@ -134,7 +144,7 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
 			continue
 		}
 
-		err := c.addSource(result.gvk, result.source)
+		err := c.addSource(result.key, result.source)
 		if err != nil {
 			// If we made it here then there is a logic error in
 			// calculating the diffs between what is currently being
@@ -146,20 +156,19 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
 	slices.SortFunc(sourcesErrors, func(a, b error) int {
 		return strings.Compare(a.Error(), b.Error())
 	})
-
 	return errors.Join(sourcesErrors...)
 }
 
-func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, watcher Watcher) (CloserSyncingSource, error) {
-	s, err := c.sourcerer.Source(gvk, c.owner, func(ctx context.Context) {
+func (c *cache) startNewSource(ctx context.Context, srcKey sourceKey, watcher Watcher) (CloserSyncingSource, error) {
+	s, err := c.sourcerer.Source(srcKey.namespace, srcKey.gvk, c.owner, func(ctx context.Context) {
 		// this callback function ensures that we remove the source from the
 		// cache if it encounters an error after it initially synced successfully
 		c.mu.Lock()
 		defer c.mu.Unlock()
-		err := c.removeSource(gvk)
+		err := c.removeSource(srcKey)
 		if err != nil {
 			logr := log.FromContext(ctx)
-			logr.Error(err, "managed content cache postSyncError removing source failed", "gvk", gvk)
+			logr.Error(err, "managed content cache postSyncError removing source failed", "namespace", srcKey.namespace, "gvk", srcKey.gvk)
 		}
 	})
 	if err != nil {
@@ -168,7 +177,7 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,
 
 	err = watcher.Watch(s)
 	if err != nil {
-		return nil, fmt.Errorf("establishing watch for GVK %q: %w", gvk, err)
+		return nil, fmt.Errorf("establishing watch for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
 	}
 
 	syncCtx, syncCancel := context.WithTimeout(ctx, c.syncTimeout)
@@ -181,19 +190,19 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,
 	return s, nil
 }
 
-func (c *cache) addSource(gvk schema.GroupVersionKind, source CloserSyncingSource) error {
-	if _, ok := c.sources[gvk]; !ok {
-		c.sources[gvk] = source
+func (c *cache) addSource(key sourceKey, source CloserSyncingSource) error {
+	if _, ok := c.sources[key]; !ok {
+		c.sources[key] = source
 		return nil
 	}
 	return errors.New("source already exists")
 }
 
-func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error {
-	cacheGvks := c.getCacheGVKs()
+func (c *cache) removeStaleSources(srcKeys sets.Set[sourceKey]) error {
+	existingSrcKeys := c.getCacheKeys()
 	removeErrs := []error{}
-	gvksToRemove := cacheGvks.Difference(gvks)
-	for _, gvk := range gvksToRemove.UnsortedList() {
+	srcKeysToRemove := existingSrcKeys.Difference(srcKeys)
+	for _, gvk := range srcKeysToRemove.UnsortedList() {
 		err := c.removeSource(gvk)
 		if err != nil {
 			removeErrs = append(removeErrs, err)
@@ -207,23 +216,23 @@ func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error
 	return errors.Join(removeErrs...)
 }
 
-func (c *cache) removeSource(gvk schema.GroupVersionKind) error {
-	if source, ok := c.sources[gvk]; ok {
-		err := source.Close()
+func (c *cache) removeSource(srcKey sourceKey) error {
+	if src, ok := c.sources[srcKey]; ok {
+		err := src.Close()
 		if err != nil {
-			return fmt.Errorf("closing source for GVK %q: %w", gvk, err)
+			return fmt.Errorf("closing source for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
 		}
 	}
-	delete(c.sources, gvk)
+	delete(c.sources, srcKey)
 	return nil
 }
 
-func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
-	cacheGvks := sets.New[schema.GroupVersionKind]()
-	for gvk := range c.sources {
-		cacheGvks.Insert(gvk)
+func (c *cache) getCacheKeys() sets.Set[sourceKey] {
+	sourceKeys := sets.New[sourceKey]()
+	for key := range c.sources {
+		sourceKeys.Insert(key)
 	}
-	return cacheGvks
+	return sourceKeys
 }
 
 // gvksForObjects builds a sets.Set of GroupVersionKinds for
@@ -233,8 +242,8 @@ func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
 //
 // An empty Group is assumed to be the "core" Kubernetes
 // API group.
-func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], error) {
-	gvkSet := sets.New[schema.GroupVersionKind]()
+func (c *cache) sourceKeysForObjects(objs ...client.Object) (sets.Set[sourceKey], error) {
+	sourceKeys := sets.New[sourceKey]()
 	for _, obj := range objs {
 		gvk := obj.GetObjectKind().GroupVersionKind()
 
@@ -257,8 +266,23 @@ func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], e
 			)
 		}
 
-		gvkSet.Insert(gvk)
+		// We shouldn't blindly accept the namespace value provided by the object.
+		// If the object is cluster-scoped, but includes a namespace for some reason,
+		// we need to make sure to create the source key with namespace set to
+		// corev1.NamespaceAll to ensure that the informer we start actually ends up
+		// watch the cluster-scoped object with a cluster-scoped informer.
+		mapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
+		if err != nil {
+			return nil, fmt.Errorf("adding %q with GVK %q to set; rest mapping failed: %w", obj.GetName(), gvk, err)
+		}
+
+		ns := corev1.NamespaceAll
+		if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
+			ns = obj.GetNamespace()
+		}
+
+		sourceKeys.Insert(sourceKey{ns, gvk})
 	}
 
-	return gvkSet, nil
+	return sourceKeys, nil
 }
diff --git a/internal/operator-controller/contentmanager/cache/cache_test.go b/internal/operator-controller/contentmanager/cache/cache_test.go
index da4455168..c39400b91 100644
--- a/internal/operator-controller/contentmanager/cache/cache_test.go
+++ b/internal/operator-controller/contentmanager/cache/cache_test.go
@@ -8,7 +8,9 @@ import (
 
 	"github.com/stretchr/testify/require"
 	corev1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/meta"
 	"k8s.io/apimachinery/pkg/runtime/schema"
+	"k8s.io/apimachinery/pkg/util/rand"
 	"k8s.io/client-go/util/workqueue"
 	"sigs.k8s.io/controller-runtime/pkg/client"
 	"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -27,6 +29,70 @@ func (mw *mockWatcher) Watch(source.Source) error {
 	return mw.err
 }
 
+type mockRESTMapper struct {
+	mappings map[schema.GroupVersionKind]*meta.RESTMapping
+}
+
+var _ meta.RESTMapper = (*mockRESTMapper)(nil)
+
+func (m *mockRESTMapper) KindFor(_ schema.GroupVersionResource) (schema.GroupVersionKind, error) {
+	panic("unused")
+}
+
+func (m *mockRESTMapper) KindsFor(_ schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
+	panic("unused")
+}
+
+func (m *mockRESTMapper) ResourceFor(_ schema.GroupVersionResource) (schema.GroupVersionResource, error) {
+	panic("unused")
+}
+
+func (m *mockRESTMapper) ResourcesFor(_ schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
+	panic("unused")
+}
+
+func (m *mockRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
+	if len(versions) != 1 {
+		panic("always expect 1 version for mock rest mapping")
+	}
+	mapping, ok := m.mappings[gk.WithVersion(versions[0])]
+	if !ok {
+		return nil, &meta.NoKindMatchError{
+			GroupKind:        gk,
+			SearchedVersions: versions,
+		}
+	}
+	return mapping, nil
+}
+
+func (m *mockRESTMapper) RESTMappings(_ schema.GroupKind, _ ...string) ([]*meta.RESTMapping, error) {
+	panic("unused")
+}
+
+func (m *mockRESTMapper) ResourceSingularizer(_ string) (string, error) {
+	panic("unused")
+}
+
+var testRESTMapper = &mockRESTMapper{
+	mappings: map[schema.GroupVersionKind]*meta.RESTMapping{
+		corev1.SchemeGroupVersion.WithKind("Pod"): {
+			Resource:         corev1.SchemeGroupVersion.WithResource("pods"),
+			GroupVersionKind: corev1.SchemeGroupVersion.WithKind("Pod"),
+			Scope:            meta.RESTScopeNamespace,
+		},
+		corev1.SchemeGroupVersion.WithKind("Secret"): {
+			Resource:         corev1.SchemeGroupVersion.WithResource("secrets"),
+			GroupVersionKind: corev1.SchemeGroupVersion.WithKind("Secret"),
+			Scope:            meta.RESTScopeNamespace,
+		},
+		corev1.SchemeGroupVersion.WithKind("Namespace"): {
+			Resource:         corev1.SchemeGroupVersion.WithResource("namespaces"),
+			GroupVersionKind: corev1.SchemeGroupVersion.WithKind("Namespace"),
+			Scope:            meta.RESTScopeRoot,
+		},
+	},
+}
+
 type mockSourcerer struct {
 	err    error
 	source CloserSyncingSource
@@ -34,7 +100,7 @@ type mockSourcerer struct {
 
 var _ sourcerer = (*mockSourcerer)(nil)
 
-func (ms *mockSourcerer) Source(_ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) {
+func (ms *mockSourcerer) Source(_ string, _ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) {
 	if ms.err != nil {
 		return nil, ms.err
 	}
@@ -66,14 +132,33 @@ func TestCacheWatch(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
 	pod := &corev1.Pod{}
-	podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
-	pod.SetGroupVersionKind(podGvk)
+	pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
+	pod.SetNamespace(rand.String(8))
 
 	require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod))
-	require.Contains(t, c.(*cache).sources, podGvk, "sources", c.(*cache).sources)
+	require.Contains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()}, "sources", c.(*cache).sources)
+}
+
+func TestCacheWatchClusterScopedIgnoresNamespace(t *testing.T) {
+	c := NewCache(
+		&mockSourcerer{
+			source: &mockSource{},
+		},
+		&ocv1.ClusterExtension{},
+		time.Second,
+		testRESTMapper,
+	)
+
+	ns := &corev1.Namespace{}
+	ns.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Namespace"))
+	ns.SetNamespace(rand.String(8))
+
+	require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, ns))
+	require.Contains(t, c.(*cache).sources, sourceKey{corev1.NamespaceAll, ns.GroupVersionKind()}, "sources", c.(*cache).sources)
 }
 
 func TestCacheWatchInvalidGVK(t *testing.T) {
@@ -83,6 +168,7 @@ func TestCacheWatchInvalidGVK(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
 	pod := &corev1.Pod{}
@@ -96,6 +182,7 @@ func TestCacheWatchSourcererError(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
 	pod := &corev1.Pod{}
@@ -111,6 +198,7 @@ func TestCacheWatchWatcherError(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
 	pod := &corev1.Pod{}
@@ -128,6 +216,7 @@ func TestCacheWatchSourceWaitForSyncError(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
 	pod := &corev1.Pod{}
@@ -144,12 +233,13 @@ func TestCacheWatchExistingSourceNotPanic(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
 	pod := &corev1.Pod{}
-	podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
-	pod.SetGroupVersionKind(podGvk)
-	require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{}))
+	pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
+	pod.SetNamespace(rand.String(8))
+	require.NoError(t, c.(*cache).addSource(sourceKey{pod.Namespace, pod.GroupVersionKind()}, &mockSource{}))
 
 	// In this case, a panic means there is a logic error somewhere in the
 	// cache.Watch() method. It should never hit the condition where it panics
@@ -164,21 +254,22 @@ func TestCacheWatchRemovesStaleSources(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
 	pod := &corev1.Pod{}
-	podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
-	pod.SetGroupVersionKind(podGvk)
+	pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
+	pod.SetNamespace(rand.String(8))
 
 	require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod))
-	require.Contains(t, c.(*cache).sources, podGvk)
+	require.Contains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()})
 
 	secret := &corev1.Secret{}
-	secretGvk := corev1.SchemeGroupVersion.WithKind("Secret")
-	secret.SetGroupVersionKind(secretGvk)
+	secret.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret"))
+	secret.SetNamespace(rand.String(8))
 	require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, secret))
-	require.Contains(t, c.(*cache).sources, secretGvk)
-	require.NotContains(t, c.(*cache).sources, podGvk)
+	require.Contains(t, c.(*cache).sources, sourceKey{secret.Namespace, secret.GroupVersionKind()})
+	require.NotContains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()})
 }
 
 func TestCacheWatchRemovingStaleSourcesError(t *testing.T) {
@@ -188,15 +279,19 @@ func TestCacheWatchRemovingStaleSourcesError(t *testing.T) {
 		},
 		&ocv1.ClusterExtension{},
 		time.Second,
+		testRESTMapper,
 	)
 
-	podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
-	require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{
+	podSourceKey := sourceKey{
+		namespace: rand.String(8),
+		gvk:       corev1.SchemeGroupVersion.WithKind("Pod"),
+	}
+	require.NoError(t, c.(*cache).addSource(podSourceKey, &mockSource{
 		err: errors.New("error"),
 	}))
 
 	secret := &corev1.Secret{}
-	secretGvk := corev1.SchemeGroupVersion.WithKind("Secret")
-	secret.SetGroupVersionKind(secretGvk)
+	secret.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret"))
+	secret.SetNamespace(rand.String(8))
 	require.Error(t, c.Watch(context.Background(), &mockWatcher{}, secret))
 }
diff --git a/internal/operator-controller/contentmanager/contentmanager.go b/internal/operator-controller/contentmanager/contentmanager.go
index d488bdb53..b9753426f 100644
--- a/internal/operator-controller/contentmanager/contentmanager.go
+++ b/internal/operator-controller/contentmanager/contentmanager.go
@@ -116,14 +116,14 @@ func (i *managerImpl) Get(ctx context.Context, ce *ocv1.ClusterExtension) (cmcac
 		// related to reusing an informer factory, we return a new informer
 		// factory every time to ensure we are not attempting to configure or
 		// start an already started informer
-		informerFactoryCreateFunc: func() dynamicinformer.DynamicSharedInformerFactory {
-			return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, metav1.NamespaceAll, func(lo *metav1.ListOptions) {
+		informerFactoryCreateFunc: func(namespace string) dynamicinformer.DynamicSharedInformerFactory {
+			return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, namespace, func(lo *metav1.ListOptions) {
 				lo.LabelSelector = tgtLabels.AsSelector().String()
 			})
 		},
 		mapper: i.mapper,
 	}
-	cache = cmcache.NewCache(dynamicSourcerer, ce, i.syncTimeout)
+	cache = cmcache.NewCache(dynamicSourcerer, ce, i.syncTimeout, i.mapper)
 	i.caches[ce.Name] = cache
 	return cache, nil
 }
diff --git a/internal/operator-controller/contentmanager/sourcerer.go b/internal/operator-controller/contentmanager/sourcerer.go
index 050de8785..b4c952edf 100644
--- a/internal/operator-controller/contentmanager/sourcerer.go
+++ b/internal/operator-controller/contentmanager/sourcerer.go
@@ -21,11 +21,11 @@ import (
 )
 
 type dynamicSourcerer struct {
-	informerFactoryCreateFunc func() dynamicinformer.DynamicSharedInformerFactory
+	informerFactoryCreateFunc func(namespace string) dynamicinformer.DynamicSharedInformerFactory
 	mapper                    meta.RESTMapper
 }
 
-func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) {
+func (ds *dynamicSourcerer) Source(namespace string, gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) {
 	scheme, err := buildScheme(gvk)
 	if err != nil {
 		return nil, fmt.Errorf("building scheme: %w", err)
@@ -48,7 +48,7 @@ func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Obj
 				GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true },
 			},
 		},
-		DynamicInformerFactory: ds.informerFactoryCreateFunc(),
+		DynamicInformerFactory: ds.informerFactoryCreateFunc(namespace),
 		OnPostSyncError:        onPostSyncError,
 	})
 	return s, nil