15
15
*/
16
16
package io .serverlessworkflow .impl ;
17
17
18
+ import static io .serverlessworkflow .impl .WorkflowUtils .*;
18
19
import static io .serverlessworkflow .impl .json .JsonUtils .*;
19
20
20
21
import com .fasterxml .jackson .databind .JsonNode ;
22
+ import io .serverlessworkflow .api .types .Input ;
23
+ import io .serverlessworkflow .api .types .Output ;
21
24
import io .serverlessworkflow .api .types .TaskBase ;
22
25
import io .serverlessworkflow .api .types .TaskItem ;
23
26
import io .serverlessworkflow .api .types .Workflow ;
24
27
import io .serverlessworkflow .impl .executors .DefaultTaskExecutorFactory ;
25
28
import io .serverlessworkflow .impl .executors .TaskExecutor ;
26
29
import io .serverlessworkflow .impl .executors .TaskExecutorFactory ;
30
+ import io .serverlessworkflow .impl .expressions .ExpressionFactory ;
31
+ import io .serverlessworkflow .impl .expressions .JQExpressionFactory ;
27
32
import io .serverlessworkflow .impl .json .JsonUtils ;
33
+ import io .serverlessworkflow .impl .jsonschema .DefaultSchemaValidatorFactory ;
34
+ import io .serverlessworkflow .impl .jsonschema .SchemaValidator ;
35
+ import io .serverlessworkflow .impl .jsonschema .SchemaValidatorFactory ;
36
+ import io .serverlessworkflow .resources .DefaultResourceLoader ;
37
+ import io .serverlessworkflow .resources .ResourceLoader ;
28
38
import java .util .Collection ;
29
39
import java .util .Collections ;
30
40
import java .util .HashSet ;
31
41
import java .util .List ;
32
42
import java .util .Map ;
43
+ import java .util .Optional ;
33
44
import java .util .concurrent .ConcurrentHashMap ;
34
45
35
46
public class WorkflowDefinition {
36
47
37
48
private WorkflowDefinition (
38
49
Workflow workflow ,
39
50
TaskExecutorFactory taskFactory ,
40
- Collection <WorkflowExecutionListener > listeners ) {
51
+ Collection <WorkflowExecutionListener > listeners ,
52
+ WorkflowFactories factories ) {
41
53
this .workflow = workflow ;
42
54
this .taskFactory = taskFactory ;
43
55
this .listeners = listeners ;
56
+ this .factories = factories ;
57
+ if (workflow .getInput () != null ) {
58
+ Input input = workflow .getInput ();
59
+ this .inputSchemaValidator =
60
+ getSchemaValidator (
61
+ factories .getValidatorFactory (), schemaToNode (factories , input .getSchema ()));
62
+ this .inputFilter = buildWorkflowFilter (factories .getExpressionFactory (), input .getFrom ());
63
+ }
64
+ if (workflow .getOutput () != null ) {
65
+ Output output = workflow .getOutput ();
66
+ this .outputSchemaValidator =
67
+ getSchemaValidator (
68
+ factories .getValidatorFactory (), schemaToNode (factories , output .getSchema ()));
69
+ this .outputFilter = buildWorkflowFilter (factories .getExpressionFactory (), output .getAs ());
70
+ }
44
71
}
45
72
46
73
private final Workflow workflow ;
47
74
private final Collection <WorkflowExecutionListener > listeners ;
48
75
private final TaskExecutorFactory taskFactory ;
76
+ private final WorkflowFactories factories ;
77
+ private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
78
+ private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
79
+ private Optional <WorkflowFilter > inputFilter = Optional .empty ();
80
+ private Optional <WorkflowFilter > outputFilter = Optional .empty ();
81
+
49
82
private final Map <String , TaskExecutor <? extends TaskBase >> taskExecutors =
50
83
new ConcurrentHashMap <>();
51
84
52
85
public static class Builder {
53
86
private final Workflow workflow ;
54
87
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory .get ();
88
+ private ExpressionFactory exprFactory = JQExpressionFactory .get ();
55
89
private Collection <WorkflowExecutionListener > listeners ;
90
+ private ResourceLoader resourceLoader = DefaultResourceLoader .get ();
91
+ private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory .get ();
56
92
57
93
private Builder (Workflow workflow ) {
58
94
this .workflow = workflow ;
@@ -71,13 +107,29 @@ public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
71
107
return this ;
72
108
}
73
109
110
+ public Builder withExpressionFactory (ExpressionFactory factory ) {
111
+ this .exprFactory = factory ;
112
+ return this ;
113
+ }
114
+
115
+ public Builder withResourceLoader (ResourceLoader resourceLoader ) {
116
+ this .resourceLoader = resourceLoader ;
117
+ return this ;
118
+ }
119
+
120
+ public Builder withSchemaValidatorFactory (SchemaValidatorFactory factory ) {
121
+ this .schemaValidatorFactory = factory ;
122
+ return this ;
123
+ }
124
+
74
125
public WorkflowDefinition build () {
75
126
return new WorkflowDefinition (
76
127
workflow ,
77
128
taskFactory ,
78
129
listeners == null
79
130
? Collections .emptySet ()
80
- : Collections .unmodifiableCollection (listeners ));
131
+ : Collections .unmodifiableCollection (listeners ),
132
+ new WorkflowFactories (resourceLoader , exprFactory , schemaValidatorFactory ));
81
133
}
82
134
}
83
135
@@ -103,9 +155,13 @@ public class WorkflowInstance {
103
155
104
156
private WorkflowInstance (TaskExecutorFactory factory , JsonNode input ) {
105
157
this .output = input ;
106
- this . state = State . STARTED ;
158
+ inputSchemaValidator . ifPresent ( v -> v . validate ( input )) ;
107
159
this .context = WorkflowContext .builder (input ).build ();
160
+ inputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
161
+ this .state = State .STARTED ;
108
162
processDo (workflow .getDo ());
163
+ outputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
164
+ outputSchemaValidator .ifPresent (v -> v .validate (output ));
109
165
}
110
166
111
167
private void processDo (List <TaskItem > tasks ) {
@@ -118,7 +174,7 @@ private void processDo(List<TaskItem> tasks) {
118
174
taskExecutors
119
175
.computeIfAbsent (
120
176
context .position ().jsonPointer (),
121
- k -> taskFactory .getTaskExecutor (task .getTask ()))
177
+ k -> taskFactory .getTaskExecutor (task .getTask (), factories ))
122
178
.apply (context , output );
123
179
listeners .forEach (l -> l .onTaskEnded (context .position (), task .getTask ()));
124
180
context .position ().back ().back ();
0 commit comments