@@ -19,15 +19,8 @@ package com.squareup.workflow
19
19
20
20
import com.squareup.workflow.WorkflowHost.Factory
21
21
import com.squareup.workflow.WorkflowHost.Update
22
- import com.squareup.workflow.internal.WorkflowId
23
- import com.squareup.workflow.internal.WorkflowNode
24
- import com.squareup.workflow.internal.id
25
- import kotlinx.coroutines.CancellationException
26
- import kotlinx.coroutines.cancel
22
+ import com.squareup.workflow.internal.RealWorkflowHost
27
23
import kotlinx.coroutines.channels.ReceiveChannel
28
- import kotlinx.coroutines.channels.produce
29
- import kotlinx.coroutines.isActive
30
- import kotlinx.coroutines.selects.select
31
24
import org.jetbrains.annotations.TestOnly
32
25
import kotlin.coroutines.CoroutineContext
33
26
import kotlin.coroutines.EmptyCoroutineContext
@@ -37,7 +30,7 @@ import kotlin.coroutines.EmptyCoroutineContext
37
30
*
38
31
* Create these by injecting a [Factory] and calling [run][Factory.run].
39
32
*/
40
- interface WorkflowHost <out OutputT : Any , out RenderingT : Any > {
33
+ interface WorkflowHost <in InputT : Any , out OutputT : Any , out RenderingT : Any > {
41
34
42
35
/* *
43
36
* Output from a [WorkflowHost]. Emitted from [WorkflowHost.updates] after every compose pass.
@@ -55,6 +48,8 @@ interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
55
48
*/
56
49
val updates: ReceiveChannel <Update <OutputT , RenderingT >>
57
50
51
+ fun setInput (input : InputT )
52
+
58
53
/* *
59
54
* Inject one of these to start root [Workflow]s.
60
55
*/
@@ -78,13 +73,14 @@ interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
78
73
input : InputT ,
79
74
snapshot : Snapshot ? = null,
80
75
context : CoroutineContext = EmptyCoroutineContext
81
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, input, snapshot, context)
76
+ ): WorkflowHost <InputT , OutputT , RenderingT > =
77
+ RealWorkflowHost (workflow.asStatefulWorkflow(), baseContext + context, input, snapshot)
82
78
83
79
fun <OutputT : Any , RenderingT : Any > run (
84
80
workflow : Workflow <Unit , OutputT , RenderingT >,
85
81
snapshot : Snapshot ? = null,
86
82
context : CoroutineContext = EmptyCoroutineContext
87
- ): WorkflowHost <OutputT , RenderingT > = run (workflow.id(), workflow, Unit , snapshot, context)
83
+ ): WorkflowHost <Unit , OutputT , RenderingT > = run (workflow, Unit , snapshot, context)
88
84
89
85
/* *
90
86
* Creates a [WorkflowHost] that runs [workflow] starting from [initialState].
@@ -99,63 +95,8 @@ interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
99
95
workflow : StatefulWorkflow <InputT , StateT , OutputT , RenderingT >,
100
96
input : InputT ,
101
97
initialState : StateT
102
- ): WorkflowHost <OutputT , RenderingT > {
103
- val workflowId = workflow.id()
104
- return object : WorkflowHost <OutputT , RenderingT > {
105
- val node = WorkflowNode (workflowId, workflow, input, null , baseContext, initialState)
106
- override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
107
- node.start(workflow, input)
108
- }
109
- }
110
-
111
- internal fun <InputT : Any , OutputT : Any , RenderingT : Any > run (
112
- id : WorkflowId <InputT , OutputT , RenderingT >,
113
- workflow : Workflow <InputT , OutputT , RenderingT >,
114
- input : InputT ,
115
- snapshot : Snapshot ? ,
116
- context : CoroutineContext
117
- ): WorkflowHost <OutputT , RenderingT > = object : WorkflowHost <OutputT , RenderingT > {
118
- val node = WorkflowNode (
119
- id = id,
120
- workflow = workflow.asStatefulWorkflow(),
121
- initialInput = input,
122
- snapshot = snapshot,
123
- baseContext = baseContext + context
124
- )
125
- override val updates: ReceiveChannel <Update <OutputT , RenderingT >> =
126
- node.start(workflow.asStatefulWorkflow(), input)
127
- }
128
- }
129
- }
130
-
131
- /* *
132
- * Starts the coroutine that runs the coroutine loop.
133
- */
134
- internal fun <I : Any , O : Any , R : Any > WorkflowNode <I , * , O , R >.start (
135
- workflow : StatefulWorkflow <I , * , O , R >,
136
- input : I
137
- ): ReceiveChannel <Update <O , R >> = produce(capacity = 0 ) {
138
- try {
139
- var output: O ? = null
140
- while (isActive) {
141
- val rendering = compose(workflow, input)
142
- val snapshot = snapshot(workflow)
143
- send(Update (rendering, snapshot, output))
144
- // Tick _might_ return an output, but if it returns null, it means the state or a child
145
- // probably changed, so we should re-compose/snapshot and emit again.
146
- output = select {
147
- tick(this ) { it }
148
- }
98
+ ): WorkflowHost <InputT , OutputT , RenderingT > {
99
+ return RealWorkflowHost (workflow, baseContext, input, null , initialState)
149
100
}
150
- } catch (e: Throwable ) {
151
- // For some reason the exception gets masked if we don't explicitly pass it to cancel the
152
- // producer coroutine ourselves here.
153
- coroutineContext.cancel(if (e is CancellationException ) e else CancellationException (null , e))
154
- throw e
155
- } finally {
156
- // There's a potential race condition if the producer coroutine is cancelled before it has a chance
157
- // to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we actually
158
- // see this cause problems, I'm not too worried about it.
159
- cancel()
160
101
}
161
102
}
0 commit comments