15
15
*/
16
16
package io .serverlessworkflow .impl ;
17
17
18
+ import static io .serverlessworkflow .impl .WorkflowUtils .*;
18
19
import static io .serverlessworkflow .impl .json .JsonUtils .*;
19
20
20
21
import com .fasterxml .jackson .databind .JsonNode ;
22
+ import io .serverlessworkflow .api .types .Input ;
23
+ import io .serverlessworkflow .api .types .Output ;
21
24
import io .serverlessworkflow .api .types .TaskBase ;
22
25
import io .serverlessworkflow .api .types .TaskItem ;
23
26
import io .serverlessworkflow .api .types .Workflow ;
24
27
import io .serverlessworkflow .impl .executors .DefaultTaskExecutorFactory ;
25
28
import io .serverlessworkflow .impl .executors .TaskExecutor ;
26
29
import io .serverlessworkflow .impl .executors .TaskExecutorFactory ;
30
+ import io .serverlessworkflow .impl .expressions .ExpressionFactory ;
31
+ import io .serverlessworkflow .impl .expressions .JQExpressionFactory ;
27
32
import io .serverlessworkflow .impl .json .JsonUtils ;
33
+ import io .serverlessworkflow .impl .jsonschema .DefaultSchemaValidatorFactory ;
34
+ import io .serverlessworkflow .impl .jsonschema .SchemaValidator ;
35
+ import io .serverlessworkflow .impl .jsonschema .SchemaValidatorFactory ;
36
+ import io .serverlessworkflow .resources .DefaultResourceLoader ;
37
+ import io .serverlessworkflow .resources .ResourceLoader ;
28
38
import java .util .Collection ;
29
39
import java .util .Collections ;
30
40
import java .util .HashSet ;
31
41
import java .util .List ;
32
42
import java .util .Map ;
43
+ import java .util .Optional ;
33
44
import java .util .concurrent .ConcurrentHashMap ;
34
45
35
46
public class WorkflowDefinition {
36
47
37
48
private WorkflowDefinition (
38
49
Workflow workflow ,
39
- TaskExecutorFactory taskFactory ,
40
- Collection < WorkflowExecutionListener > listeners ) {
50
+ Collection < WorkflowExecutionListener > listeners ,
51
+ WorkflowFactories factories ) {
41
52
this .workflow = workflow ;
42
- this .taskFactory = taskFactory ;
43
53
this .listeners = listeners ;
54
+ this .factories = factories ;
55
+ if (workflow .getInput () != null ) {
56
+ Input input = workflow .getInput ();
57
+ this .inputSchemaValidator =
58
+ getSchemaValidator (
59
+ factories .getValidatorFactory (), schemaToNode (factories , input .getSchema ()));
60
+ this .inputFilter = buildWorkflowFilter (factories .getExpressionFactory (), input .getFrom ());
61
+ }
62
+ if (workflow .getOutput () != null ) {
63
+ Output output = workflow .getOutput ();
64
+ this .outputSchemaValidator =
65
+ getSchemaValidator (
66
+ factories .getValidatorFactory (), schemaToNode (factories , output .getSchema ()));
67
+ this .outputFilter = buildWorkflowFilter (factories .getExpressionFactory (), output .getAs ());
68
+ }
44
69
}
45
70
46
71
private final Workflow workflow ;
47
72
private final Collection <WorkflowExecutionListener > listeners ;
48
- private final TaskExecutorFactory taskFactory ;
73
+ private final WorkflowFactories factories ;
74
+ private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
75
+ private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
76
+ private Optional <WorkflowFilter > inputFilter = Optional .empty ();
77
+ private Optional <WorkflowFilter > outputFilter = Optional .empty ();
78
+
49
79
private final Map <String , TaskExecutor <? extends TaskBase >> taskExecutors =
50
80
new ConcurrentHashMap <>();
51
81
52
82
public static class Builder {
53
83
private final Workflow workflow ;
54
84
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory .get ();
85
+ private ExpressionFactory exprFactory = JQExpressionFactory .get ();
55
86
private Collection <WorkflowExecutionListener > listeners ;
87
+ private ResourceLoader resourceLoader = DefaultResourceLoader .get ();
88
+ private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory .get ();
56
89
57
90
private Builder (Workflow workflow ) {
58
91
this .workflow = workflow ;
@@ -71,13 +104,28 @@ public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
71
104
return this ;
72
105
}
73
106
107
+ public Builder withExpressionFactory (ExpressionFactory factory ) {
108
+ this .exprFactory = factory ;
109
+ return this ;
110
+ }
111
+
112
+ public Builder withResourceLoader (ResourceLoader resourceLoader ) {
113
+ this .resourceLoader = resourceLoader ;
114
+ return this ;
115
+ }
116
+
117
+ public Builder withSchemaValidatorFactory (SchemaValidatorFactory factory ) {
118
+ this .schemaValidatorFactory = factory ;
119
+ return this ;
120
+ }
121
+
74
122
public WorkflowDefinition build () {
75
123
return new WorkflowDefinition (
76
124
workflow ,
77
- taskFactory ,
78
125
listeners == null
79
126
? Collections .emptySet ()
80
- : Collections .unmodifiableCollection (listeners ));
127
+ : Collections .unmodifiableCollection (listeners ),
128
+ new WorkflowFactories (taskFactory , resourceLoader , exprFactory , schemaValidatorFactory ));
81
129
}
82
130
}
83
131
@@ -86,7 +134,7 @@ public static Builder builder(Workflow workflow) {
86
134
}
87
135
88
136
public WorkflowInstance execute (Object input ) {
89
- return new WorkflowInstance (taskFactory , JsonUtils .fromValue (input ));
137
+ return new WorkflowInstance (JsonUtils .fromValue (input ));
90
138
}
91
139
92
140
enum State {
@@ -101,11 +149,15 @@ public class WorkflowInstance {
101
149
private State state ;
102
150
private WorkflowContext context ;
103
151
104
- private WorkflowInstance (TaskExecutorFactory factory , JsonNode input ) {
152
+ private WorkflowInstance (JsonNode input ) {
105
153
this .output = input ;
106
- this . state = State . STARTED ;
154
+ inputSchemaValidator . ifPresent ( v -> v . validate ( input )) ;
107
155
this .context = WorkflowContext .builder (input ).build ();
156
+ inputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
157
+ this .state = State .STARTED ;
108
158
processDo (workflow .getDo ());
159
+ outputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
160
+ outputSchemaValidator .ifPresent (v -> v .validate (output ));
109
161
}
110
162
111
163
private void processDo (List <TaskItem > tasks ) {
@@ -118,7 +170,7 @@ private void processDo(List<TaskItem> tasks) {
118
170
taskExecutors
119
171
.computeIfAbsent (
120
172
context .position ().jsonPointer (),
121
- k -> taskFactory . getTaskExecutor (task .getTask ()))
173
+ k -> factories . getTaskFactory (). getTaskExecutor (task .getTask (), factories ))
122
174
.apply (context , output );
123
175
listeners .forEach (l -> l .onTaskEnded (context .position (), task .getTask ()));
124
176
context .position ().back ().back ();
0 commit comments