@@ -19,25 +19,28 @@ package com.squareup.workflow
19
19
20
20
import com.squareup.workflow.WorkflowHost.Factory
21
21
import com.squareup.workflow.WorkflowHost.Update
22
- import com.squareup.workflow.internal.WorkflowId
23
22
import com.squareup.workflow.internal.WorkflowNode
24
23
import com.squareup.workflow.internal.id
25
24
import kotlinx.coroutines.CancellationException
25
+ import kotlinx.coroutines.CoroutineScope
26
+ import kotlinx.coroutines.GlobalScope
26
27
import kotlinx.coroutines.cancel
28
+ import kotlinx.coroutines.channels.Channel
27
29
import kotlinx.coroutines.channels.ReceiveChannel
28
30
import kotlinx.coroutines.channels.produce
29
- import kotlinx.coroutines.isActive
31
+ import kotlinx.coroutines.ensureActive
30
32
import kotlinx.coroutines.selects.select
31
33
import org.jetbrains.annotations.TestOnly
32
34
import kotlin.coroutines.CoroutineContext
33
35
import kotlin.coroutines.EmptyCoroutineContext
36
+ import kotlin.coroutines.coroutineContext
34
37
35
38
/* *
36
39
* Provides a stream of [updates][Update] from a tree of [Workflow]s.
37
40
*
38
41
* Create these by injecting a [Factory] and calling [run][Factory.run].
39
42
*/
40
- interface WorkflowHost <out OutputT : Any , out RenderingT : Any > {
43
+ interface WorkflowHost <in InputT : Any , out OutputT : Any , out RenderingT : Any > {
41
44
42
45
/* *
43
46
* Output from a [WorkflowHost]. Emitted from [WorkflowHost.updates] after every compose pass.
@@ -63,88 +66,132 @@ interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
63
66
/* *
64
67
* Creates a [WorkflowHost] to run [workflow].
65
68
*
66
- * The workflow's initial state is determined by passing [input] to
67
- * [StatefulWorkflow.initialState].
69
+ * The workflow's initial state is determined by passing the first value emitted by [inputs] to
70
+ * [StatefulWorkflow.initialState]. Subsequent values emitted from [inputs] will be used to
71
+ * re-render the workflow.
68
72
*
69
73
* @param workflow The workflow to start.
70
- * @param input Passed to [StatefulWorkflow.initialState] to determine the root workflow's
71
- * initial state. If [InputT] is `Unit`, you can just omit this argument.
74
+ * @param inputs Passed to [StatefulWorkflow.initialState] to determine the root workflow's
75
+ * initial state, and to pass input updates to the root workflow.
76
+ * If [InputT] is `Unit`, you can just omit this argument.
72
77
* @param snapshot If not null, used to restore the workflow.
73
78
* @param context The [CoroutineContext] used to run the workflow tree. Added to the [Factory]'s
74
79
* context.
75
80
*/
76
81
fun <InputT : Any , OutputT : Any , RenderingT : Any > run (
77
82
workflow : Workflow <InputT , OutputT , RenderingT >,
78
- input : InputT ,
83
+ inputs : ReceiveChannel < InputT > ,
79
84
snapshot : Snapshot ? = null,
80
85
context : CoroutineContext = EmptyCoroutineContext
81
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, input, snapshot, context)
86
+ ): WorkflowHost <InputT , OutputT , RenderingT > =
87
+ object : WorkflowHost <InputT , OutputT , RenderingT > {
88
+ private val scope = CoroutineScope (context)
89
+
90
+ override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
91
+ scope.produce(capacity = 0 ) {
92
+ runWorkflowTree(
93
+ workflow = workflow.asStatefulWorkflow(),
94
+ inputs = inputs,
95
+ initialSnapshot = snapshot,
96
+ onUpdate = ::send
97
+ )
98
+ }
99
+ }
82
100
83
101
fun <OutputT : Any , RenderingT : Any > run (
84
102
workflow : Workflow <Unit , OutputT , RenderingT >,
85
103
snapshot : Snapshot ? = null,
86
104
context : CoroutineContext = EmptyCoroutineContext
87
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, Unit , snapshot, context)
105
+ ): WorkflowHost <Unit , OutputT , RenderingT > = run (workflow, channelOf( Unit ) , snapshot, context)
88
106
89
107
/* *
90
108
* Creates a [WorkflowHost] that runs [workflow] starting from [initialState].
91
109
*
92
110
* **Don't call this directly.**
93
111
*
94
- * Instead, your module should have a test dependency on `pure-v2-testing` and you should call the
95
- * testing extension method defined there on your workflow itself.
112
+ * Instead, your module should have a test dependency on `pure-v2-testing` and you should call
113
+ * the testing extension method defined there on your workflow itself.
96
114
*/
97
115
@TestOnly
98
116
fun <InputT : Any , StateT : Any , OutputT : Any , RenderingT : Any > runTestFromState (
99
117
workflow : StatefulWorkflow <InputT , StateT , OutputT , RenderingT >,
100
118
input : InputT ,
101
119
initialState : StateT
102
- ): WorkflowHost <OutputT , RenderingT > {
103
- val workflowId = workflow.id()
104
- return object : WorkflowHost <OutputT , RenderingT > {
105
- val node = WorkflowNode (workflowId, workflow, input, null , baseContext, initialState)
120
+ ): WorkflowHost <InputT , OutputT , RenderingT > =
121
+ object : WorkflowHost <InputT , OutputT , RenderingT > {
106
122
override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
107
- node.start(workflow, input)
123
+ GlobalScope .produce(capacity = 0 , context = baseContext) {
124
+ runWorkflowTree(
125
+ workflow = workflow.asStatefulWorkflow(),
126
+ inputs = channelOf(input),
127
+ initialSnapshot = null ,
128
+ initialState = initialState,
129
+ onUpdate = ::send
130
+ )
131
+ }
108
132
}
109
- }
110
133
111
- internal fun <InputT : Any , OutputT : Any , RenderingT : Any > run (
112
- id : WorkflowId <InputT , OutputT , RenderingT >,
113
- workflow : Workflow <InputT , OutputT , RenderingT >,
114
- input : InputT ,
115
- snapshot : Snapshot ? ,
116
- context : CoroutineContext
117
- ): WorkflowHost <OutputT , RenderingT > = object : WorkflowHost <OutputT , RenderingT > {
118
- val node = WorkflowNode (
119
- id = id,
120
- workflow = workflow.asStatefulWorkflow(),
121
- initialInput = input,
122
- snapshot = snapshot,
123
- baseContext = baseContext + context
124
- )
125
- override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
126
- node.start(workflow.asStatefulWorkflow(), input)
127
- }
134
+ private fun <T > channelOf (value : T ) = Channel <T >(capacity = 1 )
135
+ .apply { offer(value) }
128
136
}
129
137
}
130
138
131
139
/* *
132
- * Starts the coroutine that runs the coroutine loop.
140
+ * Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
141
+ * updates by calling [onUpdate].
142
+ *
143
+ * This function is the lowest-level entry point into the runtime. Don't call this directly, instead
144
+ * use [WorkflowHost.Factory] to create a [WorkflowHost], or one of the stream operators for your
145
+ * favorite Rx library to map a stream of [InputT]s into [Update]s.
133
146
*/
134
- internal fun <I : Any , O : Any , R : Any > WorkflowNode <I , * , O , R >.start (
135
- workflow : StatefulWorkflow <I , * , O , R >,
136
- input : I
137
- ): ReceiveChannel <Update <O , R >> = produce(capacity = 0 ) {
147
+ suspend fun <InputT : Any , StateT : Any , OutputT : Any , RenderingT : Any > runWorkflowTree (
148
+ workflow : StatefulWorkflow <InputT , StateT , OutputT , RenderingT >,
149
+ inputs : ReceiveChannel <InputT >,
150
+ initialSnapshot : Snapshot ? ,
151
+ initialState : StateT ? = null,
152
+ onUpdate : suspend (Update <OutputT , RenderingT >) -> Unit
153
+ ): Nothing {
154
+ var output: OutputT ? = null
155
+ var input: InputT = inputs.receive()
156
+ var inputsClosed = false
157
+ val rootNode = WorkflowNode (
158
+ id = workflow.id(),
159
+ workflow = workflow,
160
+ initialInput = input,
161
+ snapshot = initialSnapshot,
162
+ baseContext = coroutineContext,
163
+ initialState = initialState
164
+ )
165
+
138
166
try {
139
- var output: O ? = null
140
- while (isActive) {
141
- val rendering = compose(workflow, input)
142
- val snapshot = snapshot(workflow)
143
- send(Update (rendering, snapshot, output))
167
+ while (true ) {
168
+ coroutineContext.ensureActive()
169
+
170
+ val rendering = rootNode.compose(workflow, input)
171
+ val snapshot = rootNode.snapshot(workflow)
172
+
173
+ onUpdate(Update (rendering, snapshot, output))
174
+
144
175
// Tick _might_ return an output, but if it returns null, it means the state or a child
145
176
// probably changed, so we should re-compose/snapshot and emit again.
146
177
output = select {
147
- tick(this ) { it }
178
+ // Stop trying to read from the inputs channel after it's closed.
179
+ if (! inputsClosed) {
180
+ @Suppress(" EXPERIMENTAL_API_USAGE" )
181
+ inputs.onReceiveOrNull { newInput ->
182
+ if (newInput == null ) {
183
+ inputsClosed = true
184
+ } else {
185
+ input = newInput
186
+ }
187
+ // No output. Returning from the select will go to the top of the loop to do another
188
+ // compose pass.
189
+ return @onReceiveOrNull null
190
+ }
191
+ }
192
+
193
+ // Tick the workflow tree.
194
+ rootNode.tick(this ) { it }
148
195
}
149
196
}
150
197
} catch (e: Throwable ) {
@@ -153,9 +200,9 @@ internal fun <I : Any, O : Any, R : Any> WorkflowNode<I, *, O, R>.start(
153
200
coroutineContext.cancel(if (e is CancellationException ) e else CancellationException (null , e))
154
201
throw e
155
202
} finally {
156
- // There's a potential race condition if the producer coroutine is cancelled before it has a chance
157
- // to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we actually
158
- // see this cause problems, I'm not too worried about it.
159
- cancel()
203
+ // There's a potential race condition if the producer coroutine is cancelled before it has a
204
+ // chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
205
+ // actually see this cause problems, I'm not too worried about it.
206
+ rootNode. cancel()
160
207
}
161
208
}
0 commit comments