diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java
index 7a53db8bd9..c3743ef409 100644
--- a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java
+++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java
@@ -1,6 +1,7 @@
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;
import java.time.Duration;
+import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
@@ -69,7 +70,7 @@ protected void createConfigMap(P resource, Context
context) {
}
@Override
- public Map prepareEventSources(
+ public List prepareEventSources(
EventSourceContext context) {
var boundedItemStore =
@@ -82,7 +83,7 @@ public Map prepareEventSources(
Mappers.fromOwnerReference(this instanceof BoundedCacheClusterScopeTestReconciler))
.build(), context);
- return EventSourceUtils.nameEventSources(es);
+ return List.of(es);
}
private void ensureStatus(P resource) {
diff --git a/docs/documentation/features.md b/docs/documentation/features.md
index d4f52482b7..d2ead0710b 100644
--- a/docs/documentation/features.md
+++ b/docs/documentation/features.md
@@ -533,7 +533,7 @@ public class TomcatReconciler implements Reconciler {
.withSecondaryToPrimaryMapper(
Mappers.fromAnnotation(ANNOTATION_NAME, ANNOTATION_NAMESPACE)
.build(), context));
- return EventSourceUtils.nameEventSources(configMapEventSource);
+ return List.of(configMapEventSource);
}
}
diff --git a/docs/documentation/v5-0-migration.md b/docs/documentation/v5-0-migration.md
index 36748ea1dc..a76b191851 100644
--- a/docs/documentation/v5-0-migration.md
+++ b/docs/documentation/v5-0-migration.md
@@ -17,8 +17,22 @@ permalink: /docs/v5-0-migration
[`EventSourceUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtils.java#L11-L11)
now contains all the utility methods used for event sources naming that were previously defined in
the `EventSourceInitializer` interface.
-3. Updates through `UpdateControl` now use [Server Side Apply (SSA)](https://kubernetes.io/docs/reference/using-api/server-side-apply/) by default to add the finalizer and for all
- the patch operations in `UpdateControl`. The update operations were removed. If you do not wish to use SSA, you can deactivate the feature using `ConfigurationService.useSSAToPatchPrimaryResource` and related `ConfigurationServiceOverrider.withUseSSAToPatchPrimaryResource`.
+3. Event sources are now explicitly named (via the `name` method of the `EventSource` interface). Built-in event sources
+ implementation have been updated to allow you to specify a name when instantiating them. If you don't provide a name
+ for your `EventSource` implementation (for example, by using its default, no-arg constructor), one will be
+ automatically generated. This simplifies the API to define event source to
+ `List prepareEventSources(EventSourceContext context)`.
+ !!! IMPORTANT !!!
+ If you use dynamic registration of event sources, be sure to name your event sources explicitly as letting JOSDK name
+ them automatically might result in duplicated event sources being registered as JOSDK relies on the name to identify
+ event sources and concurrent, dynamic registration might lead to identical event sources having different generated
+ names, thus leading JOSDK to consider them as different and hence, register them multiple times.
+4. Updates through `UpdateControl` now
+ use [Server Side Apply (SSA)](https://kubernetes.io/docs/reference/using-api/server-side-apply/) by default to add
+ the finalizer and for all
+ the patch operations in `UpdateControl`. The update operations were removed. If you do not wish to use SSA, you can
+ deactivate the feature using `ConfigurationService.useSSAToPatchPrimaryResource` and
+ related `ConfigurationServiceOverrider.withUseSSAToPatchPrimaryResource`.
!!! IMPORTANT !!!
@@ -27,15 +41,16 @@ permalink: /docs/v5-0-migration
where it is demonstrated. Also, the related part of
a [workaround](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/StatusPatchSSAMigrationIT.java#L110-L116).
- Related automatic observed generation handling changes:
+ Related automatic observed generation handling changes:
Automated Observed Generation (see features in docs), is automatically handled for non-SSA, even if
- the status sub-resource is not instructed to be updated. This is not true for SSA, observed generation is updated
+ the status sub-resource is not instructed to be updated. This is not true for SSA, observed generation is updated
only when patch status is instructed by `UpdateControl`.
-4. `ManagedDependentResourceContext` has been renamed to `ManagedWorkflowAndDependentResourceContext` and is accessed
+5. `ManagedDependentResourceContext` has been renamed to `ManagedWorkflowAndDependentResourceContext` and is accessed
via the accordingly renamed `managedWorkflowAndDependentResourceContext` method.
-5. `ResourceDiscriminator` was removed. In most of the cases you can just delete the discriminator, everything should
- work without it by default. To optimize and handle special cases see the relevant section in [Dependent Resource documentation](/docs/dependent-resources#multiple-dependent-resources-of-same-type).
-6. `ConfigurationService.getTerminationTimeoutSeconds` and associated overriding mechanism have been removed,
+6. `ResourceDiscriminator` was removed. In most of the cases you can just delete the discriminator, everything should
+ work without it by default. To optimize and handle special cases see the relevant section
+ in [Dependent Resource documentation](/docs/dependent-resources#multiple-dependent-resources-of-same-type).
+7. `ConfigurationService.getTerminationTimeoutSeconds` and associated overriding mechanism have been removed,
use `Operator.stop(Duration)` instead.
-7. `Operator.installShutdownHook()` has been removed, use `Operator.installShutdownHook(Duration)` instead
+8. `Operator.installShutdownHook()` has been removed, use `Operator.installShutdownHook(Duration)` instead
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java
index f8ee9f4e84..61ec044694 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java
@@ -88,4 +88,5 @@ public Optional> getItemStore() {
public Optional getInformerListLimit() {
return Optional.ofNullable(informerListLimit);
}
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
index 27547703b7..0134ea0a04 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
@@ -26,7 +26,6 @@ default Stream getSecondaryResourcesAsStream(Class expectedType) {
return getSecondaryResources(expectedType).stream();
}
- @Deprecated(forRemoval = true)
Optional getSecondaryResource(Class expectedType, String eventSourceName);
ControllerConfiguration getControllerConfiguration();
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtils.java
index 8b89d95b71..4294fee405 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtils.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtils.java
@@ -6,66 +6,21 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
public class EventSourceUtils {
- /**
- * Utility method to easily create map with generated name for event sources. This is for the use
- * case when the event sources are not access explicitly by name in the reconciler.
- *
- * @param eventSources to name
- * @return even source with default names
- */
- public static Map nameEventSources(EventSource... eventSources) {
- Map eventSourceMap = new HashMap<>(eventSources.length);
- for (EventSource eventSource : eventSources) {
- eventSourceMap.put(generateNameFor(eventSource), eventSource);
- }
- return eventSourceMap;
+
+ @SuppressWarnings("unchecked")
+ public static List dependentEventSources(
+ EventSourceContext eventSourceContext, DependentResource... dependentResources) {
+ return Arrays.stream(dependentResources)
+ .flatMap(dr -> dr.eventSource(eventSourceContext).stream()).toList();
}
@SuppressWarnings("unchecked")
- public static Map eventSourcesFromWorkflow(
+ public static List eventSourcesFromWorkflow(
EventSourceContext context,
Workflow workflow) {
- Map result = new HashMap<>();
- for (var e : workflow.getDependentResourcesByNameWithoutActivationCondition().entrySet()) {
- var eventSource = e.getValue().eventSource(context);
- eventSource.ifPresent(es -> result.put(e.getKey(), (EventSource) es));
- }
- return result;
- }
-
- @SuppressWarnings("rawtypes")
- public static Map nameEventSourcesFromDependentResource(
- EventSourceContext context, DependentResource... dependentResources) {
- return nameEventSourcesFromDependentResource(context, Arrays.asList(dependentResources));
- }
-
- @SuppressWarnings("unchecked,rawtypes")
- public static Map nameEventSourcesFromDependentResource(
- EventSourceContext context, Collection dependentResources) {
-
- if (dependentResources != null) {
- Map eventSourceMap = new HashMap<>(dependentResources.size());
- for (DependentResource dependentResource : dependentResources) {
- Optional es = dependentResource.eventSource(context);
- es.ifPresent(e -> eventSourceMap.put(generateNameFor(e), e));
- }
- return eventSourceMap;
- } else {
- return Collections.emptyMap();
- }
- }
-
- /**
- * Used when event sources are not explicitly named when created/registered.
- *
- * @param eventSource EventSource
- * @return generated name
- */
- public static String generateNameFor(EventSource eventSource) {
- // we can have multiple event sources for the same class
- return eventSource.getClass().getName() + "#" + eventSource.hashCode();
+ return workflow.getDependentResourcesWithoutActivationCondition().stream()
+ .flatMap(dr -> dr.eventSource(context).stream()).toList();
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Reconciler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Reconciler.java
index 2047762c35..40a8a3b407 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Reconciler.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Reconciler.java
@@ -19,17 +19,16 @@ public interface Reconciler {
*/
UpdateControl
reconcile(P resource, Context
context) throws Exception;
-
/**
* Prepares a map of {@link EventSource} implementations keyed by the name with which they need to
* be registered by the SDK.
*
* @param context a {@link EventSourceContext} providing access to information useful to event
* sources
- * @return a map of event sources to register
+ * @return a list of event sources
*/
- default Map prepareEventSources(EventSourceContext context) {
- return Map.of();
+ default List prepareEventSources(EventSourceContext context) {
+ return Collections.emptyList();
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java
deleted file mode 100644
index c83af1270a..0000000000
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package io.javaoperatorsdk.operator.api.reconciler.dependent;
-
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-
-/**
- * @deprecated now event source related methods are directly on {@link DependentResource}
- * @param
primary resource
- */
-@Deprecated(forRemoval = true)
-public interface EventSourceProvider
{
- /**
- * @param context - event source context where the event source is initialized
- * @return the initiated event source.
- */
- EventSource initEventSource(EventSourceContext
context);
-}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java
index fe90b99ef3..f873a6d870 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java
@@ -4,6 +4,7 @@
import java.util.stream.Collectors;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
@SuppressWarnings("rawtypes")
public class ControllerHealthInfo {
@@ -15,21 +16,21 @@ public ControllerHealthInfo(EventSourceManager eventSourceManager) {
}
public Map eventSourceHealthIndicators() {
- return eventSourceManager.allEventSources().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ return eventSourceManager.allEventSources().stream()
+ .collect(Collectors.toMap(EventSource::name, e -> e));
}
public Map unhealthyEventSources() {
- return eventSourceManager.allEventSources().entrySet().stream()
- .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ return eventSourceManager.allEventSources().stream()
+ .filter(e -> e.getStatus() == Status.UNHEALTHY)
+ .collect(Collectors.toMap(EventSource::name, e -> e));
}
public Map informerEventSourceHealthIndicators() {
- return eventSourceManager.allEventSources().entrySet().stream()
- .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
+ return eventSourceManager.allEventSources().stream()
+ .filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
+ .collect(Collectors.toMap(EventSource::name,
+ e -> (InformerWrappingEventSourceHealthIndicator) e));
}
@@ -40,11 +41,11 @@ public Map informerEventSour
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}.
*/
public Map unhealthyInformerEventSourceHealthIndicators() {
- return eventSourceManager.allEventSources().entrySet().stream()
- .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
- .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
+ return eventSourceManager.allEventSources().stream()
+ .filter(e -> e.getStatus() == Status.UNHEALTHY)
+ .filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
+ .collect(Collectors.toMap(EventSource::name,
+ e -> (InformerWrappingEventSourceHealthIndicator) e));
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
index c0bed7acdb..f9bac91a63 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
@@ -36,7 +36,6 @@
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceNotFoundException;
-import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
import io.javaoperatorsdk.operator.health.ControllerHealthInfo;
@@ -235,22 +234,17 @@ public void initAndRegisterEventSources(EventSourceContext context) {
// register created event sources
final var dependentResourcesByName =
- managedWorkflow.getDependentResourcesByNameWithoutActivationCondition();
+ managedWorkflow.getDependentResourcesWithoutActivationCondition();
final var size = dependentResourcesByName.size();
if (size > 0) {
- dependentResourcesByName.forEach((key, dependentResource) -> {
- if (dependentResource instanceof EventSourceProvider provider) {
- final var source = provider.initEventSource(context);
- eventSourceManager.registerEventSource(key, source);
- } else {
- Optional eventSource = dependentResource.eventSource(context);
- eventSource.ifPresent(es -> eventSourceManager.registerEventSource(key, es));
- }
+ dependentResourcesByName.forEach(dependentResource -> {
+ Optional eventSource = dependentResource.eventSource(context);
+ eventSource.ifPresent(eventSourceManager::registerEventSource);
});
// resolve event sources referenced by name for dependents that reuse an existing event source
final Map> unresolvable = new HashMap<>(size);
- dependentResourcesByName.values().stream()
+ dependentResourcesByName.stream()
.filter(EventSourceReferencer.class::isInstance)
.map(EventSourceReferencer.class::cast)
.forEach(dr -> {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java
index 6355ec39c7..659b8b4720 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.dependent.external;
+import java.time.Duration;
+
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.processing.dependent.AbstractExternalDependentResource;
@@ -11,23 +13,23 @@ public abstract class AbstractPollingDependentResource
extends AbstractExternalDependentResource>
implements CacheKeyMapper {
- public static final int DEFAULT_POLLING_PERIOD = 5000;
- private long pollingPeriod;
+ public static final Duration DEFAULT_POLLING_PERIOD = Duration.ofMillis(5000);
+ private Duration pollingPeriod;
protected AbstractPollingDependentResource(Class resourceType) {
this(resourceType, DEFAULT_POLLING_PERIOD);
}
- public AbstractPollingDependentResource(Class resourceType, long pollingPeriod) {
+ public AbstractPollingDependentResource(Class resourceType, Duration pollingPeriod) {
super(resourceType);
this.pollingPeriod = pollingPeriod;
}
- public void setPollingPeriod(long pollingPeriod) {
+ public void setPollingPeriod(Duration pollingPeriod) {
this.pollingPeriod = pollingPeriod;
}
- public long getPollingPeriod() {
+ public Duration getPollingPeriod() {
return pollingPeriod;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java
index affc63cfd3..581698ffd6 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java
@@ -6,6 +6,7 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingConfigurationBuilder;
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
@Ignore
@@ -18,15 +19,18 @@ public PerResourcePollingDependentResource(Class resourceType) {
super(resourceType);
}
- public PerResourcePollingDependentResource(Class resourceType, long pollingPeriod) {
+ public PerResourcePollingDependentResource(Class resourceType, Duration pollingPeriod) {
super(resourceType, pollingPeriod);
}
@Override
protected ExternalResourceCachingEventSource createEventSource(
EventSourceContext context) {
- return new PerResourcePollingEventSource<>(this, context,
- Duration.ofMillis(getPollingPeriod()), resourceType(), this);
- }
+ return new PerResourcePollingEventSource<>(name(), resourceType(), context,
+ new PerResourcePollingConfigurationBuilder<>(
+ this, getPollingPeriod())
+ .withCacheKeyMapper(this)
+ .build());
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java
index 3df1390d69..519771d82d 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java
@@ -1,10 +1,13 @@
package io.javaoperatorsdk.operator.processing.dependent.external;
+import java.time.Duration;
+
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.polling.PollingConfiguration;
import io.javaoperatorsdk.operator.processing.event.source.polling.PollingEventSource;
@Ignore
@@ -19,7 +22,7 @@ public PollingDependentResource(Class resourceType, CacheKeyMapper cacheKe
this.cacheKeyMapper = cacheKeyMapper;
}
- public PollingDependentResource(Class resourceType, long pollingPeriod,
+ public PollingDependentResource(Class resourceType, Duration pollingPeriod,
CacheKeyMapper cacheKeyMapper) {
super(resourceType, pollingPeriod);
this.cacheKeyMapper = cacheKeyMapper;
@@ -28,7 +31,8 @@ public PollingDependentResource(Class resourceType, long pollingPeriod,
@Override
protected ExternalResourceCachingEventSource createEventSource(
EventSourceContext context) {
- return new PollingEventSource<>(this, getPollingPeriod(), resourceType(), cacheKeyMapper);
+ return new PollingEventSource<>(name(), resourceType(),
+ new PollingConfiguration<>(this, getPollingPeriod(), cacheKeyMapper));
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
index 49f8faee04..f2c5c0b1a1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
@@ -86,7 +86,7 @@ private void configureWith(String labelSelector, Set namespaces,
.withNamespaces(namespaces, inheritNamespacesOnChange)
.build();
- configureWith(new InformerEventSource<>(ic, context));
+ configureWith(new InformerEventSource<>(name(), ic, context));
}
// just to seamlessly handle GenericKubernetesDependentResource
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java
index 05b546553c..c22bf9d666 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java
@@ -139,8 +139,7 @@ protected void registerOrDeregisterEventSourceBasedOnActivation(
var eventSource =
dr.eventSource(eventSourceRetriever.eventSourceContextForDynamicRegistration());
var es = eventSource.orElseThrow();
- eventSourceRetriever.dynamicallyRegisterEventSource(dr.name(), es);
-
+ eventSourceRetriever.dynamicallyRegisterEventSource(es);
} else {
eventSourceRetriever.dynamicallyDeRegisterEventSource(dr.name());
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultWorkflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultWorkflow.java
index 1c241aebbc..5ed5b5cb9e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultWorkflow.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultWorkflow.java
@@ -148,14 +148,10 @@ public Map getDependentResourcesByName() {
return resources;
}
- public Map getDependentResourcesByNameWithoutActivationCondition() {
- final var resources = new HashMap(dependentResourceNodes.size());
- dependentResourceNodes
- .forEach((name, node) -> {
- if (node.getActivationCondition().isEmpty()) {
- resources.put(name, node.getDependentResource());
- }
- });
- return resources;
+ public List getDependentResourcesWithoutActivationCondition() {
+ return dependentResourceNodes.values().stream()
+ .filter(n -> n.getActivationCondition().isEmpty())
+ .map(DependentResourceNode::getDependentResource)
+ .toList();
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java
index 839844256e..7f30ba6c9e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java
@@ -1,6 +1,7 @@
package io.javaoperatorsdk.operator.processing.dependent.workflow;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,7 +45,7 @@ default Map getDependentResourcesByName() {
}
@SuppressWarnings("rawtypes")
- default Map getDependentResourcesByNameWithoutActivationCondition() {
- return Collections.emptyMap();
+ default List getDependentResourcesWithoutActivationCondition() {
+ return Collections.emptyList();
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index 2b1005af4a..512ab3bde1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -2,13 +2,11 @@
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -19,7 +17,6 @@
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceUtils;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
@@ -70,27 +67,23 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
*/
@Override
public synchronized void start() {
- startEventSource(eventSources.namedControllerResourceEventSource());
+ startEventSource(eventSources.controllerResourceEventSource());
executorServiceManager.boundedExecuteAndWaitForAllToComplete(
- eventSources.additionalNamedEventSources()
+ eventSources.additionalEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
this::startEventSource,
getThreadNamer("start"));
executorServiceManager.boundedExecuteAndWaitForAllToComplete(
- eventSources.additionalNamedEventSources()
+ eventSources.additionalEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
this::startEventSource,
getThreadNamer("start"));
}
- private static Function getThreadNamer(String stage) {
- return es -> {
- final var name = es.name();
- return es.priority() + " " + stage + " -> "
- + (es.isNameSet() ? name + " " + es.original().getClass() : es.original());
- };
+ private static Function getThreadNamer(String stage) {
+ return es -> es.priority() + " " + stage + " -> " + es.name();
}
private static Function getEventSourceThreadNamer(String stage) {
@@ -99,28 +92,26 @@ private static Function getEventSourceThreadNamer(S
@Override
public synchronized void stop() {
- stopEventSource(eventSources.namedControllerResourceEventSource());
+ stopEventSource(eventSources.controllerResourceEventSource());
executorServiceManager.boundedExecuteAndWaitForAllToComplete(
- eventSources.additionalNamedEventSources(),
+ eventSources.additionalEventSources(),
this::stopEventSource,
getThreadNamer("stop"));
}
@SuppressWarnings("rawtypes")
- private void logEventSourceEvent(NamedEventSource eventSource, String event) {
+ private void logEventSourceEvent(EventSource eventSource, String event) {
if (log.isDebugEnabled()) {
- if (eventSource.original() instanceof ResourceEventSource source) {
- log.debug("{} event source {} for {}", event,
- eventSource.isNameSet() ? eventSource.name() : eventSource,
+ if (eventSource instanceof ResourceEventSource source) {
+ log.debug("{} event source {} for {}", event, eventSource.name(),
source.resourceType());
} else {
- log.debug("{} event source {}", event,
- eventSource.isNameSet() ? eventSource.name() : eventSource);
+ log.debug("{} event source {}", event, eventSource.name());
}
}
}
- private Void startEventSource(NamedEventSource eventSource) {
+ private Void startEventSource(EventSource eventSource) {
try {
logEventSourceEvent(eventSource, "Starting");
eventSource.start();
@@ -133,7 +124,7 @@ private Void startEventSource(NamedEventSource eventSource) {
return null;
}
- private Void stopEventSource(NamedEventSource eventSource) {
+ private Void stopEventSource(EventSource eventSource) {
try {
logEventSourceEvent(eventSource, "Stopping");
eventSource.stop();
@@ -144,37 +135,28 @@ private Void stopEventSource(NamedEventSource eventSource) {
return null;
}
- public final void registerEventSource(EventSource eventSource) throws OperatorException {
- registerEventSource(null, eventSource);
- }
-
@SuppressWarnings("rawtypes")
- public final synchronized void registerEventSource(String name, EventSource eventSource)
+ public final synchronized void registerEventSource(EventSource eventSource)
throws OperatorException {
Objects.requireNonNull(eventSource, "EventSource must not be null");
try {
- if (name == null || name.isBlank()) {
- name = EventSourceUtils.generateNameFor(eventSource);
- }
if (eventSource instanceof ManagedInformerEventSource managedInformerEventSource) {
managedInformerEventSource.setConfigurationService(
controller.getConfiguration().getConfigurationService());
}
- final var named = new NamedEventSource(eventSource, name);
- eventSources.add(named);
- named.setEventHandler(controller.getEventProcessor());
+ eventSources.add(eventSource);
+ eventSource.setEventHandler(controller.getEventProcessor());
} catch (IllegalStateException | MissingCRDException e) {
throw e; // leave untouched
} catch (Exception e) {
- throw new OperatorException("Couldn't register event source: " + name + " for "
+ throw new OperatorException("Couldn't register event source: " + eventSource.name() + " for "
+ controller.getConfiguration().getName() + " controller", e);
}
}
@SuppressWarnings("unchecked")
public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldResource) {
- eventSources.additionalNamedEventSources()
- .map(NamedEventSource::original)
+ eventSources.additionalEventSources()
.forEach(source -> {
if (source instanceof ResourceEventAware) {
var lifecycleAwareES = ((ResourceEventAware) source);
@@ -209,18 +191,12 @@ public void changeNamespaces(Set namespaces) {
public Set getRegisteredEventSources() {
return eventSources.flatMappedSources()
- .map(NamedEventSource::original)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
- public Map allEventSources() {
- return eventSources.allNamedEventSources().collect(Collectors.toMap(NamedEventSource::name,
- NamedEventSource::original));
+ .collect(Collectors.toCollection(LinkedHashSet::new));
}
- @SuppressWarnings("unused")
- public Stream extends EventSourceMetadata> getNamedEventSourcesStream() {
- return eventSources.flatMappedSources();
+ public List allEventSources() {
+ return eventSources.allEventSources().toList();
}
public ControllerResourceEventSource getControllerResourceEventSource() {
@@ -232,14 +208,13 @@ public List> getResourceEventSourcesFor(Class d
}
@Override
- public EventSource dynamicallyRegisterEventSource(String name,
- EventSource eventSource) {
+ public EventSource dynamicallyRegisterEventSource(EventSource eventSource) {
synchronized (this) {
- var actual = eventSources.existing(name, eventSource);
+ var actual = eventSources.existingEventSourceOfSameNameAndType(eventSource);
if (actual != null) {
- eventSource = actual.eventSource();
+ eventSource = actual;
} else {
- registerEventSource(name, eventSource);
+ registerEventSource(eventSource);
}
}
// The start itself is blocking thus blocking only the threads which are attempt to start the
@@ -276,9 +251,9 @@ public List> getEventSourcesFor(Class dependent
@Override
public ResourceEventSource getResourceEventSourceFor(
- Class dependentType, String qualifier) {
+ Class dependentType, String name) {
Objects.requireNonNull(dependentType, "dependentType is Mandatory");
- return eventSources.get(dependentType, qualifier);
+ return eventSources.get(dependentType, name);
}
TimerEventSource retryEventSource() {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java
index 7ed2777998..c687c93acd 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java
@@ -19,10 +19,11 @@ default ResourceEventSource getResourceEventSourceFor(Class depende
List> getResourceEventSourcesFor(Class dependentType);
/**
+ *
* Registers (and starts) the specified {@link EventSource} dynamically during the reconciliation.
* If an EventSource is already registered with the specified name, the registration will be
- * ignored. It is the user's responsibility to handle the naming correctly, thus to not try to
- * register different event source with same name that is already registered.
+ * ignored. It is the user's responsibility to handle the naming correctly.
+ *
*
* This is only needed when your operator needs to adapt dynamically based on optional resources
* that may or may not be present on the target cluster. Even in this situation, it should be
@@ -31,20 +32,25 @@ default ResourceEventSource getResourceEventSourceFor(Class depende
* activation conditions of dependents, for example.
*
*
- * This method will block until the event source is synced, if needed (as is the case for
+ * This method will block until the event source is synced (if needed, as it is the case for
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}).
*
*
- * Should multiple reconciliations happen concurrently, only one EventSource with the specified
- * name will ever be registered.
+ * IMPORTANT: Should multiple reconciliations happen concurrently, only one
+ * EventSource with the specified name will ever be registered. It is therefore important to
+ * explicitly name the event sources that you want to reuse because the name will be used to
+ * identify which event sources need to be created or not. If you let JOSDK implicitly name event
+ * sources, then you might end up with duplicated event sources because concurrent registration of
+ * event sources will lead to 2 (or more) event sources for the same resource type to be attempted
+ * to be registered under different, automatically generated names. If you clearly identify your
+ * event sources with names, then, if the concurrent process determines that an event source with
+ * the specified name, it won't register it again.
*
*
- * @param name of the event source
* @param eventSource to register
* @return the actual event source registered. Might not be the same as the parameter.
*/
- EventSource dynamicallyRegisterEventSource(String name, EventSource eventSource);
-
+ EventSource dynamicallyRegisterEventSource(EventSource eventSource);
/**
* De-registers (and stops) the {@link EventSource} associated with the specified name. If no such
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java
index b53b92e122..c6b5a83377 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java
@@ -19,14 +19,10 @@
class EventSources {
- public static final String CONTROLLER_RESOURCE_EVENT_SOURCE_NAME =
- "ControllerResourceEventSource";
- public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME =
- "RetryAndRescheduleTimerEventSource";
-
- private final ConcurrentNavigableMap> sources =
+ private final ConcurrentNavigableMap> sources =
new ConcurrentSkipListMap<>();
- private final TimerEventSource retryAndRescheduleTimerEventSource = new TimerEventSource<>();
+ private final TimerEventSource retryAndRescheduleTimerEventSource =
+ new TimerEventSource<>("RetryAndRescheduleTimerEventSource");
private ControllerResourceEventSource controllerResourceEventSource;
@@ -42,32 +38,19 @@ TimerEventSource retryEventSource() {
return retryAndRescheduleTimerEventSource;
}
- public Stream additionalNamedEventSources() {
- return Stream.concat(Stream.of(
- new NamedEventSource(retryAndRescheduleTimerEventSource,
- RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
- flatMappedSources());
- }
-
- public Stream allNamedEventSources() {
- return Stream.concat(Stream.of(namedControllerResourceEventSource(),
- new NamedEventSource(retryAndRescheduleTimerEventSource,
- RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
+ public Stream allEventSources() {
+ return Stream.concat(
+ Stream.of(controllerResourceEventSource(), retryAndRescheduleTimerEventSource),
flatMappedSources());
}
Stream additionalEventSources() {
return Stream.concat(
Stream.of(retryEventSource()).filter(Objects::nonNull),
- flatMappedSources().map(NamedEventSource::original));
- }
-
- NamedEventSource namedControllerResourceEventSource() {
- return new NamedEventSource(controllerResourceEventSource,
- CONTROLLER_RESOURCE_EVENT_SOURCE_NAME);
+ flatMappedSources());
}
- Stream flatMappedSources() {
+ Stream flatMappedSources() {
return sources.values().stream().flatMap(c -> c.values().stream());
}
@@ -75,25 +58,23 @@ public void clear() {
sources.clear();
}
- public NamedEventSource existing(String name, EventSource source) {
- final var eventSources = sources.get(keyFor(source));
- if (eventSources == null || eventSources.isEmpty()) {
- return null;
- }
- return eventSources.get(name);
+ public EventSource existingEventSourceOfSameNameAndType(EventSource source) {
+ return existingEventSourceOfSameType(source).get(source.name());
+ }
+
+ public Map existingEventSourceOfSameType(EventSource source) {
+ return sources.getOrDefault(keyFor(source), Collections.emptyMap());
}
- public void add(NamedEventSource eventSource) {
+ public void add(EventSource eventSource) {
final var name = eventSource.name();
- final var original = eventSource.original();
- final var existing = existing(name, original);
- if (existing != null && !eventSource.equals(existing)) {
- throw new IllegalArgumentException("Event source " + existing.original()
- + " is already registered for the "
- + keyAsString(getResourceType(original), name)
- + " class/name combination");
+ final var existing = existingEventSourceOfSameType(eventSource);
+ if (existing.get(name) != null) {
+ throw new IllegalArgumentException("Event source " + existing
+ + " is already registered with name: " + name);
}
- sources.computeIfAbsent(keyFor(original), k -> new HashMap<>()).put(name, eventSource);
+
+ sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
}
@SuppressWarnings("rawtypes")
@@ -104,10 +85,6 @@ private Class> getResourceType(EventSource source) {
}
private String keyFor(EventSource source) {
- if (source instanceof NamedEventSource) {
- source = ((NamedEventSource) source).original();
- }
-
return keyFor(getResourceType(source));
}
@@ -128,7 +105,7 @@ public ResourceEventSource get(Class dependentType, String name) {
}
final var size = sourcesForType.size();
- NamedEventSource source;
+ EventSource source;
if (size == 1 && name == null) {
source = sourcesForType.values().stream().findFirst().orElseThrow();
} else {
@@ -146,16 +123,15 @@ public ResourceEventSource get(Class dependentType, String name) {
}
}
- EventSource original = source.original();
- if (!(original instanceof ResourceEventSource)) {
+ if (!(source instanceof ResourceEventSource)) {
throw new IllegalArgumentException(source + " associated with "
+ keyAsString(dependentType, name) + " is not a "
+ ResourceEventSource.class.getSimpleName());
}
- final var res = (ResourceEventSource) original;
+ final var res = (ResourceEventSource) source;
final var resourceClass = res.resourceType();
if (!resourceClass.isAssignableFrom(dependentType)) {
- throw new IllegalArgumentException(original + " associated with "
+ throw new IllegalArgumentException(source + " associated with "
+ keyAsString(dependentType, name)
+ " is handling " + resourceClass.getName() + " resources but asked for "
+ dependentType.getName());
@@ -178,7 +154,6 @@ public List> getEventSources(Class dependentTyp
}
return sourcesForType.values().stream()
- .map(NamedEventSource::original)
.filter(ResourceEventSource.class::isInstance)
.map(es -> (ResourceEventSource) es)
.collect(Collectors.toList());
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java
deleted file mode 100644
index 1ad6efa929..0000000000
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package io.javaoperatorsdk.operator.processing.event;
-
-import java.util.Objects;
-import java.util.Optional;
-
-import io.javaoperatorsdk.operator.OperatorException;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceUtils;
-import io.javaoperatorsdk.operator.processing.event.source.Configurable;
-import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
-import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
-
-class NamedEventSource implements EventSource, EventSourceMetadata {
-
- private final EventSource original;
- private final String name;
- private final boolean nameSet;
-
- NamedEventSource(EventSource original, String name) {
- this.original = original;
- this.name = name;
- nameSet = !name.equals(EventSourceUtils.generateNameFor(original));
- }
-
- @Override
- public void start() throws OperatorException {
- original.start();
- }
-
- @Override
- public void stop() throws OperatorException {
- original.stop();
- }
-
- @Override
- public void setEventHandler(EventHandler handler) {
- original.setEventHandler(handler);
- }
-
- public String name() {
- return name;
- }
-
- @Override
- public Class> type() {
- return original.getClass();
- }
-
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public Optional> resourceType() {
- if (original instanceof ResourceEventSource resourceEventSource) {
- return Optional.of(resourceEventSource.resourceType());
- }
- return Optional.empty();
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public Optional> configuration() {
- if (original instanceof Configurable configurable) {
- return Optional.ofNullable(configurable.configuration());
- }
- return Optional.empty();
- }
-
- public EventSource eventSource() {
- return original;
- }
-
- @Override
- public String toString() {
- return original + " named: " + name;
- }
-
- public EventSource original() {
- return original;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- NamedEventSource that = (NamedEventSource) o;
- return Objects.equals(original, that.original) && Objects.equals(name, that.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(original, name);
- }
-
- @Override
- public EventSourceStartPriority priority() {
- return original.priority();
- }
-
- public boolean isNameSet() {
- return nameSet;
- }
-}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java
index 752e3ed0c2..531917adac 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java
@@ -361,7 +361,6 @@ private P patchResource(P resource, P originalResource) {
getVersion(resource));
log.trace("Resource before update: {}", resource);
- // todo unit test
final var finalizerName = configuration().getFinalizerName();
if (useSSA && controller.useFinalizer()) {
// addFinalizer already prevents adding an already present finalizer so no need to check
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java
index 4eaee91add..b2398ab6ff 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.processing.event.source;
+
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
@@ -7,6 +8,20 @@ public abstract class AbstractEventSource implements EventSource {
private EventHandler handler;
private volatile boolean running = false;
private EventSourceStartPriority eventSourceStartPriority = EventSourceStartPriority.DEFAULT;
+ private final String name;
+
+ protected AbstractEventSource() {
+ this(null);
+ }
+
+ protected AbstractEventSource(String name) {
+ this.name = name == null ? EventSource.super.name() : name;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
protected EventHandler getEventHandler() {
return handler;
@@ -41,4 +56,5 @@ public AbstractEventSource setEventSourcePriority(
this.eventSourceStartPriority = eventSourceStartPriority;
return this;
}
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java
index 65294c1625..73420c0e5e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java
@@ -17,6 +17,11 @@ public abstract class AbstractResourceEventSource
protected GenericFilter super R> genericFilter;
protected AbstractResourceEventSource(Class resourceClass) {
+ this(resourceClass, resourceClass.getName());
+ }
+
+ protected AbstractResourceEventSource(Class resourceClass, String name) {
+ super(name);
this.resourceClass = resourceClass;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
index 05a034a7a7..e368ec3a94 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
@@ -28,4 +28,13 @@ default EventSourceStartPriority priority() {
default Status getStatus() {
return Status.UNKNOWN;
}
+
+ default String name() {
+ return generateName(this);
+ }
+
+ static String generateName(EventSource eventSource) {
+ return eventSource.getClass().getName() + "@" + Integer.toHexString(eventSource.hashCode());
+ }
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java
index 3ccc41a77d..b4bb44d957 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java
@@ -51,7 +51,12 @@ public abstract class ExternalResourceCachingEventSource resourceClass,
CacheKeyMapper cacheKeyMapper) {
- super(resourceClass);
+ this(null, resourceClass, cacheKeyMapper);
+ }
+
+ protected ExternalResourceCachingEventSource(String name, Class resourceClass,
+ CacheKeyMapper cacheKeyMapper) {
+ super(resourceClass, name);
this.cacheKeyMapper = cacheKeyMapper;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java
index 722e260878..52215cdcf7 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java
@@ -39,4 +39,5 @@ default Optional getSecondaryResource(P primary) {
void setOnDeleteFilter(OnDeleteFilter super R> onDeleteFilter);
void setGenericFilter(GenericFilter super R> genericFilter);
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java
index 614525e970..9a2f51cf37 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java
@@ -18,7 +18,7 @@
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import static io.javaoperatorsdk.operator.ReconcilerUtils.handleKubernetesClientException;
-import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.*;
+import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
public class ControllerResourceEventSource
@@ -26,12 +26,13 @@ public class ControllerResourceEventSource
implements ResourceEventHandler {
private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class);
+ private static final String NAME = "ControllerResourceEventSource";
private final Controller controller;
@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerResourceEventSource(Controller controller) {
- super(controller.getCRClient(), controller.getConfiguration(), false);
+ super(NAME, controller.getCRClient(), controller.getConfiguration(), false);
this.controller = controller;
final var config = controller.getConfiguration();
@@ -130,4 +131,9 @@ public void setOnDeleteFilter(OnDeleteFilter super T> onDeleteFilter) {
throw new IllegalStateException(
"onDeleteFilter is not supported for controller resource event source");
}
+
+ @Override
+ public String name() {
+ return NAME;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/SimpleInboundEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/SimpleInboundEventSource.java
index a441684f0f..d13c032f88 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/SimpleInboundEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/SimpleInboundEventSource.java
@@ -11,6 +11,12 @@ public class SimpleInboundEventSource extends AbstractEventSource {
private static final Logger log = LoggerFactory.getLogger(SimpleInboundEventSource.class);
+ public SimpleInboundEventSource() {}
+
+ public SimpleInboundEventSource(String name) {
+ super(name);
+ }
+
public void propagateEvent(ResourceID resourceID) {
if (isRunning()) {
getEventHandler().handleEvent(new Event(resourceID));
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
index 81d31f7407..8759410b81 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
@@ -76,20 +76,28 @@ public class InformerEventSource
private final PrimaryToSecondaryMapper primaryToSecondaryMapper;
private final String id = UUID.randomUUID().toString();
+ public InformerEventSource(String name,
+ InformerConfiguration configuration, EventSourceContext context) {
+ this(name, configuration, context.getClient(),
+ context.getControllerConfiguration().getConfigurationService()
+ .parseResourceVersionsForEventFilteringAndCaching());
+ }
+
public InformerEventSource(
InformerConfiguration configuration, EventSourceContext context) {
- this(configuration, context.getClient(),
+ this(null, configuration, context.getClient(),
context.getControllerConfiguration().getConfigurationService()
.parseResourceVersionsForEventFilteringAndCaching());
}
public InformerEventSource(InformerConfiguration configuration, KubernetesClient client) {
- this(configuration, client, false);
+ this(null, configuration, client, false);
}
- public InformerEventSource(InformerConfiguration configuration, KubernetesClient client,
+ public InformerEventSource(String name, InformerConfiguration configuration,
+ KubernetesClient client,
boolean parseResourceVersions) {
- super(
+ super(name,
configuration.getGroupVersionKind()
.map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind()))
.orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())),
@@ -316,5 +324,4 @@ public R addPreviousAnnotation(String resourceVersion, R target) {
id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse(""));
return target;
}
-
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
index f5efd0a68c..ec8e980871 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
@@ -24,10 +24,7 @@
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
-import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.Cache;
-import io.javaoperatorsdk.operator.processing.event.source.Configurable;
-import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
+import io.javaoperatorsdk.operator.processing.event.source.*;
@SuppressWarnings("rawtypes")
public abstract class ManagedInformerEventSource>
@@ -45,10 +42,10 @@ public abstract class ManagedInformerEventSource temporaryResourceCache;
protected MixedOperation client;
- protected ManagedInformerEventSource(
+ protected ManagedInformerEventSource(String name,
MixedOperation client, C configuration,
boolean parseResourceVersions) {
- super(configuration.getResourceClass());
+ super(configuration.getResourceClass(), name);
this.parseResourceVersions = parseResourceVersions;
this.client = client;
this.configuration = configuration;
@@ -197,4 +194,5 @@ public String toString() {
public void setConfigurationService(ConfigurationService configurationService) {
this.configurationService = configurationService;
}
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfiguration.java
new file mode 100644
index 0000000000..23fa9e023a
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfiguration.java
@@ -0,0 +1,30 @@
+package io.javaoperatorsdk.operator.processing.event.source.polling;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.function.Predicate;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
+
+public record PerResourcePollingConfiguration(ScheduledExecutorService executorService, CacheKeyMapper cacheKeyMapper,
+ PerResourcePollingEventSource.ResourceFetcher resourceFetcher,
+ Predicate registerPredicate, Duration defaultPollingPeriod) {
+
+ public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1;
+
+ public PerResourcePollingConfiguration(ScheduledExecutorService executorService,
+ CacheKeyMapper cacheKeyMapper,
+ PerResourcePollingEventSource.ResourceFetcher resourceFetcher,
+ Predicate registerPredicate,
+ Duration defaultPollingPeriod) {
+ this.executorService = executorService == null ? new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER)
+ : executorService;
+ this.cacheKeyMapper = cacheKeyMapper == null ? CacheKeyMapper.singleResourceCacheKeyMapper() : cacheKeyMapper;
+ this.resourceFetcher = Objects.requireNonNull(resourceFetcher);
+ this.registerPredicate = registerPredicate;
+ this.defaultPollingPeriod = defaultPollingPeriod;
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfigurationBuilder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfigurationBuilder.java
new file mode 100644
index 0000000000..ece10d347e
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingConfigurationBuilder.java
@@ -0,0 +1,49 @@
+package io.javaoperatorsdk.operator.processing.event.source.polling;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Predicate;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
+
+public final class PerResourcePollingConfigurationBuilder {
+
+ private final Duration defaultPollingPeriod;
+ private final PerResourcePollingEventSource.ResourceFetcher resourceFetcher;
+
+ private Predicate registerPredicate;
+ private ScheduledExecutorService executorService;
+ private CacheKeyMapper cacheKeyMapper;
+
+ public PerResourcePollingConfigurationBuilder(
+ PerResourcePollingEventSource.ResourceFetcher resourceFetcher,
+ Duration defaultPollingPeriod) {
+ this.resourceFetcher = resourceFetcher;
+ this.defaultPollingPeriod = defaultPollingPeriod;
+ }
+
+ @SuppressWarnings("unused")
+ public PerResourcePollingConfigurationBuilder withExecutorService(
+ ScheduledExecutorService executorService) {
+ this.executorService = executorService;
+ return this;
+ }
+
+ public PerResourcePollingConfigurationBuilder withRegisterPredicate(
+ Predicate registerPredicate) {
+ this.registerPredicate = registerPredicate;
+ return this;
+ }
+
+ public PerResourcePollingConfigurationBuilder withCacheKeyMapper(
+ CacheKeyMapper cacheKeyMapper) {
+ this.cacheKeyMapper = cacheKeyMapper;
+ return this;
+ }
+
+ public PerResourcePollingConfiguration build() {
+ return new PerResourcePollingConfiguration<>(executorService, cacheKeyMapper,
+ resourceFetcher, registerPredicate, defaultPollingPeriod);
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
index 6da1ec0e58..2288e7eb75 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
@@ -9,7 +9,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -21,10 +20,10 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
-import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
+
/**
*
* Polls the supplier for each controlled resource registered. Resource is registered when created
@@ -42,138 +41,29 @@ public class PerResourcePollingEventSource
private static final Logger log = LoggerFactory.getLogger(PerResourcePollingEventSource.class);
- public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1;
+ private final Map> scheduledFutures = new ConcurrentHashMap<>();
+ private final Cache primaryResourceCache;
+ private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet();
private final ScheduledExecutorService executorService;
- private final Map> scheduledFutures = new ConcurrentHashMap<>();
private final ResourceFetcher resourceFetcher;
- private final Cache resourceCache;
private final Predicate
registerPredicate;
- private final long period;
- private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet();
-
- public PerResourcePollingEventSource(ResourceFetcher resourceFetcher,
- EventSourceContext context, Duration defaultPollingPeriod,
- Class resourceClass) {
- this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
- null, resourceClass,
- CacheKeyMapper.singleResourceCacheKeyMapper());
- }
-
- /**
- * @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
- * {@link Duration} for period parameter as it provides a more intuitive API.
- *
- * @param resourceFetcher fetches resource related to a primary resource
- * @param resourceCache cache of the primary resource
- * @param period default polling period
- * @param resourceClass class of the target resource
- */
- @Deprecated(forRemoval = true)
- public PerResourcePollingEventSource(ResourceFetcher resourceFetcher,
- Cache resourceCache, long period, Class resourceClass) {
- this(resourceFetcher, resourceCache, period, null, resourceClass,
- CacheKeyMapper.singleResourceCacheKeyMapper());
- }
+ private final Duration period;
- public PerResourcePollingEventSource(ResourceFetcher resourceFetcher,
- EventSourceContext context,
- Duration defaultPollingPeriod,
- Class resourceClass,
- CacheKeyMapper cacheKeyMapper) {
- this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
- null, resourceClass, cacheKeyMapper);
- }
-
- /**
- * @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
- * {@link Duration} for period parameter as it provides a more intuitive API.
- *
- * @param resourceFetcher fetches resource related to a primary resource
- * @param resourceCache cache of the primary resource
- * @param period default polling period
- * @param resourceClass class of the target resource
- * @param cacheKeyMapper use to distinguish resource in case more resources are handled for a
- * single primary resource
- */
- @Deprecated(forRemoval = true)
- public PerResourcePollingEventSource(ResourceFetcher resourceFetcher,
- Cache resourceCache, long period, Class resourceClass,
- CacheKeyMapper cacheKeyMapper) {
- this(resourceFetcher, resourceCache, period, null, resourceClass, cacheKeyMapper);
+ public PerResourcePollingEventSource(Class resourceClass, EventSourceContext context,
+ PerResourcePollingConfiguration config) {
+ this(null, resourceClass, context, config);
}
- public PerResourcePollingEventSource(ResourceFetcher resourceFetcher,
+ public PerResourcePollingEventSource(String name, Class resourceClass,
EventSourceContext context,
- Duration defaultPollingPeriod,
- Predicate
registerPredicate,
- Class resourceClass,
- CacheKeyMapper cacheKeyMapper) {
- this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
- registerPredicate, resourceClass, cacheKeyMapper,
- new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER));
- }
-
- /**
- * @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
- * {@link Duration} for period parameter as it provides a more intuitive API.
- *
- * @param resourceFetcher fetches resource related to a primary resource
- * @param resourceCache cache of the primary resource
- * @param period default polling period
- * @param resourceClass class of the target resource
- * @param cacheKeyMapper use to distinguish resource in case more resources are handled for a
- * single primary resource
- * @param registerPredicate used to determine if the related resource for a custom resource should
- * be polled or not.
- */
- @Deprecated(forRemoval = true)
- public PerResourcePollingEventSource(ResourceFetcher resourceFetcher,
- Cache resourceCache, long period,
- Predicate
registerPredicate, Class resourceClass,
- CacheKeyMapper cacheKeyMapper) {
- this(resourceFetcher, resourceCache, period, registerPredicate, resourceClass, cacheKeyMapper,
- new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER));
- }
-
-
- public PerResourcePollingEventSource(
- ResourceFetcher resourceFetcher,
- EventSourceContext context, Duration defaultPollingPeriod,
- Predicate
registerPredicate, Class resourceClass,
- CacheKeyMapper cacheKeyMapper, ScheduledExecutorService executorService) {
- this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
- registerPredicate,
- resourceClass, cacheKeyMapper, executorService);
- }
-
- /**
- * @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
- * {@link Duration} for period parameter as it provides a more intuitive API.
- *
- * @param resourceFetcher fetches resource related to a primary resource
- * @param resourceCache cache of the primary resource
- * @param period default polling period
- * @param resourceClass class of the target resource
- * @param cacheKeyMapper use to distinguish resource in case more resources are handled for a
- * single primary resource
- * @param registerPredicate used to determine if the related resource for a custom resource should
- * be polled or not.
- * @param executorService custom executor service
- */
-
- @Deprecated(forRemoval = true)
- public PerResourcePollingEventSource(
- ResourceFetcher resourceFetcher,
- Cache resourceCache, long period,
- Predicate
registerPredicate, Class resourceClass,
- CacheKeyMapper cacheKeyMapper, ScheduledExecutorService executorService) {
- super(resourceClass, cacheKeyMapper);
- this.resourceFetcher = resourceFetcher;
- this.resourceCache = resourceCache;
- this.period = period;
- this.registerPredicate = registerPredicate;
- this.executorService = executorService;
+ PerResourcePollingConfiguration config) {
+ super(name, resourceClass, config.cacheKeyMapper());
+ this.primaryResourceCache = context.getPrimaryCache();
+ this.resourceFetcher = config.resourceFetcher();
+ this.registerPredicate = config.registerPredicate();
+ this.executorService = config.executorService();
+ this.period = config.defaultPollingPeriod();
}
private Set getAndCacheResource(P primary, boolean fromGetter) {
@@ -187,7 +77,7 @@ private Set getAndCacheResource(P primary, boolean fromGetter) {
private void scheduleNextExecution(P primary, Set actualResources) {
var primaryID = ResourceID.fromResource(primary);
var fetchDelay = resourceFetcher.fetchDelay(actualResources, primary);
- var fetchDuration = fetchDelay.orElse(Duration.ofMillis(period));
+ var fetchDuration = fetchDelay.orElse(period);
ScheduledFuture scheduledFuture = (ScheduledFuture) executorService
.schedule(new FetchingExecutor(primaryID), fetchDuration.toMillis(), TimeUnit.MILLISECONDS);
@@ -246,7 +136,7 @@ public void run() {
return;
}
// always use up-to-date resource from cache
- var primary = resourceCache.get(primaryID);
+ var primary = primaryResourceCache.get(primaryID);
if (primary.isEmpty()) {
log.warn("No resource in cache for resource ID: {}", primaryID);
// no new execution is scheduled in this case, an on delete event should be received shortly
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingConfiguration.java
new file mode 100644
index 0000000000..516d0546f7
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingConfiguration.java
@@ -0,0 +1,18 @@
+package io.javaoperatorsdk.operator.processing.event.source.polling;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
+
+public record PollingConfiguration(PollingEventSource.GenericResourceFetcher genericResourceFetcher,
+ Duration period, CacheKeyMapper cacheKeyMapper) {
+
+ public PollingConfiguration(PollingEventSource.GenericResourceFetcher genericResourceFetcher, Duration period,
+ CacheKeyMapper cacheKeyMapper) {
+ this.genericResourceFetcher = Objects.requireNonNull(genericResourceFetcher);
+ this.period = period;
+ this.cacheKeyMapper =
+ cacheKeyMapper == null ? CacheKeyMapper.singleResourceCacheKeyMapper() : cacheKeyMapper;
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingConfigurationBuilder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingConfigurationBuilder.java
new file mode 100644
index 0000000000..576f8fdb56
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingConfigurationBuilder.java
@@ -0,0 +1,26 @@
+package io.javaoperatorsdk.operator.processing.event.source.polling;
+
+import java.time.Duration;
+
+import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
+
+public final class PollingConfigurationBuilder {
+ private final Duration period;
+ private final PollingEventSource.GenericResourceFetcher genericResourceFetcher;
+ private CacheKeyMapper cacheKeyMapper;
+
+ public PollingConfigurationBuilder(PollingEventSource.GenericResourceFetcher fetcher,
+ Duration period) {
+ this.genericResourceFetcher = fetcher;
+ this.period = period;
+ }
+
+ public PollingConfigurationBuilder withCacheKeyMapper(CacheKeyMapper cacheKeyMapper) {
+ this.cacheKeyMapper = cacheKeyMapper;
+ return this;
+ }
+
+ public PollingConfiguration build() {
+ return new PollingConfiguration<>(genericResourceFetcher, period, cacheKeyMapper);
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
index 9ef889ecb6..060128576c 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
@@ -1,6 +1,10 @@
package io.javaoperatorsdk.operator.processing.event.source.polling;
-import java.util.*;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
@@ -10,7 +14,6 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
-import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
/**
@@ -46,26 +49,17 @@ public class PollingEventSource
private final Timer timer = new Timer();
private final GenericResourceFetcher genericResourceFetcher;
- private final long period;
+ private final Duration period;
private final AtomicBoolean healthy = new AtomicBoolean(true);
- public PollingEventSource(
- GenericResourceFetcher supplier,
- long period,
- Class resourceClass) {
- super(resourceClass, CacheKeyMapper.singleResourceCacheKeyMapper());
- this.genericResourceFetcher = supplier;
- this.period = period;
+ public PollingEventSource(Class resourceClass, PollingConfiguration config) {
+ this(null, resourceClass, config);
}
- public PollingEventSource(
- GenericResourceFetcher supplier,
- long period,
- Class resourceClass,
- CacheKeyMapper cacheKeyMapper) {
- super(resourceClass, cacheKeyMapper);
- this.genericResourceFetcher = supplier;
- this.period = period;
+ public PollingEventSource(String name, Class resourceClass, PollingConfiguration config) {
+ super(name, resourceClass, config.cacheKeyMapper());
+ this.genericResourceFetcher = config.genericResourceFetcher();
+ this.period = config.period();
}
@Override
@@ -89,8 +83,8 @@ public void run() {
}
}
},
- period,
- period);
+ period.toMillis(),
+ period.toMillis());
}
protected synchronized void getStateAndFillCache() {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java
index fe641e0b0b..f228c9935c 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java
@@ -22,6 +22,12 @@ public class TimerEventSource
private Timer timer;
private final Map onceTasks = new ConcurrentHashMap<>();
+ public TimerEventSource() {}
+
+ public TimerEventSource(String name) {
+ super(name);
+ }
+
@SuppressWarnings("unused")
public void scheduleOnce(R resource, long delay) {
scheduleOnce(ResourceID.fromResource(resource), delay);
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtilsTest.java
deleted file mode 100644
index b606f1fc1c..0000000000
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtilsTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package io.javaoperatorsdk.operator.api.reconciler;
-
-import java.util.HashMap;
-
-import org.junit.jupiter.api.Test;
-
-import io.javaoperatorsdk.operator.processing.event.source.polling.PollingEventSource;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-class EventSourceUtilsTest {
-
- @Test
- @SuppressWarnings({"rawtypes", "unchecked"})
- void defaultNameDifferentForOtherInstance() {
- var eventSource1 = new PollingEventSource(HashMap::new, 1000, String.class);
- var eventSource2 = new PollingEventSource(HashMap::new, 1000, String.class);
- var eventSourceName1 = EventSourceUtils.generateNameFor(eventSource1);
- var eventSourceName2 = EventSourceUtils.generateNameFor(eventSource2);
-
- assertThat(eventSourceName1).isNotEqualTo(eventSourceName2);
- }
-
-}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
index 44bdf8c85b..17a61b987b 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
@@ -352,7 +352,7 @@ void newResourceAfterMissedDeleteEvent() {
@Test
void rateLimitsReconciliationSubmission() {
- // the refresh period value does not matter here
+ // the refresh defaultPollingPeriod value does not matter here
var refreshPeriod = Duration.ofMillis(100);
var event = prepareCREvent();
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java
index 39a3192d95..66ccf47e81 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java
@@ -92,25 +92,25 @@ void retrievingEventSourceForClassShouldWork() {
}
@Test
- void shouldNotBePossibleToAddEventSourcesForSameTypeAndName() {
+ void notPossibleAddEventSourcesForSameName() {
EventSourceManager manager = initManager();
final var name = "name1";
ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class);
+ when(eventSource.name()).thenReturn(name);
when(eventSource.resourceType()).thenReturn(TestCustomResource.class);
- manager.registerEventSource(name, eventSource);
+ manager.registerEventSource(eventSource);
eventSource = mock(ManagedInformerEventSource.class);
when(eventSource.resourceType()).thenReturn(TestCustomResource.class);
+ when(eventSource.name()).thenReturn(name);
final var source = eventSource;
final var exception = assertThrows(OperatorException.class,
- () -> manager.registerEventSource(name, source));
+ () -> manager.registerEventSource(source));
final var cause = exception.getCause();
assertInstanceOf(IllegalArgumentException.class, cause);
- assertThat(cause.getMessage()).contains(
- "is already registered for the (io.javaoperatorsdk.operator.sample.simple.TestCustomResource, "
- + name + ") class/name combination");
+ assertThat(cause.getMessage()).contains("is already registered with name");
}
@Test
@@ -119,11 +119,14 @@ void retrievingAnEventSourceWhenMultipleAreRegisteredForATypeShouldRequireAQuali
ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class);
when(eventSource.resourceType()).thenReturn(TestCustomResource.class);
- manager.registerEventSource("name1", eventSource);
+ when(eventSource.name()).thenReturn("name1");
+ manager.registerEventSource(eventSource);
+
ManagedInformerEventSource eventSource2 = mock(ManagedInformerEventSource.class);
+ when(eventSource2.name()).thenReturn("name2");
when(eventSource2.resourceType()).thenReturn(TestCustomResource.class);
- manager.registerEventSource("name2", eventSource2);
+ manager.registerEventSource(eventSource2);
final var exception = assertThrows(IllegalArgumentException.class,
() -> manager.getResourceEventSourceFor(TestCustomResource.class));
@@ -157,10 +160,11 @@ void changesNamespacesOnControllerAndInformerEventSources() {
InformerConfiguration informerConfigurationMock = mock(InformerConfiguration.class);
when(informerConfigurationMock.followControllerNamespaceChanges()).thenReturn(true);
InformerEventSource informerEventSource = mock(InformerEventSource.class);
+ when(informerEventSource.name()).thenReturn("ies");
when(informerEventSource.resourceType()).thenReturn(TestCustomResource.class);
when(informerEventSource.configuration()).thenReturn(informerConfigurationMock);
when(informerEventSource.allowsNamespaceChanges()).thenCallRealMethod();
- manager.registerEventSource("ies", informerEventSource);
+ manager.registerEventSource(informerEventSource);
manager.changeNamespaces(Set.of(newNamespaces));
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java
index db2a69609d..f26f915d00 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java
@@ -13,7 +13,6 @@
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
-import static io.javaoperatorsdk.operator.processing.event.EventSources.RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -29,42 +28,46 @@ class EventSourcesTest {
@Test
void cannotAddTwoDifferentEventSourcesWithSameName() {
final var eventSources = new EventSources();
+ var es1 = mock(EventSource.class);
+ when(es1.name()).thenReturn(EVENT_SOURCE_NAME);
+ var es2 = mock(EventSource.class);
+ when(es2.name()).thenReturn(EVENT_SOURCE_NAME);
+
+ eventSources.add(es1);
assertThrows(IllegalArgumentException.class, () -> {
- eventSources.add(new NamedEventSource(mock(EventSource.class), "name"));
- eventSources.add(new NamedEventSource(mock(EventSource.class), "name"));
+ eventSources.add(es2);
});
}
@Test
- void cannotAddTwoEventSourcesWithSameNameUnlessTheyAreEqual() {
+ void cannotAddTwoEventSourcesWithSame() {
final var eventSources = new EventSources();
final var source = mock(EventSource.class);
- eventSources.add(new NamedEventSource(source, "name"));
- eventSources.add(new NamedEventSource(source, "name"));
- assertThat(eventSources.flatMappedSources())
- .containsExactly(new NamedEventSource(source, "name"));
- }
+ when(source.name()).thenReturn("name");
+ eventSources.add(source);
+ assertThrows(IllegalArgumentException.class, () -> eventSources.add(source));
+ }
@Test
void eventSourcesStreamShouldNotReturnControllerEventSource() {
final var eventSources = new EventSources();
final var source = mock(EventSource.class);
- final var namedEventSource = new NamedEventSource(source, EVENT_SOURCE_NAME);
- eventSources.add(namedEventSource);
+ when(source.name()).thenReturn(EVENT_SOURCE_NAME);
+
+ eventSources.add(source);
- assertThat(eventSources.additionalNamedEventSources()).containsExactly(
- new NamedEventSource(eventSources.retryEventSource(),
- RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME),
- namedEventSource);
+ assertThat(eventSources.additionalEventSources()).containsExactly(
+ eventSources.retryEventSource(),
+ source);
}
@Test
void additionalEventSourcesShouldNotContainNamedEventSources() {
final var eventSources = new EventSources();
final var source = mock(EventSource.class);
- final var namedEventSource = new NamedEventSource(source, EVENT_SOURCE_NAME);
- eventSources.add(namedEventSource);
+ when(source.name()).thenReturn(EVENT_SOURCE_NAME);
+ eventSources.add(source);
assertThat(eventSources.additionalEventSources()).containsExactly(
eventSources.retryEventSource(), source);
@@ -83,45 +86,37 @@ void checkControllerResourceEventSource() {
assertEquals(HasMetadata.class, controllerResourceEventSource.resourceType());
assertEquals(controllerResourceEventSource,
- eventSources.namedControllerResourceEventSource().eventSource());
+ eventSources.controllerResourceEventSource());
}
@Test
void flatMappedSourcesShouldReturnOnlyUserRegisteredEventSources() {
final var eventSources = new EventSources();
- final var mock1 = mock(ResourceEventSource.class);
- when(mock1.resourceType()).thenReturn(HasMetadata.class);
- final var mock2 = mock(ResourceEventSource.class);
- when(mock2.resourceType()).thenReturn(HasMetadata.class);
- final var mock3 = mock(ResourceEventSource.class);
- when(mock3.resourceType()).thenReturn(ConfigMap.class);
-
- final var named1 = new NamedEventSource(mock1, "name1");
- final var named2 = new NamedEventSource(mock2, "name2");
- final var named3 = new NamedEventSource(mock3, "name2");
- eventSources.add(named1);
- eventSources.add(named2);
- eventSources.add(named3);
-
- assertThat(eventSources.flatMappedSources()).contains(named1, named2, named3);
+ final var mock1 =
+ eventSourceMockWithName(ResourceEventSource.class, "name1", HasMetadata.class);
+ final var mock2 =
+ eventSourceMockWithName(ResourceEventSource.class, "name2", HasMetadata.class);
+ final var mock3 = eventSourceMockWithName(ResourceEventSource.class, "name3", ConfigMap.class);
+
+ eventSources.add(mock1);
+ eventSources.add(mock2);
+ eventSources.add(mock3);
+
+ assertThat(eventSources.flatMappedSources()).contains(mock1, mock2, mock3);
}
@Test
void clearShouldWork() {
final var eventSources = new EventSources();
- final var mock1 = mock(ResourceEventSource.class);
- when(mock1.resourceType()).thenReturn(HasMetadata.class);
- final var mock2 = mock(ResourceEventSource.class);
- when(mock2.resourceType()).thenReturn(HasMetadata.class);
- final var mock3 = mock(ResourceEventSource.class);
- when(mock3.resourceType()).thenReturn(ConfigMap.class);
-
- final var named1 = new NamedEventSource(mock1, "name1");
- final var named2 = new NamedEventSource(mock2, "name2");
- final var named3 = new NamedEventSource(mock3, "name2");
- eventSources.add(named1);
- eventSources.add(named2);
- eventSources.add(named3);
+ final var mock1 =
+ eventSourceMockWithName(ResourceEventSource.class, "name1", HasMetadata.class);
+ final var mock2 =
+ eventSourceMockWithName(ResourceEventSource.class, "name2", HasMetadata.class);
+ final var mock3 = eventSourceMockWithName(ResourceEventSource.class, "name3", ConfigMap.class);
+
+ eventSources.add(mock1);
+ eventSources.add(mock2);
+ eventSources.add(mock3);
eventSources.clear();
assertThat(eventSources.flatMappedSources()).isEmpty();
@@ -130,19 +125,15 @@ void clearShouldWork() {
@Test
void getShouldWork() {
final var eventSources = new EventSources();
- final var mock1 = mock(ResourceEventSource.class);
- when(mock1.resourceType()).thenReturn(HasMetadata.class);
- final var mock2 = mock(ResourceEventSource.class);
- when(mock2.resourceType()).thenReturn(HasMetadata.class);
- final var mock3 = mock(ResourceEventSource.class);
- when(mock3.resourceType()).thenReturn(ConfigMap.class);
-
- final var named1 = new NamedEventSource(mock1, "name1");
- final var named2 = new NamedEventSource(mock2, "name2");
- final var named3 = new NamedEventSource(mock3, "name2");
- eventSources.add(named1);
- eventSources.add(named2);
- eventSources.add(named3);
+ final var mock1 =
+ eventSourceMockWithName(ResourceEventSource.class, "name1", HasMetadata.class);
+ final var mock2 =
+ eventSourceMockWithName(ResourceEventSource.class, "name2", HasMetadata.class);
+ final var mock3 = eventSourceMockWithName(ResourceEventSource.class, "name2", ConfigMap.class);
+
+ eventSources.add(mock1);
+ eventSources.add(mock2);
+ eventSources.add(mock3);
assertEquals(mock1, eventSources.get(HasMetadata.class, "name1"));
assertEquals(mock2, eventSources.get(HasMetadata.class, "name2"));
@@ -160,19 +151,15 @@ void getShouldWork() {
@Test
void getEventSourcesShouldWork() {
final var eventSources = new EventSources();
- final var mock1 = mock(ResourceEventSource.class);
- when(mock1.resourceType()).thenReturn(HasMetadata.class);
- final var mock2 = mock(ResourceEventSource.class);
- when(mock2.resourceType()).thenReturn(HasMetadata.class);
- final var mock3 = mock(ResourceEventSource.class);
- when(mock3.resourceType()).thenReturn(ConfigMap.class);
-
- final var named1 = new NamedEventSource(mock1, "name1");
- final var named2 = new NamedEventSource(mock2, "name2");
- final var named3 = new NamedEventSource(mock3, "name2");
- eventSources.add(named1);
- eventSources.add(named2);
- eventSources.add(named3);
+ final var mock1 =
+ eventSourceMockWithName(ResourceEventSource.class, "name1", HasMetadata.class);
+ final var mock2 =
+ eventSourceMockWithName(ResourceEventSource.class, "name2", HasMetadata.class);
+ final var mock3 = eventSourceMockWithName(ResourceEventSource.class, "name3", ConfigMap.class);
+
+ eventSources.add(mock1);
+ eventSources.add(mock2);
+ eventSources.add(mock3);
var sources = eventSources.getEventSources(HasMetadata.class);
assertThat(sources.size()).isEqualTo(2);
@@ -184,4 +171,15 @@ void getEventSourcesShouldWork() {
assertThat(eventSources.getEventSources(Service.class)).isEmpty();
}
+
+
+
+ EventSource eventSourceMockWithName(Class clazz, String name,
+ Class resourceType) {
+ var mockedES = mock(clazz);
+ when(mockedES.name()).thenReturn(name);
+ when(mockedES.resourceType()).thenReturn(resourceType);
+ return mockedES;
+ }
+
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java
index 70249f6125..fd5b85aa16 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java
@@ -11,19 +11,17 @@
import io.javaoperatorsdk.operator.TestUtils;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
-import io.javaoperatorsdk.operator.processing.event.source.*;
+import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
+import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
+import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
+import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
class PerResourcePollingEventSourceTest extends
AbstractEventSourceTestBase, EventHandler> {
@@ -45,8 +43,10 @@ public void setup() {
.thenReturn(Set.of(SampleExternalResource.testResource1()));
when(context.getPrimaryCache()).thenReturn(resourceCache);
- setUpSource(new PerResourcePollingEventSource<>(supplier, context, Duration.ofMillis(PERIOD),
- SampleExternalResource.class, r -> r.getName() + "#" + r.getValue()));
+ setUpSource(new PerResourcePollingEventSource<>(SampleExternalResource.class, context,
+ new PerResourcePollingConfigurationBuilder<>(supplier, Duration.ofMillis(PERIOD))
+ .withCacheKeyMapper(r -> r.getName() + "#" + r.getValue())
+ .build()));
}
@Test
@@ -62,9 +62,14 @@ void pollsTheResourceAfterAwareOfIt() {
@Test
void registeringTaskOnAPredicate() {
- setUpSource(new PerResourcePollingEventSource<>(supplier, context, Duration.ofMillis(PERIOD),
- testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1,
- SampleExternalResource.class, CacheKeyMapper.singleResourceCacheKeyMapper()));
+ setUpSource(new PerResourcePollingEventSource<>(SampleExternalResource.class, context,
+ new PerResourcePollingConfigurationBuilder<>(
+ supplier, Duration.ofMillis(PERIOD))
+ .withRegisterPredicate(
+ testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1)
+ .withCacheKeyMapper(CacheKeyMapper.singleResourceCacheKeyMapper())
+ .build()));
+
source.onResourceCreated(testCustomResource);
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java
index bd0179d4cb..5dffa65ae7 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java
@@ -25,14 +25,15 @@ class PollingEventSourceTest
AbstractEventSourceTestBase, EventHandler> {
public static final int DEFAULT_WAIT_PERIOD = 100;
- public static final long POLL_PERIOD = 30L;
+ public static final Duration POLL_PERIOD = Duration.ofMillis(30L);
@SuppressWarnings("unchecked")
private final PollingEventSource.GenericResourceFetcher