diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Metrics.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Metrics.java index 6edc599e47..0dd021b51f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Metrics.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Metrics.java @@ -1,19 +1,29 @@ package io.javaoperatorsdk.operator; -import io.fabric8.kubernetes.client.CustomResource; -import io.javaoperatorsdk.operator.api.Context; -import io.javaoperatorsdk.operator.api.DeleteControl; -import io.javaoperatorsdk.operator.api.ResourceController; -import io.javaoperatorsdk.operator.api.UpdateControl; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.micrometer.core.instrument.*; -import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; -import io.micrometer.core.instrument.distribution.pause.PauseDetector; -import io.micrometer.core.instrument.noop.*; import java.util.concurrent.TimeUnit; import java.util.function.ToDoubleFunction; import java.util.function.ToLongFunction; +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.FunctionTimer; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Measurement; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; +import io.micrometer.core.instrument.distribution.pause.PauseDetector; +import io.micrometer.core.instrument.noop.NoopCounter; +import io.micrometer.core.instrument.noop.NoopDistributionSummary; +import io.micrometer.core.instrument.noop.NoopFunctionCounter; +import io.micrometer.core.instrument.noop.NoopFunctionTimer; +import io.micrometer.core.instrument.noop.NoopGauge; +import io.micrometer.core.instrument.noop.NoopMeter; +import io.micrometer.core.instrument.noop.NoopTimer; + public class Metrics { public static final Metrics NOOP = new Metrics(new NoopMeterRegistry(Clock.SYSTEM)); private final MeterRegistry registry; @@ -22,82 +32,42 @@ public Metrics(MeterRegistry registry) { this.registry = registry; } - public UpdateControl timeControllerCreateOrUpdate( - ResourceController controller, - ControllerConfiguration configuration, - R resource, - Context context) { - final var name = configuration.getName(); - final var timer = - Timer.builder("operator.sdk.controllers.execution.createorupdate") - .tags("controller", name) - .publishPercentiles(0.3, 0.5, 0.95) - .publishPercentileHistogram() - .register(registry); - try { - final var result = timer.record(() -> controller.createOrUpdateResource(resource, context)); - String successType = "cr"; - if (result.isUpdateStatusSubResource()) { - successType = "status"; - } - if (result.isUpdateCustomResourceAndStatusSubResource()) { - successType = "both"; - } - registry - .counter( - "operator.sdk.controllers.execution.success", "controller", name, "type", successType) - .increment(); - return result; - } catch (Exception e) { - registry - .counter( - "operator.sdk.controllers.execution.failure", - "controller", - name, - "exception", - e.getClass().getSimpleName()) - .increment(); - throw e; - } + public interface ControllerExecution { + String name(); + + String controllerName(); + + String successTypeName(T result); + + T execute(); } - public DeleteControl timeControllerDelete( - ResourceController controller, - ControllerConfiguration configuration, - CustomResource resource, - Context context) { - final var name = configuration.getName(); + public T timeControllerExecution(ControllerExecution execution) { + final var name = execution.controllerName(); + final var execName = "operator.sdk.controllers.execution." + execution.name(); final var timer = - Timer.builder("operator.sdk.controllers.execution.delete") + Timer.builder(execName) .tags("controller", name) .publishPercentiles(0.3, 0.5, 0.95) .publishPercentileHistogram() .register(registry); try { - final var result = timer.record(() -> controller.deleteResource(resource, context)); - String successType = "notDelete"; - if (result == DeleteControl.DEFAULT_DELETE) { - successType = "delete"; - } + final var result = timer.record(execution::execute); + final var successType = execution.successTypeName(result); registry - .counter( - "operator.sdk.controllers.execution.success", "controller", name, "type", successType) + .counter(execName + ".success", "controller", name, "type", successType) .increment(); return result; } catch (Exception e) { + final var exception = e.getClass().getSimpleName(); registry - .counter( - "operator.sdk.controllers.execution.failure", - "controller", - name, - "exception", - e.getClass().getSimpleName()) + .counter(execName + ".failure", "controller", name, "exception", exception) .increment(); throw e; } } - public void timeControllerRetry() { + public void incrementControllerRetriesNumber() { registry .counter( @@ -107,7 +77,7 @@ public void timeControllerRetry() { } - public void timeControllerEvents() { + public void incrementProcessedEventsNumber() { registry .counter( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 930db02e73..c3a2360be4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,41 +1,47 @@ package io.javaoperatorsdk.operator; -import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition; +import java.io.Closeable; +import java.io.IOException; +import java.net.ConnectException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.Version; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import io.javaoperatorsdk.operator.processing.ConfiguredController; +import io.javaoperatorsdk.operator.processing.DefaultEventHandler; +import io.javaoperatorsdk.operator.processing.DefaultEventHandler.EventMonitor; +import io.javaoperatorsdk.operator.processing.event.Event; @SuppressWarnings("rawtypes") public class Operator implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(Operator.class); private final KubernetesClient k8sClient; private final ConfigurationService configurationService; - private final List closeables; - private final Object lock; - private final List controllers; - private volatile boolean started; - private final Metrics metrics; + private final ControllerManager controllers = new ControllerManager(); - public Operator( - KubernetesClient k8sClient, ConfigurationService configurationService, Metrics metrics) { + public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) { this.k8sClient = k8sClient; this.configurationService = configurationService; - this.closeables = new ArrayList<>(); - this.lock = new Object(); - this.controllers = new ArrayList<>(); - this.started = false; - this.metrics = metrics; + DefaultEventHandler.setEventMonitor(new EventMonitor() { + @Override + public void processedEvent(String uid, Event event) { + configurationService.getMetrics().incrementProcessedEventsNumber(); + } + + @Override + public void failedEvent(String uid, Event event) { + configurationService.getMetrics().incrementControllerRetriesNumber(); + } + }); } /** Adds a shutdown hook that automatically calls {@link #close()} when the app shuts down. */ @@ -43,10 +49,6 @@ public void installShutdownHook() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); } - public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) { - this(k8sClient, configurationService, Metrics.NOOP); - } - public KubernetesClient getKubernetesClient() { return k8sClient; } @@ -60,65 +62,43 @@ public ConfigurationService getConfigurationService() { * where there is no obvious entrypoint to the application which can trigger the injection process * and start the cluster monitoring processes. */ - @SuppressWarnings("unchecked") public void start() { - synchronized (lock) { - if (started) { - return; - } - - final var version = configurationService.getVersion(); - log.info( - "Operator SDK {} (commit: {}) built on {} starting...", - version.getSdkVersion(), - version.getCommit(), - version.getBuiltTime()); - - if (controllers.isEmpty()) { - throw new OperatorException("No ResourceController exists. Exiting!"); + controllers.shouldStart(); + + final var version = configurationService.getVersion(); + log.info( + "Operator SDK {} (commit: {}) built on {} starting...", + version.getSdkVersion(), + version.getCommit(), + version.getBuiltTime()); + + log.info("Client version: {}", Version.clientVersion()); + try { + final var k8sVersion = k8sClient.getVersion(); + if (k8sVersion != null) { + log.info("Server version: {}.{}", k8sVersion.getMajor(), k8sVersion.getMinor()); } - - log.info("Client version: {}", Version.clientVersion()); - try { - final var k8sVersion = k8sClient.getVersion(); - if (k8sVersion != null) { - log.info("Server version: {}.{}", k8sVersion.getMajor(), k8sVersion.getMinor()); - } - } catch (Exception e) { - log.error("Error retrieving the server version. Exiting!", e); - throw new OperatorException("Error retrieving the server version", e); - } - - for (ControllerRef ref : controllers) { - startController(ref.controller, ref.configuration); + } catch (Exception e) { + final String error; + if (e.getCause() instanceof ConnectException) { + error = "Cannot connect to cluster"; + } else { + error = "Error retrieving the server version"; } - - started = true; + log.error(error, e); + throw new OperatorException(error, e); } + + controllers.start(); } /** Stop the operator. */ @Override public void close() { - synchronized (lock) { - if (!started) { - return; - } + log.info( + "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); - log.info( - "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); - - for (Closeable closeable : this.closeables) { - try { - log.debug("closing {}", closeable); - closeable.close(); - } catch (IOException e) { - log.warn("Error closing {}", closeable, e); - } - } - - started = false; - } + controllers.close(); } /** @@ -150,32 +130,6 @@ public void register(ResourceController controller public void register( ResourceController controller, ControllerConfiguration configuration) throws OperatorException { - synchronized (lock) { - if (!started) { - this.controllers.add(new ControllerRef(controller, configuration)); - } else { - this.controllers.add(new ControllerRef(controller, configuration)); - startController(controller, configuration); - } - } - } - - /** - * Registers the specified controller with this operator, overriding its default configuration by - * the specified one (usually created via - * {@link io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider#override(ControllerConfiguration)}, - * passing it the controller's original configuration. - * - * @param controller the controller to register - * @param configuration the configuration with which we want to register the controller, if {@code - * null}, the controller's original configuration is used - * @param the {@code CustomResource} type associated with the controller - * @throws OperatorException if a problem occurred during the registration process - */ - private void startController( - ResourceController controller, ControllerConfiguration configuration) - throws OperatorException { - final var existing = configurationService.getConfigurationFor(controller); if (existing == null) { log.warn( @@ -188,40 +142,9 @@ private void startController( if (configuration == null) { configuration = existing; } - - final Class resClass = configuration.getCustomResourceClass(); - final String controllerName = configuration.getName(); - final var crdName = configuration.getCRDName(); - final var specVersion = "v1"; - - // check that the custom resource is known by the cluster if configured that way - final CustomResourceDefinition crd; // todo: check proper CRD spec version based on config - if (configurationService.checkCRDAndValidateLocalModel()) { - crd = k8sClient.apiextensions().v1().customResourceDefinitions().withName(crdName).get(); - if (crd == null) { - throwMissingCRDException(crdName, specVersion, controllerName); - } - - // Apply validations that are not handled by fabric8 - CustomResourceUtils.assertCustomResource(resClass, crd); - } - - try { - DefaultEventSourceManager eventSourceManager = - new DefaultEventSourceManager( - controller, configuration, k8sClient.customResources(resClass)); - controller.init(eventSourceManager); - closeables.add(eventSourceManager); - } catch (MissingCRDException e) { - throwMissingCRDException(crdName, specVersion, controllerName); - } - - if (failOnMissingCurrentNS(configuration)) { - throw new OperatorException( - "Controller '" - + controllerName - + "' is configured to watch the current namespace but it couldn't be inferred from the current configuration."); - } + final var configuredController = + new ConfiguredController(controller, configuration, k8sClient); + controllers.add(configuredController); final var watchedNS = configuration.watchAllNamespaces() @@ -229,49 +152,54 @@ private void startController( : configuration.getEffectiveNamespaces(); log.info( "Registered Controller: '{}' for CRD: '{}' for namespace(s): {}", - controllerName, - resClass, + configuration.getName(), + configuration.getCustomResourceClass(), watchedNS); } } - private void throwMissingCRDException(String crdName, String specVersion, String controllerName) { - throw new MissingCRDException( - crdName, - specVersion, - "'" - + crdName - + "' " - + specVersion - + " CRD was not found on the cluster, controller '" - + controllerName - + "' cannot be registered"); - } + private static class ControllerManager implements Closeable { + private final List controllers = new LinkedList<>(); + private boolean started = false; - /** - * Determines whether we should fail because the current namespace is request as target namespace - * but is missing - * - * @return {@code true} if the current namespace is requested but is missing, {@code false} - * otherwise - */ - private static boolean failOnMissingCurrentNS( - ControllerConfiguration configuration) { - if (configuration.watchCurrentNamespace()) { - final var effectiveNamespaces = configuration.getEffectiveNamespaces(); - return effectiveNamespaces.size() == 1 - && effectiveNamespaces.stream().allMatch(Objects::isNull); + + public synchronized void shouldStart() { + if (started) { + return; + } + if (controllers.isEmpty()) { + throw new OperatorException("No ResourceController exists. Exiting!"); + } } - return false; - } - private static class ControllerRef { - public final ResourceController controller; - public final ControllerConfiguration configuration; + public synchronized void start() { + controllers.parallelStream().forEach(ConfiguredController::start); + started = true; + } + + @Override + public synchronized void close() { + if (!started) { + return; + } + + this.controllers.parallelStream().forEach(closeable -> { + try { + log.debug("closing {}", closeable); + closeable.close(); + } catch (IOException e) { + log.warn("Error closing {}", closeable, e); + } + }); + + started = false; + } - public ControllerRef(ResourceController controller, ControllerConfiguration configuration) { - this.controller = controller; - this.configuration = configuration; + public synchronized void add(ConfiguredController configuredController) { + this.controllers.add(configuredController); + if (started) { + configuredController.start(); + } } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractControllerConfiguration.java index d6e6371750..cc8a96fcf1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractControllerConfiguration.java @@ -1,10 +1,11 @@ package io.javaoperatorsdk.operator.api.config; -import io.fabric8.kubernetes.client.CustomResource; import java.util.Collections; import java.util.Set; -public abstract class AbstractControllerConfiguration +import io.fabric8.kubernetes.client.CustomResource; + +public class AbstractControllerConfiguration> implements ControllerConfiguration { private final String associatedControllerClassName; @@ -16,6 +17,7 @@ public abstract class AbstractControllerConfiguration private final boolean watchAllNamespaces; private final RetryConfiguration retryConfiguration; private final String labelSelector; + private Class customResourceClass; private ConfigurationService service; public AbstractControllerConfiguration( @@ -26,7 +28,9 @@ public AbstractControllerConfiguration( boolean generationAware, Set namespaces, RetryConfiguration retryConfiguration, - String labelSelector) { + String labelSelector, + Class customResourceClass, + ConfigurationService service) { this.associatedControllerClassName = associatedControllerClassName; this.name = name; this.crdName = crdName; @@ -40,11 +44,15 @@ public AbstractControllerConfiguration( ? ControllerConfiguration.super.getRetryConfiguration() : retryConfiguration; this.labelSelector = labelSelector; + this.customResourceClass = + customResourceClass == null ? ControllerConfiguration.super.getCustomResourceClass() + : customResourceClass; + setConfigurationService(service); } /** * @deprecated use - * {@link #AbstractControllerConfiguration(String, String, String, String, boolean, Set, RetryConfiguration, String)} + * {@link #AbstractControllerConfiguration(String, String, String, String, boolean, Set, RetryConfiguration, String, Class, ConfigurationService)} * instead */ @Deprecated @@ -57,7 +65,7 @@ public AbstractControllerConfiguration( Set namespaces, RetryConfiguration retryConfiguration) { this(associatedControllerClassName, name, crdName, finalizer, generationAware, namespaces, - retryConfiguration, null); + retryConfiguration, null, null, null); } @Override @@ -107,6 +115,10 @@ public ConfigurationService getConfigurationService() { @Override public void setConfigurationService(ConfigurationService service) { + if (this.service != null) { + throw new RuntimeException("A ConfigurationService is already associated with '" + name + + "' ControllerConfiguration. Cannot change it once set!"); + } this.service = service; } @@ -114,4 +126,9 @@ public void setConfigurationService(ConfigurationService service) { public String getLabelSelector() { return labelSelector; } + + @Override + public Class getCustomResourceClass() { + return customResourceClass; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index cc6d5f75ec..51ff31af00 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -1,15 +1,16 @@ package io.javaoperatorsdk.operator.api.config; -import io.fabric8.kubernetes.client.CustomResource; import java.util.HashSet; import java.util.List; import java.util.Set; -public class ControllerConfigurationOverrider { +import io.fabric8.kubernetes.client.CustomResource; + +public class ControllerConfigurationOverrider> { private String finalizer; private boolean generationAware; - private Set namespaces; + private final Set namespaces; private RetryConfiguration retry; private String labelSelector; private final ControllerConfiguration original; @@ -65,7 +66,7 @@ public ControllerConfigurationOverrider withLabelSelector(String labelSelecto } public ControllerConfiguration build() { - return new AbstractControllerConfiguration( + return new AbstractControllerConfiguration<>( original.getAssociatedControllerClassName(), original.getName(), original.getCRDName(), @@ -73,20 +74,12 @@ public ControllerConfiguration build() { generationAware, namespaces, retry, - labelSelector) { - @Override - public Class getCustomResourceClass() { - return original.getCustomResourceClass(); - } - - @Override - public ConfigurationService getConfigurationService() { - return original.getConfigurationService(); - } - }; + labelSelector, + original.getCustomResourceClass(), + original.getConfigurationService()); } - public static ControllerConfigurationOverrider override( + public static > ControllerConfigurationOverrider override( ControllerConfiguration original) { return new ControllerConfigurationOverrider<>(original); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java index d47837593b..83a537cedc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java @@ -1,11 +1,11 @@ package io.javaoperatorsdk.operator.api.config; import java.io.IOException; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.Date; import java.util.Properties; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,8 @@ public static Version loadFromProperties() { // RFC 822 date is the default format used by git-commit-id-plugin new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") .parse(properties.getProperty("git.build.time")); - } catch (ParseException e) { + } catch (Exception e) { + log.debug("Couldn't parse git.build.time property", e); builtTime = Date.from(Instant.EPOCH); } return new Version( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java new file mode 100644 index 0000000000..31b06dd7af --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ConfiguredController.java @@ -0,0 +1,228 @@ +package io.javaoperatorsdk.operator.processing; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; + +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.javaoperatorsdk.operator.CustomResourceUtils; +import io.javaoperatorsdk.operator.Metrics.ControllerExecution; +import io.javaoperatorsdk.operator.MissingCRDException; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.Context; +import io.javaoperatorsdk.operator.api.DeleteControl; +import io.javaoperatorsdk.operator.api.ResourceController; +import io.javaoperatorsdk.operator.api.UpdateControl; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; +import io.javaoperatorsdk.operator.processing.event.EventSourceManager; + +public class ConfiguredController> implements ResourceController, + Closeable { + private final ResourceController controller; + private final ControllerConfiguration configuration; + private final KubernetesClient k8sClient; + private EventSourceManager manager; + + public ConfiguredController(ResourceController controller, + ControllerConfiguration configuration, + KubernetesClient k8sClient) { + this.controller = controller; + this.configuration = configuration; + this.k8sClient = k8sClient; + } + + @Override + public DeleteControl deleteResource(R resource, Context context) { + return configuration.getConfigurationService().getMetrics().timeControllerExecution( + new ControllerExecution<>() { + @Override + public String name() { + return "delete"; + } + + @Override + public String controllerName() { + return configuration.getName(); + } + + @Override + public String successTypeName(DeleteControl result) { + switch (result) { + case DEFAULT_DELETE: + return "delete"; + case NO_FINALIZER_REMOVAL: + return "finalizerNotRemoved"; + default: + return "unknown"; + } + } + + @Override + public DeleteControl execute() { + return controller.deleteResource(resource, context); + } + }); + } + + @Override + public UpdateControl createOrUpdateResource(R resource, Context context) { + return configuration.getConfigurationService().getMetrics().timeControllerExecution( + new ControllerExecution<>() { + @Override + public String name() { + return "createOrUpdate"; + } + + @Override + public String controllerName() { + return configuration.getName(); + } + + @Override + public String successTypeName(UpdateControl result) { + String successType = "cr"; + if (result.isUpdateStatusSubResource()) { + successType = "status"; + } + if (result.isUpdateCustomResourceAndStatusSubResource()) { + successType = "both"; + } + return successType; + } + + @Override + public UpdateControl execute() { + return controller.createOrUpdateResource(resource, context); + } + }); + } + + @Override + public void init(EventSourceManager eventSourceManager) { + this.manager = eventSourceManager; + controller.init(eventSourceManager); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ConfiguredController that = (ConfiguredController) o; + return configuration.getName().equals(that.configuration.getName()); + } + + @Override + public int hashCode() { + return configuration.getName().hashCode(); + } + + @Override + public String toString() { + return "'" + configuration.getName() + "' Controller"; + } + + public ResourceController getController() { + return controller; + } + + public ControllerConfiguration getConfiguration() { + return configuration; + } + + public KubernetesClient getClient() { + return k8sClient; + } + + public MixedOperation, Resource> getCRClient() { + return k8sClient.resources(configuration.getCustomResourceClass()); + } + + /** + * Registers the specified controller with this operator, overriding its default configuration by + * the specified one (usually created via + * {@link io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider#override(ControllerConfiguration)}, + * passing it the controller's original configuration. + * + * @throws OperatorException if a problem occurred during the registration process + */ + public void start() throws OperatorException { + final Class resClass = configuration.getCustomResourceClass(); + final String controllerName = configuration.getName(); + final var crdName = configuration.getCRDName(); + final var specVersion = "v1"; + + // check that the custom resource is known by the cluster if configured that way + final CustomResourceDefinition crd; // todo: check proper CRD spec version based on config + if (configuration.getConfigurationService().checkCRDAndValidateLocalModel()) { + crd = k8sClient.apiextensions().v1().customResourceDefinitions().withName(crdName).get(); + if (crd == null) { + throwMissingCRDException(crdName, specVersion, controllerName); + } + + // Apply validations that are not handled by fabric8 + CustomResourceUtils.assertCustomResource(resClass, crd); + } + + try { + DefaultEventSourceManager eventSourceManager = new DefaultEventSourceManager<>(this); + controller.init(eventSourceManager); + } catch (MissingCRDException e) { + throwMissingCRDException(crdName, specVersion, controllerName); + } + + if (failOnMissingCurrentNS()) { + throw new OperatorException( + "Controller '" + + controllerName + + "' is configured to watch the current namespace but it couldn't be inferred from the current configuration."); + } + } + + private void throwMissingCRDException(String crdName, String specVersion, String controllerName) { + throw new MissingCRDException( + crdName, + specVersion, + "'" + + crdName + + "' " + + specVersion + + " CRD was not found on the cluster, controller '" + + controllerName + + "' cannot be registered"); + } + + /** + * Determines whether we should fail because the current namespace is request as target namespace + * but is missing + * + * @return {@code true} if the current namespace is requested but is missing, {@code false} + * otherwise + */ + private boolean failOnMissingCurrentNS() { + if (configuration.watchCurrentNamespace()) { + final var effectiveNamespaces = configuration.getEffectiveNamespaces(); + return effectiveNamespaces.size() == 1 + && effectiveNamespaces.stream().allMatch(Objects::isNull); + } + return false; + } + + + @Override + public void close() throws IOException { + if (manager != null) { + manager.close(); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index c59ca2dc9e..4c33ec23b1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -4,20 +4,6 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; -import io.fabric8.kubernetes.client.CustomResource; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.javaoperatorsdk.operator.Metrics; -import io.javaoperatorsdk.operator.api.ResourceController; -import io.javaoperatorsdk.operator.api.RetryInfo; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; -import io.javaoperatorsdk.operator.processing.event.Event; -import io.javaoperatorsdk.operator.processing.event.EventHandler; -import io.javaoperatorsdk.operator.processing.retry.GenericRetry; -import io.javaoperatorsdk.operator.processing.retry.Retry; -import io.javaoperatorsdk.operator.processing.retry.RetryExecution; -import io.micrometer.core.instrument.Clock; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -27,58 +13,70 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.api.RetryInfo; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; +import io.javaoperatorsdk.operator.processing.retry.Retry; +import io.javaoperatorsdk.operator.processing.retry.RetryExecution; + /** * Event handler that makes sure that events are processed in a "single threaded" way per resource * UID, while buffering events which are received during an execution. */ -public class DefaultEventHandler implements EventHandler { +public class DefaultEventHandler> implements EventHandler { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandler.class); + private static EventMonitor monitor = new EventMonitor() { + @Override + public void processedEvent(String uid, Event event) {} + + @Override + public void failedEvent(String uid, Event event) {} + }; private final EventBuffer eventBuffer; private final Set underProcessing = new HashSet<>(); private final ScheduledThreadPoolExecutor executor; - private final EventDispatcher eventDispatcher; + private final EventDispatcher eventDispatcher; private final Retry retry; private final Map retryState = new HashMap<>(); private final String controllerName; private final int terminationTimeout; private final ReentrantLock lock = new ReentrantLock(); - private DefaultEventSourceManager eventSourceManager; - private ControllerConfiguration configuration; + private DefaultEventSourceManager eventSourceManager; - public DefaultEventHandler( - ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { + public DefaultEventHandler(ConfiguredController controller) { this( - new EventDispatcher(controller, configuration, client), - configuration.getName(), - GenericRetry.fromConfiguration(configuration.getRetryConfiguration()), - configuration.getConfigurationService().concurrentReconciliationThreads(), - configuration.getConfigurationService().getTerminationTimeoutSeconds(), configuration); + new EventDispatcher<>(controller), + controller.getConfiguration().getName(), + GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()), + controller.getConfiguration().getConfigurationService().concurrentReconciliationThreads(), + controller.getConfiguration().getConfigurationService().getTerminationTimeoutSeconds()); } - DefaultEventHandler( - EventDispatcher eventDispatcher, - String relatedControllerName, - Retry retry, - int concurrentReconciliationThreads, ControllerConfiguration configuration) { + DefaultEventHandler(EventDispatcher dispatcher, String relatedControllerName, Retry retry) { this( - eventDispatcher, + dispatcher, relatedControllerName, retry, - concurrentReconciliationThreads, - ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS, configuration); + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER, + ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS); } private DefaultEventHandler( - EventDispatcher eventDispatcher, + EventDispatcher eventDispatcher, String relatedControllerName, Retry retry, int concurrentReconciliationThreads, - int terminationTimeout, ControllerConfiguration configuration) { + int terminationTimeout) { this.eventDispatcher = eventDispatcher; this.retry = retry; this.controllerName = relatedControllerName; @@ -88,7 +86,6 @@ private DefaultEventHandler( new ScheduledThreadPoolExecutor( concurrentReconciliationThreads, runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName)); - this.configuration = configuration; } @Override @@ -104,10 +101,20 @@ public void close() { } } - public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { + public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { this.eventSourceManager = eventSourceManager; } + public static void setEventMonitor(EventMonitor monitor) { + DefaultEventHandler.monitor = monitor; + } + + public interface EventMonitor { + void processedEvent(String uid, Event event); + + void failedEvent(String uid, Event event); + } + @Override public void handleEvent(Event event) { try { @@ -117,10 +124,7 @@ public void handleEvent(Event event) { final Predicate selector = event.getCustomResourcesSelector(); for (String uid : eventSourceManager.getLatestResourceUids(selector)) { eventBuffer.addEvent(uid, event); - configuration - .getConfigurationService() - .getMetrics() - .timeControllerEvents(); + monitor.processedEvent(uid, event); executeBufferedEvents(uid); } } finally { @@ -159,7 +163,7 @@ private RetryInfo retryInfo(String customResourceUid) { } void eventProcessingFinished( - ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + ExecutionScope executionScope, PostExecutionControl postExecutionControl) { try { lock.lock(); log.debug( @@ -170,10 +174,8 @@ void eventProcessingFinished( if (retry != null && postExecutionControl.exceptionDuringExecution()) { handleRetryOnException(executionScope); - configuration - .getConfigurationService() - .getMetrics() - .timeControllerRetry(); + executionScope.getEvents() + .forEach(e -> monitor.failedEvent(executionScope.getCustomResourceUid(), e)); return; } @@ -196,7 +198,7 @@ void eventProcessingFinished( * events (received meanwhile retry is in place or already in buffer) instantly or always wait * according to the retry timing if there was an exception. */ - private void handleRetryOnException(ExecutionScope executionScope) { + private void handleRetryOnException(ExecutionScope executionScope) { RetryExecution execution = getOrInitRetryExecution(executionScope); boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid()); eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents()); @@ -218,12 +220,10 @@ private void handleRetryOnException(ExecutionScope executionScope) { .getRetryTimerEventSource() .scheduleOnce(executionScope.getCustomResource(), delay); }, - () -> { - log.error("Exhausted retries for {}", executionScope); - }); + () -> log.error("Exhausted retries for {}", executionScope)); } - private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { + private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) { log.debug( "Marking successful execution for resource: {}", getName(executionScope.getCustomResource())); @@ -233,7 +233,7 @@ private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope .cancelOnceSchedule(executionScope.getCustomResourceUid()); } - private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { + private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid()); if (retryExecution == null) { retryExecution = retry.initExecution(); @@ -259,11 +259,10 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { * would override an additional change coming from a different client. */ private void cacheUpdatedResourceIfChanged( - ExecutionScope executionScope, PostExecutionControl postExecutionControl) { + ExecutionScope executionScope, PostExecutionControl postExecutionControl) { if (postExecutionControl.customResourceUpdatedDuringExecution()) { - CustomResource originalCustomResource = executionScope.getCustomResource(); - CustomResource customResourceAfterExecution = - postExecutionControl.getUpdatedCustomResource().get(); + R originalCustomResource = executionScope.getCustomResource(); + R customResourceAfterExecution = postExecutionControl.getUpdatedCustomResource().get(); String originalResourceVersion = getVersion(originalCustomResource); log.debug( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java index e69d69ffb7..8eef425dad 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java @@ -5,6 +5,9 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClientException; @@ -17,35 +20,28 @@ import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.EventList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Dispatches events to the Controller and handles Finalizers for a single type of Custom Resource. */ -class EventDispatcher { +class EventDispatcher> { private static final Logger log = LoggerFactory.getLogger(EventDispatcher.class); - private final ResourceController controller; - private final ControllerConfiguration configuration; + private final ConfiguredController controller; private final CustomResourceFacade customResourceFacade; - EventDispatcher( - ResourceController controller, - ControllerConfiguration configuration, + EventDispatcher(ConfiguredController controller, CustomResourceFacade customResourceFacade) { this.controller = controller; this.customResourceFacade = customResourceFacade; - this.configuration = configuration; } - public EventDispatcher( - ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { - this(controller, configuration, new CustomResourceFacade<>(client)); + public EventDispatcher(ConfiguredController controller) { + this(controller, new CustomResourceFacade<>(controller.getCRClient())); } - public PostExecutionControl handleExecution(ExecutionScope executionScope) { + public PostExecutionControl handleExecution(ExecutionScope executionScope) { try { return handleDispatch(executionScope); } catch (KubernetesClientException e) { @@ -61,7 +57,7 @@ public PostExecutionControl handleExecution(ExecutionScope executionScope) { } } - private PostExecutionControl handleDispatch(ExecutionScope executionScope) { + private PostExecutionControl handleDispatch(ExecutionScope executionScope) { R resource = executionScope.getCustomResource(); log.debug("Handling events: {} for resource {}", executionScope.getEvents(), getName(resource)); @@ -92,6 +88,10 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) { } } + private ControllerConfiguration configuration() { + return controller.getConfiguration(); + } + /** * Determines whether the given resource should be dispatched to the controller's * {@link ResourceController#deleteResource(CustomResource, Context)} method @@ -103,12 +103,12 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) { private boolean shouldNotDispatchToDelete(R resource) { // we don't dispatch to delete if the controller is configured to use a finalizer but that // finalizer is not present (which means it's already been removed) - return configuration.useFinalizer() && !resource.hasFinalizer(configuration.getFinalizer()); + return configuration().useFinalizer() && !resource.hasFinalizer(configuration().getFinalizer()); } - private PostExecutionControl handleCreateOrUpdate( + private PostExecutionControl handleCreateOrUpdate( ExecutionScope executionScope, R resource, Context context) { - if (configuration.useFinalizer() && !resource.hasFinalizer(configuration.getFinalizer())) { + if (configuration().useFinalizer() && !resource.hasFinalizer(configuration().getFinalizer())) { /* * We always add the finalizer if missing and the controller is configured to use a finalizer. * We execute the controller processing only for processing the event sent as a results of the @@ -124,11 +124,7 @@ private PostExecutionControl handleCreateOrUpdate( getVersion(resource), executionScope); - UpdateControl updateControl = - configuration - .getConfigurationService() - .getMetrics() - .timeControllerCreateOrUpdate(controller, configuration, resource, context); + UpdateControl updateControl = controller.createOrUpdateResource(resource, context); R updatedCustomResource = null; if (updateControl.isUpdateCustomResourceAndStatusSubResource()) { updatedCustomResource = updateCustomResource(updateControl.getCustomResource()); @@ -153,21 +149,17 @@ private PostExecutionControl handleCreateOrUpdate( } } - private PostExecutionControl handleDelete(R resource, Context context) { + private PostExecutionControl handleDelete(R resource, Context context) { log.debug( "Executing delete for resource: {} with version: {}", getName(resource), getVersion(resource)); - DeleteControl deleteControl = - configuration - .getConfigurationService() - .getMetrics() - .timeControllerDelete(controller, configuration, resource, context); - final var useFinalizer = configuration.useFinalizer(); + DeleteControl deleteControl = controller.deleteResource(resource, context); + final var useFinalizer = configuration().useFinalizer(); if (useFinalizer) { if (deleteControl == DeleteControl.DEFAULT_DELETE - && resource.hasFinalizer(configuration.getFinalizer())) { + && resource.hasFinalizer(configuration().getFinalizer())) { R customResource = removeFinalizer(resource); // todo: should we patch the resource to remove the finalizer instead of updating it return PostExecutionControl.customResourceUpdated(customResource); @@ -185,7 +177,7 @@ private PostExecutionControl handleDelete(R resource, Context context) { private void updateCustomResourceWithFinalizer(R resource) { log.debug( "Adding finalizer for resource: {} version: {}", getUID(resource), getVersion(resource)); - resource.addFinalizer(configuration.getFinalizer()); + resource.addFinalizer(configuration().getFinalizer()); replace(resource); } @@ -200,7 +192,7 @@ private R removeFinalizer(R resource) { "Removing finalizer on resource: {} with version: {}", getUID(resource), getVersion(resource)); - resource.removeFinalizer(configuration.getFinalizer()); + resource.removeFinalizer(configuration().getFinalizer()); return customResourceFacade.replaceWithLock(resource); } @@ -213,7 +205,7 @@ private R replace(R resource) { } // created to support unit testing - static class CustomResourceFacade { + static class CustomResourceFacade> { private final MixedOperation, Resource> resourceOperation; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java index 2b66ad68d4..f648b82955 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionConsumer.java @@ -1,20 +1,17 @@ package io.javaoperatorsdk.operator.processing; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import io.fabric8.kubernetes.client.CustomResource; -class ExecutionConsumer implements Runnable { +class ExecutionConsumer> implements Runnable { - private static final Logger log = LoggerFactory.getLogger(ExecutionConsumer.class); - - private final ExecutionScope executionScope; - private final EventDispatcher eventDispatcher; - private final DefaultEventHandler defaultEventHandler; + private final ExecutionScope executionScope; + private final EventDispatcher eventDispatcher; + private final DefaultEventHandler defaultEventHandler; ExecutionConsumer( - ExecutionScope executionScope, - EventDispatcher eventDispatcher, - DefaultEventHandler defaultEventHandler) { + ExecutionScope executionScope, + EventDispatcher eventDispatcher, + DefaultEventHandler defaultEventHandler) { this.executionScope = executionScope; this.eventDispatcher = eventDispatcher; this.defaultEventHandler = defaultEventHandler; @@ -22,7 +19,7 @@ class ExecutionConsumer implements Runnable { @Override public void run() { - PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope); + PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope); defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java index bb2ad813e3..53917cc094 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java @@ -1,11 +1,12 @@ package io.javaoperatorsdk.operator.processing; +import java.util.List; + import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.api.RetryInfo; import io.javaoperatorsdk.operator.processing.event.Event; -import java.util.List; -public class ExecutionScope { +public class ExecutionScope> { private final List events; // the latest custom resource from cache diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java index 1e0c82f1b2..b36bcedd90 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/PostExecutionControl.java @@ -1,19 +1,20 @@ package io.javaoperatorsdk.operator.processing; -import io.fabric8.kubernetes.client.CustomResource; import java.util.Optional; -public final class PostExecutionControl { +import io.fabric8.kubernetes.client.CustomResource; + +public final class PostExecutionControl> { private final boolean onlyFinalizerHandled; - private final CustomResource updatedCustomResource; + private final R updatedCustomResource; private final RuntimeException runtimeException; private PostExecutionControl( boolean onlyFinalizerHandled, - CustomResource updatedCustomResource, + R updatedCustomResource, RuntimeException runtimeException) { this.onlyFinalizerHandled = onlyFinalizerHandled; this.updatedCustomResource = updatedCustomResource; @@ -28,8 +29,9 @@ public static PostExecutionControl defaultDispatch() { return new PostExecutionControl(false, null, null); } - public static PostExecutionControl customResourceUpdated(CustomResource updatedCustomResource) { - return new PostExecutionControl(false, updatedCustomResource, null); + public static > PostExecutionControl customResourceUpdated( + R updatedCustomResource) { + return new PostExecutionControl<>(false, updatedCustomResource, null); } public static PostExecutionControl exceptionDuringExecution(RuntimeException exception) { @@ -40,7 +42,7 @@ public boolean isOnlyFinalizerHandled() { return onlyFinalizerHandled; } - public Optional getUpdatedCustomResource() { + public Optional getUpdatedCustomResource() { return Optional.ofNullable(updatedCustomResource); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 3fc060e716..47542eb96d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,16 +1,6 @@ package io.javaoperatorsdk.operator.processing.event; -import io.fabric8.kubernetes.client.CustomResource; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.javaoperatorsdk.operator.MissingCRDException; -import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.api.ResourceController; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.CustomResourceCache; -import io.javaoperatorsdk.operator.processing.DefaultEventHandler; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; -import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -20,10 +10,22 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultEventSourceManager implements EventSourceManager { +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.javaoperatorsdk.operator.MissingCRDException; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.processing.ConfiguredController; +import io.javaoperatorsdk.operator.processing.CustomResourceCache; +import io.javaoperatorsdk.operator.processing.DefaultEventHandler; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; +import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; + +public class DefaultEventSourceManager> + implements EventSourceManager { public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source"; private static final String CUSTOM_RESOURCE_EVENT_SOURCE_NAME = "custom-resource-event-source"; @@ -31,10 +33,10 @@ public class DefaultEventSourceManager implements EventSourceManager { private final ReentrantLock lock = new ReentrantLock(); private final Map eventSources = new ConcurrentHashMap<>(); - private final DefaultEventHandler defaultEventHandler; + private final DefaultEventHandler defaultEventHandler; private TimerEventSource retryTimerEventSource; - DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { + DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { this.defaultEventHandler = defaultEventHandler; defaultEventHandler.setEventSourceManager(this); if (supportRetry) { @@ -43,12 +45,14 @@ public class DefaultEventSourceManager implements EventSourceManager { } } - @SuppressWarnings({"rawtypes", "unchecked"}) - public > DefaultEventSourceManager( - ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { - this(new DefaultEventHandler(controller, configuration, client), true); - registerEventSource( - CUSTOM_RESOURCE_EVENT_SOURCE_NAME, new CustomResourceEventSource<>(client, configuration)); + public DefaultEventSourceManager(ConfiguredController controller) { + this(new DefaultEventHandler<>(controller), true); + registerEventSource(CUSTOM_RESOURCE_EVENT_SOURCE_NAME, + new CustomResourceEventSource<>(controller)); + } + + public DefaultEventHandler getEventHandler() { + return defaultEventHandler; } @Override @@ -107,7 +111,11 @@ public Optional deRegisterEventSource(String name) { lock.lock(); EventSource currentEventSource = eventSources.remove(name); if (currentEventSource != null) { - currentEventSource.close(); + try { + currentEventSource.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } return Optional.ofNullable(currentEventSource); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java index d09a1c6d31..e0a657e1d1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventHandler.java @@ -1,11 +1,12 @@ package io.javaoperatorsdk.operator.processing.event; import java.io.Closeable; +import java.io.IOException; public interface EventHandler extends Closeable { void handleEvent(Event event); @Override - default void close() {} + default void close() throws IOException {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java index dd695ea439..149e0acc1c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.processing.event; import java.io.Closeable; +import java.io.IOException; public interface EventSource extends Closeable { @@ -15,7 +16,7 @@ default void start() {} * {@link EventSourceManager}. */ @Override - default void close() {} + default void close() throws IOException {} void setEventHandler(EventHandler eventHandler); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 241bad90b2..b59491042b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -1,10 +1,12 @@ package io.javaoperatorsdk.operator.processing.event; -import io.javaoperatorsdk.operator.OperatorException; import java.io.Closeable; +import java.io.IOException; import java.util.Map; import java.util.Optional; +import io.javaoperatorsdk.operator.OperatorException; + public interface EventSourceManager extends Closeable { /** @@ -35,5 +37,5 @@ Optional deRegisterCustomResourceFromEventSource( Map getRegisteredEventSources(); @Override - default void close() {} + default void close() throws IOException {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 728b52823c..05960e71f0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -4,26 +4,26 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.utils.Utils; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.ConfiguredController; import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils; import io.javaoperatorsdk.operator.processing.event.AbstractEventSource; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** This is a special case since is not bound to a single custom resource */ public class CustomResourceEventSource> extends AbstractEventSource @@ -31,66 +31,24 @@ public class CustomResourceEventSource> extends A private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class); - private final MixedOperation, Resource> client; - private final Set targetNamespaces; - private final boolean generationAware; - private final String resourceFinalizer; - private final String labelSelector; + private final ConfiguredController controller; private final Map lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>(); private final List watches; - private final String resClass; private final CustomResourceCache customResourceCache; - public CustomResourceEventSource( - MixedOperation, Resource> client, - ControllerConfiguration configuration) { - this( - client, - configuration.getEffectiveNamespaces(), - configuration.isGenerationAware(), - configuration.getFinalizer(), - configuration.getLabelSelector(), - configuration.getCustomResourceClass(), - new CustomResourceCache(configuration.getConfigurationService().getObjectMapper())); - } - - CustomResourceEventSource( - MixedOperation, Resource> client, - Set targetNamespaces, - boolean generationAware, - String resourceFinalizer, - String labelSelector, - Class resClass) { - this( - client, - targetNamespaces, - generationAware, - resourceFinalizer, - labelSelector, - resClass, - new CustomResourceCache()); - } - - CustomResourceEventSource( - MixedOperation, Resource> client, - Set targetNamespaces, - boolean generationAware, - String resourceFinalizer, - String labelSelector, - Class resClass, - CustomResourceCache customResourceCache) { - this.client = client; - this.targetNamespaces = targetNamespaces; - this.generationAware = generationAware; - this.resourceFinalizer = resourceFinalizer; - this.labelSelector = labelSelector; - this.watches = new ArrayList<>(); - this.resClass = resClass.getName(); - this.customResourceCache = customResourceCache; + public CustomResourceEventSource(ConfiguredController controller) { + this.controller = controller; + this.watches = new LinkedList<>(); + this.customResourceCache = new CustomResourceCache( + controller.getConfiguration().getConfigurationService().getObjectMapper()); } @Override public void start() { + final var configuration = controller.getConfiguration(); + final var targetNamespaces = configuration.getEffectiveNamespaces(); + final var client = controller.getCRClient(); + final var labelSelector = configuration.getLabelSelector(); var options = new ListOptions(); if (Utils.isNotNullOrEmpty(labelSelector)) { options.setLabelSelector(labelSelector); @@ -99,26 +57,26 @@ public void start() { if (ControllerConfiguration.allNamespacesWatched(targetNamespaces)) { var w = client.inAnyNamespace().watch(options, this); watches.add(w); - log.debug("Registered controller {} -> {} for any namespace", resClass, w); + log.debug("Registered {} -> {} for any namespace", controller, w); } else { targetNamespaces.forEach( ns -> { var w = client.inNamespace(ns).watch(options, this); watches.add(w); - log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, ns); + log.debug("Registered {} -> {} for namespace: {}", controller, w, ns); }); } } @Override - public void close() { + public void close() throws IOException { eventHandler.close(); for (Watch watch : this.watches) { try { - log.info("Closing watch {} -> {}", resClass, watch); + log.info("Closing watch {} -> {}", controller, watch); watch.close(); } catch (Exception e) { - log.warn("Error closing watcher {} -> {}", resClass, watch, e); + log.warn("Error closing watcher {} -> {}", controller, watch, e); } } } @@ -152,14 +110,15 @@ public void eventReceived(Watcher.Action action, T customResource) { } private void markLastGenerationProcessed(T resource) { - if (generationAware && resource.hasFinalizer(resourceFinalizer)) { + if (controller.getConfiguration().isGenerationAware() + && resource.hasFinalizer(controller.getConfiguration().getFinalizer())) { lastGenerationProcessedSuccessfully.put( KubernetesResourceUtils.getUID(resource), resource.getMetadata().getGeneration()); } } private boolean skipBecauseOfGeneration(T customResource) { - if (!generationAware) { + if (!controller.getConfiguration().isGenerationAware()) { return false; } // if CR being deleted generation is naturally not changing, so we process all the events diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java index 72c681b857..695a985ce0 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java @@ -10,24 +10,19 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.fabric8.kubernetes.client.Watcher; -import io.javaoperatorsdk.operator.Metrics; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.DefaultEvent; -import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; -import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; -import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import java.util.Objects; import java.util.UUID; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import io.javaoperatorsdk.operator.processing.event.DefaultEvent; +import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; + class CustomResourceSelectorTest { - public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; public static final int SEPARATE_EXECUTION_TIMEOUT = 450; private final EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); @@ -36,17 +31,8 @@ class CustomResourceSelectorTest { private final DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); - private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); - private ControllerConfiguration configuration = - mock(ControllerConfiguration.class); - private final ConfigurationService configService = mock(ConfigurationService.class); - private final DefaultEventHandler defaultEventHandler = - new DefaultEventHandler( - eventDispatcherMock, - "Test", - null, - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER, configuration); + new DefaultEventHandler(eventDispatcherMock, "Test", null); @BeforeEach public void setup() { @@ -66,10 +52,6 @@ public void setup() { }) .when(defaultEventSourceManagerMock) .cleanup(any()); - - when(configuration.getName()).thenReturn("DefaultEventHandlerTest"); - when(configService.getMetrics()).thenReturn(Metrics.NOOP); - when(configuration.getConfigurationService()).thenReturn(configService); } @Test @@ -125,13 +107,4 @@ private void waitMinimalTime() { } } - private CustomResourceEvent prepareCREvent() { - return prepareCREvent(UUID.randomUUID().toString()); - } - - private CustomResourceEvent prepareCREvent(String uid) { - TestCustomResource customResource = testCustomResource(uid); - customResourceCache.cacheResource(customResource); - return new CustomResourceEvent(Watcher.Action.MODIFIED, customResource, null); - } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 699784180f..bba02b14fd 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -13,21 +13,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.fabric8.kubernetes.client.CustomResource; -import io.fabric8.kubernetes.client.Watcher; -import io.javaoperatorsdk.operator.Metrics; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; -import io.javaoperatorsdk.operator.processing.event.Event; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; -import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent; -import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; -import io.javaoperatorsdk.operator.processing.retry.GenericRetry; -import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import java.util.Arrays; import java.util.List; import java.util.UUID; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -35,6 +24,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.fabric8.kubernetes.client.Watcher; +import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; +import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent; +import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; + class DefaultEventHandlerTest { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandlerTest.class); @@ -47,25 +45,13 @@ class DefaultEventHandlerTest { mock(DefaultEventSourceManager.class); private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); - private ControllerConfiguration configuration = - mock(ControllerConfiguration.class); - private final ConfigurationService configService = mock(ConfigurationService.class); private DefaultEventHandler defaultEventHandler = - new DefaultEventHandler( - eventDispatcherMock, - "Test", - null, - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER, - configuration); + new DefaultEventHandler(eventDispatcherMock, "Test", null); private DefaultEventHandler defaultEventHandlerWithRetry = - new DefaultEventHandler( - eventDispatcherMock, - "Test", - GenericRetry.defaultLimitedExponentialRetry(), - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER, - configuration); + new DefaultEventHandler(eventDispatcherMock, "Test", + GenericRetry.defaultLimitedExponentialRetry()); @BeforeEach public void setup() { @@ -74,10 +60,6 @@ public void setup() { defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock); defaultEventHandlerWithRetry.setEventSourceManager(defaultEventSourceManagerMock); - when(configuration.getName()).thenReturn("DefaultEventHandlerTest"); - when(configService.getMetrics()).thenReturn(Metrics.NOOP); - when(configuration.getConfigurationService()).thenReturn(configService); - // todo: remove when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache); doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java index fa44508227..721aaaa7ef 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/EventDispatcherTest.java @@ -13,6 +13,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; + import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.Watcher; import io.javaoperatorsdk.operator.Metrics; @@ -26,13 +35,6 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; class EventDispatcherTest { @@ -40,15 +42,17 @@ class EventDispatcherTest { private CustomResource testCustomResource; private EventDispatcher eventDispatcher; private final ResourceController controller = mock(ResourceController.class); - private ControllerConfiguration configuration = + private final ControllerConfiguration configuration = mock(ControllerConfiguration.class); private final ConfigurationService configService = mock(ConfigurationService.class); + private final ConfiguredController> configuredController = + new ConfiguredController(controller, configuration, null); private final EventDispatcher.CustomResourceFacade customResourceFacade = mock(EventDispatcher.CustomResourceFacade.class); @BeforeEach void setup() { - eventDispatcher = new EventDispatcher(controller, configuration, customResourceFacade); + eventDispatcher = new EventDispatcher(configuredController, customResourceFacade); testCustomResource = TestUtils.testCustomResource(); @@ -165,7 +169,8 @@ private void configureToNotUseFinalizer() { when(configService.getMetrics()).thenReturn(Metrics.NOOP); when(configuration.getConfigurationService()).thenReturn(configService); when(configuration.useFinalizer()).thenReturn(false); - eventDispatcher = new EventDispatcher(controller, configuration, customResourceFacade); + eventDispatcher = new EventDispatcher(new ConfiguredController(controller, configuration, null), + customResourceFacade); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java index 42fa9fa95d..f8f58e0aed 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java @@ -8,12 +8,15 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.IOException; +import java.util.Map; + +import org.junit.jupiter.api.Test; + import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils; -import java.util.Map; -import org.junit.jupiter.api.Test; class DefaultEventSourceManagerTest { @@ -38,7 +41,7 @@ public void registersEventSource() { } @Test - public void closeShouldCascadeToEventSources() { + public void closeShouldCascadeToEventSources() throws IOException { EventSource eventSource = mock(EventSource.class); EventSource eventSource2 = mock(EventSource.class); defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index 1e1a3513b5..1448982379 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -4,29 +4,34 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.LocalDateTime; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.api.config.AbstractControllerConfiguration; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.processing.ConfiguredController; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; -import java.time.LocalDateTime; -import java.util.Arrays; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; class CustomResourceEventSourceTest { public static final String FINALIZER = "finalizer"; - MixedOperation, Resource> client = + private static final MixedOperation, Resource> client = mock(MixedOperation.class); EventHandler eventHandler = mock(EventHandler.class); private CustomResourceEventSource customResourceEventSource = - new CustomResourceEventSource<>( - client, null, true, FINALIZER, null, TestCustomResource.class); + new CustomResourceEventSource<>(new TestConfiguredController(true)); @BeforeEach public void setup() { @@ -36,7 +41,7 @@ public void setup() { @Test public void skipsEventHandlingIfGenerationNotIncreased() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - customResource1.getMetadata().setFinalizers(Arrays.asList(FINALIZER)); + customResource1.getMetadata().setFinalizers(List.of(FINALIZER)); customResourceEventSource.eventReceived(Watcher.Action.MODIFIED, customResource1); verify(eventHandler, times(1)).handleEvent(any()); @@ -73,8 +78,7 @@ public void normalExecutionIfGenerationChanges() { @Test public void handlesAllEventIfNotGenerationAware() { customResourceEventSource = - new CustomResourceEventSource<>( - client, null, false, FINALIZER, null, TestCustomResource.class); + new CustomResourceEventSource<>(new TestConfiguredController(false)); setup(); TestCustomResource customResource1 = TestUtils.testCustomResource(); @@ -96,4 +100,27 @@ public void eventNotMarkedForLastGenerationIfNoFinalizer() { customResourceEventSource.eventReceived(Watcher.Action.MODIFIED, customResource1); verify(eventHandler, times(2)).handleEvent(any()); } + + private static class TestConfiguredController extends ConfiguredController { + + public TestConfiguredController(boolean generationAware) { + super(null, new TestConfiguration(generationAware), null); + } + + @Override + public MixedOperation, Resource> getCRClient() { + return client; + } + } + private static class TestConfiguration extends + AbstractControllerConfiguration { + + public TestConfiguration(boolean generationAware) { + super(null, null, null, FINALIZER, generationAware, null, null, null, + TestCustomResource.class, + mock(ConfigurationService.class)); + when(getConfigurationService().getObjectMapper()) + .thenReturn(ConfigurationService.OBJECT_MAPPER); + } + } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java index bbdb510d8e..271c50514b 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java @@ -3,6 +3,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.Namespace; import io.fabric8.kubernetes.api.model.NamespaceBuilder; @@ -19,11 +26,6 @@ import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import io.javaoperatorsdk.operator.sample.simple.TestCustomResourceSpec; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class IntegrationTestSupport { @@ -52,14 +54,14 @@ public void initialize(KubernetesClient k8sClient, ResourceController controller loadCRDAndApplyToCluster(crdPath); final var customResourceClass = config.getCustomResourceClass(); - this.crOperations = k8sClient.customResources(customResourceClass); + this.crOperations = k8sClient.resources(customResourceClass); final var namespaces = k8sClient.namespaces(); if (namespaces.withName(TEST_NAMESPACE).get() == null) { namespaces.create( new NamespaceBuilder().withNewMetadata().withName(TEST_NAMESPACE).endMetadata().build()); } - operator = new Operator(k8sClient, configurationService, Metrics.NOOP); + operator = new Operator(k8sClient, configurationService); final var overriddenConfig = ControllerConfigurationOverrider.override(config).settingNamespace(TEST_NAMESPACE); if (retry != null) {