Skip to content

Commit 31dc583

Browse files
csvirimetacosm
andcommitted
feat: support to handle different cluster for InformerEventSource (#2499)
Signed-off-by: Attila Mészáros <[email protected]> Signed-off-by: Chris Laprun <[email protected]> Co-authored-by: Chris Laprun <[email protected]>
1 parent 1af08aa commit 31dc583

File tree

8 files changed

+241
-12
lines changed

8 files changed

+241
-12
lines changed

Diff for: docs/content/en/docs/features/_index.md

+19
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,25 @@ parts of reconciliation logic and during the execution of the controller:
608608

609609
For more information about MDC see this [link](https://www.baeldung.com/mdc-in-log4j-2-logback).
610610

611+
## InformerEventSource Multi-Cluster Support
612+
613+
It is possible to handle resources for remote cluster with `InformerEventSource`. To do so,
614+
simply set a client that connects to a remote cluster:
615+
616+
```java
617+
618+
InformerEventSourceConfiguration<Tomcat> configuration =
619+
InformerEventSourceConfiguration.from(SecondaryResource.class, PrimaryResource.class)
620+
.withKubernetesClient(remoteClusterClient)
621+
.withSecondaryToPrimaryMapper(Mappers.fromDefaultAnnotations());
622+
623+
```
624+
625+
You will also need to specify a `SecondaryToPrimaryMapper`, since the default one
626+
is based on owner references and won't work across cluster instances. You could, for example, use the provided implementation that relies on annotations added to the secondary resources to identify the associated primary resource.
627+
628+
See related [integration test](https://github.com/operator-framework/java-operator-sdk/tree/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/informerremotecluster).
629+
611630
## Dynamically Changing Target Namespaces
612631

613632
A controller can be configured to watch a specific set of namespaces in addition of the

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java

+30-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
88
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.fabric8.kubernetes.client.KubernetesClient;
910
import io.javaoperatorsdk.operator.api.config.Informable;
1011
import io.javaoperatorsdk.operator.processing.GroupVersionKind;
1112
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
@@ -57,22 +58,33 @@ default String name() {
5758
return getInformerConfig().getName();
5859
}
5960

61+
/**
62+
* Optional, specific kubernetes client, typically to connect to a different cluster than the rest
63+
* of the operator. Note that this is solely for multi cluster support.
64+
*/
65+
default Optional<KubernetesClient> getKubernetesClient() {
66+
return Optional.empty();
67+
}
68+
6069
class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
6170
implements InformerEventSourceConfiguration<R> {
6271
private final PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
6372
private final SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
6473
private final GroupVersionKind groupVersionKind;
6574
private final InformerConfiguration<R> informerConfig;
75+
private final KubernetesClient kubernetesClient;
6676

6777
protected DefaultInformerEventSourceConfiguration(
6878
GroupVersionKind groupVersionKind,
6979
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
7080
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
71-
InformerConfiguration<R> informerConfig) {
81+
InformerConfiguration<R> informerConfig,
82+
KubernetesClient kubernetesClient) {
7283
this.informerConfig = Objects.requireNonNull(informerConfig);
7384
this.groupVersionKind = groupVersionKind;
7485
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
7586
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
87+
this.kubernetesClient = kubernetesClient;
7688
}
7789

7890
@Override
@@ -95,8 +107,12 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
95107
public Optional<GroupVersionKind> getGroupVersionKind() {
96108
return Optional.ofNullable(groupVersionKind);
97109
}
98-
}
99110

111+
@Override
112+
public Optional<KubernetesClient> getKubernetesClient() {
113+
return Optional.ofNullable(kubernetesClient);
114+
}
115+
}
100116

101117
@SuppressWarnings({"unused", "UnusedReturnValue"})
102118
class Builder<R extends HasMetadata> {
@@ -108,6 +124,7 @@ class Builder<R extends HasMetadata> {
108124
private String name;
109125
private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
110126
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
127+
private KubernetesClient kubernetesClient;
111128

112129
private Builder(Class<R> resourceClass,
113130
Class<? extends HasMetadata> primaryResourceClass) {
@@ -152,6 +169,16 @@ public Builder<R> withSecondaryToPrimaryMapper(
152169
return this;
153170
}
154171

172+
/**
173+
* Use this is case want to create an InformerEventSource that handles resources from different
174+
* cluster.
175+
*/
176+
public Builder<R> withKubernetesClient(
177+
KubernetesClient kubernetesClient) {
178+
this.kubernetesClient = kubernetesClient;
179+
return this;
180+
}
181+
155182
public String getName() {
156183
return name;
157184
}
@@ -192,7 +219,7 @@ public InformerEventSourceConfiguration<R> build() {
192219
Objects.requireNonNullElse(secondaryToPrimaryMapper,
193220
Mappers.fromOwnerReferences(HasMetadata.getApiVersion(primaryResourceClass),
194221
HasMetadata.getKind(primaryResourceClass), false)),
195-
config.buildForInformerEventSource());
222+
config.buildForInformerEventSource(), kubernetesClient);
196223
}
197224
}
198225
}

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.javaoperatorsdk.operator.processing.event.source.informer;
22

3-
import java.util.*;
3+
import java.util.Optional;
4+
import java.util.Set;
5+
import java.util.UUID;
46
import java.util.stream.Collectors;
57

68
import org.slf4j.Logger;
@@ -67,18 +69,17 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6769
extends ManagedInformerEventSource<R, P, InformerEventSourceConfiguration<R>>
6870
implements ResourceEventHandler<R> {
6971

70-
public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
71-
7272
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
73-
73+
public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
7474
// we need direct control for the indexer to propagate the just update resource also to the index
7575
private final PrimaryToSecondaryIndex<R> primaryToSecondaryIndex;
7676
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
7777
private final String id = UUID.randomUUID().toString();
7878

7979
public InformerEventSource(
8080
InformerEventSourceConfiguration<R> configuration, EventSourceContext<P> context) {
81-
this(configuration, context.getClient(),
81+
this(configuration,
82+
configuration.getKubernetesClient().orElse(context.getClient()),
8283
context.getControllerConfiguration().getConfigurationService()
8384
.parseResourceVersionsForEventFilteringAndCaching());
8485
}
@@ -287,10 +288,6 @@ private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObj
287288
}
288289
}
289290

290-
private enum Operation {
291-
ADD, UPDATE
292-
}
293-
294291
private boolean acceptedByDeleteFilters(R resource, boolean b) {
295292
return (onDeleteFilter == null || onDeleteFilter.accept(resource, b)) &&
296293
(genericFilter == null || genericFilter.accept(resource));
@@ -307,4 +304,8 @@ public R addPreviousAnnotation(String resourceVersion, R target) {
307304
id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse(""));
308305
return target;
309306
}
307+
308+
private enum Operation {
309+
ADD, UPDATE
310+
}
310311
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("irc")
12+
public class InformerRemoteClusterCustomResource
13+
extends CustomResource<Void, InformerRemoteClusterStatus> implements Namespaced {
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;
2+
3+
import java.util.Map;
4+
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.RegisterExtension;
7+
8+
import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
9+
import io.fabric8.kubernetes.api.model.ConfigMap;
10+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
11+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
12+
import io.fabric8.kubernetes.client.KubernetesClient;
13+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
14+
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
15+
16+
import static io.javaoperatorsdk.operator.baseapi.informerremotecluster.InformerRemoteClusterReconciler.DATA_KEY;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.awaitility.Awaitility.await;
19+
20+
@EnableKubeAPIServer
21+
class InformerRemoteClusterIT {
22+
23+
public static final String NAME = "test1";
24+
public static final String CONFIG_MAP_NAME = "testcm";
25+
public static final String INITIAL_VALUE = "initial_value";
26+
public static final String CHANGED_VALUE = "changed_value";
27+
public static final String CM_NAMESPACE = "default";
28+
29+
// injected by Kube API Test. Client for another cluster.
30+
static KubernetesClient kubernetesClient;
31+
32+
@RegisterExtension
33+
LocallyRunOperatorExtension extension =
34+
LocallyRunOperatorExtension.builder()
35+
.withReconciler(new InformerRemoteClusterReconciler(kubernetesClient))
36+
.build();
37+
38+
@Test
39+
void testRemoteClusterInformer() {
40+
var r = extension.create(testCustomResource());
41+
42+
var cm = kubernetesClient.configMaps()
43+
.resource(remoteConfigMap(r.getMetadata().getName(),
44+
r.getMetadata().getNamespace()))
45+
.create();
46+
47+
// config map does not exist on the primary resource cluster
48+
assertThat(extension.getKubernetesClient().configMaps()
49+
.inNamespace(CM_NAMESPACE)
50+
.withName(CONFIG_MAP_NAME).get()).isNull();
51+
52+
await().untilAsserted(() -> {
53+
var cr = extension.get(InformerRemoteClusterCustomResource.class, NAME);
54+
assertThat(cr.getStatus()).isNotNull();
55+
assertThat(cr.getStatus().getRemoteConfigMapMessage()).isEqualTo(INITIAL_VALUE);
56+
});
57+
58+
cm.getData().put(DATA_KEY, CHANGED_VALUE);
59+
kubernetesClient.configMaps().resource(cm).update();
60+
61+
await().untilAsserted(() -> {
62+
var cr = extension.get(InformerRemoteClusterCustomResource.class, NAME);
63+
assertThat(cr.getStatus().getRemoteConfigMapMessage()).isEqualTo(CHANGED_VALUE);
64+
});
65+
}
66+
67+
InformerRemoteClusterCustomResource testCustomResource() {
68+
var res = new InformerRemoteClusterCustomResource();
69+
res.setMetadata(new ObjectMetaBuilder()
70+
.withName(NAME)
71+
.build());
72+
return res;
73+
}
74+
75+
ConfigMap remoteConfigMap(String ownerName, String ownerNamespace) {
76+
return new ConfigMapBuilder()
77+
.withMetadata(new ObjectMetaBuilder()
78+
.withName(CONFIG_MAP_NAME)
79+
.withNamespace(CM_NAMESPACE)
80+
.withAnnotations(Map.of(
81+
Mappers.DEFAULT_ANNOTATION_FOR_NAME, ownerName,
82+
Mappers.DEFAULT_ANNOTATION_FOR_NAMESPACE, ownerNamespace))
83+
.build())
84+
.withData(Map.of(DATA_KEY, INITIAL_VALUE))
85+
.build();
86+
}
87+
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;
2+
3+
import java.util.List;
4+
5+
import io.fabric8.kubernetes.api.model.ConfigMap;
6+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
7+
import io.fabric8.kubernetes.client.KubernetesClient;
8+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
9+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
10+
import io.javaoperatorsdk.operator.api.reconciler.Context;
11+
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
12+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
13+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
14+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
15+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
16+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
17+
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
18+
19+
@ControllerConfiguration
20+
public class InformerRemoteClusterReconciler
21+
implements Reconciler<InformerRemoteClusterCustomResource> {
22+
23+
public static final String DATA_KEY = "key";
24+
25+
private final KubernetesClient remoteClient;
26+
27+
public InformerRemoteClusterReconciler(KubernetesClient remoteClient) {
28+
this.remoteClient = remoteClient;
29+
}
30+
31+
@Override
32+
public UpdateControl<InformerRemoteClusterCustomResource> reconcile(
33+
InformerRemoteClusterCustomResource resource,
34+
Context<InformerRemoteClusterCustomResource> context) throws Exception {
35+
36+
return context.getSecondaryResource(ConfigMap.class).map(cm -> {
37+
var r = new InformerRemoteClusterCustomResource();
38+
r.setMetadata(new ObjectMetaBuilder()
39+
.withName(resource.getMetadata().getName())
40+
.withNamespace(resource.getMetadata().getNamespace())
41+
.build());
42+
r.setStatus(new InformerRemoteClusterStatus());
43+
r.getStatus().setRemoteConfigMapMessage(cm.getData().get(DATA_KEY));
44+
return UpdateControl.patchStatus(r);
45+
}).orElseGet(UpdateControl::noUpdate);
46+
}
47+
48+
@Override
49+
public List<EventSource<?, InformerRemoteClusterCustomResource>> prepareEventSources(
50+
EventSourceContext<InformerRemoteClusterCustomResource> context) {
51+
52+
var es = new InformerEventSource<>(InformerEventSourceConfiguration
53+
.from(ConfigMap.class, InformerRemoteClusterCustomResource.class)
54+
// owner references do not work cross cluster, using
55+
// annotations here to reference primary resource
56+
.withSecondaryToPrimaryMapper(Mappers.fromDefaultAnnotations())
57+
// setting remote client for informer
58+
.withKubernetesClient(remoteClient)
59+
.withInformerConfiguration(
60+
InformerConfiguration.Builder::withWatchAllNamespaces)
61+
.build(), context);
62+
63+
return List.of(es);
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.javaoperatorsdk.operator.baseapi.informerremotecluster;
2+
3+
public class InformerRemoteClusterStatus {
4+
5+
private String remoteConfigMapMessage;
6+
7+
public String getRemoteConfigMapMessage() {
8+
return remoteConfigMapMessage;
9+
}
10+
11+
public void setRemoteConfigMapMessage(String remoteConfigMapMessage) {
12+
this.remoteConfigMapMessage = remoteConfigMapMessage;
13+
}
14+
}

Diff for: operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/labelselector/LabelSelectorTestReconciler.java

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class LabelSelectorTestReconciler
2222
@Override
2323
public UpdateControl<LabelSelectorTestCustomResource> reconcile(
2424
LabelSelectorTestCustomResource resource, Context<LabelSelectorTestCustomResource> context) {
25+
2526
numberOfExecutions.addAndGet(1);
2627
return UpdateControl.noUpdate();
2728
}

0 commit comments

Comments
 (0)