Skip to content

Commit 03d423d

Browse files
committed
refinements mentioned on operator-framework#2012
provides two options - to control if the annotation is used (to omit events that come too quickly) - to parse the resource version (to keep the cache up-to-date and omit events if they come too slowly) Signed-off-by: Steve Hawkins <[email protected]>
1 parent 2827206 commit 03d423d

File tree

12 files changed

+226
-57
lines changed

12 files changed

+226
-57
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+27
Original file line numberDiff line numberDiff line change
@@ -364,4 +364,31 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
364364
return Set.of(ConfigMap.class, Secret.class);
365365
}
366366

367+
/**
368+
* If an annotation should be used so that the operator sdk can detect events from its own updates
369+
* of dependent resources and then filter them.
370+
* <p>
371+
* Disable this if you want to react to your own dependent resource updates
372+
*
373+
* @since 4.5.0
374+
*/
375+
default boolean previousAnnotationForDependentResources() {
376+
return true;
377+
}
378+
379+
/**
380+
* If the event logic should parse the resourceVersion to determine the ordering of events. This
381+
* is typically not needed.
382+
* <p>
383+
* Disabled by default as Kubernetes does not support, and discourages, this interpretation of
384+
* resourceVersions. Enable only if your api server event processing seems to lag the operator
385+
* logic and you want to further minimize the the amount of work done / updates issued by the
386+
* operator.
387+
*
388+
* @since 4.5.0
389+
*/
390+
default boolean parseResourceVersions() {
391+
return false;
392+
}
393+
367394
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

+28
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class ConfigurationServiceOverrider {
3737
private ResourceClassResolver resourceClassResolver;
3838
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
3939
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
40+
private Boolean previousAnnotationForDependentResources;
41+
private Boolean parseResourceVersions;
4042

4143
ConfigurationServiceOverrider(ConfigurationService original) {
4244
this.original = original;
@@ -158,6 +160,18 @@ public ConfigurationServiceOverrider withDefaultNonSSAResource(
158160
return this;
159161
}
160162

163+
public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(
164+
boolean value) {
165+
this.previousAnnotationForDependentResources = value;
166+
return this;
167+
}
168+
169+
public ConfigurationServiceOverrider wihtParseResourceVersions(
170+
boolean value) {
171+
this.parseResourceVersions = value;
172+
return this;
173+
}
174+
161175
public ConfigurationService build() {
162176
return new BaseConfigurationService(original.getVersion(), cloner, client) {
163177
@Override
@@ -270,6 +284,20 @@ public Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
270284
return defaultNonSSAResource != null ? defaultNonSSAResource
271285
: super.defaultNonSSAResource();
272286
}
287+
288+
@Override
289+
public boolean previousAnnotationForDependentResources() {
290+
return previousAnnotationForDependentResources != null
291+
? previousAnnotationForDependentResources
292+
: super.previousAnnotationForDependentResources();
293+
}
294+
295+
@Override
296+
public boolean parseResourceVersions() {
297+
return parseResourceVersions != null
298+
? parseResourceVersions
299+
: super.parseResourceVersions();
300+
}
273301
};
274302
}
275303

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
6262
return secondaryToPrimaryMapper;
6363
}
6464

65+
@Override
6566
public Optional<OnDeleteFilter<? super R>> onDeleteFilter() {
6667
return Optional.ofNullable(onDeleteFilter);
6768
}
@@ -95,12 +96,15 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
9596
*/
9697
SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();
9798

99+
@Override
98100
Optional<OnAddFilter<? super R>> onAddFilter();
99101

102+
@Override
100103
Optional<OnUpdateFilter<? super R>> onUpdateFilter();
101104

102105
Optional<OnDeleteFilter<? super R>> onDeleteFilter();
103106

107+
@Override
104108
Optional<GenericFilter<? super R>> genericFilter();
105109

106110
<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public R create(R desired, P primary, Context<P> context) {
113113
desired.getMetadata().setResourceVersion("1");
114114
}
115115
}
116-
addMetadata(false, null, desired, primary);
116+
addMetadata(false, null, desired, primary, context);
117117
sanitizeDesired(desired, null, primary, context);
118118
final var resource = prepare(desired, primary, "Creating");
119119
return useSSA(context)
@@ -130,7 +130,7 @@ public R update(R actual, R desired, P primary, Context<P> context) {
130130
actual.getMetadata().getResourceVersion());
131131
}
132132
R updatedResource;
133-
addMetadata(false, actual, desired, primary);
133+
addMetadata(false, actual, desired, primary, context);
134134
sanitizeDesired(desired, actual, primary, context);
135135
if (useSSA(context)) {
136136
updatedResource = prepare(desired, primary, "Updating")
@@ -163,7 +163,7 @@ public Result<R> match(R actualResource, R desired, P primary, Context<P> contex
163163
public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMatcher<R> matcher,
164164
Context<P> context) {
165165
final boolean matches;
166-
addMetadata(true, actualResource, desired, primary);
166+
addMetadata(true, actualResource, desired, primary, context);
167167
if (useSSA(context)) {
168168
matches = SSABasedGenericKubernetesResourceMatcher.getInstance()
169169
.matches(actualResource, desired, context);
@@ -173,8 +173,9 @@ public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMa
173173
return Result.computed(matches, desired);
174174
}
175175

176-
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary) {
177-
if (forMatch) { // keep the current
176+
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary,
177+
Context<P> context) {
178+
if (forMatch) { // keep the current previous annotation
178179
String actual = actualResource.getMetadata().getAnnotations()
179180
.get(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
180181
Map<String, String> annotations = target.getMetadata().getAnnotations();
@@ -183,7 +184,7 @@ protected void addMetadata(boolean forMatch, R actualResource, final R target, P
183184
} else {
184185
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
185186
}
186-
} else { // set a new one
187+
} else if (usePreviousAnnotation(context)) { // set a new one
187188
eventSource().orElseThrow().addPreviousAnnotation(
188189
Optional.ofNullable(actualResource).map(r -> r.getMetadata().getResourceVersion())
189190
.orElse(null),
@@ -208,6 +209,11 @@ protected boolean useSSA(Context<P> context) {
208209
.ssaBasedCreateUpdateMatchForDependentResources());
209210
}
210211

212+
private boolean usePreviousAnnotation(Context<P> context) {
213+
return context.getControllerConfiguration().getConfigurationService()
214+
.previousAnnotationForDependentResources();
215+
}
216+
211217
@Override
212218
protected void handleDelete(P primary, R secondary, Context<P> context) {
213219
if (secondary != null) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ public class ControllerResourceEventSource<T extends HasMetadata>
3232

3333
@SuppressWarnings({"unchecked", "rawtypes"})
3434
public ControllerResourceEventSource(Controller<T> controller) {
35-
super(controller.getCRClient(), controller.getConfiguration());
35+
super(controller.getCRClient(), controller.getConfiguration(), false);
3636
this.controller = controller;
3737

3838
final var config = controller.getConfiguration();
3939
OnUpdateFilter internalOnUpdateFilter =
40-
(OnUpdateFilter<T>) onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
40+
onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
4141
config.getFinalizerName())
4242
.or(onUpdateGenerationAware(config.isGenerationAware()))
4343
.or(onUpdateMarkedForDeletion());

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6969
extends ManagedInformerEventSource<R, P, InformerConfiguration<R>>
7070
implements ResourceEventHandler<R> {
7171

72+
private static final int MAX_RESOURCE_VERSIONS = 256;
73+
7274
public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
7375

7476
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
@@ -78,14 +80,31 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7880
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
7981
private Map<String, Function<R, List<String>>> indexerBuffer = new HashMap<>();
8082
private final String id = UUID.randomUUID().toString();
83+
private final Set<String> knownResourceVersions;
8184

8285
public InformerEventSource(
8386
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
84-
this(configuration, context.getClient());
87+
this(configuration, context.getClient(),
88+
context.getControllerConfiguration().getConfigurationService().parseResourceVersions());
8589
}
8690

8791
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
88-
super(client.resources(configuration.getResourceClass()), configuration);
92+
this(configuration, client, false);
93+
}
94+
95+
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client,
96+
boolean parseResourceVersions) {
97+
super(client.resources(configuration.getResourceClass()), configuration, parseResourceVersions);
98+
if (parseResourceVersions) {
99+
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
100+
@Override
101+
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
102+
return size() >= MAX_RESOURCE_VERSIONS;
103+
}
104+
});
105+
} else {
106+
knownResourceVersions = null;
107+
}
89108

90109
// If there is a primary to secondary mapper there is no need for primary to secondary index.
91110
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
@@ -169,6 +188,10 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
169188
}
170189

171190
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
191+
if (knownResourceVersions != null
192+
&& knownResourceVersions.contains(newObject.getMetadata().getResourceVersion())) {
193+
return true;
194+
}
172195
var res = temporaryResourceCache.getResourceFromCache(resourceID);
173196
if (res.isEmpty()) {
174197
return isEventKnownFromAnnotation(newObject, oldObject);
@@ -262,6 +285,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
262285

263286
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
264287
primaryToSecondaryIndex.onAddOrUpdate(newResource);
288+
if (knownResourceVersions != null) {
289+
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
290+
}
265291
temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource)
266292
.map(r -> r.getMetadata().getResourceVersion()).orElse(null));
267293
}
@@ -275,7 +301,6 @@ public boolean allowsNamespaceChanges() {
275301
return configuration().followControllerNamespaceChanges();
276302
}
277303

278-
279304
private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
280305
if (genericFilter != null && !genericFilter.accept(newObject)) {
281306
return false;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,27 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
4242
protected MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client;
4343

4444
protected ManagedInformerEventSource(
45-
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
45+
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration,
46+
boolean parseResourceVersions) {
4647
super(configuration.getResourceClass());
4748
this.client = client;
48-
temporaryResourceCache = new TemporaryResourceCache<>(this);
49+
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
4950
this.cache = new InformerManager<>(client, configuration, this);
5051
}
5152

5253
@Override
5354
public void onAdd(R resource) {
54-
temporaryResourceCache.removeResourceFromCache(resource);
55+
temporaryResourceCache.onEvent(resource, false);
5556
}
5657

5758
@Override
5859
public void onUpdate(R oldObj, R newObj) {
59-
temporaryResourceCache.removeResourceFromCache(newObj);
60+
temporaryResourceCache.onEvent(newObj, false);
6061
}
6162

6263
@Override
6364
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
64-
temporaryResourceCache.removeResourceFromCache(obj);
65+
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
6566
}
6667

6768
protected InformerManager<R, C> manager() {
@@ -127,6 +128,7 @@ void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache)
127128
this.temporaryResourceCache = temporaryResourceCache;
128129
}
129130

131+
@Override
130132
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
131133
cache.addIndexers(indexers);
132134
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

+34-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.LoggerFactory;
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
11+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1112
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
1213
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1314

@@ -36,13 +37,18 @@ public class TemporaryResourceCache<T extends HasMetadata> {
3637

3738
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
3839
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
40+
private final boolean parseResourceVersions;
3941

40-
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
42+
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
43+
boolean parseResourceVersions) {
4144
this.managedInformerEventSource = managedInformerEventSource;
45+
this.parseResourceVersions = parseResourceVersions;
4246
}
4347

44-
public synchronized Optional<T> removeResourceFromCache(T resource) {
45-
return Optional.ofNullable(cache.remove(ResourceID.fromResource(resource)));
48+
public synchronized void onEvent(T resource, boolean unknownState) {
49+
cache.computeIfPresent(ResourceID.fromResource(resource),
50+
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null
51+
: cached);
4652
}
4753

4854
public synchronized void putAddedResource(T newResource) {
@@ -61,18 +67,37 @@ public synchronized void putResource(T newResource, String previousResourceVersi
6167
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
6268

6369
if ((previousResourceVersion == null && cachedResource == null)
64-
|| (cachedResource != null && previousResourceVersion != null
65-
&& cachedResource.getMetadata().getResourceVersion()
66-
.equals(previousResourceVersion))) {
70+
|| (cachedResource != null
71+
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
72+
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
6773
log.debug(
6874
"Temporarily moving ahead to target version {} for resource id: {}",
6975
newResource.getMetadata().getResourceVersion(), resourceId);
7076
putToCache(newResource, resourceId);
71-
} else {
72-
if (cache.remove(resourceId) != null) {
73-
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
77+
} else if (cache.remove(resourceId) != null) {
78+
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
79+
}
80+
}
81+
82+
/**
83+
* @return true if {@link InformerConfiguration#parseResourceVersions()} is enabled and the
84+
* resourceVersion of newResource is numerically greater than cachedResource, otherwise
85+
* false
86+
*/
87+
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
88+
try {
89+
if (parseResourceVersions
90+
&& Long.compare(Long.parseLong(newResource.getMetadata().getResourceVersion()),
91+
Long.parseLong(cachedResource.getMetadata().getResourceVersion())) > 0) {
92+
return true;
7493
}
94+
} catch (NumberFormatException e) {
95+
log.debug(
96+
"Could not compare resourceVersions {} and {} for {}",
97+
newResource.getMetadata().getResourceVersion(),
98+
cachedResource.getMetadata().getResourceVersion(), resourceId);
7599
}
100+
return false;
76101
}
77102

78103
private void putToCache(T resource, ResourceID resourceID) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
119119
informerEventSource.onUpdate(cachedDeployment, testDeployment());
120120

121121
verify(eventHandlerMock, times(1)).handleEvent(any());
122-
verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any());
122+
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
123123
}
124124

125125
@Test

0 commit comments

Comments
 (0)