Skip to content

Commit c6ac003

Browse files
committed
[Fix serverlessworkflow#490] Implement until condition for any
Signed-off-by: Francisco Javier Tirado Sarti <[email protected]>
1 parent 0c04f69 commit c6ac003

File tree

6 files changed

+181
-52
lines changed

6 files changed

+181
-52
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.fasterxml.jackson.databind.node.NullNode;
1920
import com.fasterxml.jackson.databind.node.ObjectNode;
2021
import io.cloudevents.CloudEvent;
2122
import io.cloudevents.CloudEventData;
@@ -77,6 +78,9 @@ public static CloudEventBuilder addExtension(
7778

7879
public static JsonNode toJsonNode(CloudEventData data) {
7980
try {
81+
if (data == null) {
82+
return NullNode.instance;
83+
}
8084
return data instanceof JsonCloudEventData
8185
? ((JsonCloudEventData) data).getNode()
8286
: JsonUtils.mapper().readTree(data.toBytes());

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

Lines changed: 120 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,26 @@
1818
import com.fasterxml.jackson.databind.JsonNode;
1919
import com.fasterxml.jackson.databind.node.ArrayNode;
2020
import io.cloudevents.CloudEvent;
21+
import io.serverlessworkflow.api.types.AllEventConsumptionStrategy;
22+
import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy;
23+
import io.serverlessworkflow.api.types.EventConsumptionStrategy;
2124
import io.serverlessworkflow.api.types.EventFilter;
2225
import io.serverlessworkflow.api.types.ListenTask;
2326
import io.serverlessworkflow.api.types.ListenTaskConfiguration;
2427
import io.serverlessworkflow.api.types.ListenTaskConfiguration.ListenAndReadAs;
2528
import io.serverlessworkflow.api.types.ListenTo;
29+
import io.serverlessworkflow.api.types.OneEventConsumptionStrategy;
2630
import io.serverlessworkflow.api.types.SubscriptionIterator;
31+
import io.serverlessworkflow.api.types.Until;
2732
import io.serverlessworkflow.api.types.Workflow;
2833
import io.serverlessworkflow.impl.TaskContext;
2934
import io.serverlessworkflow.impl.WorkflowApplication;
3035
import io.serverlessworkflow.impl.WorkflowContext;
3136
import io.serverlessworkflow.impl.WorkflowFilter;
3237
import io.serverlessworkflow.impl.WorkflowPosition;
38+
import io.serverlessworkflow.impl.WorkflowUtils;
3339
import io.serverlessworkflow.impl.events.CloudEventUtils;
40+
import io.serverlessworkflow.impl.events.EventConsumer;
3441
import io.serverlessworkflow.impl.events.EventRegistration;
3542
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
3643
import io.serverlessworkflow.impl.json.JsonUtils;
@@ -40,24 +47,45 @@
4047
import java.util.List;
4148
import java.util.Optional;
4249
import java.util.concurrent.CompletableFuture;
43-
import java.util.function.Consumer;
50+
import java.util.concurrent.atomic.AtomicBoolean;
51+
import java.util.function.BiConsumer;
4452
import java.util.function.Function;
4553
import java.util.stream.Collectors;
4654

4755
public abstract class ListenExecutor extends RegularTaskExecutor<ListenTask> {
4856

49-
protected final Collection<EventRegistrationBuilder> regBuilders;
57+
protected final EventRegistrationBuilderCollection regBuilders;
58+
protected final EventRegistrationBuilderCollection untilRegBuilders;
5059
protected final Optional<WorkflowFilter> until;
5160
protected final Optional<TaskExecutor<?>> loop;
5261
protected final Function<CloudEvent, JsonNode> converter;
62+
protected final EventConsumer eventConsumer;
63+
protected final AtomicBoolean untilEvent = new AtomicBoolean(true);
64+
65+
private static record EventRegistrationBuilderCollection(
66+
Collection<EventRegistrationBuilder> registrations, boolean isAnd) {}
5367

5468
public static class ListenExecutorBuilder extends RegularTaskExecutorBuilder<ListenTask> {
5569

56-
private Collection<EventRegistrationBuilder> registrations;
70+
private EventRegistrationBuilderCollection registrations;
5771
private WorkflowFilter until;
72+
private EventRegistrationBuilderCollection untilRegistrations;
5873
private TaskExecutor<?> loop;
5974
private Function<CloudEvent, JsonNode> converter = this::defaultCEConverter;
60-
private boolean isAnd;
75+
76+
private EventRegistrationBuilderCollection allEvents(AllEventConsumptionStrategy allStrategy) {
77+
return new EventRegistrationBuilderCollection(from(allStrategy.getAll()), true);
78+
}
79+
80+
private EventRegistrationBuilderCollection anyEvents(AnyEventConsumptionStrategy anyStrategy) {
81+
List<EventFilter> eventFilters = anyStrategy.getAny();
82+
return new EventRegistrationBuilderCollection(
83+
eventFilters.isEmpty() ? registerToAll() : from(eventFilters), false);
84+
}
85+
86+
private EventRegistrationBuilderCollection oneEvent(OneEventConsumptionStrategy oneStrategy) {
87+
return new EventRegistrationBuilderCollection(List.of(from(oneStrategy.getOne())), false);
88+
}
6189

6290
protected ListenExecutorBuilder(
6391
WorkflowPosition position,
@@ -69,15 +97,29 @@ protected ListenExecutorBuilder(
6997
ListenTaskConfiguration listen = task.getListen();
7098
ListenTo to = listen.getTo();
7199
if (to.getAllEventConsumptionStrategy() != null) {
72-
isAnd = true;
73-
registrations = from(to.getAllEventConsumptionStrategy().getAll());
100+
registrations = allEvents(to.getAllEventConsumptionStrategy());
74101
} else if (to.getAnyEventConsumptionStrategy() != null) {
75-
isAnd = false;
76-
List<EventFilter> eventFilters = to.getAnyEventConsumptionStrategy().getAny();
77-
registrations = eventFilters.isEmpty() ? registerToAll() : from(eventFilters);
102+
AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy();
103+
registrations = anyEvents(any);
104+
Until untilDesc = any.getUntil();
105+
if (untilDesc != null) {
106+
if (untilDesc.getAnyEventUntilCondition() != null) {
107+
until =
108+
WorkflowUtils.buildWorkflowFilter(
109+
application.expressionFactory(), untilDesc.getAnyEventUntilCondition());
110+
} else if (untilDesc.getAnyEventUntilConsumed() != null) {
111+
EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed();
112+
if (strategy.getAllEventConsumptionStrategy() != null) {
113+
untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy());
114+
} else if (strategy.getAnyEventConsumptionStrategy() != null) {
115+
untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy());
116+
} else if (strategy.getOneEventConsumptionStrategy() != null) {
117+
untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy());
118+
}
119+
}
120+
}
78121
} else if (to.getOneEventConsumptionStrategy() != null) {
79-
isAnd = false;
80-
registrations = List.of(from(to.getOneEventConsumptionStrategy().getOne()));
122+
registrations = oneEvent(to.getOneEventConsumptionStrategy());
81123
}
82124
SubscriptionIterator forEach = task.getForeach();
83125
if (forEach != null) {
@@ -116,7 +158,7 @@ private EventRegistrationBuilder from(EventFilter filter) {
116158

117159
@Override
118160
public TaskExecutor<ListenTask> buildInstance() {
119-
return isAnd ? new AndListenExecutor(this) : new OrListenExecutor(this);
161+
return registrations.isAnd() ? new AndListenExecutor(this) : new OrListenExecutor(this);
120162
}
121163
}
122164

@@ -160,8 +202,11 @@ protected void internalProcessCe(
160202
TaskContext taskContext,
161203
CompletableFuture<JsonNode> future) {
162204
arrayNode.add(node);
163-
if (until.isEmpty()
164-
|| until.filter(u -> u.apply(workflow, taskContext, arrayNode).asBoolean()).isPresent()) {
205+
if ((until.isEmpty()
206+
|| until
207+
.filter(u -> u.apply(workflow, taskContext, arrayNode).asBoolean())
208+
.isPresent())
209+
&& untilEvent.get()) {
165210
future.complete(arrayNode);
166211
}
167212
}
@@ -176,6 +221,65 @@ protected abstract void internalProcessCe(
176221
TaskContext taskContext,
177222
CompletableFuture<JsonNode> future);
178223

224+
@Override
225+
protected CompletableFuture<JsonNode> internalExecute(
226+
WorkflowContext workflow, TaskContext taskContext) {
227+
ArrayNode output = JsonUtils.mapper().createArrayNode();
228+
Collection<EventRegistration> registrations = new ArrayList<>();
229+
if (untilRegBuilders != null) {
230+
untilEvent.set(false);
231+
}
232+
CompletableFuture<?> combinedFuture =
233+
combine(
234+
toCompletables(
235+
regBuilders,
236+
registrations,
237+
(ce, future) ->
238+
processCe(converter.apply(ce), output, workflow, taskContext, future)));
239+
CompletableFuture<JsonNode> resultFuture =
240+
combinedFuture.thenApply(
241+
v -> {
242+
registrations.forEach(reg -> eventConsumer.unregister(reg));
243+
return output;
244+
});
245+
if (untilRegBuilders != null) {
246+
Collection<EventRegistration> untilRegistrations = new ArrayList<>();
247+
CompletableFuture<?>[] futures =
248+
toCompletables(
249+
untilRegBuilders, untilRegistrations, (ce, future) -> future.complete(null));
250+
CompletableFuture<?> untilFuture =
251+
untilRegBuilders.isAnd()
252+
? CompletableFuture.allOf(futures)
253+
: CompletableFuture.anyOf(futures);
254+
untilFuture.thenAccept(
255+
v -> {
256+
untilEvent.set(true);
257+
combinedFuture.complete(null);
258+
untilRegistrations.forEach(reg -> eventConsumer.unregister(reg));
259+
});
260+
}
261+
return resultFuture;
262+
}
263+
264+
private <T> CompletableFuture<T>[] toCompletables(
265+
EventRegistrationBuilderCollection regCollection,
266+
Collection<EventRegistration> registrations,
267+
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
268+
return regCollection.registrations().stream()
269+
.map(reg -> toCompletable(reg, registrations, consumer))
270+
.toArray(size -> new CompletableFuture[size]);
271+
}
272+
273+
private <T> CompletableFuture<T> toCompletable(
274+
EventRegistrationBuilder regBuilder,
275+
Collection<EventRegistration> registrations,
276+
BiConsumer<CloudEvent, CompletableFuture<T>> ceConsumer) {
277+
final CompletableFuture<T> future = new CompletableFuture<>();
278+
registrations.add(
279+
eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future)));
280+
return future;
281+
}
282+
179283
private void processCe(
180284
JsonNode node,
181285
ArrayNode arrayNode,
@@ -199,48 +303,13 @@ private void processCe(
199303
() -> internalProcessCe(node, arrayNode, workflow, taskContext, future));
200304
}
201305

202-
protected CompletableFuture<JsonNode> toCompletable(
203-
WorkflowContext workflow,
204-
TaskContext taskContext,
205-
EventRegistrationBuilder regBuilder,
206-
Collection<EventRegistration> registrations,
207-
ArrayNode arrayNode) {
208-
final CompletableFuture<JsonNode> future = new CompletableFuture<>();
209-
registrations.add(
210-
workflow
211-
.definition()
212-
.application()
213-
.eventConsumer()
214-
.register(
215-
regBuilder,
216-
(Consumer<CloudEvent>)
217-
(ce ->
218-
processCe(converter.apply(ce), arrayNode, workflow, taskContext, future))));
219-
return future;
220-
}
221-
222-
@Override
223-
protected CompletableFuture<JsonNode> internalExecute(
224-
WorkflowContext workflow, TaskContext taskContext) {
225-
ArrayNode output = JsonUtils.mapper().createArrayNode();
226-
Collection<EventRegistration> registrations = new ArrayList<>();
227-
return combine(
228-
regBuilders.stream()
229-
.map(reg -> toCompletable(workflow, taskContext, reg, registrations, output))
230-
.toArray(size -> new CompletableFuture[size]))
231-
.thenApply(
232-
v -> {
233-
registrations.forEach(
234-
reg -> workflow.definition().application().eventConsumer().unregister(reg));
235-
return output;
236-
});
237-
}
238-
239306
protected ListenExecutor(ListenExecutorBuilder builder) {
240307
super(builder);
308+
this.eventConsumer = builder.application.eventConsumer();
241309
this.regBuilders = builder.registrations;
242310
this.until = Optional.ofNullable(builder.until);
243311
this.loop = Optional.ofNullable(builder.loop);
244312
this.converter = builder.converter;
313+
this.untilRegBuilders = builder.untilRegistrations;
245314
}
246315
}

impl/core/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.stream.Stream;
2929
import org.junit.jupiter.api.BeforeAll;
30+
import org.junit.jupiter.api.Test;
3031
import org.junit.jupiter.params.ParameterizedTest;
3132
import org.junit.jupiter.params.provider.Arguments;
3233
import org.junit.jupiter.params.provider.MethodSource;
@@ -57,6 +58,27 @@ void testEventListened(String listen, String emit, JsonNode expectedResult, Obje
5758
assertThat(waitingInstance.outputAsJsonNode()).isEqualTo(expectedResult);
5859
}
5960

61+
@Test
62+
void testUntilConsumed() throws IOException {
63+
WorkflowDefinition listenDefinition =
64+
appl.workflowDefinition(
65+
WorkflowReader.readWorkflowFromClasspath("listen-to-any-until-consumed.yaml"));
66+
WorkflowDefinition emitDoctorDefinition =
67+
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("emit-doctor.yaml"));
68+
WorkflowDefinition emitOutDefinition =
69+
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("emit-out.yaml"));
70+
WorkflowInstance waitingInstance = listenDefinition.instance(Map.of());
71+
CompletableFuture<JsonNode> future = waitingInstance.start();
72+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING);
73+
emitDoctorDefinition.instance(Map.of("temperature", 35)).start().join();
74+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING);
75+
emitDoctorDefinition.instance(Map.of("temperature", 39)).start().join();
76+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.RUNNING);
77+
emitOutDefinition.instance(Map.of()).start().join();
78+
assertThat(future).isCompleted();
79+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED);
80+
}
81+
6082
private static Stream<Arguments> eventListenerParameters() {
6183
return Stream.of(
6284
Arguments.of("listen-to-any.yaml", "emit.yaml", cruellaDeVil(), Map.of()),
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: emit-out
5+
version: '0.1.0'
6+
do:
7+
- emitEvent:
8+
emit:
9+
event:
10+
with:
11+
source: https://hospital.com
12+
type: com.fake-hospital.patient.checked-out

impl/core/src/test/resources/listen-to-any-filter.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ do:
1414
- with:
1515
type: com.fake-hospital.vitals.measurements.bpm
1616
data: ${ .bpm < 60 or .bpm > 100 }
17+
until: ( . | length ) > 0
1718
foreach:
1819
item: event
1920
do:
2021
- isSick:
2122
set:
2223
temperature: ${$event.temperature}
23-
isSick: true
24+
isSick: true
25+
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-to-any-until-consumed
5+
version: '0.1.0'
6+
do:
7+
- callDoctor:
8+
listen:
9+
to:
10+
any:
11+
- with:
12+
type: com.fake-hospital.vitals.measurements.temperature
13+
data: ${ .temperature > 38 }
14+
- with:
15+
type: com.fake-hospital.vitals.measurements.bpm
16+
data: ${ .bpm < 60 or .bpm > 100 }
17+
until:
18+
one:
19+
with:
20+
type: com.fake-hospital.patient.checked-out

0 commit comments

Comments
 (0)