Skip to content

Commit 181eb31

Browse files
committed
refinements mentioned on operator-framework#2012
provides two options - to control if the annotation is used (to omit events that come too quickly) - to parse the resource version (to keep the cache up-to-date and omit events if they come too slowly) Signed-off-by: Steve Hawkins <[email protected]>
1 parent a3e2c24 commit 181eb31

File tree

10 files changed

+145
-28
lines changed

10 files changed

+145
-28
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+27
Original file line numberDiff line numberDiff line change
@@ -345,4 +345,31 @@ default boolean ssaBasedCreateUpdateMatchForDependentResources() {
345345
return true;
346346
}
347347

348+
/**
349+
* If an annotation should be used so that the operator sdk can detect events from its own updates
350+
* of dependent resources and then filter them.
351+
* <p>
352+
* Disable this if you want to react to your own dependent resource updates
353+
*
354+
* @since 4.5.0
355+
*/
356+
default boolean previousAnnotationForDependentResources() {
357+
return true;
358+
}
359+
360+
/**
361+
* If the event logic should parse the resourceVersion to determine the ordering of events. This
362+
* is typically not needed.
363+
* <p>
364+
* Disabled by default as Kubernetes does not officially support this interpretation of
365+
* resourceVersions. Enable only if your api server event processing seems to lag the operator
366+
* logic and you want to further minimize the the amount of work done / updates issued by the
367+
* operator.
368+
*
369+
* @since 4.5.0
370+
*/
371+
default boolean parseResourceVersions() {
372+
return false;
373+
}
374+
348375
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

+28
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class ConfigurationServiceOverrider {
3535
private Duration cacheSyncTimeout;
3636
private ResourceClassResolver resourceClassResolver;
3737
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
38+
private Boolean previousAnnotationForDependentResources;
39+
private Boolean parseResourceVersions;
3840

3941
ConfigurationServiceOverrider(ConfigurationService original) {
4042
this.original = original;
@@ -150,6 +152,18 @@ public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentRe
150152
return this;
151153
}
152154

155+
public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(
156+
boolean value) {
157+
this.previousAnnotationForDependentResources = value;
158+
return this;
159+
}
160+
161+
public ConfigurationServiceOverrider wihtParseResourceVersions(
162+
boolean value) {
163+
this.parseResourceVersions = value;
164+
return this;
165+
}
166+
153167
public ConfigurationService build() {
154168
return new BaseConfigurationService(original.getVersion(), cloner, client) {
155169
@Override
@@ -256,6 +270,20 @@ public boolean ssaBasedCreateUpdateMatchForDependentResources() {
256270
? ssaBasedCreateUpdateMatchForDependentResources
257271
: super.ssaBasedCreateUpdateMatchForDependentResources();
258272
}
273+
274+
@Override
275+
public boolean previousAnnotationForDependentResources() {
276+
return previousAnnotationForDependentResources != null
277+
? previousAnnotationForDependentResources
278+
: super.previousAnnotationForDependentResources();
279+
}
280+
281+
@Override
282+
public boolean parseResourceVersions() {
283+
return parseResourceVersions != null
284+
? parseResourceVersions
285+
: super.parseResourceVersions();
286+
}
259287
};
260288
}
261289

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
6262
return secondaryToPrimaryMapper;
6363
}
6464

65+
@Override
6566
public Optional<OnDeleteFilter<? super R>> onDeleteFilter() {
6667
return Optional.ofNullable(onDeleteFilter);
6768
}
@@ -95,12 +96,15 @@ public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondary
9596
*/
9697
SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();
9798

99+
@Override
98100
Optional<OnAddFilter<? super R>> onAddFilter();
99101

102+
@Override
100103
Optional<OnUpdateFilter<? super R>> onUpdateFilter();
101104

102105
Optional<OnDeleteFilter<? super R>> onDeleteFilter();
103106

107+
@Override
104108
Optional<GenericFilter<? super R>> genericFilter();
105109

106110
<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public R create(R target, P primary, Context<P> context) {
113113
target.getMetadata().setResourceVersion("1");
114114
}
115115
}
116-
addMetadata(false, null, target, primary);
116+
addMetadata(false, null, target, primary, context);
117117
final var resource = prepare(target, primary, "Creating");
118118
return useSSA(context)
119119
? resource
@@ -129,7 +129,7 @@ public R update(R actual, R target, P primary, Context<P> context) {
129129
actual.getMetadata().getResourceVersion());
130130
}
131131
R updatedResource;
132-
addMetadata(false, actual, target, primary);
132+
addMetadata(false, actual, target, primary, context);
133133
if (useSSA(context)) {
134134
updatedResource = prepare(target, primary, "Updating")
135135
.fieldManager(context.getControllerConfiguration().fieldManager())
@@ -160,7 +160,7 @@ public Result<R> match(R actualResource, R desired, P primary, Context<P> contex
160160
public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMatcher<R> matcher,
161161
Context<P> context) {
162162
final boolean matches;
163-
addMetadata(true, actualResource, desired, primary);
163+
addMetadata(true, actualResource, desired, primary, context);
164164
if (useSSA(context)) {
165165
matches = SSABasedGenericKubernetesResourceMatcher.getInstance()
166166
.matches(actualResource, desired, context);
@@ -170,8 +170,9 @@ public Result<R> match(R actualResource, R desired, P primary, ResourceUpdaterMa
170170
return Result.computed(matches, desired);
171171
}
172172

173-
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary) {
174-
if (forMatch) { // keep the current
173+
protected void addMetadata(boolean forMatch, R actualResource, final R target, P primary,
174+
Context<P> context) {
175+
if (forMatch) { // keep the current previous annotation
175176
String actual = actualResource.getMetadata().getAnnotations()
176177
.get(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
177178
Map<String, String> annotations = target.getMetadata().getAnnotations();
@@ -180,7 +181,7 @@ protected void addMetadata(boolean forMatch, R actualResource, final R target, P
180181
} else {
181182
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
182183
}
183-
} else { // set a new one
184+
} else if (usePreviousAnnotation(context)) { // set a new one
184185
eventSource().orElseThrow().addPreviousAnnotation(
185186
Optional.ofNullable(actualResource).map(r -> r.getMetadata().getResourceVersion())
186187
.orElse(null),
@@ -196,6 +197,11 @@ private boolean useSSA(Context<P> context) {
196197
.ssaBasedCreateUpdateMatchForDependentResources());
197198
}
198199

200+
private boolean usePreviousAnnotation(Context<P> context) {
201+
return context.getControllerConfiguration().getConfigurationService()
202+
.previousAnnotationForDependentResources();
203+
}
204+
199205
@Override
200206
protected void handleDelete(P primary, R secondary, Context<P> context) {
201207
if (secondary != null) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ public class ControllerResourceEventSource<T extends HasMetadata>
3232

3333
@SuppressWarnings({"unchecked", "rawtypes"})
3434
public ControllerResourceEventSource(Controller<T> controller) {
35-
super(controller.getCRClient(), controller.getConfiguration());
35+
super(controller.getCRClient(), controller.getConfiguration(), false);
3636
this.controller = controller;
3737

3838
final var config = controller.getConfiguration();
3939
OnUpdateFilter internalOnUpdateFilter =
40-
(OnUpdateFilter<T>) onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
40+
onUpdateFinalizerNeededAndApplied(controller.useFinalizer(),
4141
config.getFinalizerName())
4242
.or(onUpdateGenerationAware(config.isGenerationAware()))
4343
.or(onUpdateMarkedForDeletion());

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6969
extends ManagedInformerEventSource<R, P, InformerConfiguration<R>>
7070
implements ResourceEventHandler<R> {
7171

72+
private static final int MAX_RESOURCE_VERSIONS = 256;
73+
7274
public static String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
7375

7476
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
@@ -78,14 +80,31 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7880
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
7981
private Map<String, Function<R, List<String>>> indexerBuffer = new HashMap<>();
8082
private final String id = UUID.randomUUID().toString();
83+
private final Set<String> knownResourceVersions;
8184

8285
public InformerEventSource(
8386
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
84-
this(configuration, context.getClient());
87+
this(configuration, context.getClient(),
88+
context.getControllerConfiguration().getConfigurationService().parseResourceVersions());
8589
}
8690

8791
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
88-
super(client.resources(configuration.getResourceClass()), configuration);
92+
this(configuration, client, false);
93+
}
94+
95+
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client,
96+
boolean parseResourceVersions) {
97+
super(client.resources(configuration.getResourceClass()), configuration, parseResourceVersions);
98+
if (parseResourceVersions) {
99+
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
100+
@Override
101+
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
102+
return size() >= MAX_RESOURCE_VERSIONS;
103+
}
104+
});
105+
} else {
106+
knownResourceVersions = null;
107+
}
89108

90109
// If there is a primary to secondary mapper there is no need for primary to secondary index.
91110
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
@@ -169,6 +188,10 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
169188
}
170189

171190
private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
191+
if (knownResourceVersions != null
192+
&& knownResourceVersions.contains(newObject.getMetadata().getResourceVersion())) {
193+
return true;
194+
}
172195
var res = temporaryResourceCache.getResourceFromCache(resourceID);
173196
if (res.isEmpty()) {
174197
return isEventKnownFromAnnotation(newObject, oldObject);
@@ -262,6 +285,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
262285

263286
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
264287
primaryToSecondaryIndex.onAddOrUpdate(newResource);
288+
if (knownResourceVersions != null) {
289+
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
290+
}
265291
temporaryResourceCache.putResource(newResource, Optional.ofNullable(oldResource)
266292
.map(r -> r.getMetadata().getResourceVersion()).orElse(null));
267293
}
@@ -275,7 +301,6 @@ public boolean allowsNamespaceChanges() {
275301
return configuration().followControllerNamespaceChanges();
276302
}
277303

278-
279304
private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) {
280305
if (genericFilter != null && !genericFilter.accept(newObject)) {
281306
return false;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,27 @@ public abstract class ManagedInformerEventSource<R extends HasMetadata, P extend
4242
protected MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client;
4343

4444
protected ManagedInformerEventSource(
45-
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration) {
45+
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client, C configuration,
46+
boolean parseResourceVersions) {
4647
super(configuration.getResourceClass());
4748
this.client = client;
48-
temporaryResourceCache = new TemporaryResourceCache<>(this);
49+
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
4950
this.cache = new InformerManager<>(client, configuration, this);
5051
}
5152

5253
@Override
5354
public void onAdd(R resource) {
54-
temporaryResourceCache.removeResourceFromCache(resource);
55+
temporaryResourceCache.onEvent(resource, false);
5556
}
5657

5758
@Override
5859
public void onUpdate(R oldObj, R newObj) {
59-
temporaryResourceCache.removeResourceFromCache(newObj);
60+
temporaryResourceCache.onEvent(newObj, false);
6061
}
6162

6263
@Override
6364
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
64-
temporaryResourceCache.removeResourceFromCache(obj);
65+
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
6566
}
6667

6768
protected InformerManager<R, C> manager() {
@@ -127,6 +128,7 @@ void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache)
127128
this.temporaryResourceCache = temporaryResourceCache;
128129
}
129130

131+
@Override
130132
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
131133
cache.addIndexers(indexers);
132134
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

+34-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.LoggerFactory;
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
11+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1112
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
1213
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1314

@@ -36,13 +37,18 @@ public class TemporaryResourceCache<T extends HasMetadata> {
3637

3738
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
3839
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
40+
private final boolean parseResourceVersions;
3941

40-
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
42+
public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
43+
boolean parseResourceVersions) {
4144
this.managedInformerEventSource = managedInformerEventSource;
45+
this.parseResourceVersions = parseResourceVersions;
4246
}
4347

44-
public synchronized Optional<T> removeResourceFromCache(T resource) {
45-
return Optional.ofNullable(cache.remove(ResourceID.fromResource(resource)));
48+
public synchronized void onEvent(T resource, boolean unknownState) {
49+
cache.computeIfPresent(ResourceID.fromResource(resource),
50+
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null
51+
: cached);
4652
}
4753

4854
public synchronized void putAddedResource(T newResource) {
@@ -61,18 +67,37 @@ public synchronized void putResource(T newResource, String previousResourceVersi
6167
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
6268

6369
if ((previousResourceVersion == null && cachedResource == null)
64-
|| (cachedResource != null && previousResourceVersion != null
65-
&& cachedResource.getMetadata().getResourceVersion()
66-
.equals(previousResourceVersion))) {
70+
|| (cachedResource != null
71+
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
72+
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
6773
log.debug(
6874
"Temporarily moving ahead to target version {} for resource id: {}",
6975
newResource.getMetadata().getResourceVersion(), resourceId);
7076
putToCache(newResource, resourceId);
71-
} else {
72-
if (cache.remove(resourceId) != null) {
73-
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
77+
} else if (cache.remove(resourceId) != null) {
78+
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
79+
}
80+
}
81+
82+
/**
83+
* @return true if {@link InformerConfiguration#parseResourceVersions()} is enabled and the
84+
* resourceVersion of newResource is numerically greater than cachedResource, otherwise
85+
* false
86+
*/
87+
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
88+
try {
89+
if (parseResourceVersions
90+
&& Long.compare(Long.parseLong(newResource.getMetadata().getResourceVersion()),
91+
Long.parseLong(cachedResource.getMetadata().getResourceVersion())) > 0) {
92+
return true;
7493
}
94+
} catch (NumberFormatException e) {
95+
log.debug(
96+
"Could not compare resourceVersions {} and {} for {}",
97+
newResource.getMetadata().getResourceVersion(),
98+
cachedResource.getMetadata().getResourceVersion(), resourceId);
7599
}
100+
return false;
76101
}
77102

78103
private void putToCache(T resource, ResourceID resourceID) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
119119
informerEventSource.onUpdate(cachedDeployment, testDeployment());
120120

121121
verify(eventHandlerMock, times(1)).handleEvent(any());
122-
verify(temporaryResourceCacheMock, times(1)).removeResourceFromCache(any());
122+
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
123123
}
124124

125125
@Test

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class TemporaryResourceCacheTest {
2121
private final InformerEventSource<ConfigMap, ?> informerEventSource =
2222
mock(InformerEventSource.class);
2323
private final TemporaryResourceCache<ConfigMap> temporaryResourceCache =
24-
new TemporaryResourceCache<>(informerEventSource);
24+
new TemporaryResourceCache<>(informerEventSource, false);
2525

2626
@Test
2727
void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() {
@@ -75,7 +75,7 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
7575
void removesResourceFromCache() {
7676
ConfigMap testResource = propagateTestResourceToCache();
7777

78-
temporaryResourceCache.removeResourceFromCache(testResource());
78+
temporaryResourceCache.onEvent(testResource(), false);
7979

8080
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
8181
.isNotPresent();

0 commit comments

Comments
 (0)