Skip to content

Commit 33726c6

Browse files
committed
[Fix serverlessworkflow#490] Delaying registration
1 parent 16b01af commit 33726c6

13 files changed

+296
-70
lines changed

api/src/main/resources/schema/workflow.yaml

+42
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,17 @@ $defs:
558558
$ref: '#/$defs/eventConsumptionStrategy'
559559
title: ListenTo
560560
description: Defines the event(s) to listen to.
561+
read:
562+
type: string
563+
enum: [ data, envelope, raw ]
564+
default: data
565+
title: ListenAndReadAs
566+
description: Specifies how events are read during the listen operation.
561567
required: [ to ]
568+
foreach:
569+
$ref: '#/$defs/subscriptionIterator'
570+
title: ListenIterator
571+
description: Configures the iterator, if any, for processing consumed event(s).
562572
raiseTask:
563573
type: object
564574
$ref: '#/$defs/taskBase'
@@ -1710,6 +1720,10 @@ $defs:
17101720
$ref: '#/$defs/asyncApiMessageConsumptionPolicy'
17111721
title: AsyncApiMessageConsumptionPolicy
17121722
description: An object used to configure the subscription's message consumption policy.
1723+
foreach:
1724+
$ref: '#/$defs/subscriptionIterator'
1725+
title: AsyncApiSubscriptionIterator
1726+
description: Configures the iterator, if any, for processing consumed messages(s).
17131727
required: [ consume ]
17141728
asyncApiMessageConsumptionPolicy:
17151729
type: object
@@ -1740,3 +1754,31 @@ $defs:
17401754
title: AsyncApiMessageConsumptionPolicyUntil
17411755
description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
17421756
required: [ until ]
1757+
subscriptionIterator:
1758+
type: object
1759+
title: SubscriptionIterator
1760+
description: Configures the iteration over each item (event or message) consumed by a subscription.
1761+
unevaluatedProperties: false
1762+
properties:
1763+
item:
1764+
type: string
1765+
title: SubscriptionIteratorItem
1766+
description: The name of the variable used to store the current item being enumerated.
1767+
default: item
1768+
at:
1769+
type: string
1770+
title: SubscriptionIteratorIndex
1771+
description: The name of the variable used to store the index of the current item being enumerated.
1772+
default: index
1773+
do:
1774+
$ref: '#/$defs/taskList'
1775+
title: SubscriptionIteratorTasks
1776+
description: The tasks to perform for each consumed item.
1777+
output:
1778+
$ref: '#/$defs/output'
1779+
title: SubscriptionIteratorOutput
1780+
description: An object, if any, used to customize the item's output and to document its schema.
1781+
export:
1782+
$ref: '#/$defs/export'
1783+
title: SubscriptionIteratorExport
1784+
description: An object, if any, used to customize the content of the workflow context.

api/src/test/resources/features/emit.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ do:
1010
with:
1111
source: https://fake-source.com
1212
type: com.fake-source.user.greeted.v1
13-
data:
14-
greetings: ${ "Hello \(.user.firstName) \(.user.lastName)!" }
13+
data: [1,2,3,4]
14+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import io.serverlessworkflow.api.types.EventFilter;
19+
20+
public abstract class AbstractTypeConsumer implements EventConsumer<TypeEventRegistration> {
21+
22+
@Override
23+
public EventRegistrationBuilder<TypeEventRegistration> register(EventFilter register) {
24+
// TODO Auto-generated method stub
25+
return null;
26+
}
27+
28+
@Override
29+
public void unregister(TypeEventRegistration register) {
30+
// TODO Auto-generated method stub
31+
32+
}
33+
}

impl/core/src/main/java/io/serverlessworkflow/impl/DefaultEventConsumer.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18-
public class DefaultEventConsumer implements EventConsumer {
18+
public class DefaultEventConsumer extends AbstractTypeConsumer {
1919

2020
private static DefaultEventConsumer instance = new DefaultEventConsumer();
2121

@@ -24,10 +24,4 @@ private DefaultEventConsumer() {}
2424
public static DefaultEventConsumer get() {
2525
return instance;
2626
}
27-
28-
@Override
29-
public void register(EventRegistration register) {}
30-
31-
@Override
32-
public void unregister(EventRegistration register) {}
3327
}

impl/core/src/main/java/io/serverlessworkflow/impl/EventConsumer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18-
public interface EventConsumer {
19-
void register(EventRegistration register);
18+
import io.serverlessworkflow.api.types.EventFilter;
2019

21-
void unregister(EventRegistration register);
20+
public interface EventConsumer<T extends EventRegistration> {
21+
EventRegistrationBuilder<T> register(EventFilter register);
22+
23+
void unregister(T register);
2224
}

impl/core/src/main/java/io/serverlessworkflow/impl/EventRegistration.java

+1-39
Original file line numberDiff line numberDiff line change
@@ -15,42 +15,4 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18-
import io.cloudevents.CloudEvent;
19-
import java.util.Objects;
20-
import java.util.Optional;
21-
import java.util.function.Consumer;
22-
import java.util.function.Predicate;
23-
24-
public record EventRegistration(
25-
Consumer<CloudEvent> consumer, String type, Optional<Predicate<CloudEvent>> filter) {
26-
27-
public static EventRegistrationBuilder builder(String type) {
28-
return new EventRegistrationBuilder(type);
29-
}
30-
31-
public static class EventRegistrationBuilder {
32-
33-
private final String type;
34-
private Consumer<CloudEvent> consumer;
35-
private Predicate<CloudEvent> predicate;
36-
37-
private EventRegistrationBuilder(String type) {
38-
this.type = type;
39-
}
40-
41-
public EventRegistrationBuilder withConsumer(Consumer<CloudEvent> consumer) {
42-
this.consumer = consumer;
43-
return this;
44-
}
45-
46-
public EventRegistrationBuilder withFilter(Predicate<CloudEvent> predicate) {
47-
this.predicate = predicate;
48-
return this;
49-
}
50-
51-
public EventRegistration build() {
52-
return new EventRegistration(
53-
Objects.requireNonNull(consumer), type, Optional.ofNullable(predicate));
54-
}
55-
}
56-
}
18+
public interface EventRegistration {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import io.cloudevents.CloudEvent;
19+
import java.util.function.Consumer;
20+
import java.util.function.Function;
21+
22+
public interface EventRegistrationBuilder<T extends EventRegistration>
23+
extends Function<Consumer<CloudEvent>, T> {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl;
17+
18+
import io.cloudevents.CloudEvent;
19+
import java.util.function.Consumer;
20+
21+
public record TypeEventRegistration(String type, Consumer<CloudEvent> consumer)
22+
implements EventRegistration {}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class WorkflowApplication implements AutoCloseable {
4747
private final WorkflowPositionFactory positionFactory;
4848
private final ExecutorServiceFactory executorFactory;
4949
private final RuntimeDescriptorFactory runtimeDescriptorFactory;
50-
private final EventConsumer eventConsumer;
50+
private final EventConsumer<?> eventConsumer;
5151

5252
private ExecutorService executorService;
5353

@@ -102,7 +102,7 @@ public static class Builder {
102102
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
103103
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
104104
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
105-
private EventConsumer eventConsumer = DefaultEventConsumer.get();
105+
private EventConsumer<?> eventConsumer = DefaultEventConsumer.get();
106106
private RuntimeDescriptorFactory descriptorFactory =
107107
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());
108108

@@ -156,6 +156,11 @@ public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) {
156156
return this;
157157
}
158158

159+
public Builder withEventConsumer(EventConsumer<?> eventConsumer) {
160+
this.eventConsumer = eventConsumer;
161+
return this;
162+
}
163+
159164
public WorkflowApplication build() {
160165
return new WorkflowApplication(this);
161166
}
@@ -188,6 +193,7 @@ public RuntimeDescriptorFactory runtimeDescriptorFactory() {
188193
return runtimeDescriptorFactory;
189194
}
190195

196+
@SuppressWarnings("rawtypes")
191197
public EventConsumer eventConsumer() {
192198
return eventConsumer;
193199
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,6 @@ public Optional<WorkflowFilter> outputFilter() {
101101
return outputFilter;
102102
}
103103

104-
public WorkflowIdFactory idFactory() {
105-
return application.idFactory();
106-
}
107-
108104
public Optional<SchemaValidator> outputSchemaValidator() {
109105
return outputSchemaValidator;
110106
}
@@ -113,6 +109,10 @@ public RuntimeDescriptorFactory runtimeDescriptorFactory() {
113109
return application.runtimeDescriptorFactory();
114110
}
115111

112+
public WorkflowApplication application() {
113+
return application;
114+
}
115+
116116
@Override
117117
public void close() {
118118
// TODO close resourcers hold for uncompleted process instances, if any

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class WorkflowInstance {
3636
private CompletableFuture<JsonNode> completableFuture;
3737

3838
WorkflowInstance(WorkflowDefinition definition, JsonNode input) {
39-
this.id = definition.idFactory().get();
39+
this.id = definition.application().idFactory().get();
4040
this.input = input;
4141
this.definition = definition;
4242
this.status = new AtomicReference<>(WorkflowStatus.PENDING);

0 commit comments

Comments
 (0)