Skip to content

Commit 78bebb4

Browse files
committed
label local objects with the agent name so that multiple agents can serve the same API
On-behalf-of: @SAP [email protected]
1 parent 997725a commit 78bebb4

File tree

8 files changed

+68
-3
lines changed

8 files changed

+68
-3
lines changed

Diff for: cmd/api-syncagent/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
138138
return fmt.Errorf("failed to add apiexport controller: %w", err)
139139
}
140140

141-
if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace); err != nil {
141+
if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace, opts.AgentName); err != nil {
142142
return fmt.Errorf("failed to add syncmanager controller: %w", err)
143143
}
144144

Diff for: internal/controller/sync/controller.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"sigs.k8s.io/controller-runtime/pkg/handler"
4343
"sigs.k8s.io/controller-runtime/pkg/kontext"
4444
"sigs.k8s.io/controller-runtime/pkg/manager"
45+
"sigs.k8s.io/controller-runtime/pkg/predicate"
4546
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4647
"sigs.k8s.io/controller-runtime/pkg/source"
4748
)
@@ -69,6 +70,7 @@ func Create(
6970
discoveryClient *discovery.Client,
7071
apiExportName string,
7172
stateNamespace string,
73+
agentName string,
7274
log *zap.SugaredLogger,
7375
numWorkers int,
7476
) (controller.Controller, error) {
@@ -92,7 +94,7 @@ func Create(
9294

9395
// create the syncer that holds the meat&potatoes of the synchronization logic
9496
mutator := mutation.NewMutator(nil) // pubRes.Spec.Mutation
95-
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace)
97+
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace, agentName)
9698
if err != nil {
9799
return nil, fmt.Errorf("failed to create syncer: %w", err)
98100
}
@@ -135,7 +137,12 @@ func Create(
135137
return []reconcile.Request{*req}
136138
})
137139

138-
if err := c.Watch(source.Kind(localManager.GetCache(), localDummy, enqueueRemoteObjForLocalObj)); err != nil {
140+
// only watch local objects that we own
141+
nameFilter := predicate.NewTypedPredicateFuncs(func(u *unstructured.Unstructured) bool {
142+
return sync.OwnedBy(u, agentName)
143+
})
144+
145+
if err := c.Watch(source.Kind(localManager.GetCache(), localDummy, enqueueRemoteObjForLocalObj, nameFilter)); err != nil {
139146
return nil, err
140147
}
141148

Diff for: internal/controller/syncmanager/controller.go

+4
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type Reconciler struct {
7070
discoveryClient *discovery.Client
7171
prFilter labels.Selector
7272
stateNamespace string
73+
agentName string
7374

7475
apiExport *kcpdevv1alpha1.APIExport
7576

@@ -95,6 +96,7 @@ func Add(
9596
apiExport *kcpdevv1alpha1.APIExport,
9697
prFilter labels.Selector,
9798
stateNamespace string,
99+
agentName string,
98100
) error {
99101
reconciler := &Reconciler{
100102
ctx: ctx,
@@ -108,6 +110,7 @@ func Add(
108110
discoveryClient: discovery.NewClient(localManager.GetClient()),
109111
prFilter: prFilter,
110112
stateNamespace: stateNamespace,
113+
agentName: agentName,
111114
}
112115

113116
_, err := builder.ControllerManagedBy(localManager).
@@ -280,6 +283,7 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
280283
r.discoveryClient,
281284
r.apiExport.Name,
282285
r.stateNamespace,
286+
r.agentName,
283287
r.log,
284288
numSyncWorkers,
285289
)

Diff for: internal/sync/object_syncer.go

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ import (
3838
type objectCreatorFunc func(source *unstructured.Unstructured) *unstructured.Unstructured
3939

4040
type objectSyncer struct {
41+
// When set, the syncer will create a label on the destination object that contains
42+
// this value; used to allow multiple agents syncing *the same* API from one
43+
// service cluster onto multiple different kcp's.
44+
agentName string
4145
// creates a new destination object; does not need to perform cleanup like
4246
// removing unwanted metadata, that's done by the syncer automatically
4347
destCreator objectCreatorFunc
@@ -281,6 +285,9 @@ func (s *objectSyncer) ensureDestinationObject(log *zap.SugaredLogger, source, d
281285
sourceObjKey := newObjectKey(source.object, source.clusterName, source.workspacePath)
282286
ensureLabels(destObj, sourceObjKey.Labels())
283287

288+
// remember what agent synced this object
289+
s.labelWithAgent(destObj)
290+
284291
// put optional additional annotations on the new object
285292
ensureAnnotations(destObj, sourceObjKey.Annotations())
286293

@@ -326,6 +333,7 @@ func (s *objectSyncer) adoptExistingDestinationObject(log *zap.SugaredLogger, de
326333
// the destination object from another source object, which would then lead to the two source objects
327334
// "fighting" about the one destination object.
328335
ensureLabels(existingDestObj, sourceKey.Labels())
336+
s.labelWithAgent(existingDestObj)
329337
ensureAnnotations(existingDestObj, sourceKey.Annotations())
330338

331339
if err := dest.client.Update(dest.ctx, existingDestObj); err != nil {
@@ -425,3 +433,9 @@ func (s *objectSyncer) createMergePatch(base, revision *unstructured.Unstructure
425433
func (s *objectSyncer) isIrrelevantTopLevelField(fieldName string) bool {
426434
return fieldName == "kind" || fieldName == "apiVersion" || fieldName == "metadata" || slices.Contains(s.subresources, fieldName)
427435
}
436+
437+
func (s *objectSyncer) labelWithAgent(obj *unstructured.Unstructured) {
438+
if s.agentName != "" {
439+
ensureLabels(obj, map[string]string{agentNameLabel: s.agentName})
440+
}
441+
}

Diff for: internal/sync/syncer.go

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ type ResourceSyncer struct {
4646

4747
mutator mutation.Mutator
4848

49+
agentName string
50+
4951
// newObjectStateStore is used for testing purposes
5052
newObjectStateStore newObjectStateStoreFunc
5153
}
@@ -59,6 +61,7 @@ func NewResourceSyncer(
5961
remoteAPIGroup string,
6062
mutator mutation.Mutator,
6163
stateNamespace string,
64+
agentName string,
6265
) (*ResourceSyncer, error) {
6366
// create a dummy that represents the type used on the local service cluster
6467
localGVK := projection.PublishedResourceSourceGVK(pubRes)
@@ -100,6 +103,7 @@ func NewResourceSyncer(
100103
subresources: subresources,
101104
destDummy: localDummy,
102105
mutator: mutator,
106+
agentName: agentName,
103107
newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace),
104108
}, nil
105109
}
@@ -145,6 +149,8 @@ func (s *ResourceSyncer) Process(ctx Context, remoteObj *unstructured.Unstructur
145149
stateStore := s.newObjectStateStore(sourceSide, destSide)
146150

147151
syncer := objectSyncer{
152+
// The primary object should be labelled with the agent name.
153+
agentName: s.agentName,
148154
subresources: s.subresources,
149155
// use the projection and renaming rules configured in the PublishedResource
150156
destCreator: s.createLocalObjectCreator(ctx),

Diff for: internal/sync/syncer_related.go

+2
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ func (s *ResourceSyncer) processRelatedResource(log *zap.SugaredLogger, stateSto
127127
}
128128

129129
syncer := objectSyncer{
130+
// Related objects within kcp are not labelled with the agent name because it's unnecessary.
131+
// agentName: "",
130132
// use the same state store as we used for the main resource, to keep everything contained
131133
// in one place, on the service cluster side
132134
stateStore: stateStore,

Diff for: internal/sync/syncer_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
182182
ObjectMeta: metav1.ObjectMeta{
183183
Name: "testcluster-my-test-thing",
184184
Labels: map[string]string{
185+
agentNameLabel: "textor-the-doctor",
185186
remoteObjectClusterLabel: "testcluster",
186187
remoteObjectNamespaceLabel: "",
187188
remoteObjectNameLabel: "my-test-thing",
@@ -208,6 +209,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
208209
ObjectMeta: metav1.ObjectMeta{
209210
Name: "testcluster-my-test-thing",
210211
Labels: map[string]string{
212+
agentNameLabel: "textor-the-doctor",
211213
remoteObjectClusterLabel: "testcluster",
212214
remoteObjectNamespaceLabel: "",
213215
remoteObjectNameLabel: "my-test-thing",
@@ -255,6 +257,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
255257
ObjectMeta: metav1.ObjectMeta{
256258
Name: "testcluster-my-test-thing",
257259
Labels: map[string]string{
260+
agentNameLabel: "textor-the-doctor",
258261
remoteObjectClusterLabel: "testcluster",
259262
remoteObjectNamespaceLabel: "",
260263
remoteObjectNameLabel: "my-test-thing",
@@ -312,6 +315,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
312315
ObjectMeta: metav1.ObjectMeta{
313316
Name: "testcluster-my-test-thing",
314317
Labels: map[string]string{
318+
agentNameLabel: "textor-the-doctor",
315319
remoteObjectClusterLabel: "testcluster",
316320
remoteObjectNamespaceLabel: "",
317321
remoteObjectNameLabel: "my-test-thing",
@@ -348,6 +352,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
348352
ObjectMeta: metav1.ObjectMeta{
349353
Name: "testcluster-my-test-thing",
350354
Labels: map[string]string{
355+
agentNameLabel: "textor-the-doctor",
351356
remoteObjectClusterLabel: "testcluster",
352357
remoteObjectNamespaceLabel: "",
353358
remoteObjectNameLabel: "my-test-thing",
@@ -374,6 +379,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
374379
ObjectMeta: metav1.ObjectMeta{
375380
Name: "testcluster-my-test-thing",
376381
Labels: map[string]string{
382+
agentNameLabel: "textor-the-doctor",
377383
remoteObjectClusterLabel: "testcluster",
378384
remoteObjectNamespaceLabel: "",
379385
remoteObjectNameLabel: "my-test-thing",
@@ -410,6 +416,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
410416
ObjectMeta: metav1.ObjectMeta{
411417
Name: "testcluster-my-test-thing",
412418
Labels: map[string]string{
419+
agentNameLabel: "textor-the-doctor",
413420
remoteObjectClusterLabel: "testcluster",
414421
remoteObjectNamespaceLabel: "",
415422
remoteObjectNameLabel: "my-test-thing",
@@ -436,6 +443,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
436443
ObjectMeta: metav1.ObjectMeta{
437444
Name: "testcluster-my-test-thing",
438445
Labels: map[string]string{
446+
agentNameLabel: "textor-the-doctor",
439447
remoteObjectClusterLabel: "testcluster",
440448
remoteObjectNamespaceLabel: "",
441449
remoteObjectNameLabel: "my-test-thing",
@@ -484,6 +492,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
484492
"existing-annotation": "annotation-value",
485493
},
486494
Labels: map[string]string{
495+
agentNameLabel: "textor-the-doctor",
487496
remoteObjectClusterLabel: "testcluster",
488497
remoteObjectNamespaceLabel: "",
489498
remoteObjectNameLabel: "my-test-thing",
@@ -525,6 +534,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
525534
"new-annotation": "hei-verden",
526535
},
527536
Labels: map[string]string{
537+
agentNameLabel: "textor-the-doctor",
528538
remoteObjectClusterLabel: "testcluster",
529539
remoteObjectNamespaceLabel: "",
530540
remoteObjectNameLabel: "my-test-thing",
@@ -564,6 +574,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
564574
ObjectMeta: metav1.ObjectMeta{
565575
Name: "testcluster-my-test-thing",
566576
Labels: map[string]string{
577+
agentNameLabel: "textor-the-doctor",
567578
remoteObjectClusterLabel: "testcluster",
568579
remoteObjectNamespaceLabel: "",
569580
remoteObjectNameLabel: "my-test-thing",
@@ -591,6 +602,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
591602
ObjectMeta: metav1.ObjectMeta{
592603
Name: "testcluster-my-test-thing",
593604
Labels: map[string]string{
605+
agentNameLabel: "textor-the-doctor",
594606
remoteObjectClusterLabel: "testcluster",
595607
remoteObjectNamespaceLabel: "",
596608
remoteObjectNameLabel: "my-test-thing",
@@ -636,6 +648,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
636648
"prevent-instant-deletion-in-tests",
637649
},
638650
Labels: map[string]string{
651+
agentNameLabel: "textor-the-doctor",
639652
remoteObjectClusterLabel: "testcluster",
640653
remoteObjectNamespaceLabel: "",
641654
remoteObjectNameLabel: "my-test-thing",
@@ -667,6 +680,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
667680
},
668681
DeletionTimestamp: &nonEmptyTime,
669682
Labels: map[string]string{
683+
agentNameLabel: "textor-the-doctor",
670684
remoteObjectClusterLabel: "testcluster",
671685
remoteObjectNamespaceLabel: "",
672686
remoteObjectNameLabel: "my-test-thing",
@@ -748,6 +762,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
748762
},
749763
DeletionTimestamp: &nonEmptyTime,
750764
Labels: map[string]string{
765+
agentNameLabel: "textor-the-doctor",
751766
remoteObjectClusterLabel: "testcluster",
752767
remoteObjectNamespaceLabel: "",
753768
remoteObjectNameLabel: "my-test-thing",
@@ -778,6 +793,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
778793
},
779794
DeletionTimestamp: &nonEmptyTime,
780795
Labels: map[string]string{
796+
agentNameLabel: "textor-the-doctor",
781797
remoteObjectClusterLabel: "testcluster",
782798
remoteObjectNamespaceLabel: "",
783799
remoteObjectNameLabel: "my-test-thing",
@@ -809,6 +825,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
809825
testcase.remoteAPIGroup,
810826
nil,
811827
stateNamespace,
828+
"textor-the-doctor",
812829
)
813830
if err != nil {
814831
t.Fatalf("Failed to create syncer: %v", err)
@@ -970,6 +987,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
970987
ObjectMeta: metav1.ObjectMeta{
971988
Name: "testcluster-my-test-thing",
972989
Labels: map[string]string{
990+
agentNameLabel: "textor-the-doctor",
973991
remoteObjectClusterLabel: "testcluster",
974992
remoteObjectNamespaceLabel: "",
975993
remoteObjectNameLabel: "my-test-thing",
@@ -1002,6 +1020,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
10021020
ObjectMeta: metav1.ObjectMeta{
10031021
Name: "testcluster-my-test-thing",
10041022
Labels: map[string]string{
1023+
agentNameLabel: "textor-the-doctor",
10051024
remoteObjectClusterLabel: "testcluster",
10061025
remoteObjectNamespaceLabel: "",
10071026
remoteObjectNameLabel: "my-test-thing",
@@ -1041,6 +1060,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
10411060
ObjectMeta: metav1.ObjectMeta{
10421061
Name: "testcluster-my-test-thing",
10431062
Labels: map[string]string{
1063+
agentNameLabel: "textor-the-doctor",
10441064
remoteObjectClusterLabel: "testcluster",
10451065
remoteObjectNamespaceLabel: "",
10461066
remoteObjectNameLabel: "my-test-thing",
@@ -1073,6 +1093,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
10731093
ObjectMeta: metav1.ObjectMeta{
10741094
Name: "testcluster-my-test-thing",
10751095
Labels: map[string]string{
1096+
agentNameLabel: "textor-the-doctor",
10761097
remoteObjectClusterLabel: "testcluster",
10771098
remoteObjectNamespaceLabel: "",
10781099
remoteObjectNameLabel: "my-test-thing",
@@ -1106,6 +1127,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
11061127
testcase.remoteAPIGroup,
11071128
nil,
11081129
stateNamespace,
1130+
"textor-the-doctor",
11091131
)
11101132
if err != nil {
11111133
t.Fatalf("Failed to create syncer: %v", err)

Diff for: internal/sync/types.go

+10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ limitations under the License.
1616

1717
package sync
1818

19+
import ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
20+
1921
const (
2022
// deletionFinalizer is the finalizer put on remote objects to prevent
2123
// them from being deleted before the local objects can be cleaned up.
@@ -31,6 +33,10 @@ const (
3133

3234
remoteObjectWorkspacePathAnnotation = "syncagent.kcp.io/remote-object-workspace-path"
3335

36+
// agentNameLabel contains the Sync Agent's name and is used to allow multiple Sync Agents
37+
// on the same service cluster, syncing *the same* API to different kcp's.
38+
agentNameLabel = "syncagent.kcp.io/agent-name"
39+
3440
// objectStateLabelName is put on object state Secrets to allow for easier mass deletions
3541
// if ever necessary.
3642
objectStateLabelName = "syncagent.kcp.io/object-state"
@@ -45,3 +51,7 @@ const (
4551
// metadata of the related object.
4652
relatedObjectAnnotationPrefix = "related-resources.syncagent.kcp.io/"
4753
)
54+
55+
func OwnedBy(obj ctrlruntimeclient.Object, agentName string) bool {
56+
return obj.GetLabels()[agentNameLabel] == agentName
57+
}

0 commit comments

Comments
 (0)