Skip to content

Refined Interface of EventSource and EventSourceManager #597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
898e2c2
WIP
csviri Sep 30, 2021
be4e5cf
Addressing Custom Resource by Name and Namespace refactor + Informer …
csviri Oct 1, 2021
b2ab4b9
Build is fixed, (test failing)
csviri Oct 4, 2021
9145b52
Test fixes
csviri Oct 4, 2021
4ef27bf
Merge branch 'master' into informer-creventsource
csviri Oct 4, 2021
34cc2a1
minor update
csviri Oct 4, 2021
4b15974
EventSourceManager small fix
csviri Oct 4, 2021
77033e6
Merge branch 'access-event-source-manager' into informer-creventsource
csviri Oct 4, 2021
e1b5926
Unit tests fixed
csviri Oct 4, 2021
e7d1b99
fix: DefaultEventHandler init from EventSourceManager
csviri Oct 4, 2021
a1f92c6
fix: custom resource selector test improvement
csviri Oct 4, 2021
e928a3e
fix: wip test imrpovements
csviri Oct 5, 2021
f35a340
fix: test improvements
csviri Oct 5, 2021
5d5817e
fix: further improvements
csviri Oct 5, 2021
544ce35
Merge branch 'v2' into informer-creventsource
csviri Oct 5, 2021
75ad7d2
fix: build
csviri Oct 5, 2021
7012fb3
feature: add mvn jar to gitignore
csviri Oct 5, 2021
3746122
Exposing CustomResourceEventSource and informers
csviri Oct 6, 2021
f0f2e91
fix: cleanup
csviri Oct 6, 2021
6638a48
fix: remove caching optimization since it not possible anymore with i…
csviri Oct 6, 2021
1d786ef
fix: formatting
csviri Oct 6, 2021
a5343b7
refactor: make name/namespace final
metacosm Oct 6, 2021
9e0430a
feature: Simple label selector support
csviri Oct 7, 2021
7f48b9a
Merge branch 'informer-creventsource' of github.com:java-operator-sdk…
csviri Oct 7, 2021
0aa29a1
fix: formatting
csviri Oct 7, 2021
3ad2fc5
fix: code inspection reports
csviri Oct 7, 2021
86b8185
Merge branch 'v2' into informer-creventsource
csviri Oct 7, 2021
92d3ed3
fix: merge from v2
csviri Oct 7, 2021
d194d25
fix: removed most deprecated apis
csviri Oct 7, 2021
2e8f219
chore: renaming vars named k8sClient to kubernetsClient
iocanel Oct 6, 2021
9ebfff0
chore(deps): bump jandex-maven-plugin from 1.1.1 to 1.2.1 (#592)
dependabot[bot] Oct 8, 2021
f1f3867
chore(deps-dev): bump mockito-core from 3.12.4 to 4.0.0 (#591)
dependabot[bot] Oct 8, 2021
b99ff10
feature: Build PR on v2
csviri Oct 7, 2021
326f82f
chore(ci): use Java 17
metacosm Sep 23, 2021
98e3def
chore(ci): use only Temurin distribution
metacosm Sep 24, 2021
a03cfb9
fix: Updated informer mapping to CustomResourceID
csviri Oct 8, 2021
746f0c1
chore: add generics to PostExecutionControl to reduce IDEs noise (#594)
lburgazzoli Oct 11, 2021
bb6c00e
chore: polish the junit5 extension (#593)
lburgazzoli Oct 11, 2021
80e8238
fix: EventSourceManager API wip
csviri Oct 11, 2021
bbecf4c
fix: code review fixes
csviri Oct 12, 2021
26b1c3e
Merge branch 'master' into informer-creventsource
csviri Oct 12, 2021
f350f46
Merge branch 'informer-creventsource' into event-source-register-as-list
csviri Oct 12, 2021
d3e2469
fix: improvements of Event Source related APIs
csviri Oct 12, 2021
974c0eb
Merge branch 'v2' into event-source-register-as-list
csviri Oct 12, 2021
b8478e2
fix: remarks from code review
csviri Oct 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
}

private void cleanupAfterDeletedEvent(CustomResourceID customResourceUid) {
eventSourceManager.cleanup(customResourceUid);
eventSourceManager.cleanupForCustomResource(customResourceUid);
eventBuffer.cleanup(customResourceUid);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package io.javaoperatorsdk.operator.processing.event;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
Expand All @@ -22,119 +17,81 @@
public class DefaultEventSourceManager<R extends CustomResource<?, ?>>
implements EventSourceManager {

public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source";
public static final String CUSTOM_RESOURCE_EVENT_SOURCE_NAME = "custom-resource-event-source";
private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class);

private final ReentrantLock lock = new ReentrantLock();
private final Map<String, EventSource> eventSources = new ConcurrentHashMap<>();
private final Set<EventSource> eventSources = Collections.synchronizedSet(new HashSet<>());
private DefaultEventHandler<R> defaultEventHandler;
private TimerEventSource<R> retryTimerEventSource;
private CustomResourceEventSource customResourceEventSource;

DefaultEventSourceManager(DefaultEventHandler<R> defaultEventHandler) {
init(defaultEventHandler);
}

public DefaultEventSourceManager(ConfiguredController<R> controller) {
CustomResourceEventSource customResourceEventSource =
new CustomResourceEventSource<>(controller);
customResourceEventSource = new CustomResourceEventSource<>(controller);
init(new DefaultEventHandler<>(controller, customResourceEventSource));
registerEventSource(CUSTOM_RESOURCE_EVENT_SOURCE_NAME, customResourceEventSource);
registerEventSource(customResourceEventSource);
}

private void init(DefaultEventHandler<R> defaultEventHandler) {
this.defaultEventHandler = defaultEventHandler;
defaultEventHandler.setEventSourceManager(this);

this.retryTimerEventSource = new TimerEventSource<>();
registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource);
registerEventSource(retryTimerEventSource);
}

@Override
public void close() {
lock.lock();
try {
lock.lock();

try {
defaultEventHandler.close();
} catch (Exception e) {
log.warn("Error closing event handler", e);
}

for (var entry : eventSources.entrySet()) {
log.debug("Closing event sources.");
for (var eventSource : eventSources) {
try {
log.debug("Closing {} -> {}", entry.getKey(), entry.getValue());
entry.getValue().close();
eventSource.close();
} catch (Exception e) {
log.warn("Error closing {} -> {}", entry.getKey(), entry.getValue(), e);
log.warn("Error closing {} -> {}", eventSource);
}
}

eventSources.clear();
} finally {
lock.unlock();
}
}

@Override
public final void registerEventSource(String name, EventSource eventSource)
public final void registerEventSource(EventSource eventSource)
throws OperatorException {
Objects.requireNonNull(eventSource, "EventSource must not be null");

lock.lock();
try {
lock.lock();
if (eventSources.containsKey(name)) {
throw new IllegalStateException(
"Event source with name already registered. Event source name: " + name);
}
eventSources.put(name, eventSource);
eventSources.add(eventSource);
eventSource.setEventHandler(defaultEventHandler);
eventSource.start();
} catch (Throwable e) {
if (e instanceof IllegalStateException || e instanceof MissingCRDException) {
// leave untouched
throw e;
}
throw new OperatorException("Couldn't register event source named '" + name + "'", e);
} finally {
lock.unlock();
}
}

@Override
public Optional<EventSource> deRegisterEventSource(String name) {
try {
lock.lock();
EventSource currentEventSource = eventSources.remove(name);
if (currentEventSource != null) {
try {
currentEventSource.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

return Optional.ofNullable(currentEventSource);
throw new OperatorException(
"Couldn't register event source: " + eventSource.getClass().getName(), e);
} finally {
lock.unlock();
}
}

@Override
public Optional<EventSource> deRegisterCustomResourceFromEventSource(
String eventSourceName, CustomResourceID customResourceUid) {
public void cleanupForCustomResource(CustomResourceID customResourceUid) {
lock.lock();
try {
lock.lock();
EventSource eventSource = this.eventSources.get(eventSourceName);
if (eventSource == null) {
log.warn(
"Event producer: {} not found for custom resource: {}",
eventSourceName,
customResourceUid);
return Optional.empty();
} else {
eventSource.eventSourceDeRegisteredForResource(customResourceUid);
return Optional.of(eventSource);
for (EventSource eventSource : this.eventSources) {
eventSource.cleanupForCustomResource(customResourceUid);
}
} finally {
lock.unlock();
Expand All @@ -146,19 +103,13 @@ public TimerEventSource getRetryTimerEventSource() {
}

@Override
public Map<String, EventSource> getRegisteredEventSources() {
return Collections.unmodifiableMap(eventSources);
public Set<EventSource> getRegisteredEventSources() {
return Collections.unmodifiableSet(eventSources);
}

@Override
public CustomResourceEventSource getCustomResourceEventSource() {
return (CustomResourceEventSource) getRegisteredEventSources()
.get(CUSTOM_RESOURCE_EVENT_SOURCE_NAME);
return customResourceEventSource;
}

public void cleanup(CustomResourceID customResourceUid) {
getRegisteredEventSources()
.keySet()
.forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ default void close() throws IOException {}

void setEventHandler(EventHandler eventHandler);

default void eventSourceDeRegisteredForResource(CustomResourceID customResourceUid) {}
/**
* Automatically called when a custom resource is deleted from the cluster.
*
* @param customResourceUid - id of custom resource
*/
default void cleanupForCustomResource(CustomResourceID customResourceUid) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.OperatorException;
Expand All @@ -14,29 +13,15 @@ public interface EventSourceManager<T extends CustomResource<?, ?>> extends Clos
/**
* Add the {@link EventSource} identified by the given <code>name</code> to the event manager.
*
* @param name the name of the {@link EventSource} to add
* @param eventSource the {@link EventSource} to register
* @throws IllegalStateException if an {@link EventSource} with the same name is already
* registered.
* @throws OperatorException if an error occurred during the registration process
*/
void registerEventSource(String name, EventSource eventSource)
void registerEventSource(EventSource eventSource)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also make it possible to remove an EventSource?

Copy link
Collaborator Author

@csviri csviri Oct 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should not. So were thinking how to handle some dynamic registration and de-registration of event sources. But that would make sense only per custom resource. Since we want to cover events for all related custom resources which are in differenet lifecycle state. So deregistering an event source will mean that new custom resources would not receive events about related dependent resource anymore, even the custom resource was just created.

I see it rather this way: notifications for event sources about custom resource lifecycle, what we already partially cover:

  • now we have cleanupForCustomResource when a custom resource delete,
  • similarly we could have an event that customResourceCreated. So event source could execute some related logic. Like register a webhook or etc. This would make it very general. But did not want to do it until we have some actual request. We already have for the cleanup.

throws IllegalStateException, OperatorException;

/**
* Remove the {@link EventSource} identified by the given <code>name</code> from the event
* manager.
*
* @param name the name of the {@link EventSource} to remove
* @return an optional {@link EventSource} which would be empty if no {@link EventSource} have
* been registered with the given name.
*/
Optional<EventSource> deRegisterEventSource(String name);

Optional<EventSource> deRegisterCustomResourceFromEventSource(
String name, CustomResourceID customResourceUid);

Map<String, EventSource> getRegisteredEventSources();
Set<EventSource> getRegisteredEventSources();

CustomResourceEventSource<T> getCustomResourceEventSource();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void scheduleOnce(R customResource, long delay) {
}

@Override
public void eventSourceDeRegisteredForResource(CustomResourceID customResourceUid) {
public void cleanupForCustomResource(CustomResourceID customResourceUid) {
cancelSchedule(customResourceUid);
cancelOnceSchedule(customResourceUid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void cleanUpAfterDeleteEvent() {

waitMinimalTime();
verify(defaultEventSourceManagerMock, times(1))
.cleanup(CustomResourceID.fromResource(customResource));
.cleanupForCustomResource(CustomResourceID.fromResource(customResource));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.javaoperatorsdk.operator.processing.event;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

import org.junit.jupiter.api.Test;

Expand All @@ -10,16 +10,13 @@
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

class DefaultEventSourceManagerTest {

public static final String CUSTOM_EVENT_SOURCE_NAME = "CustomEventSource";

private DefaultEventHandler defaultEventHandlerMock = mock(DefaultEventHandler.class);
private DefaultEventSourceManager defaultEventSourceManager =
new DefaultEventSourceManager(defaultEventHandlerMock);
Expand All @@ -28,12 +25,12 @@ class DefaultEventSourceManagerTest {
public void registersEventSource() {
EventSource eventSource = mock(EventSource.class);

defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource);
defaultEventSourceManager.registerEventSource(eventSource);

Map<String, EventSource> registeredSources =
Set<EventSource> registeredSources =
defaultEventSourceManager.getRegisteredEventSources();
assertThat(registeredSources.entrySet()).hasSize(2);
assertThat(registeredSources.get(CUSTOM_EVENT_SOURCE_NAME)).isEqualTo(eventSource);
assertThat(registeredSources).hasSize(2);

verify(eventSource, times(1)).setEventHandler(eq(defaultEventHandlerMock));
verify(eventSource, times(1)).start();
}
Expand All @@ -42,37 +39,25 @@ public void registersEventSource() {
public void closeShouldCascadeToEventSources() throws IOException {
EventSource eventSource = mock(EventSource.class);
EventSource eventSource2 = mock(EventSource.class);
defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource);
defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME + "2", eventSource2);
defaultEventSourceManager.registerEventSource(eventSource);
defaultEventSourceManager.registerEventSource(eventSource2);

defaultEventSourceManager.close();

verify(eventSource, times(1)).close();
verify(eventSource2, times(1)).close();
}

@Test
public void throwExceptionIfRegisteringEventSourceWithSameName() {
EventSource eventSource = mock(EventSource.class);
EventSource eventSource2 = mock(EventSource.class);

defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource);
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(
() -> defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME,
eventSource2));
}

@Test
public void deRegistersEventSources() {
CustomResource customResource = TestUtils.testCustomResource();
EventSource eventSource = mock(EventSource.class);
defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource);
defaultEventSourceManager.registerEventSource(eventSource);

defaultEventSourceManager.deRegisterCustomResourceFromEventSource(
CUSTOM_EVENT_SOURCE_NAME, CustomResourceID.fromResource(customResource));
defaultEventSourceManager
.cleanupForCustomResource(CustomResourceID.fromResource(customResource));

verify(eventSource, times(1))
.eventSourceDeRegisteredForResource(eq(CustomResourceID.fromResource(customResource)));
.cleanupForCustomResource(eq(CustomResourceID.fromResource(customResource)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void deRegistersPeriodicalEventSources() {
untilAsserted(() -> assertThat(eventHandlerMock.events).hasSizeGreaterThan(1));

timerEventSource
.eventSourceDeRegisteredForResource(CustomResourceID.fromResource(customResource));
.cleanupForCustomResource(CustomResourceID.fromResource(customResource));

int size = eventHandlerMock.events.size();
untilAsserted(() -> assertThat(eventHandlerMock.events).hasSize(size));
Expand Down Expand Up @@ -103,7 +103,7 @@ public void deRegistersOnceEventSources() {

timerEventSource.scheduleOnce(customResource, PERIOD);
timerEventSource
.eventSourceDeRegisteredForResource(CustomResourceID.fromResource(customResource));
.cleanupForCustomResource(CustomResourceID.fromResource(customResource));

untilAsserted(() -> assertThat(eventHandlerMock.events).isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EventSourceTestCustomResourceController

@Override
public void init(EventSourceManager eventSourceManager) {
eventSourceManager.registerEventSource("Timer", timerEventSource);
eventSourceManager.registerEventSource(timerEventSource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class InformerEventSourceTestCustomResourceController implements
public void init(EventSourceManager eventSourceManager) {
eventSource = new InformerEventSource<>(kubernetesClient, ConfigMap.class,
Mappers.fromAnnotation(RELATED_RESOURCE_UID));
eventSourceManager.registerEventSource("configmap", eventSource);
eventSourceManager.registerEventSource(eventSource);
}

@Override
Expand Down