-
Notifications
You must be signed in to change notification settings - Fork 218
refactor: rename internal package to source, moving LifecycleAware #716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
907ec2a
cfe6bc0
35023c5
c96072d
7113e98
87a8ce2
134dc67
0a48972
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
package io.javaoperatorsdk.operator.processing; | ||
package io.javaoperatorsdk.operator.processing.event; | ||
|
||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
|
@@ -14,17 +14,16 @@ | |
|
||
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.javaoperatorsdk.operator.OperatorException; | ||
import io.javaoperatorsdk.operator.api.LifecycleAware; | ||
import io.javaoperatorsdk.operator.api.config.ConfigurationService; | ||
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; | ||
import io.javaoperatorsdk.operator.api.monitoring.Metrics; | ||
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; | ||
import io.javaoperatorsdk.operator.processing.event.Event; | ||
import io.javaoperatorsdk.operator.processing.event.EventHandler; | ||
import io.javaoperatorsdk.operator.processing.event.EventSourceManager; | ||
import io.javaoperatorsdk.operator.processing.event.ResourceID; | ||
import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; | ||
import io.javaoperatorsdk.operator.processing.event.internal.ResourceEvent; | ||
import io.javaoperatorsdk.operator.processing.LifecycleAware; | ||
import io.javaoperatorsdk.operator.processing.MDCUtils; | ||
import io.javaoperatorsdk.operator.processing.ResourceCache; | ||
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; | ||
import io.javaoperatorsdk.operator.processing.event.source.ResourceEvent; | ||
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource; | ||
import io.javaoperatorsdk.operator.processing.retry.GenericRetry; | ||
import io.javaoperatorsdk.operator.processing.retry.Retry; | ||
import io.javaoperatorsdk.operator.processing.retry.RetryExecution; | ||
|
@@ -36,8 +35,7 @@ | |
* Event handler that makes sure that events are processed in a "single threaded" way per resource | ||
* UID, while buffering events which are received during an execution. | ||
*/ | ||
public class EventProcessor<R extends HasMetadata> | ||
implements EventHandler, LifecycleAware { | ||
class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); | ||
|
||
|
@@ -51,32 +49,35 @@ public class EventProcessor<R extends HasMetadata> | |
private final Metrics metrics; | ||
private volatile boolean running; | ||
private final ResourceCache<R> resourceCache; | ||
private EventSourceManager<R> eventSourceManager; | ||
private final EventSourceManager<R> eventSourceManager; | ||
private final EventMarker eventMarker; | ||
private final TimerEventSource<R> retryAndRescheduleTimerEventSource; | ||
|
||
public EventProcessor(Controller<R> controller, ResourceCache<R> resourceCache) { | ||
EventProcessor(EventSourceManager<R> eventSourceManager) { | ||
this( | ||
resourceCache, | ||
eventSourceManager.getControllerResourceEventSource(), | ||
ExecutorServiceManager.instance().executorService(), | ||
controller.getConfiguration().getName(), | ||
new ReconciliationDispatcher<>(controller), | ||
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()), | ||
controller.getConfiguration().getConfigurationService().getMetrics(), | ||
new EventMarker()); | ||
eventSourceManager.getController().getConfiguration().getName(), | ||
new ReconciliationDispatcher<>(eventSourceManager.getController()), | ||
GenericRetry.fromConfiguration( | ||
eventSourceManager.getController().getConfiguration().getRetryConfiguration()), | ||
eventSourceManager.getController().getConfiguration().getConfigurationService() | ||
.getMetrics(), | ||
eventSourceManager); | ||
} | ||
|
||
EventProcessor(ReconciliationDispatcher<R> reconciliationDispatcher, | ||
ResourceCache<R> resourceCache, | ||
EventSourceManager<R> eventSourceManager, | ||
String relatedControllerName, | ||
Retry retry, EventMarker eventMarker) { | ||
this(resourceCache, null, relatedControllerName, reconciliationDispatcher, retry, null, | ||
eventMarker); | ||
Retry retry) { | ||
this(eventSourceManager.getControllerResourceEventSource(), null, relatedControllerName, | ||
reconciliationDispatcher, retry, null, eventSourceManager); | ||
} | ||
|
||
private EventProcessor(ResourceCache<R> resourceCache, ExecutorService executor, | ||
String relatedControllerName, | ||
ReconciliationDispatcher<R> reconciliationDispatcher, Retry retry, Metrics metrics, | ||
EventMarker eventMarker) { | ||
EventSourceManager<R> eventSourceManager) { | ||
this.running = true; | ||
this.executor = | ||
executor == null | ||
|
@@ -88,11 +89,14 @@ private EventProcessor(ResourceCache<R> resourceCache, ExecutorService executor, | |
this.retry = retry; | ||
this.resourceCache = resourceCache; | ||
this.metrics = metrics != null ? metrics : Metrics.NOOP; | ||
this.eventMarker = eventMarker; | ||
this.eventMarker = new EventMarker(); | ||
this.retryAndRescheduleTimerEventSource = new TimerEventSource<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know, but the thinking was that EventSourceManager encapsulates all the event sources, so manages also the lifecycle etc of the. This breaks that logic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand but that makes the code more complex. I tend to think that things that are only needed in one spot should be managed there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is kinda a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted. |
||
this.retryAndRescheduleTimerEventSource.setEventHandler(this); | ||
this.eventSourceManager = eventSourceManager; | ||
} | ||
|
||
public void setEventSourceManager(EventSourceManager<R> eventSourceManager) { | ||
this.eventSourceManager = eventSourceManager; | ||
EventMarker getEventMarker() { | ||
return eventMarker; | ||
} | ||
|
||
@Override | ||
|
@@ -243,9 +247,12 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> execution | |
|
||
private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecutionControl, | ||
R customResource) { | ||
postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager | ||
.getRetryAndRescheduleTimerEventSource() | ||
.scheduleOnce(customResource, delay)); | ||
postExecutionControl.getReScheduleDelay() | ||
.ifPresent(delay -> retryEventSource().scheduleOnce(customResource, delay)); | ||
} | ||
|
||
TimerEventSource<R> retryEventSource() { | ||
return retryAndRescheduleTimerEventSource; | ||
} | ||
|
||
/** | ||
|
@@ -275,9 +282,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope, | |
delay, | ||
customResourceID); | ||
metrics.failedReconciliation(customResourceID, exception); | ||
eventSourceManager | ||
.getRetryAndRescheduleTimerEventSource() | ||
.scheduleOnce(executionScope.getResource(), delay); | ||
retryEventSource().scheduleOnce(executionScope.getResource(), delay); | ||
}, | ||
() -> log.error("Exhausted retries for {}", executionScope)); | ||
} | ||
|
@@ -289,9 +294,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) { | |
if (isRetryConfigured()) { | ||
retryState.remove(executionScope.getCustomResourceID()); | ||
} | ||
eventSourceManager | ||
.getRetryAndRescheduleTimerEventSource() | ||
.cancelOnceSchedule(executionScope.getCustomResourceID()); | ||
retryEventSource().cancelOnceSchedule(executionScope.getCustomResourceID()); | ||
} | ||
|
||
private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope) { | ||
|
@@ -329,6 +332,7 @@ private boolean isRetryConfigured() { | |
public void stop() { | ||
lock.lock(); | ||
try { | ||
retryEventSource().stop(); | ||
this.running = false; | ||
} finally { | ||
lock.unlock(); | ||
|
@@ -339,6 +343,7 @@ public void stop() { | |
public void start() throws OperatorException { | ||
lock.lock(); | ||
try { | ||
retryEventSource().start(); | ||
this.running = true; | ||
} finally { | ||
lock.unlock(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,11 +12,11 @@ | |
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.javaoperatorsdk.operator.MissingCRDException; | ||
import io.javaoperatorsdk.operator.OperatorException; | ||
import io.javaoperatorsdk.operator.api.LifecycleAware; | ||
import io.javaoperatorsdk.operator.processing.Controller; | ||
import io.javaoperatorsdk.operator.processing.EventProcessor; | ||
import io.javaoperatorsdk.operator.processing.event.internal.ControllerResourceEventSource; | ||
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; | ||
import io.javaoperatorsdk.operator.processing.LifecycleAware; | ||
import io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource; | ||
import io.javaoperatorsdk.operator.processing.event.source.EventSource; | ||
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry; | ||
|
||
public class EventSourceManager<R extends HasMetadata> | ||
implements EventSourceRegistry<R>, LifecycleAware { | ||
|
@@ -25,38 +25,25 @@ public class EventSourceManager<R extends HasMetadata> | |
|
||
private final ReentrantLock lock = new ReentrantLock(); | ||
private final Set<EventSource> eventSources = Collections.synchronizedSet(new HashSet<>()); | ||
private EventProcessor<R> eventProcessor; | ||
private TimerEventSource<R> retryAndRescheduleTimerEventSource; | ||
private final EventProcessor<R> eventProcessor; | ||
private ControllerResourceEventSource<R> controllerResourceEventSource; | ||
private final Controller<R> controller; | ||
|
||
EventSourceManager() { | ||
init(); | ||
EventSourceManager(EventProcessor<R> eventProcessor) { | ||
this.eventProcessor = eventProcessor; | ||
controller = null; | ||
} | ||
|
||
public EventSourceManager(Controller<R> controller) { | ||
init(); | ||
this.controller = controller; | ||
controllerResourceEventSource = new ControllerResourceEventSource<>(controller); | ||
this.eventProcessor = new EventProcessor<>(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The event source manager should not instantiate the processor. I that the controller manages the lifecycle is much cleaner. This is what was moved out as part of the design changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't make things cleaner: look at how ugly the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. except for those :D |
||
registerEventSource(controllerResourceEventSource); | ||
} | ||
|
||
private void init() { | ||
this.retryAndRescheduleTimerEventSource = new TimerEventSource<>(); | ||
registerEventSource(retryAndRescheduleTimerEventSource); | ||
} | ||
|
||
public EventSourceManager<R> setEventProcessor(EventProcessor<R> eventProcessor) { | ||
this.eventProcessor = eventProcessor; | ||
if (controllerResourceEventSource != null) { | ||
controllerResourceEventSource.setEventHandler(eventProcessor); | ||
} | ||
if (retryAndRescheduleTimerEventSource != null) { | ||
retryAndRescheduleTimerEventSource.setEventHandler(eventProcessor); | ||
} | ||
return this; | ||
} | ||
|
||
@Override | ||
public void start() throws OperatorException { | ||
eventProcessor.start(); | ||
lock.lock(); | ||
try { | ||
log.debug("Starting event sources."); | ||
|
@@ -88,6 +75,7 @@ public void stop() { | |
} finally { | ||
lock.unlock(); | ||
} | ||
eventProcessor.stop(); | ||
} | ||
|
||
@Override | ||
|
@@ -121,10 +109,6 @@ public void cleanupForCustomResource(ResourceID customResourceUid) { | |
} | ||
} | ||
|
||
public TimerEventSource<R> getRetryAndRescheduleTimerEventSource() { | ||
return retryAndRescheduleTimerEventSource; | ||
} | ||
|
||
@Override | ||
public Set<EventSource> getRegisteredEventSources() { | ||
return Collections.unmodifiableSet(eventSources); | ||
|
@@ -135,4 +119,7 @@ public ControllerResourceEventSource<R> getControllerResourceEventSource() { | |
return controllerResourceEventSource; | ||
} | ||
|
||
Controller<R> getController() { | ||
return controller; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea as desribed here is that the controller maneges the lifecycle of every other component top level component:
#655
So it is the highest level agregate, therefore will be no multiple layers for managing LifecycleAware
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that it doesn't really do anything with it, it's just needed by the event manager so I don't think we should expose it since the lifecycle of the
EventProcessor
is tied to theEventManager
(you cannot have one without the other). MakingEventManager
handle theEventProcessor
life cycle also allows to avoid the rather ugly setters and prevent forgetting to start/stop one or the other.If the
Controller
was using theEventProcessor
directly, it would make sense to keep there. At the very least, if we wanted to keep the top-level aggregate concept, theEventManager
should retrieve theEventProcessor
from theController
(or vice-versa) and that's actually what I tried doing yesterday without complicating things even more…There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in general the Controller does not do anything just instantiates and starts/stops the major components. The circular dependency is ugly, but the EventManager is just a class to encapsulate a list of EventSources. For me that it creates a central component, probably the most complex part is quite a smell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except that
EventSourceManager
andEventProcessor
are not really separate components. Again, one cannot exist without the other. If we want to keep them separate, we should make it as cleanly as possible and I think the current solution is cleaner because it avoids the mess of having incomplete objects for no good reason. Maybe we should combineEventSourceManager
andEventProcessor
, renaming them toEventManager
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with that is even now the EventProcessor is too big, with bunch of logic, what little bugs me. Actually wanted to somehow break it down to more components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed quite complex, but some things are 😄
It doesn't make sense to split things that are not coherent if we end up with more complexity to manage the different pieces. I could see the
TimerEventSource
moving back toEventSourceManager
but I really think that we should tie the lifecycle ofEventSourceManager
andEventProcessor
by makingEventSourceManager
handle the processor's lifecycle because, again, the manager cannot work without the processor.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the event source manager will handle the lifecycle of event processor?
ok that is fine by me, we can evaluate further later.