Skip to content

Commit 806bb2e

Browse files
committed
improve: mamed event sources and equality of event sources and their implications
Signed-off-by: Attila Mészáros <[email protected]>
1 parent d7da94e commit 806bb2e

File tree

52 files changed

+233
-159
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+233
-159
lines changed

caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;
22

33
import java.time.Duration;
4+
import java.util.List;
45
import java.util.Map;
56

67
import org.slf4j.Logger;
@@ -69,7 +70,7 @@ protected void createConfigMap(P resource, Context<P> context) {
6970
}
7071

7172
@Override
72-
public Map<String, EventSource> prepareEventSources(
73+
public List<EventSource> prepareEventSources(
7374
EventSourceContext<P> context) {
7475

7576
var boundedItemStore =
@@ -82,7 +83,7 @@ public Map<String, EventSource> prepareEventSources(
8283
Mappers.fromOwnerReference(this instanceof BoundedCacheClusterScopeTestReconciler))
8384
.build(), context);
8485

85-
return EventSourceUtils.nameEventSources(es);
86+
return List.of(es);
8687
}
8788

8889
private void ensureStatus(P resource) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceUtils.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,20 @@
55
import io.fabric8.kubernetes.api.model.HasMetadata;
66
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
77
import io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow;
8+
import io.javaoperatorsdk.operator.processing.event.Event;
89
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
910
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
11+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
1012

13+
// todo cleanup / delete
1114
public class EventSourceUtils {
15+
16+
@SuppressWarnings("unchecked")
17+
public static <R extends HasMetadata> List<EventSource> dependentEventSources(EventSourceContext<R> eventSourceContext,DependentResource... dependentResources) {
18+
return Arrays.stream(dependentResources)
19+
.flatMap(dr-> dr.eventSource(eventSourceContext).stream()).toList();
20+
}
21+
1222
/**
1323
* Utility method to easily create map with generated name for event sources. This is for the use
1424
* case when the event sources are not access explicitly by name in the reconciler.
@@ -25,15 +35,11 @@ public static Map<String, EventSource> nameEventSources(EventSource... eventSour
2535
}
2636

2737
@SuppressWarnings("unchecked")
28-
public static <K extends HasMetadata> Map<String, EventSource> eventSourcesFromWorkflow(
38+
public static <K extends HasMetadata> List<EventSource> eventSourcesFromWorkflow(
2939
EventSourceContext<K> context,
3040
Workflow<K> workflow) {
31-
Map<String, EventSource> result = new HashMap<>();
32-
for (var e : workflow.getDependentResourcesByNameWithoutActivationCondition().entrySet()) {
33-
var eventSource = e.getValue().eventSource(context);
34-
eventSource.ifPresent(es -> result.put(e.getKey(), (EventSource) es));
35-
}
36-
return result;
41+
return workflow.getDependentResourcesByNameWithoutActivationCondition().stream()
42+
.flatMap(dr->dr.eventSource(context).stream()).toList();
3743
}
3844

3945
@SuppressWarnings("rawtypes")

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Reconciler.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ public interface Reconciler<P extends HasMetadata> {
1919
*/
2020
UpdateControl<P> reconcile(P resource, Context<P> context) throws Exception;
2121

22-
2322
/**
2423
* Prepares a map of {@link EventSource} implementations keyed by the name with which they need to
2524
* be registered by the SDK.
2625
*
2726
* @param context a {@link EventSourceContext} providing access to information useful to event
2827
* sources
29-
* @return a map of event sources to register
28+
* @return a list of event sources
3029
*/
31-
default Map<String, EventSource> prepareEventSources(EventSourceContext<P> context) {
32-
return Map.of();
30+
// todo should be ? extends EventSource
31+
default List<EventSource> prepareEventSources(EventSourceContext<P> context) {
32+
return Collections.emptyList();
3333
}
3434

3535
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ default Optional<? extends ResourceEventSource<R, P>> eventSource(
4949
return Optional.empty();
5050
}
5151

52+
53+
5254
/**
5355
* Retrieves the secondary resource (if it exists) associated with the specified primary resource
5456
* for this DependentResource.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -238,20 +238,20 @@ public void initAndRegisterEventSources(EventSourceContext<P> context) {
238238
managedWorkflow.getDependentResourcesByNameWithoutActivationCondition();
239239
final var size = dependentResourcesByName.size();
240240
if (size > 0) {
241-
dependentResourcesByName.forEach((key, dependentResource) -> {
241+
dependentResourcesByName.forEach(dependentResource -> {
242242
if (dependentResource instanceof EventSourceProvider) {
243243
final var provider = (EventSourceProvider) dependentResource;
244244
final var source = provider.initEventSource(context);
245-
eventSourceManager.registerEventSource(key, source);
245+
eventSourceManager.registerEventSource(source);
246246
} else {
247247
Optional<ResourceEventSource> eventSource = dependentResource.eventSource(context);
248-
eventSource.ifPresent(es -> eventSourceManager.registerEventSource(key, es));
248+
eventSource.ifPresent(es -> eventSourceManager.registerEventSource(es));
249249
}
250250
});
251251

252252
// resolve event sources referenced by name for dependents that reuse an existing event source
253253
final Map<String, List<EventSourceReferencer>> unresolvable = new HashMap<>(size);
254-
dependentResourcesByName.values().stream()
254+
dependentResourcesByName.stream()
255255
.filter(EventSourceReferencer.class::isInstance)
256256
.map(EventSourceReferencer.class::cast)
257257
.forEach(dr -> {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ protected <R> void registerOrDeregisterEventSourceBasedOnActivation(
139139
var eventSource =
140140
dr.eventSource(eventSourceRetriever.eventSourceContextForDynamicRegistration());
141141
var es = eventSource.orElseThrow();
142-
eventSourceRetriever.dynamicallyRegisterEventSource(dr.name(), es);
143-
142+
eventSourceRetriever.dynamicallyRegisterEventSource(es);
144143
} else {
145144
eventSourceRetriever.dynamicallyDeRegisterEventSource(dr.name());
146145
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultWorkflow.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,8 @@ public Map<String, DependentResource> getDependentResourcesByName() {
148148
return resources;
149149
}
150150

151-
public Map<String, DependentResource> getDependentResourcesByNameWithoutActivationCondition() {
152-
final var resources = new HashMap<String, DependentResource>(dependentResourceNodes.size());
153-
dependentResourceNodes
154-
.forEach((name, node) -> {
155-
if (node.getActivationCondition().isEmpty()) {
156-
resources.put(name, node.getDependentResource());
157-
}
158-
});
159-
return resources;
151+
public List<DependentResource> getDependentResourcesByNameWithoutActivationCondition() {
152+
return dependentResourceNodes.values().stream().filter(n->n.getActivationCondition().isEmpty())
153+
.map(DependentResourceNode::getDependentResource).toList();
160154
}
161155
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.dependent.workflow;
22

33
import java.util.Collections;
4+
import java.util.List;
45
import java.util.Map;
56
import java.util.Set;
67

@@ -44,7 +45,7 @@ default Map<String, DependentResource> getDependentResourcesByName() {
4445
}
4546

4647
@SuppressWarnings("rawtypes")
47-
default Map<String, DependentResource> getDependentResourcesByNameWithoutActivationCondition() {
48-
return Collections.emptyMap();
48+
default List<DependentResource> getDependentResourcesByNameWithoutActivationCondition() {
49+
return Collections.emptyList();
4950
}
5051
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

+6-14
Original file line numberDiff line numberDiff line change
@@ -140,30 +140,23 @@ private Void stopEventSource(NamedEventSource eventSource) {
140140
return null;
141141
}
142142

143-
public final void registerEventSource(EventSource eventSource) throws OperatorException {
144-
registerEventSource(null, eventSource);
145-
}
146-
147143
@SuppressWarnings("rawtypes")
148-
public final synchronized void registerEventSource(String name, EventSource eventSource)
144+
public final synchronized void registerEventSource(EventSource eventSource)
149145
throws OperatorException {
150146
Objects.requireNonNull(eventSource, "EventSource must not be null");
151147
try {
152-
if (name == null || name.isBlank()) {
153-
name = EventSourceUtils.generateNameFor(eventSource);
154-
}
155148
if (eventSource instanceof ManagedInformerEventSource) {
156149
var managedInformerEventSource = ((ManagedInformerEventSource) eventSource);
157150
managedInformerEventSource.setConfigurationService(
158151
controller.getConfiguration().getConfigurationService());
159152
}
160-
final var named = new NamedEventSource(eventSource, name);
153+
final var named = new NamedEventSource(eventSource, eventSource.name());
161154
eventSources.add(named);
162155
named.setEventHandler(controller.getEventProcessor());
163156
} catch (IllegalStateException | MissingCRDException e) {
164157
throw e; // leave untouched
165158
} catch (Exception e) {
166-
throw new OperatorException("Couldn't register event source: " + name + " for "
159+
throw new OperatorException("Couldn't register event source: " + eventSource.name() + " for "
167160
+ controller.getConfiguration().getName() + " controller", e);
168161
}
169162
}
@@ -229,14 +222,13 @@ public <R> List<ResourceEventSource<R, P>> getResourceEventSourcesFor(Class<R> d
229222
}
230223

231224
@Override
232-
public EventSource dynamicallyRegisterEventSource(String name,
233-
EventSource eventSource) {
225+
public EventSource dynamicallyRegisterEventSource(EventSource eventSource) {
234226
synchronized (this) {
235-
var actual = eventSources.existing(name, eventSource);
227+
var actual = eventSources.existing(eventSource.name(), eventSource);
236228
if (actual != null) {
237229
eventSource = actual.eventSource();
238230
} else {
239-
registerEventSource(name, eventSource);
231+
registerEventSource(eventSource);
240232
}
241233
}
242234
// The start itself is blocking thus blocking only the threads which are attempt to start the

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,10 @@ default <R> ResourceEventSource<R, P> getResourceEventSourceFor(Class<R> depende
3939
* name will ever be registered.
4040
* </p>
4141
*
42-
* @param name of the event source
4342
* @param eventSource to register
4443
* @return the actual event source registered. Might not be the same as the parameter.
4544
*/
46-
EventSource dynamicallyRegisterEventSource(String name, EventSource eventSource);
45+
EventSource dynamicallyRegisterEventSource(EventSource eventSource);
4746

4847

4948
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void add(NamedEventSource eventSource) {
8787
final var name = eventSource.name();
8888
final var original = eventSource.original();
8989
final var existing = existing(name, original);
90-
if (existing != null && !eventSource.equals(existing)) {
90+
if (existing != null && !eventSource.scopeEquals(existing)) {
9191
throw new IllegalArgumentException("Event source " + existing.original()
9292
+ " is already registered for the "
9393
+ keyAsString(getResourceType(original), name)

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java

+12
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,15 @@ public abstract class AbstractResourceEventSource<R, P extends HasMetadata>
1616
protected OnDeleteFilter<? super R> onDeleteFilter;
1717
protected GenericFilter<? super R> genericFilter;
1818

19+
private final String name;
20+
1921
protected AbstractResourceEventSource(Class<R> resourceClass) {
22+
this(resourceClass,resourceClass.getName());
23+
}
24+
25+
protected AbstractResourceEventSource(Class<R> resourceClass, String name) {
2026
this.resourceClass = resourceClass;
27+
this.name = name == null ? resourceClass.getName() : name;
2128
}
2229

2330
@Override
@@ -42,4 +49,9 @@ public void setOnDeleteFilter(
4249
public void setGenericFilter(GenericFilter<? super R> genericFilter) {
4350
this.genericFilter = genericFilter;
4451
}
52+
53+
@Override
54+
public String name() {
55+
return name;
56+
}
4557
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java

+10
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,14 @@ default EventSourceStartPriority priority() {
2828
default Status getStatus() {
2929
return Status.UNKNOWN;
3030
}
31+
32+
default String name() {
33+
return getClass().getSimpleName();
34+
}
35+
36+
// todo maybe different special method name?
37+
default boolean scopeEquals(EventSource es) {
38+
return this == es;
39+
}
40+
3141
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,13 @@ public abstract class ExternalResourceCachingEventSource<R, P extends HasMetadat
4343
protected Map<ResourceID, Map<String, R>> cache = new ConcurrentHashMap<>();
4444

4545
protected ExternalResourceCachingEventSource(Class<R> resourceClass,
46+
CacheKeyMapper<R> cacheKeyMapper) {
47+
this(null,resourceClass,cacheKeyMapper);
48+
}
49+
50+
protected ExternalResourceCachingEventSource(String name,Class<R> resourceClass,
4651
CacheKeyMapper<R> cacheKeyMapper) {
47-
super(resourceClass);
52+
super(resourceClass,name);
4853
this.cacheKeyMapper = cacheKeyMapper;
4954
}
5055

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java

+6
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,10 @@ default Optional<R> getSecondaryResource(P primary) {
3939
void setOnDeleteFilter(OnDeleteFilter<? super R> onDeleteFilter);
4040

4141
void setGenericFilter(GenericFilter<? super R> genericFilter);
42+
43+
@Override
44+
default String name() {
45+
return resourceType().getSimpleName();
46+
}
47+
4248
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ public class ControllerResourceEventSource<T extends HasMetadata>
2626
implements ResourceEventHandler<T> {
2727

2828
private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class);
29+
public static final String NAME = "ControllerResourceEventSource";
2930

3031
private final Controller<T> controller;
3132

3233
@SuppressWarnings({"unchecked", "rawtypes"})
3334
public ControllerResourceEventSource(Controller<T> controller) {
34-
super(controller.getCRClient(), controller.getConfiguration(), false);
35+
super(NAME,controller.getCRClient(), controller.getConfiguration(), false);
3536
this.controller = controller;
3637

3738
final var config = controller.getConfiguration();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,27 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7676
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
7777
private final String id = UUID.randomUUID().toString();
7878

79+
public InformerEventSource(String name,
80+
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
81+
this(name,configuration, context.getClient(),
82+
context.getControllerConfiguration().getConfigurationService()
83+
.parseResourceVersionsForEventFilteringAndCaching());
84+
}
85+
7986
public InformerEventSource(
8087
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
81-
this(configuration, context.getClient(),
88+
this(null,configuration, context.getClient(),
8289
context.getControllerConfiguration().getConfigurationService()
8390
.parseResourceVersionsForEventFilteringAndCaching());
8491
}
8592

8693
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
87-
this(configuration, client, false);
94+
this(null,configuration, client, false);
8895
}
8996

90-
public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client,
97+
public InformerEventSource(String name,InformerConfiguration<R> configuration, KubernetesClient client,
9198
boolean parseResourceVersions) {
92-
super(
99+
super(name,
93100
configuration.getGroupVersionKind()
94101
.map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind()))
95102
.orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())),

0 commit comments

Comments
 (0)