diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java new file mode 100644 index 0000000000..8b960434ee --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -0,0 +1,232 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.function.BiFunction; +import java.util.function.UnaryOperator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.base.PatchContext; +import io.fabric8.kubernetes.client.dsl.base.PatchType; +import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class PrimaryUpdateAndCacheUtils { + + private PrimaryUpdateAndCacheUtils() {} + + private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using update (PUT) method. + * + * @param primary resource + * @param context of reconciliation + * @return updated resource + * @param

primary resource type + */ + public static

P updateAndCacheStatusWith(P primary, Context

context) { + return patchAndCacheStatusWith(primary, context, (p, c) -> c.resource(primary).updateStatus()); + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using JSON Merge patch. + * + * @param primary resource + * @param context of reconciliation + * @return updated resource + * @param

primary resource type + */ + public static

P patchAndCacheStatusWith(P primary, Context

context) { + return patchAndCacheStatusWith(primary, context, (p, c) -> c.resource(primary).patchStatus()); + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using JSON Patch. + * + * @param primary resource + * @param context of reconciliation + * @return updated resource + * @param

primary resource type + */ + public static

P editAndCacheStatusWith( + P primary, Context

context, UnaryOperator

operation) { + return patchAndCacheStatusWith( + primary, context, (p, c) -> c.resource(primary).editStatus(operation)); + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * + * @param primary resource + * @param context of reconciliation + * @param patch free implementation of cache - make sure you use optimistic locking during the + * update + * @return the updated resource. + * @param

primary resource type + */ + public static

P patchAndCacheStatusWith( + P primary, Context

context, BiFunction patch) { + checkResourceVersionPresent(primary); + var updatedResource = patch.apply(primary, context.getClient()); + context + .eventSourceRetriever() + .getControllerEventSource() + .handleRecentResourceUpdate(ResourceID.fromResource(primary), updatedResource, primary); + return updatedResource; + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using Server Side Apply. + * + * @param primary resource + * @param freshResourceWithStatus - fresh resource with target state + * @param context of reconciliation + * @return the updated resource. + * @param

primary resource type + */ + public static

P ssaPatchAndCacheStatusWith( + P primary, P freshResourceWithStatus, Context

context) { + checkResourceVersionPresent(freshResourceWithStatus); + var res = + context + .getClient() + .resource(freshResourceWithStatus) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build()); + + context + .eventSourceRetriever() + .getControllerEventSource() + .handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary); + return res; + } + + /** + * Patches the resource and adds it to the {@link PrimaryResourceCache} provided. Optimistic + * locking is not required. + * + * @param primary resource + * @param freshResourceWithStatus - fresh resource with target state + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P ssaPatchAndCacheStatus( + P primary, P freshResourceWithStatus, Context

context, PrimaryResourceCache

cache) { + logWarnIfResourceVersionPresent(freshResourceWithStatus); + return patchAndCacheStatus( + primary, + context.getClient(), + cache, + (P p, KubernetesClient c) -> + c.resource(freshResourceWithStatus) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build())); + } + + /** + * Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache} provided. + * Optimistic locking is not required. + * + * @param primary resource* + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P editAndCacheStatus( + P primary, Context

context, PrimaryResourceCache

cache, UnaryOperator

operation) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, + context.getClient(), + cache, + (P p, KubernetesClient c) -> c.resource(primary).editStatus(operation)); + } + + /** + * Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache} + * provided. Optimistic locking is not required. + * + * @param primary resource* + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P patchAndCacheStatus( + P primary, Context

context, PrimaryResourceCache

cache) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, + context.getClient(), + cache, + (P p, KubernetesClient c) -> c.resource(primary).patchStatus()); + } + + /** + * Updates the resource and adds it to the {@link PrimaryResourceCache} provided. Optimistic + * locking is not required. + * + * @param primary resource* + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P updateAndCacheStatus( + P primary, Context

context, PrimaryResourceCache

cache) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, + context.getClient(), + cache, + (P p, KubernetesClient c) -> c.resource(primary).updateStatus()); + } + + public static

P patchAndCacheStatus( + P primary, + KubernetesClient client, + PrimaryResourceCache

cache, + BiFunction patch) { + var updatedResource = patch.apply(primary, client); + cache.cacheResource(primary, updatedResource); + return updatedResource; + } + + private static

void checkResourceVersionPresent(P primary) { + if (primary.getMetadata().getResourceVersion() == null) { + throw new IllegalStateException( + "Primary resource version is null, it is expected to set resource version for updates for caching. Name: %s namespace: %s" + .formatted(primary.getMetadata().getName(), primary.getMetadata().getNamespace())); + } + } + + private static

void logWarnIfResourceVersionPresent(P primary) { + if (primary.getMetadata().getResourceVersion() != null) { + log.warn( + "Primary resource version is NOT null, for caching with optimistic locking use" + + " alternative methods. Name: {} namespace: {}", + primary.getMetadata().getName(), + primary.getMetadata().getNamespace()); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java new file mode 100644 index 0000000000..4da73ab8b1 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java @@ -0,0 +1,65 @@ +package io.javaoperatorsdk.operator.api.reconciler.support; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class PrimaryResourceCache

{ + + private final BiPredicate, P> evictionPredicate; + private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); + + public PrimaryResourceCache(BiPredicate, P> evictionPredicate) { + this.evictionPredicate = evictionPredicate; + } + + public PrimaryResourceCache() { + this(new ResourceVersionParsingEvictionPredicate<>()); + } + + public void cacheResource(P afterUpdate) { + var resourceId = ResourceID.fromResource(afterUpdate); + cache.put(resourceId, new Pair<>(null, afterUpdate)); + } + + public void cacheResource(P beforeUpdate, P afterUpdate) { + var resourceId = ResourceID.fromResource(beforeUpdate); + cache.put(resourceId, new Pair<>(beforeUpdate, afterUpdate)); + } + + public P getFreshResource(P newVersion) { + var resourceId = ResourceID.fromResource(newVersion); + var pair = cache.get(resourceId); + if (pair == null) { + return newVersion; + } + if (!newVersion.getMetadata().getUid().equals(pair.afterUpdate().getMetadata().getUid())) { + cache.remove(resourceId); + return newVersion; + } + if (evictionPredicate.test(pair, newVersion)) { + cache.remove(resourceId); + return newVersion; + } else { + return pair.afterUpdate(); + } + } + + public void cleanup(P resource) { + cache.remove(ResourceID.fromResource(resource)); + } + + public record Pair(T beforeUpdate, T afterUpdate) {} + + /** This works in general, but it does not strictly follow the contract with k8s API */ + public static class ResourceVersionParsingEvictionPredicate + implements BiPredicate, T> { + @Override + public boolean test(Pair updatePair, T newVersion) { + return Long.parseLong(updatePair.afterUpdate().getMetadata().getResourceVersion()) + <= Long.parseLong(newVersion.getMetadata().getResourceVersion()); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 02b91f6dd0..8b07bf110b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -208,6 +208,7 @@ public Stream> getEventSourcesStream() { return eventSources.flatMappedSources(); } + @Override public ControllerEventSource

getControllerEventSource() { return eventSources.controllerEventSource(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java index 066a7f5808..c5a219a026 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java @@ -6,6 +6,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; public interface EventSourceRetriever

{ @@ -17,6 +18,8 @@ default EventSource getEventSourceFor(Class dependentType) { List> getEventSourcesFor(Class dependentType); + ControllerEventSource

getControllerEventSource(); + /** * Registers (and starts) the specified {@link EventSource} dynamically during the reconciliation. * If an EventSource is already registered with the specified name, the registration will be diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 688a88ae22..40ceaa975b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; @@ -20,50 +21,53 @@ import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; /** - * Wraps informer(s) so it is connected to the eventing system of the framework. Note that since - * it's it is built on top of Informers, it also support caching resources using caching from - * fabric8 client Informer caches and additional caches described below. + * Wraps informer(s) so they are connected to the eventing system of the framework. Note that since + * this is built on top of Fabric8 client Informers, it also supports caching resources using caching from + * informer caches as well as additional caches described below. * *

InformerEventSource also supports two features to better handle events and caching of - * resources on top of Informers from fabric8 Kubernetes client. These two features implementation - * wise are related to each other:
+ * resources on top of Informers from the Fabric8 Kubernetes client. These two features + * are related to each other as follows: * - *

1. API that allows to make sure the cache contains the fresh resource after an update. This is - * important for {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} and - * mainly for {@link - * io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource} so after - * reconcile if getResource() called always return the fresh resource. To achieve this - * handleRecentResourceUpdate() and handleRecentResourceCreate() needs to be called explicitly after - * resource created/updated using the kubernetes client. (These calls are done automatically by - * KubernetesDependentResource implementation.). In the background this will store the new resource - * in a temporary cache {@link TemporaryResourceCache} which do additional checks. After a new event - * is received the cachec object is removed from this cache, since in general then it is already in - * the cache of informer.
+ *

    + *
  1. Ensuring the cache contains the fresh resource after an update. This is + * important for {@link + * io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} and mainly for + * {@link + * io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource} so + * that {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource#getSecondaryResource(HasMetadata, Context)} always returns the latest version of the resource after a reconciliation. To achieve this + * {@link #handleRecentResourceUpdate(ResourceID, HasMetadata, HasMetadata)} and {@link #handleRecentResourceCreate(ResourceID, HasMetadata)} need to be called explicitly + * after a resource is created or updated using the kubernetes client. These calls are done + * automatically by the KubernetesDependentResource implementation. In the background this will + * store the new resource in a temporary cache {@link TemporaryResourceCache} which does + * additional checks. After a new event is received the cached object is removed from this + * cache, since it is then usually already in the informer cache. + *
  2. Avoiding unneeded reconciliations after resources are created or updated. This filters out events that are the results of updates and creates made + * by the controller itself because we typically don't want the associated informer to trigger an event causing a useless reconciliation (as the change originates from the reconciler itself). This is achieved + * by + * TODO: update as this mentions methods that don't exist anymore and isn't very clear * - *

    2. Additional API is provided that is meant to be used with the combination of the previous - * one, and the goal is to filter out events that are the results of updates and creates made by the - * controller itself. For example if in reconciler a ConfigMaps is created, there should be an - * Informer in place to handle change events of that ConfigMap, but since it has bean created (or - * updated) by the reconciler this should not trigger an additional reconciliation by default. In - * order to achieve this prepareForCreateOrUpdateEventFiltering(..) method needs to be called before - * the operation of the k8s client. And the operation from point 1. after the k8s client call. See - * it's usage in CreateUpdateEventFilterTestReconciler integration test for the usage. (Again this - * is managed for the developer if using dependent resources.)
    - * Roughly it works in a way that before the K8S API call is made, we set mark the resource ID, and - * from that point informer won't propagate events further just will start record them. After the - * client operation is done, it's checked and analysed what events were received and based on that - * it will propagate event or not and/or put the new resource into the temporal cache - so if the - * event not arrived yet about the update will be able to filter it in the future. + * {@link #prepareForCreateOrUpdateEventFiltering(..) method needs to be called before the operation + * of the k8s client. And the operation from point 1. after the k8s client call. See its + * usage in CreateUpdateEventFilterTestReconciler integration test for the usage. (Again this + * is managed for the developer if using dependent resources.)
    + * Roughly it works in a way that before the K8S API call is made, we set mark the resource + * ID, and from that point informer won't propagate events further just will start record + * them. After the client operation is done, it's checked and analysed what events were + * received and based on that it will propagate event or not and/or put the new resource into + * the temporal cache - so if the event not arrived yet about the update will be able to + * filter it in the future. + *

* - * @param resource type watching - * @param

type of the primary resource + * @param resource type being watched + * @param

type of the associated primary resource */ public class InformerEventSource extends ManagedInformerEventSource> implements ResourceEventHandler { - private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous"; + private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryIndex primaryToSecondaryIndex; private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 247cdb9aa5..9ec5b3694c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -9,7 +9,7 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -167,9 +167,9 @@ public synchronized boolean isKnownResourceVersion(T resource) { } /** - * @return true if {@link InformerEventSourceConfiguration#parseResourceVersions()} is enabled and - * the resourceVersion of newResource is numerically greater than cachedResource, otherwise - * false + * @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()} + * is enabled and the resourceVersion of newResource is numerically greater than + * cachedResource, otherwise false */ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) { try { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java new file mode 100644 index 0000000000..58e3ce8a0a --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java @@ -0,0 +1,87 @@ +package io.javaoperatorsdk.operator.api.reconciler.support; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResourceSpec; + +import static org.assertj.core.api.Assertions.assertThat; + +class PrimaryResourceCacheTest { + + PrimaryResourceCache versionParsingCache = + new PrimaryResourceCache<>( + new PrimaryResourceCache.ResourceVersionParsingEvictionPredicate<>()); + + @Test + void returnsThePassedValueIfCacheIsEmpty() { + var cr = customResource("1"); + + var res = versionParsingCache.getFreshResource(cr); + + assertThat(cr).isSameAs(res); + } + + @Test + void returnsTheCachedIfNotEvictedAccordingToPredicate() { + var cr = customResource("2"); + + versionParsingCache.cacheResource(cr); + + var res = versionParsingCache.getFreshResource(customResource("1")); + assertThat(cr).isSameAs(res); + } + + @Test + void ifMoreFreshPassedCachedIsEvicted() { + var cr = customResource("2"); + versionParsingCache.cacheResource(cr); + var newCR = customResource("3"); + + var res = versionParsingCache.getFreshResource(newCR); + var resOnOlder = versionParsingCache.getFreshResource(cr); + + assertThat(newCR).isSameAs(res); + assertThat(resOnOlder).isSameAs(cr); + assertThat(newCR).isNotSameAs(cr); + } + + @Test + void cleanupRemovesCachedResources() { + var cr = customResource("2"); + versionParsingCache.cacheResource(cr); + + versionParsingCache.cleanup(customResource("3")); + + var olderCR = customResource("1"); + var res = versionParsingCache.getFreshResource(olderCR); + assertThat(olderCR).isSameAs(res); + } + + @Test + void removesIfNewResourceWithDifferentUid() { + var cr = customResource("2"); + versionParsingCache.cacheResource(cr); + var crWithDifferentUid = customResource("1"); + cr.getMetadata().setUid("otheruid"); + + var res = versionParsingCache.getFreshResource(crWithDifferentUid); + + assertThat(res).isSameAs(crWithDifferentUid); + } + + private TestCustomResource customResource(String resourceVersion) { + var cr = new TestCustomResource(); + cr.setMetadata( + new ObjectMetaBuilder() + .withName("test1") + .withNamespace("default") + .withUid("uid") + .withResourceVersion(resourceVersion) + .build()); + cr.setSpec(new TestCustomResourceSpec()); + cr.getSpec().setKey("key"); + return cr; + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java similarity index 99% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index d31408beb6..e62888832f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -19,7 +19,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class TemporaryResourceCacheTest { +class TemporaryPrimaryResourceCacheTest { public static final String RESOURCE_VERSION = "2"; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/PeriodicTriggerEventSource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/PeriodicTriggerEventSource.java new file mode 100644 index 0000000000..366777409a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/PeriodicTriggerEventSource.java @@ -0,0 +1,52 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache; + +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource; +import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; + +public class PeriodicTriggerEventSource

+ extends AbstractEventSource { + + public static final int DEFAULT_PERIOD = 30; + private final Timer timer = new Timer(); + private final IndexerResourceCache

primaryCache; + private final int period; + + public PeriodicTriggerEventSource(IndexerResourceCache

primaryCache) { + this(primaryCache, DEFAULT_PERIOD); + } + + public PeriodicTriggerEventSource(IndexerResourceCache

primaryCache, int period) { + super(Void.class); + this.primaryCache = primaryCache; + this.period = period; + } + + @Override + public Set getSecondaryResources(P primary) { + return Set.of(); + } + + @Override + public void start() throws OperatorException { + super.start(); + timer.schedule( + new TimerTask() { + @Override + public void run() { + primaryCache + .list() + .forEach(r -> getEventHandler().handleEvent(new Event(ResourceID.fromResource(r)))); + } + }, + 0, + period); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java new file mode 100644 index 0000000000..84b145cac3 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("spc") +public class StatusPatchPrimaryCacheCustomResource + extends CustomResource + implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java new file mode 100644 index 0000000000..a884ec0758 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class StatusPatchPrimaryCacheIT { + + public static final String TEST_1 = "test1"; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(StatusPatchPrimaryCacheReconciler.class) + .build(); + + @Test + void testStatusAlwaysUpToDate() { + var reconciler = extension.getReconcilerOfType(StatusPatchPrimaryCacheReconciler.class); + + extension.create(testResource()); + + // the reconciliation is periodically triggered, the status values should be increasing + // monotonically + await() + .pollDelay(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.errorPresent).isFalse(); + assertThat(reconciler.latestValue).isGreaterThan(10); + }); + } + + StatusPatchPrimaryCacheCustomResource testResource() { + var res = new StatusPatchPrimaryCacheCustomResource(); + res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); + res.setSpec(new StatusPatchPrimaryCacheSpec()); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java new file mode 100644 index 0000000000..879cc1e3d8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java @@ -0,0 +1,92 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; +import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +@ControllerConfiguration +public class StatusPatchPrimaryCacheReconciler + implements Reconciler, + Cleaner { + + public volatile int latestValue = 0; + public volatile boolean errorPresent = false; + + // We on purpose don't use the provided predicate to show what a custom one could look like. + private final PrimaryResourceCache cache = + new PrimaryResourceCache<>( + (statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) -> + statusPatchCacheCustomResource.getStatus().getValue() + >= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue()); + + @Override + public UpdateControl reconcile( + StatusPatchPrimaryCacheCustomResource primary, + Context context) + throws InterruptedException { + + primary = cache.getFreshResource(primary); + + if (primary.getStatus() != null && primary.getStatus().getValue() != latestValue) { + errorPresent = true; + throw new IllegalStateException( + "status is not up to date. Latest value: " + + latestValue + + " status values: " + + primary.getStatus().getValue()); + } + + var freshCopy = createFreshCopy(primary); + // setting the resource version + freshCopy.getMetadata().setResourceVersion(primary.getMetadata().getResourceVersion()); + freshCopy + .getStatus() + .setValue(primary.getStatus() == null ? 1 : primary.getStatus().getValue() + 1); + + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); + latestValue = updated.getStatus().getValue(); + + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + // periodic event triggering for testing purposes + return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); + } + + private StatusPatchPrimaryCacheCustomResource createFreshCopy( + StatusPatchPrimaryCacheCustomResource resource) { + var res = new StatusPatchPrimaryCacheCustomResource(); + res.setMetadata( + new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()); + res.setStatus(new StatusPatchPrimaryCacheStatus()); + + return res; + } + + @Override + public DeleteControl cleanup( + StatusPatchPrimaryCacheCustomResource resource, + Context context) + throws Exception { + cache.cleanup(resource); + return DeleteControl.defaultDelete(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java new file mode 100644 index 0000000000..90630c1ae8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +public class StatusPatchPrimaryCacheSpec { + + private boolean messageInStatus = true; + + public boolean isMessageInStatus() { + return messageInStatus; + } + + public StatusPatchPrimaryCacheSpec setMessageInStatus(boolean messageInStatus) { + this.messageInStatus = messageInStatus; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java new file mode 100644 index 0000000000..0687d5576a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +public class StatusPatchPrimaryCacheStatus { + + private Integer value = 0; + + public Integer getValue() { + return value; + } + + public StatusPatchPrimaryCacheStatus setValue(Integer value) { + this.value = value; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockCustomResource.java new file mode 100644 index 0000000000..d84a992a13 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockCustomResource.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.withlock; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("spcl") +public class StatusPatchCacheWithLockCustomResource + extends CustomResource + implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockIT.java new file mode 100644 index 0000000000..ec687c45d5 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockIT.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.withlock; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class StatusPatchCacheWithLockIT { + + public static final String TEST_1 = "test1"; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(StatusPatchCacheWithLockReconciler.class) + .build(); + + @Test + void testStatusAlwaysUpToDate() { + var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class); + + extension.create(testResource()); + + // the reconciliation is periodically triggered, the status values should be increasing + // monotonically + await() + .pollDelay(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.errorPresent).isFalse(); + assertThat(reconciler.latestValue).isGreaterThan(10); + }); + } + + StatusPatchCacheWithLockCustomResource testResource() { + var res = new StatusPatchCacheWithLockCustomResource(); + res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); + res.setSpec(new StatusPatchCacheWithLockSpec()); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockReconciler.java new file mode 100644 index 0000000000..8e6e87fba0 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockReconciler.java @@ -0,0 +1,71 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.withlock; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +@ControllerConfiguration +public class StatusPatchCacheWithLockReconciler + implements Reconciler { + + public volatile int latestValue = 0; + public volatile boolean errorPresent = false; + + @Override + public UpdateControl reconcile( + StatusPatchCacheWithLockCustomResource resource, + Context context) + throws InterruptedException { + + if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) { + errorPresent = true; + throw new IllegalStateException( + "status is not up to date. Latest value: " + + latestValue + + " status values: " + + resource.getStatus().getValue()); + } + + var freshCopy = createFreshCopy(resource); + // setting the resource version + freshCopy.getMetadata().setResourceVersion(resource.getMetadata().getResourceVersion()); + freshCopy + .getStatus() + .setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1); + + resource.getMetadata().setResourceVersion(null); + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatusWith(resource, freshCopy, context); + latestValue = updated.getStatus().getValue(); + + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + // periodic event triggering for testing purposes + return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); + } + + private StatusPatchCacheWithLockCustomResource createFreshCopy( + StatusPatchCacheWithLockCustomResource resource) { + var res = new StatusPatchCacheWithLockCustomResource(); + res.setMetadata( + new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()); + res.setStatus(new StatusPatchCacheWithLockStatus()); + + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockSpec.java new file mode 100644 index 0000000000..12cd1ac3e1 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockSpec.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.withlock; + +public class StatusPatchCacheWithLockSpec { + + private int counter = 0; + + public int getCounter() { + return counter; + } + + public void setCounter(int counter) { + this.counter = counter; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockStatus.java new file mode 100644 index 0000000000..8d1e559308 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/withlock/StatusPatchCacheWithLockStatus.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.withlock; + +public class StatusPatchCacheWithLockStatus { + + private Integer value = 0; + + public Integer getValue() { + return value; + } + + public StatusPatchCacheWithLockStatus setValue(Integer value) { + this.value = value; + return this; + } +}