Skip to content

Removing events from context #596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
898e2c2
WIP
csviri Sep 30, 2021
be4e5cf
Addressing Custom Resource by Name and Namespace refactor + Informer …
csviri Oct 1, 2021
b2ab4b9
Build is fixed, (test failing)
csviri Oct 4, 2021
9145b52
Test fixes
csviri Oct 4, 2021
4ef27bf
Merge branch 'master' into informer-creventsource
csviri Oct 4, 2021
34cc2a1
minor update
csviri Oct 4, 2021
4b15974
EventSourceManager small fix
csviri Oct 4, 2021
77033e6
Merge branch 'access-event-source-manager' into informer-creventsource
csviri Oct 4, 2021
e1b5926
Unit tests fixed
csviri Oct 4, 2021
e7d1b99
fix: DefaultEventHandler init from EventSourceManager
csviri Oct 4, 2021
a1f92c6
fix: custom resource selector test improvement
csviri Oct 4, 2021
e928a3e
fix: wip test imrpovements
csviri Oct 5, 2021
f35a340
fix: test improvements
csviri Oct 5, 2021
5d5817e
fix: further improvements
csviri Oct 5, 2021
544ce35
Merge branch 'v2' into informer-creventsource
csviri Oct 5, 2021
75ad7d2
fix: build
csviri Oct 5, 2021
7012fb3
feature: add mvn jar to gitignore
csviri Oct 5, 2021
3746122
Exposing CustomResourceEventSource and informers
csviri Oct 6, 2021
f0f2e91
fix: cleanup
csviri Oct 6, 2021
6638a48
fix: remove caching optimization since it not possible anymore with i…
csviri Oct 6, 2021
1d786ef
fix: formatting
csviri Oct 6, 2021
a5343b7
refactor: make name/namespace final
metacosm Oct 6, 2021
9e0430a
feature: Simple label selector support
csviri Oct 7, 2021
7f48b9a
Merge branch 'informer-creventsource' of github.com:java-operator-sdk…
csviri Oct 7, 2021
0aa29a1
fix: formatting
csviri Oct 7, 2021
3ad2fc5
fix: code inspection reports
csviri Oct 7, 2021
86b8185
Merge branch 'v2' into informer-creventsource
csviri Oct 7, 2021
92d3ed3
fix: merge from v2
csviri Oct 7, 2021
d194d25
fix: removed most deprecated apis
csviri Oct 7, 2021
7ee0b7d
wip: started to remove events from variouse layers
csviri Oct 7, 2021
150e875
fix: progress with implementation and tests
csviri Oct 8, 2021
a03cfb9
fix: Updated informer mapping to CustomResourceID
csviri Oct 8, 2021
6b1d7fe
Merge branch 'informer-creventsource' into removing-events-from-context
csviri Oct 8, 2021
f97ced5
fix: imports
csviri Oct 8, 2021
109b7bc
fix: decorational changes
csviri Oct 11, 2021
874c25e
fix: event marker unit test
csviri Oct 11, 2021
ff2b32a
Default Event Handler Unit tests
csviri Oct 11, 2021
1b66865
Merge branch 'v2' into removing-events-from-context
csviri Oct 12, 2021
b6c87f0
fix: fixes after merge
csviri Oct 12, 2021
af50089
fix: changes from code review
csviri Oct 13, 2021
58c3f5c
fix: method naming
csviri Oct 13, 2021
6635b54
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Oct 13, 2021
f40ee4e
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Oct 13, 2021
73c32a8
fix: comment
csviri Oct 13, 2021
a8a1ea6
Merge branch 'v2' into removing-events-from-context
csviri Oct 13, 2021
e75cad4
fix: fixes from merge
csviri Oct 13, 2021
b9fc1fc
fix: remove not used method
csviri Oct 13, 2021
0d409ee
fix: formatting
csviri Oct 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import java.util.Optional;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventList;

public interface Context<T extends CustomResource> {

EventList getEvents();

Optional<RetryInfo> getRetryInfo();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,13 @@
import java.util.Optional;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventList;

public class DefaultContext<T extends CustomResource> implements Context<T> {

private final RetryInfo retryInfo;
private final EventList events;

public DefaultContext(EventList events, RetryInfo retryInfo) {
public DefaultContext(RetryInfo retryInfo) {
this.retryInfo = retryInfo;
this.events = events;
}

@Override
public EventList getEvents() {
return events;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;

/**
Expand All @@ -38,7 +39,6 @@ public class DefaultEventHandler<R extends CustomResource<?, ?>> implements Even
@Deprecated
private static EventMonitor monitor = EventMonitor.NOOP;

private final EventBuffer eventBuffer;
private final Set<CustomResourceID> underProcessing = new HashSet<>();
private final EventDispatcher<R> eventDispatcher;
private final Retry retry;
Expand All @@ -50,6 +50,7 @@ public class DefaultEventHandler<R extends CustomResource<?, ?>> implements Even
private volatile boolean running;
private final ResourceCache<R> resourceCache;
private DefaultEventSourceManager<R> eventSourceManager;
private final EventMarker eventMarker;

public DefaultEventHandler(ConfiguredController<R> controller, ResourceCache<R> resourceCache) {
this(
Expand All @@ -58,18 +59,20 @@ public DefaultEventHandler(ConfiguredController<R> controller, ResourceCache<R>
controller.getConfiguration().getName(),
new EventDispatcher<>(controller),
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()),
controller.getConfiguration().getConfigurationService().getMetrics().getEventMonitor());
controller.getConfiguration().getConfigurationService().getMetrics().getEventMonitor(),
new EventMarker());
}

DefaultEventHandler(EventDispatcher<R> eventDispatcher, ResourceCache<R> resourceCache,
String relatedControllerName,
Retry retry) {
this(resourceCache, null, relatedControllerName, eventDispatcher, retry, null);
Retry retry, EventMarker eventMarker) {
this(resourceCache, null, relatedControllerName, eventDispatcher, retry, null, eventMarker);
}

private DefaultEventHandler(ResourceCache<R> resourceCache, ExecutorService executor,
String relatedControllerName,
EventDispatcher<R> eventDispatcher, Retry retry, EventMonitor monitor) {
EventDispatcher<R> eventDispatcher, Retry retry, EventMonitor monitor,
EventMarker eventMarker) {
this.running = true;
this.executor =
executor == null
Expand All @@ -79,9 +82,9 @@ private DefaultEventHandler(ResourceCache<R> resourceCache, ExecutorService exec
this.controllerName = relatedControllerName;
this.eventDispatcher = eventDispatcher;
this.retry = retry;
eventBuffer = new EventBuffer();
this.resourceCache = resourceCache;
this.eventMonitor = monitor != null ? monitor : EventMonitor.NOOP;
this.eventMarker = eventMarker;
}

public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) {
Expand Down Expand Up @@ -113,45 +116,41 @@ private EventMonitor monitor() {

@Override
public void handleEvent(Event event) {
lock.lock();
try {
lock.lock();
log.debug("Received event: {}", event);
if (!this.running) {
log.debug("Skipping event: {} because the event handler is shutting down", event);
return;
}
final var monitor = monitor();
eventBuffer.addEvent(event.getRelatedCustomResourceID(), event);
monitor.processedEvent(event.getRelatedCustomResourceID(), event);
executeBufferedEvents(event.getRelatedCustomResourceID());
} finally {
lock.unlock();
}
}

@Override
public void close() {
try {
lock.lock();
this.running = false;
markEvent(event);
if (eventMarker.isEventPresent(event.getRelatedCustomResourceID())) {
submitReconciliationExecution(event.getRelatedCustomResourceID());
} else {
cleanupForDeletedEvent(event.getRelatedCustomResourceID());
}
} finally {
lock.unlock();
}
}

private boolean executeBufferedEvents(CustomResourceID customResourceUid) {
boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid);
private boolean submitReconciliationExecution(CustomResourceID customResourceUid) {
boolean newEventForResourceId = eventMarker.isEventPresent(customResourceUid);
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);
Optional<R> latestCustomResource =
resourceCache.getCustomResource(customResourceUid);

if (!controllerUnderExecution && newEventForResourceId && latestCustomResource.isPresent()) {
if (!controllerUnderExecution && newEventForResourceId
&& latestCustomResource.isPresent()) {
setUnderExecutionProcessing(customResourceUid);
ExecutionScope executionScope =
new ExecutionScope(
eventBuffer.getAndRemoveEventsForExecution(customResourceUid),
latestCustomResource.get(),
retryInfo(customResourceUid));
eventMarker.unMarkEventReceived(customResourceUid);
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ControllerExecution(executionScope));
return true;
Expand All @@ -164,20 +163,30 @@ private boolean executeBufferedEvents(CustomResourceID customResourceUid) {
controllerUnderExecution,
latestCustomResource.isPresent());
if (latestCustomResource.isEmpty()) {
log.warn("no custom resource found in cache for CustomResourceID: {}", customResourceUid);
log.warn("no custom resource found in cache for CustomResourceID: {}",
customResourceUid);
}
return false;
}
}

private void markEvent(Event event) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a method on EventMarker?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not want to put it there, since Event marker should not know that there is special CustomResourceEvent . Will renamet his method so it's more descriptive.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good point!

if (event instanceof CustomResourceEvent &&
((CustomResourceEvent) event).getAction() == ResourceAction.DELETED) {
eventMarker.markDeleteEventReceived(event);
} else if (!eventMarker.isDeleteEventPresent(event.getRelatedCustomResourceID())) {
eventMarker.markEventReceived(event);
}
}

private RetryInfo retryInfo(CustomResourceID customResourceUid) {
return retryState.get(customResourceUid);
}

void eventProcessingFinished(
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
lock.lock();
try {
lock.lock();
if (!running) {
return;
}
Expand All @@ -190,28 +199,34 @@ void eventProcessingFinished(

if (retry != null && postExecutionControl.exceptionDuringExecution()) {
handleRetryOnException(executionScope);
final var monitor = monitor();
executionScope.getEvents()
.forEach(e -> monitor.failedEvent(executionScope.getCustomResourceID(), e));
// todo revisit monitoring since events are not present anymore
// final var monitor = monitor(); executionScope.getEvents().forEach(e ->
// monitor.failedEvent(executionScope.getCustomResourceID(), e));
return;
}

if (retry != null) {
markSuccessfulExecutionRegardingRetry(executionScope);
handleSuccessfulExecutionRegardingRetry(executionScope);
}
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
cleanupAfterDeletedEvent(executionScope.getCustomResourceID());
if (readyForCleanup(executionScope.getCustomResourceID())) {
cleanupForDeletedEvent(executionScope.getCustomResourceID());
} else {
var executed = executeBufferedEvents(executionScope.getCustomResourceID());
var executed = submitReconciliationExecution(executionScope.getCustomResourceID());
if (!executed) {
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getCustomResource());
reScheduleExecutionIfInstructed(postExecutionControl,
executionScope.getCustomResource());
}
}
} finally {
lock.unlock();
}
}

private boolean readyForCleanup(CustomResourceID customResourceID) {
return eventMarker
.getEventingState(customResourceID) == EventMarker.EventingState.ONLY_DELETE_EVENT_PRESENT;
}

private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecutionControl,
R customResource) {
postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager
Expand All @@ -227,13 +242,17 @@ private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecuti
private void handleRetryOnException(ExecutionScope<R> executionScope) {
RetryExecution execution = getOrInitRetryExecution(executionScope);
var customResourceID = executionScope.getCustomResourceID();
boolean newEventsExists = eventBuffer
.newEventsExists(customResourceID);
eventBuffer.putBackEvents(customResourceID, executionScope.getEvents());
EventMarker.EventingState eventingState =
eventMarker.getEventingState(customResourceID);
boolean newEventsExists =
eventingState == EventMarker.EventingState.EVENT_PRESENT ||
eventingState == EventMarker.EventingState.DELETE_AND_NON_DELETE_EVENT_PRESENT;
eventMarker.markEventReceived(customResourceID);

if (newEventsExists) {
log.debug("New events exists for for resource id: {}", customResourceID);
executeBufferedEvents(customResourceID);
log.debug("New events exists for for resource id: {}",
customResourceID);
submitReconciliationExecution(customResourceID);
return;
}
Optional<Long> nextDelay = execution.nextDelay();
Expand All @@ -251,7 +270,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope) {
() -> log.error("Exhausted retries for {}", executionScope));
}

private void markSuccessfulExecutionRegardingRetry(ExecutionScope<R> executionScope) {
private void handleSuccessfulExecutionRegardingRetry(ExecutionScope<R> executionScope) {
log.debug(
"Marking successful execution for resource: {}",
getName(executionScope.getCustomResource()));
Expand All @@ -270,9 +289,9 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
return retryExecution;
}

private void cleanupAfterDeletedEvent(CustomResourceID customResourceUid) {
private void cleanupForDeletedEvent(CustomResourceID customResourceUid) {
eventSourceManager.cleanup(customResourceUid);
eventBuffer.cleanup(customResourceUid);
eventMarker.cleanup(customResourceUid);
}

private boolean isControllerUnderExecution(CustomResourceID customResourceUid) {
Expand All @@ -287,6 +306,15 @@ private void unsetUnderExecution(CustomResourceID customResourceUid) {
underProcessing.remove(customResourceUid);
}

@Override
public void close() {
lock.lock();
try {
this.running = false;
} finally {
lock.unlock();
}
}

private class ControllerExecution implements Runnable {
private final ExecutionScope<R> executionScope;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import io.javaoperatorsdk.operator.api.ResourceController;
import io.javaoperatorsdk.operator.api.UpdateControl;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventList;

import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
Expand Down Expand Up @@ -59,15 +57,7 @@ public PostExecutionControl<R> handleExecution(ExecutionScope<R> executionScope)

private PostExecutionControl<R> handleDispatch(ExecutionScope<R> executionScope) {
R resource = executionScope.getCustomResource();
log.debug("Handling events: {} for resource {}", executionScope.getEvents(), getName(resource));

if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
log.debug(
"Skipping dispatch processing because of a Delete event: {} with version: {}",
getName(resource),
getVersion(resource));
return PostExecutionControl.defaultDispatch();
}
log.debug("Handling dispatch for resource {}", getName(resource));

final var markedForDeletion = resource.isMarkedForDeletion();
if (markedForDeletion && shouldNotDispatchToDelete(resource)) {
Expand All @@ -79,8 +69,7 @@ private PostExecutionControl<R> handleDispatch(ExecutionScope<R> executionScope)
}

Context<R> context =
new DefaultContext<>(
new EventList(executionScope.getEvents()), executionScope.getRetryInfo());
new DefaultContext<>(executionScope.getRetryInfo());
if (markedForDeletion) {
return handleDelete(resource, context);
} else {
Expand Down
Loading