Skip to content

fix: cache handling on update #604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

/**
* Event handler that makes sure that events are processed in a "single threaded" way per resource
Expand Down Expand Up @@ -188,18 +189,18 @@ void eventProcessingFinished(
if (!running) {
return;
}

CustomResourceID customResourceID = executionScope.getCustomResourceID();
log.debug(
"Event processing finished. Scope: {}, PostExecutionControl: {}",
executionScope,
postExecutionControl);
unsetUnderExecution(executionScope.getCustomResourceID());
unsetUnderExecution(customResourceID);

// If a delete event present at this phase, it was received during reconciliation.
// So we either removed the finalizer during reconciliation or we don't use finalizers.
// Either way we don't want to retry.
if (retry != null && postExecutionControl.exceptionDuringExecution() &&
!eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) {
!eventMarker.deleteEventPresent(customResourceID)) {
handleRetryOnException(executionScope);
// todo revisit monitoring since events are not present anymore
// final var monitor = monitor(); executionScope.getEvents().forEach(e ->
Expand All @@ -210,11 +211,15 @@ void eventProcessingFinished(
if (retry != null) {
handleSuccessfulExecutionRegardingRetry(executionScope);
}
if (eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) {
if (eventMarker.deleteEventPresent(customResourceID)) {
cleanupForDeletedEvent(executionScope.getCustomResourceID());
} else {
if (eventMarker.eventPresent(executionScope.getCustomResourceID())) {
submitReconciliationExecution(executionScope.getCustomResourceID());
if (eventMarker.eventPresent(customResourceID)) {
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
submitReconciliationExecution(customResourceID);
} else {
postponeReconciliationAndHandleCacheSyncEvent(customResourceID);
}
} else {
reScheduleExecutionIfInstructed(postExecutionControl,
executionScope.getCustomResource());
Expand All @@ -225,6 +230,38 @@ void eventProcessingFinished(
}
}

private void postponeReconciliationAndHandleCacheSyncEvent(CustomResourceID customResourceID) {
eventSourceManager.getCustomResourceEventSource().whitelistNextEvent(customResourceID);
}

private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> executionScope,
PostExecutionControl<R> postExecutionControl) {
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
return true;
}
String originalResourceVersion = getVersion(executionScope.getCustomResource());
String customResourceVersionAfterExecution = getVersion(postExecutionControl
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like how we access an Optional without checking if it's present or not. If we never check for the presence, we might as well not use an Optional.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite a gray are, because we are checking implicitly above. So returning when there was no execution:

   if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
      return true;
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not nice, but not sure how we could make it nicer.

.getUpdatedCustomResource()
.orElseThrow(() -> new IllegalStateException(
"Updated custom resource must be present at this point of time")));
String cachedCustomResourceVersion = getVersion(resourceCache
.getCustomResource(executionScope.getCustomResourceID())
.orElseThrow(() -> new IllegalStateException(
"Cached custom resource must be present at this point")));

if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
return true;
}
if (cachedCustomResourceVersion.equals(originalResourceVersion)) {
return false;
}
// If the cached resource version equals neither the version before nor after execution
// probably an update happened on the custom resource independent of the framework during
// reconciliation. We cannot tell at this point if it happened before our update or before.
// (Well we could if we would parse resource version, but that should not be done by definition)
return true;
}

private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecutionControl,
R customResource) {
postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(name, namespace);
}

@Override
public String toString() {
return "CustomResourceID{" +
"name='" + name + '\'' +
", namespace='" + namespace + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Set<EventSource> getRegisteredEventSources() {
}

@Override
public CustomResourceEventSource getCustomResourceEventSource() {
public CustomResourceEventSource<R> getCustomResourceEventSource() {
return customResourceEventSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,31 @@ public class CustomResourceEventSource<T extends CustomResource<?, ?>> extends A
private final Map<String, SharedIndexInformer<T>> sharedIndexInformers =
new ConcurrentHashMap<>();
private final ObjectMapper cloningObjectMapper;
private final CustomResourceEventFilter<T> filter;
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;


public CustomResourceEventSource(ConfiguredController<T> controller) {
this.controller = controller;
this.cloningObjectMapper =
controller.getConfiguration().getConfigurationService().getObjectMapper();

var filters = new CustomResourceEventFilter[] {
CustomResourceEventFilters.finalizerNeededAndApplied(),
CustomResourceEventFilters.markedForDeletion(),
CustomResourceEventFilters.and(
controller.getConfiguration().getEventFilter(),
CustomResourceEventFilters.generationAware()),
null
};

if (controller.getConfiguration().isGenerationAware()) {
onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>();
filters[filters.length - 1] = onceWhitelistEventFilterEventFilter;
} else {
onceWhitelistEventFilterEventFilter = null;
}
filter = CustomResourceEventFilters.or(filters);
}

@Override
Expand Down Expand Up @@ -90,7 +110,7 @@ public void start() {
@Override
public void close() throws IOException {
eventHandler.close();
for (SharedIndexInformer informer : sharedIndexInformers.values()) {
for (SharedIndexInformer<T> informer : sharedIndexInformers.values()) {
try {
log.info("Closing informer {} -> {}", controller, informer);
informer.close();
Expand All @@ -104,13 +124,6 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
log.debug(
"Event received for resource: {}", getName(customResource));

final CustomResourceEventFilter<T> filter = CustomResourceEventFilters.or(
CustomResourceEventFilters.finalizerNeededAndApplied(),
CustomResourceEventFilters.markedForDeletion(),
CustomResourceEventFilters.and(
controller.getConfiguration().getEventFilter(),
CustomResourceEventFilters.generationAware()));

if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) {
eventHandler.handleEvent(
new CustomResourceEvent(action, CustomResourceID.fromResource(customResource)));
Expand Down Expand Up @@ -171,4 +184,16 @@ private T clone(T customResource) {
throw new IllegalStateException(e);
}
}

/**
* This will ensure that the next event received after this method is called will not be filtered
* out.
*
* @param customResourceID - to which the event is related
*/
public void whitelistNextEvent(CustomResourceID customResourceID) {
if (onceWhitelistEventFilterEventFilter != null) {
onceWhitelistEventFilterEventFilter.whitelistNextEvent(customResourceID);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.javaoperatorsdk.operator.processing.event.internal;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.CustomResourceID;

public class OnceWhitelistEventFilterEventFilter<T extends CustomResource<?, ?>>
implements CustomResourceEventFilter<T> {

private static final Logger log =
LoggerFactory.getLogger(OnceWhitelistEventFilterEventFilter.class);

private final ConcurrentMap<CustomResourceID, CustomResourceID> whiteList =
new ConcurrentHashMap<>();

@Override
public boolean acceptChange(ControllerConfiguration<T> configuration, T oldResource,
T newResource) {
CustomResourceID customResourceID = CustomResourceID.fromResource(newResource);
boolean res = whiteList.remove(customResourceID, customResourceID);
if (res) {
log.debug("Accepting whitelisted event for CR id: {}", customResourceID);
}
return res;
}

public void whitelistNextEvent(CustomResourceID customResourceID) {
whiteList.putIfAbsent(customResourceID, customResourceID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static TestCustomResource testCustomResource(CustomResourceID id) {
resource.setMetadata(
new ObjectMetaBuilder()
.withName(id.getName())
.withResourceVersion("1")
.withGeneration(1L)
.withNamespace(id.getNamespace().orElse(null))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
Expand All @@ -39,15 +40,15 @@ class DefaultEventHandlerTest {
private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class);
private DefaultEventSourceManager defaultEventSourceManagerMock =
mock(DefaultEventSourceManager.class);
private ResourceCache resourceCache = mock(ResourceCache.class);
private ResourceCache resourceCacheMock = mock(ResourceCache.class);

private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class);

private DefaultEventHandler defaultEventHandler =
new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", null, eventMarker);
new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test", null, eventMarker);

private DefaultEventHandler defaultEventHandlerWithRetry =
new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test",
new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test",
GenericRetry.defaultLimitedExponentialRetry(), eventMarker);

@BeforeEach
Expand All @@ -68,7 +69,7 @@ public void dispatchesEventsIfNoExecutionInProgress() {
@Test
public void skipProcessingIfLatestCustomResourceNotInCache() {
Event event = prepareCREvent();
when(resourceCache.getCustomResource(event.getRelatedCustomResourceID()))
when(resourceCacheMock.getCustomResource(event.getRelatedCustomResourceID()))
.thenReturn(Optional.empty());

defaultEventHandler.handleEvent(event);
Expand Down Expand Up @@ -213,7 +214,7 @@ public void cleansUpWhenDeleteEventReceivedAndNoEventPresent() {

@Test
public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() {
var cr = testCustomResource(new CustomResourceID(UUID.randomUUID().toString()));
var cr = testCustomResource();
var crEvent = prepareCREvent(CustomResourceID.fromResource(cr));
eventMarker.markDeleteEventReceived(crEvent.getRelatedCustomResourceID());
var executionScope = new ExecutionScope(cr, null);
Expand All @@ -225,6 +226,60 @@ public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() {
.cleanupForCustomResource(eq(crEvent.getRelatedCustomResourceID()));
}

@Test
public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() {
var crID = new CustomResourceID("test-cr", TEST_NAMESPACE);
var cr = testCustomResource(crID);
var updatedCr = testCustomResource(crID);
updatedCr.getMetadata().setResourceVersion("2");
var mockCREventSource = mock(CustomResourceEventSource.class);
eventMarker.markEventReceived(crID);
when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr));
when(defaultEventSourceManagerMock.getCustomResourceEventSource())
.thenReturn(mockCREventSource);

defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null),
PostExecutionControl.customResourceUpdated(updatedCr));

verify(mockCREventSource, times(1)).whitelistNextEvent(eq(crID));
}

@Test
public void dontWhitelistsEventWhenOtherChangeDuringExecution() {
var crID = new CustomResourceID("test-cr", TEST_NAMESPACE);
var cr = testCustomResource(crID);
var updatedCr = testCustomResource(crID);
updatedCr.getMetadata().setResourceVersion("2");
var otherChangeCR = testCustomResource(crID);
otherChangeCR.getMetadata().setResourceVersion("3");
var mockCREventSource = mock(CustomResourceEventSource.class);
eventMarker.markEventReceived(crID);
when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(otherChangeCR));
when(defaultEventSourceManagerMock.getCustomResourceEventSource())
.thenReturn(mockCREventSource);

defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null),
PostExecutionControl.customResourceUpdated(updatedCr));

verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID));
}

@Test
public void dontWhitelistsEventIfUpdatedEventInCache() {
var crID = new CustomResourceID("test-cr", TEST_NAMESPACE);
var cr = testCustomResource(crID);
var mockCREventSource = mock(CustomResourceEventSource.class);
eventMarker.markEventReceived(crID);
when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr));
when(defaultEventSourceManagerMock.getCustomResourceEventSource())
.thenReturn(mockCREventSource);

defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null),
PostExecutionControl.customResourceUpdated(cr));

verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID));
}

private CustomResourceID eventAlreadyUnderProcessing() {
when(eventDispatcherMock.handleExecution(any()))
.then(
Expand All @@ -243,7 +298,7 @@ private CustomResourceEvent prepareCREvent() {

private CustomResourceEvent prepareCREvent(CustomResourceID uid) {
TestCustomResource customResource = testCustomResource(uid);
when(resourceCache.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource));
when(resourceCacheMock.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource));
return new CustomResourceEvent(ResourceAction.UPDATED,
CustomResourceID.fromResource(customResource));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration;
import io.javaoperatorsdk.operator.processing.ConfiguredController;
import io.javaoperatorsdk.operator.processing.event.CustomResourceID;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

Expand Down Expand Up @@ -103,16 +104,36 @@ public void handlesAllEventIfNotGenerationAware() {
}

@Test
public void eventNotMarkedForLastGenerationIfNoFinalizer() {
public void eventWithNoGenerationProcessedIfNoFinalizer() {
TestCustomResource customResource1 = TestUtils.testCustomResource();

customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1,
customResource1);

verify(eventHandler, times(1)).handleEvent(any());
}

customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1,
customResource1);
verify(eventHandler, times(2)).handleEvent(any());
@Test
public void handlesNextEventIfWhitelisted() {
TestCustomResource customResource = TestUtils.testCustomResource();
customResource.getMetadata().setFinalizers(List.of(FINALIZER));
customResourceEventSource.whitelistNextEvent(CustomResourceID.fromResource(customResource));

customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource,
customResource);

verify(eventHandler, times(1)).handleEvent(any());
}

@Test
public void notHandlesNextEventIfNotWhitelisted() {
TestCustomResource customResource = TestUtils.testCustomResource();
customResource.getMetadata().setFinalizers(List.of(FINALIZER));

customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource,
customResource);

verify(eventHandler, times(0)).handleEvent(any());
}

private static class TestConfiguredController extends ConfiguredController<TestCustomResource> {
Expand Down
Loading