@@ -19,25 +19,28 @@ package com.squareup.workflow
19
19
20
20
import com.squareup.workflow.WorkflowHost.Factory
21
21
import com.squareup.workflow.WorkflowHost.Update
22
- import com.squareup.workflow.internal.WorkflowId
23
22
import com.squareup.workflow.internal.WorkflowNode
24
23
import com.squareup.workflow.internal.id
25
24
import kotlinx.coroutines.CancellationException
25
+ import kotlinx.coroutines.CoroutineScope
26
+ import kotlinx.coroutines.GlobalScope
26
27
import kotlinx.coroutines.cancel
28
+ import kotlinx.coroutines.channels.Channel
27
29
import kotlinx.coroutines.channels.ReceiveChannel
28
30
import kotlinx.coroutines.channels.produce
29
- import kotlinx.coroutines.isActive
31
+ import kotlinx.coroutines.ensureActive
30
32
import kotlinx.coroutines.selects.select
31
33
import org.jetbrains.annotations.TestOnly
32
34
import kotlin.coroutines.CoroutineContext
33
35
import kotlin.coroutines.EmptyCoroutineContext
36
+ import kotlin.coroutines.coroutineContext
34
37
35
38
/* *
36
39
* Provides a stream of [updates][Update] from a tree of [Workflow]s.
37
40
*
38
41
* Create these by injecting a [Factory] and calling [run][Factory.run].
39
42
*/
40
- interface WorkflowHost <out OutputT : Any , out RenderingT : Any > {
43
+ interface WorkflowHost <in InputT : Any , out OutputT : Any , out RenderingT : Any > {
41
44
42
45
/* *
43
46
* Output from a [WorkflowHost]. Emitted from [WorkflowHost.updates] after every compose pass.
@@ -75,76 +78,118 @@ interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
75
78
*/
76
79
fun <InputT : Any , OutputT : Any , RenderingT : Any > run (
77
80
workflow : Workflow <InputT , OutputT , RenderingT >,
78
- input : InputT ,
81
+ inputs : ReceiveChannel < InputT > ,
79
82
snapshot : Snapshot ? = null,
80
83
context : CoroutineContext = EmptyCoroutineContext
81
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, input, snapshot, context)
84
+ ): WorkflowHost <InputT , OutputT , RenderingT > =
85
+ object : WorkflowHost <InputT , OutputT , RenderingT > {
86
+ private val scope = CoroutineScope (context)
87
+
88
+ override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
89
+ scope.produce(capacity = 0 ) {
90
+ runWorkflowTree(
91
+ workflow = workflow.asStatefulWorkflow(),
92
+ inputs = inputs,
93
+ initialSnapshot = snapshot,
94
+ onUpdate = ::send
95
+ )
96
+ }
97
+ }
82
98
83
99
fun <OutputT : Any , RenderingT : Any > run (
84
100
workflow : Workflow <Unit , OutputT , RenderingT >,
85
101
snapshot : Snapshot ? = null,
86
102
context : CoroutineContext = EmptyCoroutineContext
87
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, Unit , snapshot, context)
103
+ ): WorkflowHost <Unit , OutputT , RenderingT > = run (workflow, channelOf( Unit ) , snapshot, context)
88
104
89
105
/* *
90
106
* Creates a [WorkflowHost] that runs [workflow] starting from [initialState].
91
107
*
92
108
* **Don't call this directly.**
93
109
*
94
- * Instead, your module should have a test dependency on `pure-v2-testing` and you should call the
95
- * testing extension method defined there on your workflow itself.
110
+ * Instead, your module should have a test dependency on `pure-v2-testing` and you should call
111
+ * the testing extension method defined there on your workflow itself.
96
112
*/
97
113
@TestOnly
98
114
fun <InputT : Any , StateT : Any , OutputT : Any , RenderingT : Any > runTestFromState (
99
115
workflow : StatefulWorkflow <InputT , StateT , OutputT , RenderingT >,
100
116
input : InputT ,
101
117
initialState : StateT
102
- ): WorkflowHost <OutputT , RenderingT > {
103
- val workflowId = workflow.id()
104
- return object : WorkflowHost <OutputT , RenderingT > {
105
- val node = WorkflowNode (workflowId, workflow, input, null , baseContext, initialState)
118
+ ): WorkflowHost <InputT , OutputT , RenderingT > =
119
+ object : WorkflowHost <InputT , OutputT , RenderingT > {
106
120
override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
107
- node.start(workflow, input)
121
+ GlobalScope .produce(capacity = 0 , context = baseContext) {
122
+ runWorkflowTree(
123
+ workflow = workflow.asStatefulWorkflow(),
124
+ inputs = channelOf(input),
125
+ initialSnapshot = null ,
126
+ initialState = initialState,
127
+ onUpdate = ::send
128
+ )
129
+ }
108
130
}
109
- }
110
131
111
- internal fun <InputT : Any , OutputT : Any , RenderingT : Any > run (
112
- id : WorkflowId <InputT , OutputT , RenderingT >,
113
- workflow : Workflow <InputT , OutputT , RenderingT >,
114
- input : InputT ,
115
- snapshot : Snapshot ? ,
116
- context : CoroutineContext
117
- ): WorkflowHost <OutputT , RenderingT > = object : WorkflowHost <OutputT , RenderingT > {
118
- val node = WorkflowNode (
119
- id = id,
120
- workflow = workflow.asStatefulWorkflow(),
121
- initialInput = input,
122
- snapshot = snapshot,
123
- baseContext = baseContext + context
124
- )
125
- override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
126
- node.start(workflow.asStatefulWorkflow(), input)
127
- }
132
+ private fun <T > channelOf (value : T ) = Channel <T >(capacity = 1 )
133
+ .apply { offer(value) }
128
134
}
129
135
}
130
136
131
137
/* *
132
- * Starts the coroutine that runs the coroutine loop.
138
+ * Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
139
+ * updates by calling [onUpdate].
140
+ *
141
+ * This function is the lowest-level entry point into the runtime. Don't call this directly, instead
142
+ * use [WorkflowHost.Factory] to create a [WorkflowHost], or one of the stream operators for your
143
+ * favorite Rx library to map a stream of [InputT]s into [Update]s.
133
144
*/
134
- internal fun <I : Any , O : Any , R : Any > WorkflowNode <I , * , O , R >.start (
135
- workflow : StatefulWorkflow <I , * , O , R >,
136
- input : I
137
- ): ReceiveChannel <Update <O , R >> = produce(capacity = 0 ) {
145
+ suspend fun <InputT : Any , StateT : Any , OutputT : Any , RenderingT : Any > runWorkflowTree (
146
+ workflow : StatefulWorkflow <InputT , StateT , OutputT , RenderingT >,
147
+ inputs : ReceiveChannel <InputT >,
148
+ initialSnapshot : Snapshot ? ,
149
+ initialState : StateT ? = null,
150
+ onUpdate : suspend (Update <OutputT , RenderingT >) -> Unit
151
+ ): Nothing {
152
+ var output: OutputT ? = null
153
+ var input: InputT = inputs.receive()
154
+ var inputsClosed = false
155
+ val workflowNode = WorkflowNode (
156
+ id = workflow.id(),
157
+ workflow = workflow,
158
+ initialInput = input,
159
+ snapshot = initialSnapshot,
160
+ baseContext = coroutineContext,
161
+ initialState = initialState
162
+ )
163
+
138
164
try {
139
- var output: O ? = null
140
- while (isActive) {
141
- val rendering = compose(workflow, input)
142
- val snapshot = snapshot(workflow)
143
- send(Update (rendering, snapshot, output))
165
+ while (true ) {
166
+ coroutineContext.ensureActive()
167
+
168
+ val rendering = workflowNode.compose(workflow, input)
169
+ val snapshot = workflowNode.snapshot(workflow)
170
+
171
+ onUpdate(Update (rendering, snapshot, output))
172
+
144
173
// Tick _might_ return an output, but if it returns null, it means the state or a child
145
174
// probably changed, so we should re-compose/snapshot and emit again.
146
175
output = select {
147
- tick(this ) { it }
176
+ // Stop trying to read from the inputs channel after it's closed.
177
+ if (! inputsClosed) {
178
+ @Suppress(" EXPERIMENTAL_API_USAGE" )
179
+ inputs.onReceiveOrNull { newInput ->
180
+ if (newInput == null ) {
181
+ inputsClosed = true
182
+ } else {
183
+ input = newInput
184
+ }
185
+ // No output. Returning from the select will go to the top of the loop to do another
186
+ // compose pass.
187
+ return @onReceiveOrNull null
188
+ }
189
+ }
190
+
191
+ // Tick the workflow tree.
192
+ workflowNode.tick(this ) { it }
148
193
}
149
194
}
150
195
} catch (e: Throwable ) {
@@ -153,9 +198,9 @@ internal fun <I : Any, O : Any, R : Any> WorkflowNode<I, *, O, R>.start(
153
198
coroutineContext.cancel(if (e is CancellationException ) e else CancellationException (null , e))
154
199
throw e
155
200
} finally {
156
- // There's a potential race condition if the producer coroutine is cancelled before it has a chance
157
- // to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we actually
158
- // see this cause problems, I'm not too worried about it.
159
- cancel()
201
+ // There's a potential race condition if the producer coroutine is cancelled before it has a
202
+ // chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
203
+ // actually see this cause problems, I'm not too worried about it.
204
+ workflowNode. cancel()
160
205
}
161
206
}
0 commit comments