Skip to content

improve: named event sources and related changes #2340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;

import java.time.Duration;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
Expand Down Expand Up @@ -69,7 +70,7 @@ protected void createConfigMap(P resource, Context<P> context) {
}

@Override
public Map<String, EventSource> prepareEventSources(
public List<EventSource> prepareEventSources(
EventSourceContext<P> context) {

var boundedItemStore =
Expand All @@ -82,7 +83,7 @@ public Map<String, EventSource> prepareEventSources(
Mappers.fromOwnerReference(this instanceof BoundedCacheClusterScopeTestReconciler))
.build(), context);

return EventSourceUtils.nameEventSources(es);
return List.of(es);
}

private void ensureStatus(P resource) {
Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public class TomcatReconciler implements Reconciler<Tomcat> {
.withSecondaryToPrimaryMapper(
Mappers.fromAnnotation(ANNOTATION_NAME, ANNOTATION_NAMESPACE)
.build(), context));
return EventSourceUtils.nameEventSources(configMapEventSource);
return List.of(configMapEventSource);
}

}
Expand Down
33 changes: 24 additions & 9 deletions docs/documentation/v5-0-migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,22 @@ permalink: /docs/v5-0-migration
[`EventSourceUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtils.java#L11-L11)
now contains all the utility methods used for event sources naming that were previously defined in
the `EventSourceInitializer` interface.
3. Updates through `UpdateControl` now use [Server Side Apply (SSA)](https://kubernetes.io/docs/reference/using-api/server-side-apply/) by default to add the finalizer and for all
the patch operations in `UpdateControl`. The update operations were removed. If you do not wish to use SSA, you can deactivate the feature using `ConfigurationService.useSSAToPatchPrimaryResource` and related `ConfigurationServiceOverrider.withUseSSAToPatchPrimaryResource`.
3. Event sources are now explicitly named (via the `name` method of the `EventSource` interface). Built-in event sources
implementation have been updated to allow you to specify a name when instantiating them. If you don't provide a name
for your `EventSource` implementation (for example, by using its default, no-arg constructor), one will be
automatically generated. This simplifies the API to define event source to
`List<EventSource> prepareEventSources(EventSourceContext<P> context)`.
!!! IMPORTANT !!!
If you use dynamic registration of event sources, be sure to name your event sources explicitly as letting JOSDK name
them automatically might result in duplicated event sources being registered as JOSDK relies on the name to identify
event sources and concurrent, dynamic registration might lead to identical event sources having different generated
names, thus leading JOSDK to consider them as different and hence, register them multiple times.
4. Updates through `UpdateControl` now
use [Server Side Apply (SSA)](https://kubernetes.io/docs/reference/using-api/server-side-apply/) by default to add
the finalizer and for all
the patch operations in `UpdateControl`. The update operations were removed. If you do not wish to use SSA, you can
deactivate the feature using `ConfigurationService.useSSAToPatchPrimaryResource` and
related `ConfigurationServiceOverrider.withUseSSAToPatchPrimaryResource`.

!!! IMPORTANT !!!

Expand All @@ -27,15 +41,16 @@ permalink: /docs/v5-0-migration
where it is demonstrated. Also, the related part of
a [workaround](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/StatusPatchSSAMigrationIT.java#L110-L116).

Related automatic observed generation handling changes:
Related automatic observed generation handling changes:
Automated Observed Generation (see features in docs), is automatically handled for non-SSA, even if
the status sub-resource is not instructed to be updated. This is not true for SSA, observed generation is updated
the status sub-resource is not instructed to be updated. This is not true for SSA, observed generation is updated
only when patch status is instructed by `UpdateControl`.

4. `ManagedDependentResourceContext` has been renamed to `ManagedWorkflowAndDependentResourceContext` and is accessed
5. `ManagedDependentResourceContext` has been renamed to `ManagedWorkflowAndDependentResourceContext` and is accessed
via the accordingly renamed `managedWorkflowAndDependentResourceContext` method.
5. `ResourceDiscriminator` was removed. In most of the cases you can just delete the discriminator, everything should
work without it by default. To optimize and handle special cases see the relevant section in [Dependent Resource documentation](/docs/dependent-resources#multiple-dependent-resources-of-same-type).
6. `ConfigurationService.getTerminationTimeoutSeconds` and associated overriding mechanism have been removed,
6. `ResourceDiscriminator` was removed. In most of the cases you can just delete the discriminator, everything should
work without it by default. To optimize and handle special cases see the relevant section
in [Dependent Resource documentation](/docs/dependent-resources#multiple-dependent-resources-of-same-type).
7. `ConfigurationService.getTerminationTimeoutSeconds` and associated overriding mechanism have been removed,
use `Operator.stop(Duration)` instead.
7. `Operator.installShutdownHook()` has been removed, use `Operator.installShutdownHook(Duration)` instead
8. `Operator.installShutdownHook()` has been removed, use `Operator.installShutdownHook(Duration)` instead
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,5 @@ public Optional<ItemStore<R>> getItemStore() {
public Optional<Long> getInformerListLimit() {
return Optional.ofNullable(informerListLimit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return getSecondaryResources(expectedType).stream();
}

@Deprecated(forRemoval = true)
<R> Optional<R> getSecondaryResource(Class<R> expectedType, String eventSourceName);

ControllerConfiguration<P> getControllerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,21 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;

public class EventSourceUtils {
/**
* Utility method to easily create map with generated name for event sources. This is for the use
* case when the event sources are not access explicitly by name in the reconciler.
*
* @param eventSources to name
* @return even source with default names
*/
public static Map<String, EventSource> nameEventSources(EventSource... eventSources) {
Map<String, EventSource> eventSourceMap = new HashMap<>(eventSources.length);
for (EventSource eventSource : eventSources) {
eventSourceMap.put(generateNameFor(eventSource), eventSource);
}
return eventSourceMap;

@SuppressWarnings("unchecked")
public static <R extends HasMetadata> List<EventSource> dependentEventSources(
EventSourceContext<R> eventSourceContext, DependentResource... dependentResources) {
return Arrays.stream(dependentResources)
.flatMap(dr -> dr.eventSource(eventSourceContext).stream()).toList();
}

@SuppressWarnings("unchecked")
public static <K extends HasMetadata> Map<String, EventSource> eventSourcesFromWorkflow(
public static <K extends HasMetadata> List<EventSource> eventSourcesFromWorkflow(
EventSourceContext<K> context,
Workflow<K> workflow) {
Map<String, EventSource> result = new HashMap<>();
for (var e : workflow.getDependentResourcesByNameWithoutActivationCondition().entrySet()) {
var eventSource = e.getValue().eventSource(context);
eventSource.ifPresent(es -> result.put(e.getKey(), (EventSource) es));
}
return result;
}

@SuppressWarnings("rawtypes")
public static <K extends HasMetadata> Map<String, EventSource> nameEventSourcesFromDependentResource(
EventSourceContext<K> context, DependentResource... dependentResources) {
return nameEventSourcesFromDependentResource(context, Arrays.asList(dependentResources));
}

@SuppressWarnings("unchecked,rawtypes")
public static <K extends HasMetadata> Map<String, EventSource> nameEventSourcesFromDependentResource(
EventSourceContext<K> context, Collection<DependentResource> dependentResources) {

if (dependentResources != null) {
Map<String, EventSource> eventSourceMap = new HashMap<>(dependentResources.size());
for (DependentResource dependentResource : dependentResources) {
Optional<ResourceEventSource> es = dependentResource.eventSource(context);
es.ifPresent(e -> eventSourceMap.put(generateNameFor(e), e));
}
return eventSourceMap;
} else {
return Collections.emptyMap();
}
}

/**
* Used when event sources are not explicitly named when created/registered.
*
* @param eventSource EventSource
* @return generated name
*/
public static String generateNameFor(EventSource eventSource) {
// we can have multiple event sources for the same class
return eventSource.getClass().getName() + "#" + eventSource.hashCode();
return workflow.getDependentResourcesWithoutActivationCondition().stream()
.flatMap(dr -> dr.eventSource(context).stream()).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ public interface Reconciler<P extends HasMetadata> {
*/
UpdateControl<P> reconcile(P resource, Context<P> context) throws Exception;


/**
* Prepares a map of {@link EventSource} implementations keyed by the name with which they need to
* be registered by the SDK.
*
* @param context a {@link EventSourceContext} providing access to information useful to event
* sources
* @return a map of event sources to register
* @return a list of event sources
*/
default Map<String, EventSource> prepareEventSources(EventSourceContext<P> context) {
return Map.of();
default List<EventSource> prepareEventSources(EventSourceContext<P> context) {
return Collections.emptyList();
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.stream.Collectors;

import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;

@SuppressWarnings("rawtypes")
public class ControllerHealthInfo {
Expand All @@ -15,21 +16,21 @@ public ControllerHealthInfo(EventSourceManager eventSourceManager) {
}

public Map<String, EventSourceHealthIndicator> eventSourceHealthIndicators() {
return eventSourceManager.allEventSources().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return eventSourceManager.allEventSources().stream()
.collect(Collectors.toMap(EventSource::name, e -> e));
}

public Map<String, EventSourceHealthIndicator> unhealthyEventSources() {
return eventSourceManager.allEventSources().entrySet().stream()
.filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return eventSourceManager.allEventSources().stream()
.filter(e -> e.getStatus() == Status.UNHEALTHY)
.collect(Collectors.toMap(EventSource::name, e -> e));
}

public Map<String, InformerWrappingEventSourceHealthIndicator> informerEventSourceHealthIndicators() {
return eventSourceManager.allEventSources().entrySet().stream()
.filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
.collect(Collectors.toMap(Map.Entry::getKey,
e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
return eventSourceManager.allEventSources().stream()
.filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
.collect(Collectors.toMap(EventSource::name,
e -> (InformerWrappingEventSourceHealthIndicator) e));

}

Expand All @@ -40,11 +41,11 @@ public Map<String, InformerWrappingEventSourceHealthIndicator> informerEventSour
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}.
*/
public Map<String, InformerWrappingEventSourceHealthIndicator> unhealthyInformerEventSourceHealthIndicators() {
return eventSourceManager.allEventSources().entrySet().stream()
.filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
.filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
.collect(Collectors.toMap(Map.Entry::getKey,
e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
return eventSourceManager.allEventSources().stream()
.filter(e -> e.getStatus() == Status.UNHEALTHY)
.filter(e -> e instanceof InformerWrappingEventSourceHealthIndicator)
.collect(Collectors.toMap(EventSource::name,
e -> (InformerWrappingEventSourceHealthIndicator) e));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceNotFoundException;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext;
import io.javaoperatorsdk.operator.health.ControllerHealthInfo;
Expand Down Expand Up @@ -235,22 +234,17 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {

// register created event sources
final var dependentResourcesByName =
managedWorkflow.getDependentResourcesByNameWithoutActivationCondition();
managedWorkflow.getDependentResourcesWithoutActivationCondition();
final var size = dependentResourcesByName.size();
if (size > 0) {
dependentResourcesByName.forEach((key, dependentResource) -> {
if (dependentResource instanceof EventSourceProvider provider) {
final var source = provider.initEventSource(context);
eventSourceManager.registerEventSource(key, source);
} else {
Optional<ResourceEventSource> eventSource = dependentResource.eventSource(context);
eventSource.ifPresent(es -> eventSourceManager.registerEventSource(key, es));
}
dependentResourcesByName.forEach(dependentResource -> {
Optional<ResourceEventSource> eventSource = dependentResource.eventSource(context);
eventSource.ifPresent(eventSourceManager::registerEventSource);
});

// resolve event sources referenced by name for dependents that reuse an existing event source
final Map<String, List<EventSourceReferencer>> unresolvable = new HashMap<>(size);
dependentResourcesByName.values().stream()
dependentResourcesByName.stream()
.filter(EventSourceReferencer.class::isInstance)
.map(EventSourceReferencer.class::cast)
.forEach(dr -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.dependent.external;

import java.time.Duration;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.processing.dependent.AbstractExternalDependentResource;
Expand All @@ -11,23 +13,23 @@ public abstract class AbstractPollingDependentResource<R, P extends HasMetadata>
extends AbstractExternalDependentResource<R, P, ExternalResourceCachingEventSource<R, P>>
implements CacheKeyMapper<R> {

public static final int DEFAULT_POLLING_PERIOD = 5000;
private long pollingPeriod;
public static final Duration DEFAULT_POLLING_PERIOD = Duration.ofMillis(5000);
private Duration pollingPeriod;

protected AbstractPollingDependentResource(Class<R> resourceType) {
this(resourceType, DEFAULT_POLLING_PERIOD);
}

public AbstractPollingDependentResource(Class<R> resourceType, long pollingPeriod) {
public AbstractPollingDependentResource(Class<R> resourceType, Duration pollingPeriod) {
super(resourceType);
this.pollingPeriod = pollingPeriod;
}

public void setPollingPeriod(long pollingPeriod) {
public void setPollingPeriod(Duration pollingPeriod) {
this.pollingPeriod = pollingPeriod;
}

public long getPollingPeriod() {
public Duration getPollingPeriod() {
return pollingPeriod;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingConfigurationBuilder;
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;

@Ignore
Expand All @@ -18,15 +19,18 @@ public PerResourcePollingDependentResource(Class<R> resourceType) {
super(resourceType);
}

public PerResourcePollingDependentResource(Class<R> resourceType, long pollingPeriod) {
public PerResourcePollingDependentResource(Class<R> resourceType, Duration pollingPeriod) {
super(resourceType, pollingPeriod);
}

@Override
protected ExternalResourceCachingEventSource<R, P> createEventSource(
EventSourceContext<P> context) {
return new PerResourcePollingEventSource<>(this, context,
Duration.ofMillis(getPollingPeriod()), resourceType(), this);
}

return new PerResourcePollingEventSource<>(name(), resourceType(), context,
new PerResourcePollingConfigurationBuilder<>(
this, getPollingPeriod())
.withCacheKeyMapper(this)
.build());
}
}
Loading
Loading