Skip to content

Commit 0c4386b

Browse files
committed
[Fix serverlessworkflow#490] Emit executor
1 parent 88f24fe commit 0c4386b

8 files changed

+311
-89
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
import com.github.f4b6a3.ulid.UlidCreator;
1919
import io.serverlessworkflow.api.types.Document;
2020
import io.serverlessworkflow.api.types.Workflow;
21-
import io.serverlessworkflow.impl.events.DefaultEventConsumer;
2221
import io.serverlessworkflow.impl.events.EventConsumer;
22+
import io.serverlessworkflow.impl.events.EventPublisher;
23+
import io.serverlessworkflow.impl.events.InMemoryEvents;
2324
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
2425
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
2526
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
@@ -50,6 +51,7 @@ public class WorkflowApplication implements AutoCloseable {
5051
private final ExecutorServiceFactory executorFactory;
5152
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
5253
private final EventConsumer<?, ?> eventConsumer;
54+
private final EventPublisher eventPublisher;
5355

5456
private ExecutorService executorService;
5557

@@ -65,6 +67,7 @@ private WorkflowApplication(Builder builder) {
6567
this.listeners = builder.listeners != null ? builder.listeners : Collections.emptySet();
6668
this.definitions = new ConcurrentHashMap<>();
6769
this.eventConsumer = builder.eventConsumer;
70+
this.eventPublisher = builder.eventPublisher;
6871
}
6972

7073
public TaskExecutorFactory taskFactory() {
@@ -91,6 +94,10 @@ public Collection<WorkflowExecutionListener> listeners() {
9194
return listeners;
9295
}
9396

97+
public EventPublisher eventPublisher() {
98+
return eventPublisher;
99+
}
100+
94101
public WorkflowIdFactory idFactory() {
95102
return idFactory;
96103
}
@@ -104,7 +111,8 @@ public static class Builder {
104111
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
105112
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
106113
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
107-
private EventConsumer<?, ?> eventConsumer = DefaultEventConsumer.get();
114+
private EventConsumer<?, ?> eventConsumer = InMemoryEvents.get();
115+
private EventPublisher eventPublisher = InMemoryEvents.get();
108116
private RuntimeDescriptorFactory descriptorFactory =
109117
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
110118

@@ -163,6 +171,11 @@ public Builder withEventConsumer(EventConsumer<?, ?> eventConsumer) {
163171
return this;
164172
}
165173

174+
public Builder withEventPublisher(EventPublisher eventPublisher) {
175+
this.eventPublisher = eventPublisher;
176+
return this;
177+
}
178+
166179
public WorkflowApplication build() {
167180
return new WorkflowApplication(this);
168181
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.fasterxml.jackson.databind.node.ObjectNode;
2020
import io.cloudevents.CloudEvent;
2121
import io.cloudevents.CloudEventData;
22+
import io.cloudevents.core.builder.CloudEventBuilder;
2223
import io.cloudevents.jackson.JsonCloudEventData;
2324
import io.serverlessworkflow.impl.json.JsonUtils;
2425
import java.io.IOException;
@@ -55,6 +56,18 @@ public static JsonNode toJsonNode(CloudEvent event) {
5556
return result;
5657
}
5758

59+
public static CloudEventBuilder addExtension(
60+
CloudEventBuilder builder, String name, JsonNode value) {
61+
if (value.isTextual()) {
62+
builder.withExtension(name, value.asText());
63+
} else if (value.isBoolean()) {
64+
builder.withExtension(name, value.isBoolean());
65+
} else if (value.isNumber()) {
66+
builder.withExtension(name, value.numberValue());
67+
}
68+
return builder;
69+
}
70+
5871
public static JsonNode toJsonNode(CloudEventData data) {
5972
try {
6073
return data instanceof JsonCloudEventData

impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java

+11-78
Original file line numberDiff line numberDiff line change
@@ -17,102 +17,35 @@
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
1919
import io.cloudevents.CloudEvent;
20-
import io.serverlessworkflow.api.types.EventData;
21-
import io.serverlessworkflow.api.types.EventDataschema;
2220
import io.serverlessworkflow.api.types.EventProperties;
23-
import io.serverlessworkflow.api.types.EventSource;
24-
import io.serverlessworkflow.api.types.EventTime;
2521
import io.serverlessworkflow.impl.ExpressionHolder;
26-
import io.serverlessworkflow.impl.StringFilter;
2722
import io.serverlessworkflow.impl.TaskContext;
2823
import io.serverlessworkflow.impl.WorkflowContext;
2924
import io.serverlessworkflow.impl.WorkflowFilter;
30-
import io.serverlessworkflow.impl.WorkflowUtils;
3125
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
3226
import io.serverlessworkflow.impl.json.JsonUtils;
33-
import java.time.OffsetDateTime;
34-
import java.time.ZoneOffset;
35-
import java.util.Map;
3627
import java.util.Optional;
3728

3829
public class DefaultCloudEventPredicate implements CloudEventPredicate {
3930

40-
private final Optional<StringFilter> idFilter;
41-
private final Optional<StringFilter> sourceFilter;
42-
private final Optional<StringFilter> subjectFilter;
43-
private final Optional<StringFilter> contentTypeFilter;
44-
private final Optional<StringFilter> typeFilter;
45-
private final Optional<StringFilter> dataSchemaFilter;
46-
private final Optional<ExpressionHolder<OffsetDateTime>> timeFilter;
47-
private final Optional<WorkflowFilter> dataFilter;
48-
private final Optional<WorkflowFilter> additionalFilter;
31+
private final EventPropertiesFilter props;
4932

5033
public DefaultCloudEventPredicate(EventProperties properties, ExpressionFactory exprFactory) {
51-
this.idFilter = buildFilter(exprFactory, properties.getId());
52-
EventSource source = properties.getSource();
53-
this.sourceFilter =
54-
source == null
55-
? Optional.empty()
56-
: Optional.of(
57-
WorkflowUtils.buildStringFilter(
58-
exprFactory,
59-
source.getRuntimeExpression(),
60-
WorkflowUtils.toString(source.getUriTemplate())));
61-
this.subjectFilter = buildFilter(exprFactory, properties.getSubject());
62-
this.contentTypeFilter = buildFilter(exprFactory, properties.getDatacontenttype());
63-
this.typeFilter = buildFilter(exprFactory, properties.getType());
64-
EventDataschema dataSchema = properties.getDataschema();
65-
this.dataSchemaFilter =
66-
dataSchema == null
67-
? Optional.empty()
68-
: Optional.of(
69-
WorkflowUtils.buildStringFilter(
70-
exprFactory,
71-
dataSchema.getExpressionDataSchema(),
72-
WorkflowUtils.toString(dataSchema.getLiteralDataSchema())));
73-
EventTime time = properties.getTime();
74-
this.timeFilter =
75-
time == null
76-
? Optional.empty()
77-
: Optional.of(
78-
WorkflowUtils.buildExpressionHolder(
79-
exprFactory,
80-
time.getRuntimeExpression(),
81-
time.getLiteralTime().toInstant().atOffset(ZoneOffset.UTC),
82-
JsonUtils::toOffsetDateTime));
83-
84-
EventData data = properties.getData();
85-
this.dataFilter =
86-
properties.getData() == null
87-
? Optional.empty()
88-
: Optional.of(
89-
WorkflowUtils.buildWorkflowFilter(
90-
exprFactory, data.getRuntimeExpression(), data.getObject()));
91-
Map<String, Object> ceAttrs = properties.getAdditionalProperties();
92-
this.additionalFilter =
93-
ceAttrs == null || ceAttrs.isEmpty()
94-
? Optional.empty()
95-
: Optional.of(WorkflowUtils.buildWorkflowFilter(exprFactory, null, ceAttrs));
96-
}
97-
98-
private Optional<StringFilter> buildFilter(ExpressionFactory exprFactory, String str) {
99-
return str == null
100-
? Optional.empty()
101-
: Optional.of(WorkflowUtils.buildStringFilter(exprFactory, str));
34+
this.props = EventPropertiesFilter.build(properties, exprFactory);
10235
}
10336

10437
@Override
10538
public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) {
106-
return test(idFilter, event.getId(), workflow, task)
107-
&& test(sourceFilter, event.getSource().toString(), workflow, task)
108-
&& test(subjectFilter, event.getSubject(), workflow, task)
109-
&& test(contentTypeFilter, event.getDataContentType(), workflow, task)
110-
&& test(typeFilter, event.getType(), workflow, task)
111-
&& test(dataSchemaFilter, event.getDataSchema().toString(), workflow, task)
112-
&& test(timeFilter, event.getTime(), workflow, task)
113-
&& test(dataFilter, CloudEventUtils.toJsonNode(event.getData()), workflow, task)
39+
return test(props.idFilter(), event.getId(), workflow, task)
40+
&& test(props.sourceFilter(), event.getSource().toString(), workflow, task)
41+
&& test(props.subjectFilter(), event.getSubject(), workflow, task)
42+
&& test(props.contentTypeFilter(), event.getDataContentType(), workflow, task)
43+
&& test(props.typeFilter(), event.getType(), workflow, task)
44+
&& test(props.dataSchemaFilter(), event.getDataSchema().toString(), workflow, task)
45+
&& test(props.timeFilter(), event.getTime(), workflow, task)
46+
&& test(props.dataFilter(), CloudEventUtils.toJsonNode(event.getData()), workflow, task)
11447
&& test(
115-
additionalFilter,
48+
props.additionalFilter(),
11649
JsonUtils.fromValue(CloudEventUtils.extensions(event)),
11750
workflow,
11851
task);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
import io.serverlessworkflow.api.types.EventData;
19+
import io.serverlessworkflow.api.types.EventDataschema;
20+
import io.serverlessworkflow.api.types.EventProperties;
21+
import io.serverlessworkflow.api.types.EventSource;
22+
import io.serverlessworkflow.api.types.EventTime;
23+
import io.serverlessworkflow.impl.ExpressionHolder;
24+
import io.serverlessworkflow.impl.StringFilter;
25+
import io.serverlessworkflow.impl.WorkflowFilter;
26+
import io.serverlessworkflow.impl.WorkflowUtils;
27+
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
28+
import io.serverlessworkflow.impl.json.JsonUtils;
29+
import java.time.OffsetDateTime;
30+
import java.time.ZoneOffset;
31+
import java.util.Map;
32+
import java.util.Optional;
33+
34+
public record EventPropertiesFilter(
35+
Optional<StringFilter> idFilter,
36+
Optional<StringFilter> sourceFilter,
37+
Optional<StringFilter> subjectFilter,
38+
Optional<StringFilter> contentTypeFilter,
39+
Optional<StringFilter> typeFilter,
40+
Optional<StringFilter> dataSchemaFilter,
41+
Optional<ExpressionHolder<OffsetDateTime>> timeFilter,
42+
Optional<WorkflowFilter> dataFilter,
43+
Optional<WorkflowFilter> additionalFilter) {
44+
45+
public static EventPropertiesFilter build(
46+
EventProperties properties, ExpressionFactory exprFactory) {
47+
Optional<StringFilter> idFilter = buildFilter(exprFactory, properties.getId());
48+
EventSource source = properties.getSource();
49+
Optional<StringFilter> sourceFilter =
50+
source == null
51+
? Optional.empty()
52+
: Optional.of(
53+
WorkflowUtils.buildStringFilter(
54+
exprFactory,
55+
source.getRuntimeExpression(),
56+
WorkflowUtils.toString(source.getUriTemplate())));
57+
Optional<StringFilter> subjectFilter = buildFilter(exprFactory, properties.getSubject());
58+
Optional<StringFilter> contentTypeFilter =
59+
buildFilter(exprFactory, properties.getDatacontenttype());
60+
Optional<StringFilter> typeFilter = buildFilter(exprFactory, properties.getType());
61+
EventDataschema dataSchema = properties.getDataschema();
62+
Optional<StringFilter> dataSchemaFilter =
63+
dataSchema == null
64+
? Optional.empty()
65+
: Optional.of(
66+
WorkflowUtils.buildStringFilter(
67+
exprFactory,
68+
dataSchema.getExpressionDataSchema(),
69+
WorkflowUtils.toString(dataSchema.getLiteralDataSchema())));
70+
EventTime time = properties.getTime();
71+
Optional<ExpressionHolder<OffsetDateTime>> timeFilter =
72+
time == null
73+
? Optional.empty()
74+
: Optional.of(
75+
WorkflowUtils.buildExpressionHolder(
76+
exprFactory,
77+
time.getRuntimeExpression(),
78+
time.getLiteralTime().toInstant().atOffset(ZoneOffset.UTC),
79+
JsonUtils::toOffsetDateTime));
80+
81+
EventData data = properties.getData();
82+
Optional<WorkflowFilter> dataFilter =
83+
properties.getData() == null
84+
? Optional.empty()
85+
: Optional.of(
86+
WorkflowUtils.buildWorkflowFilter(
87+
exprFactory, data.getRuntimeExpression(), data.getObject()));
88+
Map<String, Object> ceAttrs = properties.getAdditionalProperties();
89+
Optional<WorkflowFilter> additionalFilter =
90+
ceAttrs == null || ceAttrs.isEmpty()
91+
? Optional.empty()
92+
: Optional.of(WorkflowUtils.buildWorkflowFilter(exprFactory, null, ceAttrs));
93+
return new EventPropertiesFilter(
94+
idFilter,
95+
sourceFilter,
96+
subjectFilter,
97+
contentTypeFilter,
98+
typeFilter,
99+
dataSchemaFilter,
100+
timeFilter,
101+
dataFilter,
102+
additionalFilter);
103+
}
104+
105+
private static Optional<StringFilter> buildFilter(ExpressionFactory exprFactory, String str) {
106+
return str == null
107+
? Optional.empty()
108+
: Optional.of(WorkflowUtils.buildStringFilter(exprFactory, str));
109+
}
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
import io.cloudevents.CloudEvent;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
public interface EventPublisher {
22+
CompletableFuture<Void> publish(CloudEvent event);
23+
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultEventConsumer.java renamed to impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import java.util.Map;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.ConcurrentHashMap;
2122
import java.util.function.Consumer;
2223

2324
/*
2425
* Straighforward implementation of in memory event broker.
2526
* User might invoke notifyCE to simulate event reception.
2627
*/
27-
public class DefaultEventConsumer extends AbstractTypeConsumer {
28+
public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher {
2829

29-
private static DefaultEventConsumer instance = new DefaultEventConsumer();
30+
private static InMemoryEvents instance = new InMemoryEvents();
3031

31-
private DefaultEventConsumer() {}
32+
private InMemoryEvents() {}
3233

33-
public static DefaultEventConsumer get() {
34+
public static InMemoryEvents get() {
3435
return instance;
3536
}
3637

@@ -46,10 +47,14 @@ protected void unregister(String topicName) {
4647
topicMap.remove(topicName);
4748
}
4849

49-
public void notifyCE(CloudEvent ce) {
50-
Consumer<CloudEvent> consumer = topicMap.get(ce.getType());
51-
if (consumer != null) {
52-
consumer.accept(ce);
53-
}
50+
@Override
51+
public CompletableFuture<Void> publish(CloudEvent ce) {
52+
return CompletableFuture.runAsync(
53+
() -> {
54+
Consumer<CloudEvent> consumer = topicMap.get(ce.getType());
55+
if (consumer != null) {
56+
consumer.accept(ce);
57+
}
58+
});
5459
}
5560
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.serverlessworkflow.impl.WorkflowPosition;
2929
import io.serverlessworkflow.impl.executors.CallTaskExecutor.CallTaskExecutorBuilder;
3030
import io.serverlessworkflow.impl.executors.DoExecutor.DoExecutorBuilder;
31+
import io.serverlessworkflow.impl.executors.EmitExecutor.EmitExecutorBuilder;
3132
import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder;
3233
import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder;
3334
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
@@ -129,6 +130,9 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
129130
} else if (task.getListenTask() != null) {
130131
return new ListenExecutorBuilder(
131132
position, task.getListenTask(), workflow, application, resourceLoader);
133+
} else if (task.getEmitTask() != null) {
134+
return new EmitExecutorBuilder(
135+
position, task.getEmitTask(), workflow, application, resourceLoader);
132136
}
133137
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
134138
}

0 commit comments

Comments
 (0)