Skip to content

Commit 6c62e31

Browse files
committed
feat: custom event filter for controllers operator-framework#404
1 parent cb0870e commit 6c62e31

File tree

10 files changed

+161
-73
lines changed

10 files changed

+161
-73
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractControllerConfiguration.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.api.config;
22

33
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicate;
45
import java.util.Set;
56

67
/**
@@ -24,9 +25,10 @@ public AbstractControllerConfiguration(String associatedControllerClassName, Str
2425
String crdName, String finalizer, boolean generationAware,
2526
Set<String> namespaces,
2627
RetryConfiguration retryConfiguration, String labelSelector,
28+
CustomResourcePredicate<R> customResourcePredicate,
2729
Class<R> customResourceClass,
2830
ConfigurationService service) {
2931
super(associatedControllerClassName, name, crdName, finalizer, generationAware, namespaces,
30-
retryConfiguration, labelSelector, customResourceClass, service);
32+
retryConfiguration, labelSelector, customResourcePredicate, customResourceClass, service);
3133
}
3234
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java

+8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import io.javaoperatorsdk.operator.ControllerUtils;
55
import io.javaoperatorsdk.operator.api.Controller;
66
import java.lang.reflect.ParameterizedType;
7+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicate;
8+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicates;
79
import java.util.Collections;
810
import java.util.Set;
911

@@ -97,4 +99,10 @@ default void setConfigurationService(ConfigurationService service) {}
9799
default boolean useFinalizer() {
98100
return !Controller.NO_FINALIZER.equals(getFinalizer());
99101
}
102+
103+
default CustomResourcePredicate<R> getCustomResourcePredicate() {
104+
return isGenerationAware()
105+
? CustomResourcePredicates.generationAware()
106+
: CustomResourcePredicates.passthrough();
107+
}
100108
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicate;
35
import java.util.HashSet;
46
import java.util.List;
57
import java.util.Set;
68

7-
import io.fabric8.kubernetes.client.CustomResource;
8-
99
public class ControllerConfigurationOverrider<R extends CustomResource<?, ?>> {
1010

1111
private String finalizer;
1212
private boolean generationAware;
1313
private final Set<String> namespaces;
1414
private RetryConfiguration retry;
1515
private String labelSelector;
16+
private CustomResourcePredicate<R> customResourcePredicate;
1617
private final ControllerConfiguration<R> original;
1718

1819
private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
@@ -21,6 +22,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
2122
namespaces = new HashSet<>(original.getNamespaces());
2223
retry = original.getRetryConfiguration();
2324
labelSelector = original.getLabelSelector();
25+
customResourcePredicate = original.getCustomResourcePredicate();
2426
this.original = original;
2527
}
2628

@@ -65,6 +67,12 @@ public ControllerConfigurationOverrider<R> withLabelSelector(String labelSelecto
6567
return this;
6668
}
6769

70+
public ControllerConfigurationOverrider<R> withCustomResourcePredicate(
71+
CustomResourcePredicate<R> customResourcePredicate) {
72+
this.customResourcePredicate = customResourcePredicate;
73+
return this;
74+
}
75+
6876
public ControllerConfiguration<R> build() {
6977
return new DefaultControllerConfiguration<>(
7078
original.getAssociatedControllerClassName(),
@@ -75,6 +83,7 @@ public ControllerConfiguration<R> build() {
7583
namespaces,
7684
retry,
7785
labelSelector,
86+
customResourcePredicate,
7887
original.getCustomResourceClass(),
7988
original.getConfigurationService());
8089
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicate;
4+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourcePredicates;
35
import java.util.Collections;
46
import java.util.Set;
57

@@ -17,6 +19,7 @@ public class DefaultControllerConfiguration<R extends CustomResource<?, ?>>
1719
private final boolean watchAllNamespaces;
1820
private final RetryConfiguration retryConfiguration;
1921
private final String labelSelector;
22+
private final CustomResourcePredicate<R> customResourcePredicate;
2023
private Class<R> customResourceClass;
2124
private ConfigurationService service;
2225

@@ -29,6 +32,7 @@ public DefaultControllerConfiguration(
2932
Set<String> namespaces,
3033
RetryConfiguration retryConfiguration,
3134
String labelSelector,
35+
CustomResourcePredicate<R> customResourcePredicate,
3236
Class<R> customResourceClass,
3337
ConfigurationService service) {
3438
this.associatedControllerClassName = associatedControllerClassName;
@@ -44,6 +48,7 @@ public DefaultControllerConfiguration(
4448
? ControllerConfiguration.super.getRetryConfiguration()
4549
: retryConfiguration;
4650
this.labelSelector = labelSelector;
51+
this.customResourcePredicate = customResourcePredicate;
4752
this.customResourceClass =
4853
customResourceClass == null ? ControllerConfiguration.super.getCustomResourceClass()
4954
: customResourceClass;
@@ -52,7 +57,7 @@ public DefaultControllerConfiguration(
5257

5358
/**
5459
* @deprecated use
55-
* {@link #DefaultControllerConfiguration(String, String, String, String, boolean, Set, RetryConfiguration, String, Class, ConfigurationService)}
60+
* {@link #DefaultControllerConfiguration(String, String, String, String, boolean, Set, RetryConfiguration)}
5661
* instead
5762
*/
5863
@Deprecated
@@ -64,8 +69,20 @@ public DefaultControllerConfiguration(
6469
boolean generationAware,
6570
Set<String> namespaces,
6671
RetryConfiguration retryConfiguration) {
67-
this(associatedControllerClassName, name, crdName, finalizer, generationAware, namespaces,
68-
retryConfiguration, null, null, null);
72+
this(
73+
associatedControllerClassName,
74+
name,
75+
crdName,
76+
finalizer,
77+
generationAware,
78+
namespaces,
79+
retryConfiguration,
80+
null,
81+
generationAware
82+
? CustomResourcePredicates.generationAware()
83+
: CustomResourcePredicates.passthrough(),
84+
null,
85+
null);
6986
}
7087

7188
@Override
@@ -131,4 +148,9 @@ public String getLabelSelector() {
131148
public Class<R> getCustomResourceClass() {
132149
return customResourceClass;
133150
}
151+
152+
@Override
153+
public CustomResourcePredicate<R> getCustomResourcePredicate() {
154+
return customResourcePredicate;
155+
}
134156
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import org.slf4j.LoggerFactory;
2020

2121
@SuppressWarnings("rawtypes")
22-
public class CustomResourceCache {
22+
public class CustomResourceCache<T extends CustomResource<?, ?>> {
2323

2424
private static final Logger log = LoggerFactory.getLogger(CustomResourceCache.class);
2525

2626
private final ObjectMapper objectMapper;
27-
private final ConcurrentMap<String, CustomResource> resources = new ConcurrentHashMap<>();
27+
private final ConcurrentMap<String, T> resources = new ConcurrentHashMap<>();
2828
private final Lock lock = new ReentrantLock();
2929

3030
public CustomResourceCache() {
@@ -35,21 +35,24 @@ public CustomResourceCache(ObjectMapper objectMapper) {
3535
this.objectMapper = objectMapper;
3636
}
3737

38-
public void cacheResource(CustomResource resource) {
38+
public void cacheResource(T resource) {
3939
try {
4040
lock.lock();
41-
resources.put(KubernetesResourceUtils.getUID(resource), resource);
41+
42+
// defensive copy
43+
resources.put(KubernetesResourceUtils.getUID(resource), clone(resource));
4244
} finally {
4345
lock.unlock();
4446
}
4547
}
4648

47-
public void cacheResource(CustomResource resource, Predicate<CustomResource> predicate) {
49+
public void cacheResource(T resource, Predicate<CustomResource> predicate) {
4850
try {
4951
lock.lock();
5052
if (predicate.test(resources.get(KubernetesResourceUtils.getUID(resource)))) {
5153
log.trace("Update cache after condition is true: {}", getName(resource));
52-
resources.put(getUID(resource), resource);
54+
// defensive copy
55+
resources.put(getUID(resource), clone(resource));
5356
}
5457
} finally {
5558
lock.unlock();
@@ -63,11 +66,11 @@ public void cacheResource(CustomResource resource, Predicate<CustomResource> pre
6366
* @param uuid
6467
* @return
6568
*/
66-
public Optional<CustomResource> getLatestResource(String uuid) {
69+
public Optional<T> getLatestResource(String uuid) {
6770
return Optional.ofNullable(resources.get(uuid)).map(this::clone);
6871
}
6972

70-
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
73+
public List<T> getLatestResources(Predicate<CustomResource> selector) {
7174
try {
7275
lock.lock();
7376
return resources.values().stream()
@@ -91,16 +94,17 @@ public Set<String> getLatestResourcesUids(Predicate<CustomResource> selector) {
9194
}
9295
}
9396

94-
private CustomResource clone(CustomResource customResource) {
97+
@SuppressWarnings("unchecked")
98+
private T clone(CustomResource customResource) {
9599
try {
96-
return objectMapper.readValue(
100+
return (T) objectMapper.readValue(
97101
objectMapper.writeValueAsString(customResource), customResource.getClass());
98102
} catch (JsonProcessingException e) {
99103
throw new IllegalStateException(e);
100104
}
101105
}
102106

103-
public CustomResource cleanup(String customResourceUid) {
107+
public T cleanup(String customResourceUid) {
104108
return resources.remove(customResourceUid);
105109
}
106110
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

+18-48
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,6 @@
44
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
55
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
66

7-
import java.io.IOException;
8-
import java.util.LinkedList;
9-
import java.util.List;
10-
import java.util.Map;
11-
import java.util.concurrent.ConcurrentHashMap;
12-
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
15-
167
import io.fabric8.kubernetes.api.model.ListOptions;
178
import io.fabric8.kubernetes.client.CustomResource;
189
import io.fabric8.kubernetes.client.Watch;
@@ -24,22 +15,28 @@
2415
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
2516
import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils;
2617
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
18+
import java.io.IOException;
19+
import java.util.LinkedList;
20+
import java.util.List;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2723

28-
/** This is a special case since is not bound to a single custom resource */
24+
/**
25+
* This is a special case since is not bound to a single custom resource
26+
*/
2927
public class CustomResourceEventSource<T extends CustomResource<?, ?>> extends AbstractEventSource
3028
implements Watcher<T> {
3129

3230
private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class);
3331

3432
private final ConfiguredController<T> controller;
35-
private final Map<String, Long> lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>();
3633
private final List<Watch> watches;
37-
private final CustomResourceCache customResourceCache;
34+
private final CustomResourceCache<T> customResourceCache;
3835

3936
public CustomResourceEventSource(ConfiguredController<T> controller) {
4037
this.controller = controller;
4138
this.watches = new LinkedList<>();
42-
this.customResourceCache = new CustomResourceCache(
39+
this.customResourceCache = new CustomResourceCache<>(
4340
controller.getConfiguration().getConfigurationService().getObjectMapper());
4441
}
4542

@@ -86,6 +83,9 @@ public void eventReceived(Watcher.Action action, T customResource) {
8683
log.debug(
8784
"Event received for action: {}, resource: {}", action.name(), getName(customResource));
8885

86+
final String uuid = KubernetesResourceUtils.getUID(customResource);
87+
final T oldResource = customResourceCache.getLatestResource(uuid).orElse(null);
88+
8989
// cache the latest version of the CR
9090
customResourceCache.cacheResource(customResource);
9191

@@ -98,9 +98,11 @@ public void eventReceived(Watcher.Action action, T customResource) {
9898
return;
9999
}
100100

101-
if (!skipBecauseOfGeneration(customResource)) {
101+
boolean fire = controller.getConfiguration().getCustomResourcePredicate().test(
102+
controller.getConfiguration(), oldResource, customResource);
103+
104+
if (fire) {
102105
eventHandler.handleEvent(new CustomResourceEvent(action, customResource, this));
103-
markLastGenerationProcessed(customResource);
104106
} else {
105107
log.debug(
106108
"Skipping event handling resource {} with version: {}",
@@ -109,38 +111,6 @@ public void eventReceived(Watcher.Action action, T customResource) {
109111
}
110112
}
111113

112-
private void markLastGenerationProcessed(T resource) {
113-
if (controller.getConfiguration().isGenerationAware()
114-
&& resource.hasFinalizer(controller.getConfiguration().getFinalizer())) {
115-
lastGenerationProcessedSuccessfully.put(
116-
KubernetesResourceUtils.getUID(resource), resource.getMetadata().getGeneration());
117-
}
118-
}
119-
120-
private boolean skipBecauseOfGeneration(T customResource) {
121-
if (!controller.getConfiguration().isGenerationAware()) {
122-
return false;
123-
}
124-
// if CR being deleted generation is naturally not changing, so we process all the events
125-
if (customResource.isMarkedForDeletion()) {
126-
return false;
127-
}
128-
129-
// only proceed if we haven't already seen this custom resource generation
130-
Long lastGeneration =
131-
lastGenerationProcessedSuccessfully.get(customResource.getMetadata().getUid());
132-
if (lastGeneration == null) {
133-
return false;
134-
} else {
135-
return customResource.getMetadata().getGeneration() <= lastGeneration;
136-
}
137-
}
138-
139-
@Override
140-
public void eventSourceDeRegisteredForResource(String customResourceUid) {
141-
lastGenerationProcessedSuccessfully.remove(customResourceUid);
142-
}
143-
144114
@Override
145115
public void onClose(WatcherException e) {
146116
if (e == null) {
@@ -164,7 +134,7 @@ public void onClose(WatcherException e) {
164134
}
165135

166136
// todo: remove
167-
public CustomResourceCache getCache() {
137+
public CustomResourceCache<T> getCache() {
168138
return customResourceCache;
169139
}
170140
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
5+
6+
@FunctionalInterface
7+
public interface CustomResourcePredicate<T extends CustomResource> {
8+
boolean test(ControllerConfiguration<T> configuration, T oldResource, T newResource);
9+
}

0 commit comments

Comments
 (0)