@@ -19,24 +19,29 @@ package com.squareup.workflow
19
19
20
20
import com.squareup.workflow.WorkflowHost.Factory
21
21
import com.squareup.workflow.WorkflowHost.Update
22
- import com.squareup.workflow.internal.WorkflowId
23
22
import com.squareup.workflow.internal.WorkflowNode
24
23
import com.squareup.workflow.internal.id
24
+ import kotlinx.coroutines.CoroutineScope
25
+ import kotlinx.coroutines.GlobalScope
26
+ import kotlinx.coroutines.InternalCoroutinesApi
27
+ import kotlinx.coroutines.Job
28
+ import kotlinx.coroutines.NonCancellable.isActive
25
29
import kotlinx.coroutines.cancel
30
+ import kotlinx.coroutines.channels.Channel
26
31
import kotlinx.coroutines.channels.ReceiveChannel
27
32
import kotlinx.coroutines.channels.produce
28
- import kotlinx.coroutines.isActive
29
33
import kotlinx.coroutines.selects.select
30
34
import org.jetbrains.annotations.TestOnly
31
35
import kotlin.coroutines.CoroutineContext
32
36
import kotlin.coroutines.EmptyCoroutineContext
37
+ import kotlin.coroutines.coroutineContext
33
38
34
39
/* *
35
40
* Provides a stream of [updates][Update] from a tree of [Workflow]s.
36
41
*
37
42
* Create these by injecting a [Factory] and calling [run][Factory.run].
38
43
*/
39
- interface WorkflowHost <out OutputT : Any , out RenderingT : Any > {
44
+ interface WorkflowHost <in InputT : Any , out OutputT : Any , out RenderingT : Any > {
40
45
41
46
/* *
42
47
* Output from a [WorkflowHost]. Emitted from [WorkflowHost.updates] after every compose pass.
@@ -62,88 +67,134 @@ interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
62
67
/* *
63
68
* Creates a [WorkflowHost] to run [workflow].
64
69
*
65
- * The workflow's initial state is determined by passing [input] to
66
- * [StatefulWorkflow.initialState].
70
+ * The workflow's initial state is determined by passing the first value emitted by [inputs] to
71
+ * [StatefulWorkflow.initialState]. Subsequent values emitted from [inputs] will be used to
72
+ * re-render the workflow.
67
73
*
68
74
* @param workflow The workflow to start.
69
- * @param input Passed to [StatefulWorkflow.initialState] to determine the root workflow's
70
- * initial state. If [InputT] is `Unit`, you can just omit this argument.
75
+ * @param inputs Passed to [StatefulWorkflow.initialState] to determine the root workflow's
76
+ * initial state, and to pass input updates to the root workflow.
77
+ * If [InputT] is `Unit`, you can just omit this argument.
71
78
* @param snapshot If not null, used to restore the workflow.
72
79
* @param context The [CoroutineContext] used to run the workflow tree. Added to the [Factory]'s
73
80
* context.
74
81
*/
75
82
fun <InputT : Any , OutputT : Any , RenderingT : Any > run (
76
83
workflow : Workflow <InputT , OutputT , RenderingT >,
77
- input : InputT ,
84
+ inputs : ReceiveChannel < InputT > ,
78
85
snapshot : Snapshot ? = null,
79
86
context : CoroutineContext = EmptyCoroutineContext
80
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, input, snapshot, context)
87
+ ): WorkflowHost <InputT , OutputT , RenderingT > =
88
+ object : WorkflowHost <InputT , OutputT , RenderingT > {
89
+ private val scope = CoroutineScope (context)
90
+
91
+ override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
92
+ scope.produce(capacity = 0 ) {
93
+ runWorkflowTree(
94
+ workflow = workflow.asStatefulWorkflow(),
95
+ inputs = inputs,
96
+ initialSnapshot = snapshot,
97
+ onUpdate = ::send
98
+ )
99
+ }
100
+ }
81
101
82
102
fun <OutputT : Any , RenderingT : Any > run (
83
103
workflow : Workflow <Unit , OutputT , RenderingT >,
84
104
snapshot : Snapshot ? = null,
85
105
context : CoroutineContext = EmptyCoroutineContext
86
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, Unit , snapshot, context)
106
+ ): WorkflowHost <Unit , OutputT , RenderingT > = run (workflow, channelOf( Unit ) , snapshot, context)
87
107
88
108
/* *
89
109
* Creates a [WorkflowHost] that runs [workflow] starting from [initialState].
90
110
*
91
111
* **Don't call this directly.**
92
112
*
93
- * Instead, your module should have a test dependency on `pure-v2-testing` and you should call the
94
- * testing extension method defined there on your workflow itself.
113
+ * Instead, your module should have a test dependency on `pure-v2-testing` and you should call
114
+ * the testing extension method defined there on your workflow itself.
95
115
*/
96
116
@TestOnly
97
117
fun <InputT : Any , StateT : Any , OutputT : Any , RenderingT : Any > runTestFromState (
98
118
workflow : StatefulWorkflow <InputT , StateT , OutputT , RenderingT >,
99
119
input : InputT ,
100
120
initialState : StateT
101
- ): WorkflowHost <OutputT , RenderingT > {
102
- val workflowId = workflow.id()
103
- return object : WorkflowHost <OutputT , RenderingT > {
104
- val node = WorkflowNode (workflowId, workflow, input, null , baseContext, initialState)
121
+ ): WorkflowHost <InputT , OutputT , RenderingT > =
122
+ object : WorkflowHost <InputT , OutputT , RenderingT > {
105
123
override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
106
- node.start(workflow, input)
124
+ GlobalScope .produce(capacity = 0 , context = baseContext) {
125
+ runWorkflowTree(
126
+ workflow = workflow.asStatefulWorkflow(),
127
+ inputs = channelOf(input),
128
+ initialSnapshot = null ,
129
+ initialState = initialState,
130
+ onUpdate = ::send
131
+ )
132
+ }
107
133
}
108
- }
109
134
110
- internal fun <InputT : Any , OutputT : Any , RenderingT : Any > run (
111
- id : WorkflowId <InputT , OutputT , RenderingT >,
112
- workflow : Workflow <InputT , OutputT , RenderingT >,
113
- input : InputT ,
114
- snapshot : Snapshot ? ,
115
- context : CoroutineContext
116
- ): WorkflowHost <OutputT , RenderingT > = object : WorkflowHost <OutputT , RenderingT > {
117
- val node = WorkflowNode (
118
- id = id,
119
- workflow = workflow.asStatefulWorkflow(),
120
- initialInput = input,
121
- snapshot = snapshot,
122
- baseContext = baseContext + context
123
- )
124
- override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
125
- node.start(workflow.asStatefulWorkflow(), input)
126
- }
135
+ private fun <T > channelOf (value : T ) = Channel <T >(capacity = 1 )
136
+ .apply { offer(value) }
127
137
}
128
138
}
129
139
130
140
/* *
131
- * Starts the coroutine that runs the coroutine loop.
141
+ * Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
142
+ * updates by calling [onUpdate].
143
+ *
144
+ * This function is the lowest-level entry point into the runtime. Don't call this directly, instead
145
+ * use [WorkflowHost.Factory] to create a [WorkflowHost], or one of the stream operators for your
146
+ * favorite Rx library to map a stream of [InputT]s into [Update]s.
132
147
*/
133
- internal fun <I : Any , O : Any , R : Any > WorkflowNode <I , * , O , R >.start (
134
- workflow : StatefulWorkflow <I , * , O , R >,
135
- input : I
136
- ): ReceiveChannel <Update <O , R >> = produce(capacity = 0 ) {
148
+ @UseExperimental(InternalCoroutinesApi ::class )
149
+ suspend fun <InputT : Any , StateT : Any , OutputT : Any , RenderingT : Any > runWorkflowTree (
150
+ workflow : StatefulWorkflow <InputT , StateT , OutputT , RenderingT >,
151
+ inputs : ReceiveChannel <InputT >,
152
+ initialSnapshot : Snapshot ? ,
153
+ initialState : StateT ? = null,
154
+ onUpdate : suspend (Update <OutputT , RenderingT >) -> Unit
155
+ ): Nothing {
156
+ var output: OutputT ? = null
157
+ var input: InputT = inputs.receive()
158
+ var inputsClosed = false
159
+ val rootNode = WorkflowNode (
160
+ id = workflow.id(),
161
+ workflow = workflow,
162
+ initialInput = input,
163
+ snapshot = initialSnapshot,
164
+ baseContext = coroutineContext,
165
+ initialState = initialState
166
+ )
167
+
137
168
try {
138
- var output: O ? = null
139
- while (isActive) {
140
- val rendering = compose(workflow, input)
141
- val snapshot = snapshot(workflow)
142
- send(Update (rendering, snapshot, output))
169
+ while (true ) {
170
+ // Manually implement `ensureActive()` until we're on coroutines 1.2.x.
171
+ if (! isActive) throw coroutineContext[Job ]!! .getCancellationException()
172
+
173
+ val rendering = rootNode.compose(workflow, input)
174
+ val snapshot = rootNode.snapshot(workflow)
175
+
176
+ onUpdate(Update (rendering, snapshot, output))
177
+
143
178
// Tick _might_ return an output, but if it returns null, it means the state or a child
144
179
// probably changed, so we should re-compose/snapshot and emit again.
145
180
output = select {
146
- tick(this ) { it }
181
+ // Stop trying to read from the inputs channel after it's closed.
182
+ if (! inputsClosed) {
183
+ @Suppress(" EXPERIMENTAL_API_USAGE" )
184
+ inputs.onReceiveOrNull { newInput ->
185
+ if (newInput == null ) {
186
+ inputsClosed = true
187
+ } else {
188
+ input = newInput
189
+ }
190
+ // No output. Returning from the select will go to the top of the loop to do another
191
+ // compose pass.
192
+ return @onReceiveOrNull null
193
+ }
194
+ }
195
+
196
+ // Tick the workflow tree.
197
+ rootNode.tick(this ) { it }
147
198
}
148
199
}
149
200
} catch (e: Throwable ) {
@@ -154,9 +205,9 @@ internal fun <I : Any, O : Any, R : Any> WorkflowNode<I, *, O, R>.start(
154
205
coroutineContext.cancel(e)
155
206
throw e
156
207
} finally {
157
- // There's a potential race condition if the producer coroutine is cancelled before it has a chance
158
- // to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we actually
159
- // see this cause problems, I'm not too worried about it.
160
- cancel()
208
+ // There's a potential race condition if the producer coroutine is cancelled before it has a
209
+ // chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
210
+ // actually see this cause problems, I'm not too worried about it.
211
+ rootNode. cancel()
161
212
}
162
213
}
0 commit comments