Skip to content

Commit b278ebf

Browse files
csvirimetacosm
andauthored
fix: cache handling on update (#604)
Co-authored-by: Chris Laprun <[email protected]>
1 parent cb93092 commit b278ebf

File tree

9 files changed

+251
-25
lines changed

9 files changed

+251
-25
lines changed

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

+43-6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
2828

2929
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
30+
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
3031

3132
/**
3233
* Event handler that makes sure that events are processed in a "single threaded" way per resource
@@ -188,18 +189,18 @@ void eventProcessingFinished(
188189
if (!running) {
189190
return;
190191
}
191-
192+
CustomResourceID customResourceID = executionScope.getCustomResourceID();
192193
log.debug(
193194
"Event processing finished. Scope: {}, PostExecutionControl: {}",
194195
executionScope,
195196
postExecutionControl);
196-
unsetUnderExecution(executionScope.getCustomResourceID());
197+
unsetUnderExecution(customResourceID);
197198

198199
// If a delete event present at this phase, it was received during reconciliation.
199200
// So we either removed the finalizer during reconciliation or we don't use finalizers.
200201
// Either way we don't want to retry.
201202
if (retry != null && postExecutionControl.exceptionDuringExecution() &&
202-
!eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) {
203+
!eventMarker.deleteEventPresent(customResourceID)) {
203204
handleRetryOnException(executionScope);
204205
// todo revisit monitoring since events are not present anymore
205206
// final var monitor = monitor(); executionScope.getEvents().forEach(e ->
@@ -210,11 +211,15 @@ void eventProcessingFinished(
210211
if (retry != null) {
211212
handleSuccessfulExecutionRegardingRetry(executionScope);
212213
}
213-
if (eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) {
214+
if (eventMarker.deleteEventPresent(customResourceID)) {
214215
cleanupForDeletedEvent(executionScope.getCustomResourceID());
215216
} else {
216-
if (eventMarker.eventPresent(executionScope.getCustomResourceID())) {
217-
submitReconciliationExecution(executionScope.getCustomResourceID());
217+
if (eventMarker.eventPresent(customResourceID)) {
218+
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
219+
submitReconciliationExecution(customResourceID);
220+
} else {
221+
postponeReconciliationAndHandleCacheSyncEvent(customResourceID);
222+
}
218223
} else {
219224
reScheduleExecutionIfInstructed(postExecutionControl,
220225
executionScope.getCustomResource());
@@ -225,6 +230,38 @@ void eventProcessingFinished(
225230
}
226231
}
227232

233+
private void postponeReconciliationAndHandleCacheSyncEvent(CustomResourceID customResourceID) {
234+
eventSourceManager.getCustomResourceEventSource().whitelistNextEvent(customResourceID);
235+
}
236+
237+
private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> executionScope,
238+
PostExecutionControl<R> postExecutionControl) {
239+
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
240+
return true;
241+
}
242+
String originalResourceVersion = getVersion(executionScope.getCustomResource());
243+
String customResourceVersionAfterExecution = getVersion(postExecutionControl
244+
.getUpdatedCustomResource()
245+
.orElseThrow(() -> new IllegalStateException(
246+
"Updated custom resource must be present at this point of time")));
247+
String cachedCustomResourceVersion = getVersion(resourceCache
248+
.getCustomResource(executionScope.getCustomResourceID())
249+
.orElseThrow(() -> new IllegalStateException(
250+
"Cached custom resource must be present at this point")));
251+
252+
if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
253+
return true;
254+
}
255+
if (cachedCustomResourceVersion.equals(originalResourceVersion)) {
256+
return false;
257+
}
258+
// If the cached resource version equals neither the version before nor after execution
259+
// probably an update happened on the custom resource independent of the framework during
260+
// reconciliation. We cannot tell at this point if it happened before our update or before.
261+
// (Well we could if we would parse resource version, but that should not be done by definition)
262+
return true;
263+
}
264+
228265
private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecutionControl,
229266
R customResource) {
230267
postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java

+8
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,12 @@ public boolean equals(Object o) {
4747
public int hashCode() {
4848
return Objects.hash(name, namespace);
4949
}
50+
51+
@Override
52+
public String toString() {
53+
return "CustomResourceID{" +
54+
"name='" + name + '\'' +
55+
", namespace='" + namespace + '\'' +
56+
'}';
57+
}
5058
}

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public Set<EventSource> getRegisteredEventSources() {
108108
}
109109

110110
@Override
111-
public CustomResourceEventSource getCustomResourceEventSource() {
111+
public CustomResourceEventSource<R> getCustomResourceEventSource() {
112112
return customResourceEventSource;
113113
}
114114

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

+33-8
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,31 @@ public class CustomResourceEventSource<T extends CustomResource<?, ?>> extends A
4141
private final Map<String, SharedIndexInformer<T>> sharedIndexInformers =
4242
new ConcurrentHashMap<>();
4343
private final ObjectMapper cloningObjectMapper;
44+
private final CustomResourceEventFilter<T> filter;
45+
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
46+
4447

4548
public CustomResourceEventSource(ConfiguredController<T> controller) {
4649
this.controller = controller;
4750
this.cloningObjectMapper =
4851
controller.getConfiguration().getConfigurationService().getObjectMapper();
52+
53+
var filters = new CustomResourceEventFilter[] {
54+
CustomResourceEventFilters.finalizerNeededAndApplied(),
55+
CustomResourceEventFilters.markedForDeletion(),
56+
CustomResourceEventFilters.and(
57+
controller.getConfiguration().getEventFilter(),
58+
CustomResourceEventFilters.generationAware()),
59+
null
60+
};
61+
62+
if (controller.getConfiguration().isGenerationAware()) {
63+
onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>();
64+
filters[filters.length - 1] = onceWhitelistEventFilterEventFilter;
65+
} else {
66+
onceWhitelistEventFilterEventFilter = null;
67+
}
68+
filter = CustomResourceEventFilters.or(filters);
4969
}
5070

5171
@Override
@@ -90,7 +110,7 @@ public void start() {
90110
@Override
91111
public void close() throws IOException {
92112
eventHandler.close();
93-
for (SharedIndexInformer informer : sharedIndexInformers.values()) {
113+
for (SharedIndexInformer<T> informer : sharedIndexInformers.values()) {
94114
try {
95115
log.info("Closing informer {} -> {}", controller, informer);
96116
informer.close();
@@ -104,13 +124,6 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
104124
log.debug(
105125
"Event received for resource: {}", getName(customResource));
106126

107-
final CustomResourceEventFilter<T> filter = CustomResourceEventFilters.or(
108-
CustomResourceEventFilters.finalizerNeededAndApplied(),
109-
CustomResourceEventFilters.markedForDeletion(),
110-
CustomResourceEventFilters.and(
111-
controller.getConfiguration().getEventFilter(),
112-
CustomResourceEventFilters.generationAware()));
113-
114127
if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) {
115128
eventHandler.handleEvent(
116129
new CustomResourceEvent(action, CustomResourceID.fromResource(customResource)));
@@ -171,4 +184,16 @@ private T clone(T customResource) {
171184
throw new IllegalStateException(e);
172185
}
173186
}
187+
188+
/**
189+
* This will ensure that the next event received after this method is called will not be filtered
190+
* out.
191+
*
192+
* @param customResourceID - to which the event is related
193+
*/
194+
public void whitelistNextEvent(CustomResourceID customResourceID) {
195+
if (onceWhitelistEventFilterEventFilter != null) {
196+
onceWhitelistEventFilterEventFilter.whitelistNextEvent(customResourceID);
197+
}
198+
}
174199
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import java.util.concurrent.ConcurrentHashMap;
4+
import java.util.concurrent.ConcurrentMap;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import io.fabric8.kubernetes.client.CustomResource;
10+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
11+
import io.javaoperatorsdk.operator.processing.event.CustomResourceID;
12+
13+
public class OnceWhitelistEventFilterEventFilter<T extends CustomResource<?, ?>>
14+
implements CustomResourceEventFilter<T> {
15+
16+
private static final Logger log =
17+
LoggerFactory.getLogger(OnceWhitelistEventFilterEventFilter.class);
18+
19+
private final ConcurrentMap<CustomResourceID, CustomResourceID> whiteList =
20+
new ConcurrentHashMap<>();
21+
22+
@Override
23+
public boolean acceptChange(ControllerConfiguration<T> configuration, T oldResource,
24+
T newResource) {
25+
CustomResourceID customResourceID = CustomResourceID.fromResource(newResource);
26+
boolean res = whiteList.remove(customResourceID, customResourceID);
27+
if (res) {
28+
log.debug("Accepting whitelisted event for CR id: {}", customResourceID);
29+
}
30+
return res;
31+
}
32+
33+
public void whitelistNextEvent(CustomResourceID customResourceID) {
34+
whiteList.putIfAbsent(customResourceID, customResourceID);
35+
}
36+
}

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public static TestCustomResource testCustomResource(CustomResourceID id) {
3232
resource.setMetadata(
3333
new ObjectMetaBuilder()
3434
.withName(id.getName())
35+
.withResourceVersion("1")
3536
.withGeneration(1L)
3637
.withNamespace(id.getNamespace().orElse(null))
3738
.build());

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java

+61-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
1818
import io.javaoperatorsdk.operator.processing.event.Event;
1919
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
20+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
2021
import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction;
2122
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
2223
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
@@ -39,15 +40,15 @@ class DefaultEventHandlerTest {
3940
private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class);
4041
private DefaultEventSourceManager defaultEventSourceManagerMock =
4142
mock(DefaultEventSourceManager.class);
42-
private ResourceCache resourceCache = mock(ResourceCache.class);
43+
private ResourceCache resourceCacheMock = mock(ResourceCache.class);
4344

4445
private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class);
4546

4647
private DefaultEventHandler defaultEventHandler =
47-
new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", null, eventMarker);
48+
new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test", null, eventMarker);
4849

4950
private DefaultEventHandler defaultEventHandlerWithRetry =
50-
new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test",
51+
new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test",
5152
GenericRetry.defaultLimitedExponentialRetry(), eventMarker);
5253

5354
@BeforeEach
@@ -68,7 +69,7 @@ public void dispatchesEventsIfNoExecutionInProgress() {
6869
@Test
6970
public void skipProcessingIfLatestCustomResourceNotInCache() {
7071
Event event = prepareCREvent();
71-
when(resourceCache.getCustomResource(event.getRelatedCustomResourceID()))
72+
when(resourceCacheMock.getCustomResource(event.getRelatedCustomResourceID()))
7273
.thenReturn(Optional.empty());
7374

7475
defaultEventHandler.handleEvent(event);
@@ -213,7 +214,7 @@ public void cleansUpWhenDeleteEventReceivedAndNoEventPresent() {
213214

214215
@Test
215216
public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() {
216-
var cr = testCustomResource(new CustomResourceID(UUID.randomUUID().toString()));
217+
var cr = testCustomResource();
217218
var crEvent = prepareCREvent(CustomResourceID.fromResource(cr));
218219
eventMarker.markDeleteEventReceived(crEvent.getRelatedCustomResourceID());
219220
var executionScope = new ExecutionScope(cr, null);
@@ -225,6 +226,60 @@ public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() {
225226
.cleanupForCustomResource(eq(crEvent.getRelatedCustomResourceID()));
226227
}
227228

229+
@Test
230+
public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() {
231+
var crID = new CustomResourceID("test-cr", TEST_NAMESPACE);
232+
var cr = testCustomResource(crID);
233+
var updatedCr = testCustomResource(crID);
234+
updatedCr.getMetadata().setResourceVersion("2");
235+
var mockCREventSource = mock(CustomResourceEventSource.class);
236+
eventMarker.markEventReceived(crID);
237+
when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr));
238+
when(defaultEventSourceManagerMock.getCustomResourceEventSource())
239+
.thenReturn(mockCREventSource);
240+
241+
defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null),
242+
PostExecutionControl.customResourceUpdated(updatedCr));
243+
244+
verify(mockCREventSource, times(1)).whitelistNextEvent(eq(crID));
245+
}
246+
247+
@Test
248+
public void dontWhitelistsEventWhenOtherChangeDuringExecution() {
249+
var crID = new CustomResourceID("test-cr", TEST_NAMESPACE);
250+
var cr = testCustomResource(crID);
251+
var updatedCr = testCustomResource(crID);
252+
updatedCr.getMetadata().setResourceVersion("2");
253+
var otherChangeCR = testCustomResource(crID);
254+
otherChangeCR.getMetadata().setResourceVersion("3");
255+
var mockCREventSource = mock(CustomResourceEventSource.class);
256+
eventMarker.markEventReceived(crID);
257+
when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(otherChangeCR));
258+
when(defaultEventSourceManagerMock.getCustomResourceEventSource())
259+
.thenReturn(mockCREventSource);
260+
261+
defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null),
262+
PostExecutionControl.customResourceUpdated(updatedCr));
263+
264+
verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID));
265+
}
266+
267+
@Test
268+
public void dontWhitelistsEventIfUpdatedEventInCache() {
269+
var crID = new CustomResourceID("test-cr", TEST_NAMESPACE);
270+
var cr = testCustomResource(crID);
271+
var mockCREventSource = mock(CustomResourceEventSource.class);
272+
eventMarker.markEventReceived(crID);
273+
when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr));
274+
when(defaultEventSourceManagerMock.getCustomResourceEventSource())
275+
.thenReturn(mockCREventSource);
276+
277+
defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null),
278+
PostExecutionControl.customResourceUpdated(cr));
279+
280+
verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID));
281+
}
282+
228283
private CustomResourceID eventAlreadyUnderProcessing() {
229284
when(eventDispatcherMock.handleExecution(any()))
230285
.then(
@@ -243,7 +298,7 @@ private CustomResourceEvent prepareCREvent() {
243298

244299
private CustomResourceEvent prepareCREvent(CustomResourceID uid) {
245300
TestCustomResource customResource = testCustomResource(uid);
246-
when(resourceCache.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource));
301+
when(resourceCacheMock.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource));
247302
return new CustomResourceEvent(ResourceAction.UPDATED,
248303
CustomResourceID.fromResource(customResource));
249304
}

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java

+25-4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
1515
import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration;
1616
import io.javaoperatorsdk.operator.processing.ConfiguredController;
17+
import io.javaoperatorsdk.operator.processing.event.CustomResourceID;
1718
import io.javaoperatorsdk.operator.processing.event.EventHandler;
1819
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
1920

@@ -103,16 +104,36 @@ public void handlesAllEventIfNotGenerationAware() {
103104
}
104105

105106
@Test
106-
public void eventNotMarkedForLastGenerationIfNoFinalizer() {
107+
public void eventWithNoGenerationProcessedIfNoFinalizer() {
107108
TestCustomResource customResource1 = TestUtils.testCustomResource();
108109

109110
customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1,
110111
customResource1);
112+
111113
verify(eventHandler, times(1)).handleEvent(any());
114+
}
112115

113-
customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1,
114-
customResource1);
115-
verify(eventHandler, times(2)).handleEvent(any());
116+
@Test
117+
public void handlesNextEventIfWhitelisted() {
118+
TestCustomResource customResource = TestUtils.testCustomResource();
119+
customResource.getMetadata().setFinalizers(List.of(FINALIZER));
120+
customResourceEventSource.whitelistNextEvent(CustomResourceID.fromResource(customResource));
121+
122+
customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource,
123+
customResource);
124+
125+
verify(eventHandler, times(1)).handleEvent(any());
126+
}
127+
128+
@Test
129+
public void notHandlesNextEventIfNotWhitelisted() {
130+
TestCustomResource customResource = TestUtils.testCustomResource();
131+
customResource.getMetadata().setFinalizers(List.of(FINALIZER));
132+
133+
customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource,
134+
customResource);
135+
136+
verify(eventHandler, times(0)).handleEvent(any());
116137
}
117138

118139
private static class TestConfiguredController extends ConfiguredController<TestCustomResource> {

0 commit comments

Comments
 (0)