Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 44e9d41

Browse files
committedApr 16, 2019
Add the ability to update the input for a WorkflowHost.
Closes #247.
1 parent bff3229 commit 44e9d41

File tree

21 files changed

+485
-150
lines changed

21 files changed

+485
-150
lines changed
 

‎kotlin/samples/tictactoe/common/src/test/java/com/squareup/sample/gameworkflow/TakeTurnsWorkflowTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,6 @@ class TakeTurnsWorkflowTest {
6969
}
7070
}
7171

72-
private fun WorkflowTester<*, GamePlayScreen>.takeSquare(event: TakeSquare) {
72+
private fun WorkflowTester<*, *, GamePlayScreen>.takeSquare(event: TakeSquare) {
7373
withNextRendering { it.onEvent(event) }
7474
}

‎kotlin/samples/tictactoe/common/src/test/java/com/squareup/sample/mainworkflow/MainWorkflowTest.kt

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ class MainWorkflowTest {
4040
}
4141

4242
MainWorkflow(authWorkflow, runGameWorkflow()).testFromStart { tester ->
43+
tester.withNextRendering { screen ->
44+
assertThat(screen.panels).containsOnly(DEFAULT_AUTH)
45+
}
4346
tester.withNextRendering { screen ->
4447
assertThat(screen.panels).isEmpty()
4548
assertThat(screen.body).isEqualTo(DEFAULT_RUN_GAME)

‎kotlin/settings.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ include ':samples:tictactoe:common'
2323
include ':workflow-core'
2424
include ':workflow-runtime'
2525
include ':workflow-rx2'
26+
include ':workflow-rx2-runtime'
2627
include ':workflow-testing'
2728
include ':workflow-ui-core'
2829
include ':workflow-ui-android'

‎kotlin/workflow-core/src/main/java/com/squareup/workflow/WorkflowContext.kt

+21-8
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import com.squareup.workflow.util.ChannelUpdate
2121
import com.squareup.workflow.util.ChannelUpdate.Value
2222
import com.squareup.workflow.util.KTypes
2323
import kotlinx.coroutines.CoroutineScope
24+
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
2425
import kotlinx.coroutines.Deferred
26+
import kotlinx.coroutines.channels.Channel
2527
import kotlinx.coroutines.channels.ReceiveChannel
26-
import kotlinx.coroutines.channels.produce
27-
import kotlinx.coroutines.suspendCancellableCoroutine
28+
import kotlinx.coroutines.launch
2829
import kotlin.reflect.KType
2930

3031
/**
@@ -259,6 +260,9 @@ fun <T, StateT : Any, OutputT : Any> WorkflowContext<StateT, OutputT>.onDeferred
259260
* This function is provided as a helper for writing [WorkflowContext] extension functions, it
260261
* should not be used by general application code.
261262
*
263+
* The suspending function will be executed in the current stack frame ([UNDISPATCHED]). When this
264+
* workflow is being torn down, the coroutine running the function will be cancelled.
265+
*
262266
* @param type The [KType] that represents both the type of data source (e.g. `Deferred`) and the
263267
* element type [T].
264268
* @param key An optional string key that is used to distinguish between subscriptions of the same
@@ -289,10 +293,19 @@ fun <T, StateT : Any, OutputT : Any> WorkflowContext<StateT, OutputT>.onSuspendi
289293
*/
290294
private fun <T> CoroutineScope.wrapInNeverClosingChannel(
291295
function: suspend () -> T
292-
): ReceiveChannel<T> =
293-
produce {
294-
send(function())
295-
// We explicitly don't want to close the channel, because that would trigger an infinite loop.
296-
// Instead, just suspend forever.
297-
suspendCancellableCoroutine<Nothing> { }
296+
): ReceiveChannel<T> {
297+
// We can't use produce because we want the coroutine to start UNDISPATCHED, and the produce
298+
// builder doesn't let us do that.
299+
val channel = Channel<T>()
300+
val channelJob = launch(start = UNDISPATCHED) {
301+
channel.send(function())
302+
}
303+
304+
// Since we're not using produce, we need to manually cancel the coroutine when the channel is
305+
// cancelled so the function can do its own cleanup.
306+
channel.invokeOnClose { cause ->
307+
@Suppress("DEPRECATION")
308+
channelJob.cancel(cause)
298309
}
310+
return channel
311+
}

‎kotlin/workflow-runtime/src/main/java/com/squareup/workflow/WorkflowHost.kt

+100-50
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,29 @@ package com.squareup.workflow
1919

2020
import com.squareup.workflow.WorkflowHost.Factory
2121
import com.squareup.workflow.WorkflowHost.Update
22-
import com.squareup.workflow.internal.WorkflowId
2322
import com.squareup.workflow.internal.WorkflowNode
2423
import com.squareup.workflow.internal.id
24+
import kotlinx.coroutines.CoroutineScope
25+
import kotlinx.coroutines.GlobalScope
26+
import kotlinx.coroutines.InternalCoroutinesApi
27+
import kotlinx.coroutines.Job
28+
import kotlinx.coroutines.NonCancellable.isActive
2529
import kotlinx.coroutines.cancel
30+
import kotlinx.coroutines.channels.Channel
2631
import kotlinx.coroutines.channels.ReceiveChannel
2732
import kotlinx.coroutines.channels.produce
28-
import kotlinx.coroutines.isActive
2933
import kotlinx.coroutines.selects.select
3034
import org.jetbrains.annotations.TestOnly
3135
import kotlin.coroutines.CoroutineContext
3236
import kotlin.coroutines.EmptyCoroutineContext
37+
import kotlin.coroutines.coroutineContext
3338

3439
/**
3540
* Provides a stream of [updates][Update] from a tree of [Workflow]s.
3641
*
3742
* Create these by injecting a [Factory] and calling [run][Factory.run].
3843
*/
39-
interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
44+
interface WorkflowHost<in InputT : Any, out OutputT : Any, out RenderingT : Any> {
4045

4146
/**
4247
* Output from a [WorkflowHost]. Emitted from [WorkflowHost.updates] after every compose pass.
@@ -62,88 +67,133 @@ interface WorkflowHost<out OutputT : Any, out RenderingT : Any> {
6267
/**
6368
* Creates a [WorkflowHost] to run [workflow].
6469
*
65-
* The workflow's initial state is determined by passing [input] to
66-
* [StatefulWorkflow.initialState].
70+
* The workflow's initial state is determined by passing the first value emitted by [inputs] to
71+
* [StatefulWorkflow.initialState]. Subsequent values emitted from [inputs] will be used to
72+
* re-render the workflow.
6773
*
6874
* @param workflow The workflow to start.
69-
* @param input Passed to [StatefulWorkflow.initialState] to determine the root workflow's
70-
* initial state. If [InputT] is `Unit`, you can just omit this argument.
75+
* @param inputs Passed to [StatefulWorkflow.initialState] to determine the root workflow's
76+
* initial state, and to pass input updates to the root workflow.
77+
* If [InputT] is `Unit`, you can just omit this argument.
7178
* @param snapshot If not null, used to restore the workflow.
7279
* @param context The [CoroutineContext] used to run the workflow tree. Added to the [Factory]'s
7380
* context.
7481
*/
7582
fun <InputT : Any, OutputT : Any, RenderingT : Any> run(
7683
workflow: Workflow<InputT, OutputT, RenderingT>,
77-
input: InputT,
84+
inputs: ReceiveChannel<InputT>,
7885
snapshot: Snapshot? = null,
7986
context: CoroutineContext = EmptyCoroutineContext
80-
): WorkflowHost<OutputT, RenderingT> = run(workflow.id(), workflow, input, snapshot, context)
87+
): WorkflowHost<InputT, OutputT, RenderingT> =
88+
object : WorkflowHost<InputT, OutputT, RenderingT> {
89+
private val scope = CoroutineScope(context)
90+
91+
override val updates: ReceiveChannel<Update<OutputT, RenderingT>> =
92+
scope.produce(capacity = 0) {
93+
runWorkflowTree(
94+
workflow = workflow.asStatefulWorkflow(),
95+
inputs = inputs,
96+
initialSnapshot = snapshot,
97+
onUpdate = ::send
98+
)
99+
}
100+
}
81101

82102
fun <OutputT : Any, RenderingT : Any> run(
83103
workflow: Workflow<Unit, OutputT, RenderingT>,
84104
snapshot: Snapshot? = null,
85105
context: CoroutineContext = EmptyCoroutineContext
86-
): WorkflowHost<OutputT, RenderingT> = run(workflow.id(), workflow, Unit, snapshot, context)
106+
): WorkflowHost<Unit, OutputT, RenderingT> = run(workflow, channelOf(Unit), snapshot, context)
87107

88108
/**
89109
* Creates a [WorkflowHost] that runs [workflow] starting from [initialState].
90110
*
91111
* **Don't call this directly.**
92112
*
93-
* Instead, your module should have a test dependency on `pure-v2-testing` and you should call the
94-
* testing extension method defined there on your workflow itself.
113+
* Instead, your module should have a test dependency on `pure-v2-testing` and you should call
114+
* the testing extension method defined there on your workflow itself.
95115
*/
96116
@TestOnly
97117
fun <InputT : Any, StateT : Any, OutputT : Any, RenderingT : Any> runTestFromState(
98118
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
99119
input: InputT,
100120
initialState: StateT
101-
): WorkflowHost<OutputT, RenderingT> {
102-
val workflowId = workflow.id()
103-
return object : WorkflowHost<OutputT, RenderingT> {
104-
val node = WorkflowNode(workflowId, workflow, input, null, baseContext, initialState)
121+
): WorkflowHost<InputT, OutputT, RenderingT> =
122+
object : WorkflowHost<InputT, OutputT, RenderingT> {
105123
override val updates: ReceiveChannel<Update<OutputT, RenderingT>> =
106-
node.start(workflow, input)
124+
GlobalScope.produce(capacity = 0, context = baseContext) {
125+
runWorkflowTree(
126+
workflow = workflow.asStatefulWorkflow(),
127+
inputs = channelOf(input),
128+
initialSnapshot = null,
129+
initialState = initialState,
130+
onUpdate = ::send
131+
)
132+
}
107133
}
108-
}
109134

110-
internal fun <InputT : Any, OutputT : Any, RenderingT : Any> run(
111-
id: WorkflowId<InputT, OutputT, RenderingT>,
112-
workflow: Workflow<InputT, OutputT, RenderingT>,
113-
input: InputT,
114-
snapshot: Snapshot?,
115-
context: CoroutineContext
116-
): WorkflowHost<OutputT, RenderingT> = object : WorkflowHost<OutputT, RenderingT> {
117-
val node = WorkflowNode(
118-
id = id,
119-
workflow = workflow.asStatefulWorkflow(),
120-
initialInput = input,
121-
snapshot = snapshot,
122-
baseContext = baseContext + context
123-
)
124-
override val updates: ReceiveChannel<Update<OutputT, RenderingT>> =
125-
node.start(workflow.asStatefulWorkflow(), input)
126-
}
135+
private fun <T> channelOf(value: T) = Channel<T>(capacity = 1)
136+
.apply { offer(value) }
127137
}
128138
}
129139

130140
/**
131-
* Starts the coroutine that runs the coroutine loop.
141+
* Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
142+
* updates by calling [onUpdate].
143+
*
144+
* This function is the lowest-level entry point into the runtime. Don't call this directly, instead
145+
* use [WorkflowHost.Factory] to create a [WorkflowHost], or one of the stream operators for your
146+
* favorite Rx library to map a stream of [InputT]s into [Update]s.
132147
*/
133-
internal fun <I : Any, O : Any, R : Any> WorkflowNode<I, *, O, R>.start(
134-
workflow: StatefulWorkflow<I, *, O, R>,
135-
input: I
136-
): ReceiveChannel<Update<O, R>> = produce(capacity = 0) {
148+
@UseExperimental(InternalCoroutinesApi::class)
149+
suspend fun <InputT : Any, StateT : Any, OutputT : Any, RenderingT : Any> runWorkflowTree(
150+
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
151+
inputs: ReceiveChannel<InputT>,
152+
initialSnapshot: Snapshot?,
153+
initialState: StateT? = null,
154+
onUpdate: suspend (Update<OutputT, RenderingT>) -> Unit
155+
): Nothing {
156+
var output: OutputT? = null
157+
var input: InputT = inputs.receive()
158+
var inputsClosed = false
159+
val rootNode = WorkflowNode(
160+
id = workflow.id(),
161+
workflow = workflow,
162+
initialInput = input,
163+
snapshot = initialSnapshot,
164+
baseContext = coroutineContext,
165+
initialState = initialState
166+
)
167+
137168
try {
138-
var output: O? = null
139-
while (isActive) {
140-
val rendering = compose(workflow, input)
141-
val snapshot = snapshot(workflow)
142-
send(Update(rendering, snapshot, output))
169+
while (true) {
170+
if (!isActive) throw coroutineContext[Job]!!.getCancellationException()
171+
172+
val rendering = rootNode.compose(workflow, input)
173+
val snapshot = rootNode.snapshot(workflow)
174+
175+
onUpdate(Update(rendering, snapshot, output))
176+
143177
// Tick _might_ return an output, but if it returns null, it means the state or a child
144178
// probably changed, so we should re-compose/snapshot and emit again.
145179
output = select {
146-
tick(this) { it }
180+
// Stop trying to read from the inputs channel after it's closed.
181+
if (!inputsClosed) {
182+
@Suppress("EXPERIMENTAL_API_USAGE")
183+
inputs.onReceiveOrNull { newInput ->
184+
if (newInput == null) {
185+
inputsClosed = true
186+
} else {
187+
input = newInput
188+
}
189+
// No output. Returning from the select will go to the top of the loop to do another
190+
// compose pass.
191+
return@onReceiveOrNull null
192+
}
193+
}
194+
195+
// Tick the workflow tree.
196+
rootNode.tick(this) { it }
147197
}
148198
}
149199
} catch (e: Throwable) {
@@ -154,9 +204,9 @@ internal fun <I : Any, O : Any, R : Any> WorkflowNode<I, *, O, R>.start(
154204
coroutineContext.cancel(e)
155205
throw e
156206
} finally {
157-
// There's a potential race condition if the producer coroutine is cancelled before it has a chance
158-
// to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we actually
159-
// see this cause problems, I'm not too worried about it.
160-
cancel()
207+
// There's a potential race condition if the producer coroutine is cancelled before it has a
208+
// chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
209+
// actually see this cause problems, I'm not too worried about it.
210+
rootNode.cancel()
161211
}
162212
}

‎kotlin/workflow-rx2-runtime/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# workflow-rx2-runtime
2+
3+
This module contains adapters to use the Workflow runtime with RxJava2.
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2019 Square Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
apply plugin: 'java-library'
17+
apply plugin: 'kotlin'
18+
apply plugin: 'com.vanniktech.maven.publish'
19+
apply plugin: 'org.jetbrains.dokka'
20+
21+
sourceCompatibility = JavaVersion.VERSION_1_7
22+
targetCompatibility = JavaVersion.VERSION_1_7
23+
24+
dokka rootProject.ext.defaultDokkaConfig
25+
26+
dependencies {
27+
compileOnly deps.annotations.intellij
28+
29+
api deps.kotlin.stdLib.jdk6
30+
api deps.kotlin.coroutines.core
31+
api deps.rxjava2.rxjava2
32+
33+
implementation project(':workflow-runtime')
34+
implementation deps.kotlin.coroutines.rx2
35+
36+
testImplementation project(':workflow-testing')
37+
testImplementation deps.kotlin.test.jdk
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Copyright 2019 Square Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
POM_ARTIFACT_ID=workflow-rx2-runtime
17+
POM_NAME=Workflow RxJava2 Runtime
18+
POM_PACKAGING=jar
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2019 Square Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
@file:Suppress("EXPERIMENTAL_API_USAGE")
17+
18+
package com.squareup.workflow.rx2
19+
20+
import com.squareup.workflow.Snapshot
21+
import com.squareup.workflow.Workflow
22+
import com.squareup.workflow.WorkflowHost.Update
23+
import com.squareup.workflow.runWorkflowTree
24+
import io.reactivex.Flowable
25+
import io.reactivex.Observable
26+
import kotlinx.coroutines.CoroutineDispatcher
27+
import kotlinx.coroutines.Dispatchers
28+
import kotlinx.coroutines.GlobalScope
29+
import kotlinx.coroutines.channels.consume
30+
import kotlinx.coroutines.reactive.openSubscription
31+
import kotlinx.coroutines.rx2.rxFlowable
32+
33+
/**
34+
* Given a stream of [InputT] values to use as inputs for the top-level [workflow], returns a
35+
* [Flowable] that, when subscribed to, will start [workflow] and emit its [Update]s. When the
36+
* subscription is disposed, the workflow will be terminated.
37+
*
38+
* The returned [Flowable] is _not_ multicasted – **every subscription will start a new workflow
39+
* session.** It is recommended to use a multicasting operator on the resulting stream, such as
40+
* [Flowable.replay], to share the updates from a single workflow session.
41+
*
42+
* The workflow's logic will run in whatever threading context the source [Flowable] is being
43+
* observed on.
44+
*
45+
* This operator is an alternative to using
46+
* [WorkflowHost.Factory][com.squareup.workflow.WorkflowHost.Factory] that is more convenient to
47+
* use with a stream of inputs.
48+
*
49+
* This function operates on [Flowable] instead of [Observable] because the workflow runtime
50+
* inherently supports backpressure. [Flowable] supports backpressure, [Observable] does not.
51+
* RxJava provides operators to adapt between [Flowable]s and [Observable]s by explicitly specifying
52+
* how to use backpressure. By operating on [Flowable], this operator leaves it up to the caller to
53+
* specify strategies for handling backpressure, instead of assuming any particular behavior.
54+
*/
55+
fun <InputT : Any, OutputT : Any, RenderingT : Any> Flowable<InputT>.flatMapWorkflow(
56+
workflow: Workflow<InputT, OutputT, RenderingT>,
57+
initialSnapshot: Snapshot? = null,
58+
dispatcher: CoroutineDispatcher = Dispatchers.Unconfined
59+
): Flowable<Update<OutputT, RenderingT>> =
60+
// We're ok not having a job here because the lifetime of the coroutine will be controlled by the
61+
// subscription to the resulting flowable.
62+
GlobalScope.rxFlowable(context = dispatcher) {
63+
// Convert the input stream into a channel.
64+
openSubscription().consume {
65+
runWorkflowTree(
66+
workflow = workflow.asStatefulWorkflow(),
67+
inputs = this,
68+
initialSnapshot = initialSnapshot,
69+
onUpdate = ::send
70+
)
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2019 Square Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.squareup.workflow.rx2
17+
18+
import com.squareup.workflow.Workflow
19+
import com.squareup.workflow.stateless
20+
import io.reactivex.BackpressureStrategy.BUFFER
21+
import io.reactivex.subjects.PublishSubject
22+
import io.reactivex.subscribers.TestSubscriber
23+
import kotlin.test.Test
24+
25+
class FlatMapWorkflowTest {
26+
27+
private val workflow = Workflow.stateless<String, Nothing, String> { input, _ ->
28+
"rendered: $input"
29+
}
30+
private val inputs = PublishSubject.create<String>()
31+
private val renderings: TestSubscriber<String> =
32+
inputs.toFlowable(BUFFER)
33+
.flatMapWorkflow(workflow)
34+
.map { it.rendering }
35+
.test()
36+
37+
@Test fun `doesn't emit until input emitted`() {
38+
renderings.assertNoValues()
39+
renderings.assertNotTerminated()
40+
}
41+
42+
@Test fun `single input`() {
43+
inputs.onNext("input")
44+
45+
renderings.assertValue("rendered: input")
46+
renderings.assertNotTerminated()
47+
}
48+
49+
@Test fun `multiple inputs`() {
50+
inputs.onNext("one")
51+
inputs.onNext("two")
52+
inputs.onNext("three")
53+
renderings.assertValues("rendered: one", "rendered: two", "rendered: three")
54+
renderings.assertNotTerminated()
55+
}
56+
57+
@Test fun `output doesn't complete after input completes`() {
58+
inputs.onNext("input")
59+
inputs.onComplete()
60+
renderings.assertNotTerminated()
61+
}
62+
63+
@Test fun `output errors when input completes before emitting`() {
64+
inputs.onComplete()
65+
renderings.assertError { it is NoSuchElementException }
66+
}
67+
}

‎kotlin/workflow-rx2/src/test/java/com/squareup/workflow/rx2/SubscriptionsTest.kt

+20-11
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import kotlin.test.Test
3333
import kotlin.test.assertEquals
3434
import kotlin.test.assertFailsWith
3535
import kotlin.test.assertFalse
36-
import kotlin.test.assertSame
36+
import kotlin.test.assertTrue
3737

3838
class SubscriptionsTest {
3939

@@ -102,10 +102,12 @@ class SubscriptionsTest {
102102
setSubscribed(false)
103103
}
104104

105-
assertEquals(1, workflow.subscriptions)
106-
// For some reason the observable is actually disposed twice. Seems like a coroutines bug, but
105+
host.withNextRendering {
106+
assertEquals(1, workflow.subscriptions)
107+
// For some reason the observable is actually disposed twice. Seems like a coroutines bug, but
107108
// Disposable.dispose() is an idempotent operation so it should be fine.
108109
assertEquals(2, workflow.disposals)
110+
}
109111
}
110112
}
111113

@@ -117,10 +119,12 @@ class SubscriptionsTest {
117119
setSubscribed(false)
118120
}
119121

120-
assertEquals(1, workflow.subscriptions)
121-
// For some reason the observable is actually disposed twice. Seems like a coroutines bug, but
122+
host.withNextRendering {
123+
assertEquals(1, workflow.subscriptions)
124+
// For some reason the observable is actually disposed twice. Seems like a coroutines bug, but
122125
// Disposable.dispose() is an idempotent operation so it should be fine.
123126
assertEquals(2, workflow.disposals)
127+
}
124128
}
125129
}
126130

@@ -179,6 +183,8 @@ class SubscriptionsTest {
179183

180184
@Test fun `observable reports close`() {
181185
workflow.testFromStart(true) { host ->
186+
// Get the next rendering to unblock the pipeline so the subscription can actually occur.
187+
host.awaitNextRendering()
182188
assertFalse(host.hasOutput)
183189

184190
subject.onComplete()
@@ -194,6 +200,8 @@ class SubscriptionsTest {
194200

195201
@Test fun `observable reports close after emission`() {
196202
workflow.testFromStart(true) { host ->
203+
// Get the next rendering to unblock the pipeline so the subscription can actually occur.
204+
host.awaitNextRendering()
197205
assertFalse(host.hasOutput)
198206

199207
subject.onNext("foo")
@@ -206,13 +214,14 @@ class SubscriptionsTest {
206214
}
207215

208216
@Test fun `observable reports error`() {
209-
assertFailsWith<IOException> {
210-
workflow.testFromStart(true) { host ->
211-
assertFalse(host.hasOutput)
217+
workflow.testFromStart(true) { host ->
218+
assertFalse(host.hasOutput)
219+
220+
subject.onError(IOException("fail"))
212221

213-
subject.onError(IOException("fail"))
214-
assertSame(Closed, host.awaitNextOutput())
215-
assertFalse(host.hasOutput)
222+
host.withFailure { error ->
223+
val causeChain = generateSequence(error) { it.cause }
224+
assertTrue(causeChain.any { it is IOException })
216225
}
217226
}
218227
}

‎kotlin/workflow-rx2/src/test/java/com/squareup/workflow/rx2/WorkflowContextsTest.kt

+15-10
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import com.squareup.workflow.invoke
2727
import com.squareup.workflow.stateless
2828
import com.squareup.workflow.testing.testFromStart
2929
import io.reactivex.subjects.SingleSubject
30+
import kotlinx.coroutines.CompletableDeferred
3031
import kotlinx.coroutines.CoroutineScope
32+
import kotlinx.coroutines.runBlocking
3133
import kotlin.test.Test
3234
import kotlin.test.assertEquals
3335
import kotlin.test.assertFalse
@@ -50,12 +52,14 @@ class WorkflowContextsTest {
5052
assertEquals(0, disposals)
5153

5254
workflow.testFromStart { host ->
55+
host.awaitNextRendering()
5356
assertEquals(1, subscriptions)
5457
assertEquals(0, disposals)
5558
assertFalse(host.hasOutput)
5659

5760
singleSubject.onSuccess("done!")
5861

62+
host.awaitNextRendering()
5963
assertTrue(host.hasOutput)
6064
assertEquals("done!", host.awaitNextOutput())
6165
assertEquals(1, subscriptions)
@@ -64,13 +68,13 @@ class WorkflowContextsTest {
6468
}
6569

6670
@Test fun `onSuccess unsubscribes`() {
67-
var subscriptions = 0
68-
var disposals = 0
71+
val subscribed = CompletableDeferred<Unit>()
72+
val disposed = CompletableDeferred<Unit>()
6973
lateinit var doClose: EventHandler<Unit>
7074
val singleSubject = SingleSubject.create<Unit>()
7175
val single = singleSubject
72-
.doOnSubscribe { subscriptions++ }
73-
.doOnDispose { disposals++ }
76+
.doOnSubscribe { subscribed.complete(Unit) }
77+
.doOnDispose { disposed.complete(Unit) }
7478
val workflow = object : StatefulWorkflow<Unit, Boolean, Nothing, Unit>() {
7579
override fun initialState(
7680
input: Unit,
@@ -92,18 +96,19 @@ class WorkflowContextsTest {
9296
override fun snapshotState(state: Boolean): Snapshot = Snapshot.EMPTY
9397
}
9498

95-
assertEquals(0, subscriptions)
96-
assertEquals(0, disposals)
99+
assertFalse(subscribed.isCompleted)
100+
assertFalse(disposed.isCompleted)
97101

98102
workflow.testFromStart { host ->
99-
assertEquals(1, subscriptions)
100-
assertEquals(0, disposals)
103+
host.awaitNextRendering()
104+
runBlocking { subscribed.await() }
105+
assertFalse(disposed.isCompleted)
101106
assertFalse(host.hasOutput)
102107

103108
doClose()
104109

105-
assertEquals(1, subscriptions)
106-
assertEquals(1, disposals)
110+
host.awaitNextRendering()
111+
runBlocking { disposed.await() }
107112
}
108113
}
109114
}

‎kotlin/workflow-testing/src/main/java/com/squareup/workflow/testing/WorkflowTester.kt

+37-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import kotlinx.coroutines.channels.consumeEach
2727
import kotlinx.coroutines.launch
2828
import kotlinx.coroutines.runBlocking
2929
import kotlinx.coroutines.withTimeout
30+
import kotlinx.coroutines.yield
3031
import org.jetbrains.annotations.TestOnly
3132
import kotlin.coroutines.CoroutineContext
3233

@@ -43,8 +44,8 @@ import kotlin.coroutines.CoroutineContext
4344
* - [hasRendering], [hasOutput], [hasSnapshot]
4445
* - Return `true` if the previous methods won't block.
4546
*/
46-
class WorkflowTester<OutputT : Any, RenderingT : Any> @TestOnly internal constructor(
47-
private val host: WorkflowHost<OutputT, RenderingT>,
47+
class WorkflowTester<InputT : Any, OutputT : Any, RenderingT : Any> @TestOnly internal constructor(
48+
private val host: WorkflowHost<InputT, OutputT, RenderingT>,
4849
context: CoroutineContext
4950
) {
5051

@@ -162,6 +163,39 @@ class WorkflowTester<OutputT : Any, RenderingT : Any> @TestOnly internal constru
162163
block: (OutputT) -> T
163164
): T = awaitNextOutput(timeoutMs).let(block)
164165

166+
/**
167+
* Blocks until the workflow fails by throwing an exception, then returns that exception.
168+
*
169+
* @param timeoutMs The maximum amount of time to wait for an output to be emitted. If null,
170+
* [DEFAULT_TIMEOUT_MS] will be used instead.
171+
*/
172+
fun awaitFailure(timeoutMs: Long? = null): Throwable {
173+
var error: Throwable? = null
174+
runBlocking {
175+
withTimeout(timeoutMs ?: DEFAULT_TIMEOUT_MS) {
176+
try {
177+
while (true) renderings.receive()
178+
} catch (e: Throwable) {
179+
error = e
180+
}
181+
}
182+
}
183+
return error!!
184+
}
185+
186+
/**
187+
* Blocks until the workflow fails by throwing an exception, then passes that exception to
188+
* [block].
189+
*
190+
* @param timeoutMs The maximum amount of time to wait for an output to be emitted. If null,
191+
* [DEFAULT_TIMEOUT_MS] will be used instead.
192+
* @return The value returned from [block].
193+
*/
194+
fun <T> withFailure(
195+
timeoutMs: Long? = null,
196+
block: (Throwable) -> T
197+
): T = awaitFailure(timeoutMs).let(block)
198+
165199
/**
166200
* @param drain If true, this function will consume all the values currently in the channel, and
167201
* return the last one.
@@ -182,6 +216,6 @@ class WorkflowTester<OutputT : Any, RenderingT : Any> @TestOnly internal constru
182216
}
183217

184218
companion object {
185-
const val DEFAULT_TIMEOUT_MS: Long = 500
219+
const val DEFAULT_TIMEOUT_MS: Long = 5000
186220
}
187221
}

‎kotlin/workflow-testing/src/main/java/com/squareup/workflow/testing/WorkflowTesting.kt

+12-8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import kotlinx.coroutines.Dispatchers
2626
import kotlinx.coroutines.InternalCoroutinesApi
2727
import kotlinx.coroutines.Job
2828
import kotlinx.coroutines.cancel
29+
import kotlinx.coroutines.channels.Channel
2930
import org.jetbrains.annotations.TestOnly
3031
import kotlin.coroutines.CoroutineContext
3132
import kotlin.coroutines.EmptyCoroutineContext
@@ -42,8 +43,11 @@ fun <T, InputT : Any, OutputT : Any, RenderingT : Any>
4243
input: InputT,
4344
snapshot: Snapshot? = null,
4445
context: CoroutineContext = EmptyCoroutineContext,
45-
block: (WorkflowTester<OutputT, RenderingT>) -> T
46-
): T = test(block, context) { it.run(this, input, snapshot) }
46+
block: (WorkflowTester<InputT, OutputT, RenderingT>) -> T
47+
): T = test(block, context) {
48+
val inputs = Channel<InputT>(1).apply { offer(input) }
49+
it.run(this, inputs, snapshot)
50+
}
4751
// @formatter:on
4852

4953
/**
@@ -55,7 +59,7 @@ fun <T, InputT : Any, OutputT : Any, RenderingT : Any>
5559
fun <T, OutputT : Any, RenderingT : Any> Workflow<Unit, OutputT, RenderingT>.testFromStart(
5660
snapshot: Snapshot? = null,
5761
context: CoroutineContext = EmptyCoroutineContext,
58-
block: (WorkflowTester<OutputT, RenderingT>) -> T
62+
block: (WorkflowTester<Unit, OutputT, RenderingT>) -> T
5963
): T = testFromStart(Unit, snapshot, context, block)
6064

6165
/**
@@ -72,7 +76,7 @@ fun <T, InputT : Any, StateT : Any, OutputT : Any, RenderingT : Any>
7276
input: InputT,
7377
initialState: StateT,
7478
context: CoroutineContext = EmptyCoroutineContext,
75-
block: (WorkflowTester<OutputT, RenderingT>) -> T
79+
block: (WorkflowTester<InputT, OutputT, RenderingT>) -> T
7680
): T = test(block, context) { it.runTestFromState(this, input, initialState) }
7781
// @formatter:on
7882

@@ -89,15 +93,15 @@ fun <StateT : Any, OutputT : Any, RenderingT : Any>
8993
StatefulWorkflow<Unit, StateT, OutputT, RenderingT>.testFromState(
9094
initialState: StateT,
9195
context: CoroutineContext = EmptyCoroutineContext,
92-
block: (WorkflowTester<OutputT, RenderingT>) -> Unit
96+
block: (WorkflowTester<Unit, OutputT, RenderingT>) -> Unit
9397
) = testFromState(Unit, initialState, context, block)
9498
// @formatter:on
9599

96100
@UseExperimental(InternalCoroutinesApi::class)
97-
private fun <T, O : Any, R : Any> test(
98-
testBlock: (WorkflowTester<O, R>) -> T,
101+
private fun <T, I : Any, O : Any, R : Any> test(
102+
testBlock: (WorkflowTester<I, O, R>) -> T,
99103
baseContext: CoroutineContext,
100-
starter: (Factory) -> WorkflowHost<O, R>
104+
starter: (Factory) -> WorkflowHost<I, O, R>
101105
): T {
102106
val context = Dispatchers.Unconfined + baseContext + Job(parent = baseContext[Job])
103107
val host = WorkflowHost.Factory(context)

‎kotlin/workflow-testing/src/test/java/com/squareup/workflow/ChannelSubscriptionsIntegrationTest.kt

+8-4
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,10 @@ class ChannelSubscriptionsIntegrationTest {
107107
setSubscribed(false)
108108
}
109109

110-
assertEquals(1, workflow.subscriptions)
111-
assertEquals(1, workflow.cancellations)
110+
host.withNextRendering {
111+
assertEquals(1, workflow.subscriptions)
112+
assertEquals(1, workflow.cancellations)
113+
}
112114
}
113115
}
114116

@@ -120,8 +122,10 @@ class ChannelSubscriptionsIntegrationTest {
120122
setSubscribed(false)
121123
}
122124

123-
assertEquals(1, workflow.subscriptions)
124-
assertEquals(1, workflow.cancellations)
125+
host.withNextRendering {
126+
assertEquals(1, workflow.subscriptions)
127+
assertEquals(1, workflow.cancellations)
128+
}
125129
}
126130
}
127131

‎kotlin/workflow-testing/src/test/java/com/squareup/workflow/CompositionIntegrationTest.kt

+17-14
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@ package com.squareup.workflow
1717

1818
import com.squareup.workflow.WorkflowAction.Companion.enterState
1919
import com.squareup.workflow.testing.testFromStart
20-
import kotlinx.coroutines.CancellationException
2120
import kotlinx.coroutines.CoroutineScope
22-
import kotlinx.coroutines.launch
23-
import kotlinx.coroutines.suspendCancellableCoroutine
21+
import kotlinx.coroutines.Job
2422
import kotlin.test.Test
2523
import kotlin.test.assertEquals
26-
import kotlin.test.assertFailsWith
24+
import kotlin.test.assertFails
2725
import kotlin.test.assertTrue
2826

2927
class CompositionIntegrationTest {
@@ -91,10 +89,13 @@ class CompositionIntegrationTest {
9189
)
9290

9391
// Setup initial state and change the state the workflow in the tree.
94-
root.testFromStart("initial input") {
95-
assertFailsWith<IllegalArgumentException> {
96-
it.awaitNextRendering()
92+
assertFails {
93+
root.testFromStart("initial input") { tester ->
94+
tester.awaitNextRendering()
9795
}
96+
}.let { error ->
97+
val causeChain = generateSequence(error) { it.cause }
98+
assertTrue(causeChain.any { it is IllegalArgumentException })
9899
}
99100
}
100101

@@ -135,7 +136,9 @@ class CompositionIntegrationTest {
135136
assertTrue(teardowns.isEmpty())
136137

137138
teardownChildren()
139+
}
138140

141+
tester.withNextRendering {
139142
assertEquals(listOf("child1", "child2"), teardowns)
140143
}
141144
}
@@ -178,7 +181,9 @@ class CompositionIntegrationTest {
178181
assertTrue(teardowns.isEmpty())
179182

180183
teardownChildren()
184+
}
181185

186+
tester.withNextRendering {
182187
assertEquals(listOf("grandchild", "child"), teardowns)
183188
}
184189
}
@@ -193,13 +198,9 @@ class CompositionIntegrationTest {
193198
snapshot: Snapshot?,
194199
scope: CoroutineScope
195200
) {
196-
scope.launch {
197-
starts++
198-
try {
199-
suspendCancellableCoroutine<Nothing> { }
200-
} catch (e: CancellationException) {
201-
cancels++
202-
}
201+
starts++
202+
scope.coroutineContext[Job]!!.invokeOnCompletion { cause ->
203+
if (cause != null) cancels++
203204
}
204205
}
205206

@@ -249,7 +250,9 @@ class CompositionIntegrationTest {
249250
assertEquals(0, cancels)
250251

251252
runChildren(false)
253+
}
252254

255+
tester.withNextRendering {
253256
assertEquals(1, starts)
254257
assertEquals(1, cancels)
255258
}

‎kotlin/workflow-testing/src/test/java/com/squareup/workflow/SuspendingSubscriptionIntegrationTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import kotlinx.coroutines.CompletableDeferred
2121
import kotlin.reflect.full.starProjectedType
2222
import kotlin.test.Test
2323
import kotlin.test.assertEquals
24-
import kotlin.test.assertFailsWith
2524
import kotlin.test.assertFalse
2625
import kotlin.test.fail
2726

@@ -59,11 +58,12 @@ class SuspendingSubscriptionIntegrationTest {
5958
}
6059
}
6160

62-
assertFailsWith<ExpectedException> {
63-
workflow.testFromStart { host ->
64-
assertFalse(host.hasOutput)
61+
workflow.testFromStart { host ->
62+
assertFalse(host.hasOutput)
6563

66-
host.awaitNextOutput()
64+
host.withFailure { error ->
65+
val causeChain = generateSequence(error) { it.cause }
66+
assertEquals(1, causeChain.count { it is ExpectedException })
6767
}
6868
}
6969
}

‎kotlin/workflow-testing/src/test/java/com/squareup/workflow/WorkflowTesterTest.kt

+34-22
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
@file:Suppress("EXPERIMENTAL_API_USAGE")
17+
1618
package com.squareup.workflow
1719

1820
import com.squareup.workflow.testing.testFromStart
@@ -21,8 +23,10 @@ import kotlinx.coroutines.CompletableDeferred
2123
import kotlinx.coroutines.CoroutineScope
2224
import kotlinx.coroutines.Job
2325
import kotlin.test.Test
26+
import kotlin.test.assertEquals
2427
import kotlin.test.assertFailsWith
2528
import kotlin.test.assertNull
29+
import kotlin.test.assertTrue
2630
import kotlin.test.fail
2731

2832
class WorkflowTesterTest {
@@ -42,9 +46,10 @@ class WorkflowTesterTest {
4246
throw ExpectedException()
4347
}
4448

45-
assertFailsWith<ExpectedException> {
46-
workflow.testFromStart {
47-
it.awaitNextRendering()
49+
workflow.testFromStart {
50+
it.withFailure { error ->
51+
val causeChain = generateSequence(error) { it.cause }
52+
assertEquals(1, causeChain.count { it is ExpectedException })
4853
}
4954
}
5055
}
@@ -53,10 +58,12 @@ class WorkflowTesterTest {
5358
val job = Job()
5459
val workflow = Workflow.stateless<Unit, Unit> { }
5560

56-
assertFailsWith<CancellationException> {
57-
workflow.testFromStart(context = job) {
58-
job.cancel()
59-
it.awaitNextRendering()
61+
workflow.testFromStart(context = job) { tester ->
62+
@Suppress("DEPRECATION")
63+
job.cancel(ExpectedException())
64+
tester.withFailure { error ->
65+
val causeChain = generateSequence(error) { it.cause }
66+
assertEquals(1, causeChain.count { it is ExpectedException })
6067
}
6168
}
6269
}
@@ -65,9 +72,9 @@ class WorkflowTesterTest {
6572
val job = Job().apply { cancel() }
6673
val workflow = Workflow.stateless<Unit, Unit> { }
6774

68-
assertFailsWith<CancellationException> {
69-
workflow.testFromStart(context = job) {
70-
it.awaitNextRendering()
75+
workflow.testFromStart(context = job) {
76+
it.withFailure { error ->
77+
assertTrue(error is CancellationException)
7178
}
7279
}
7380
}
@@ -94,9 +101,10 @@ class WorkflowTesterTest {
94101
override fun snapshotState(state: Unit): Snapshot = fail()
95102
}
96103

97-
assertFailsWith<ExpectedException> {
98-
workflow.testFromStart {
99-
it.awaitNextRendering()
104+
workflow.testFromStart { tester ->
105+
tester.withFailure { error ->
106+
val causeChain = generateSequence(error) { it.cause }
107+
assertEquals(1, causeChain.count { it is ExpectedException })
100108
}
101109
}
102110
}
@@ -123,9 +131,10 @@ class WorkflowTesterTest {
123131
override fun snapshotState(state: Unit): Snapshot = throw ExpectedException()
124132
}
125133

126-
assertFailsWith<ExpectedException> {
127-
workflow.testFromStart {
128-
it.awaitNextRendering()
134+
workflow.testFromStart { tester ->
135+
tester.withFailure { error ->
136+
val causeChain = generateSequence(error) { it.cause }
137+
assertEquals(1, causeChain.count { it is ExpectedException })
129138
}
130139
}
131140
}
@@ -156,9 +165,11 @@ class WorkflowTesterTest {
156165
val snapshot = workflow.testFromStart {
157166
it.awaitNextSnapshot()
158167
}
159-
assertFailsWith<ExpectedException> {
160-
workflow.testFromStart(snapshot) {
161-
// Workflow should never start.
168+
169+
workflow.testFromStart(snapshot) { tester ->
170+
tester.withFailure { error ->
171+
val causeChain = generateSequence(error) { it.cause }
172+
assertEquals(1, causeChain.count { it is ExpectedException })
162173
}
163174
}
164175
}
@@ -170,9 +181,10 @@ class WorkflowTesterTest {
170181
it.onDeferred(deferred) { fail("Shouldn't get here.") }
171182
}
172183

173-
assertFailsWith<ExpectedException> {
174-
workflow.testFromStart {
175-
it.awaitNextRendering()
184+
workflow.testFromStart { tester ->
185+
tester.withFailure { error ->
186+
val causeChain = generateSequence(error) { it.cause }
187+
assertEquals(1, causeChain.count { it is ExpectedException })
176188
}
177189
}
178190
}

‎kotlin/workflow-ui-android/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ dependencies {
3232
api deps.rxjava2.rxjava2
3333
api deps.transition
3434

35-
implementation project(':workflow-runtime')
35+
implementation project(':workflow-rx2-runtime')
3636
implementation deps.appcompatv7
3737
implementation deps.kotlin.coroutines.android
3838
implementation deps.kotlin.coroutines.core

‎kotlin/workflow-ui-android/src/main/java/com/squareup/workflow/ui/WorkflowActivityRunner.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import android.os.Parcelable
2121
import android.support.annotation.CheckResult
2222
import android.support.v4.app.FragmentActivity
2323
import com.squareup.workflow.Workflow
24+
import io.reactivex.Flowable
2425
import io.reactivex.Observable
2526

2627
/**
@@ -106,10 +107,10 @@ internal constructor(private val model: WorkflowViewModel<OutputT, RenderingT>)
106107
fun <InputT : Any, OutputT : Any, RenderingT : Any> FragmentActivity.setContentWorkflow(
107108
viewRegistry: ViewRegistry,
108109
workflow: Workflow<InputT, OutputT, RenderingT>,
109-
initialInput: InputT,
110+
inputs: Flowable<InputT>,
110111
restored: PickledWorkflow?
111112
): WorkflowActivityRunner<OutputT, RenderingT> {
112-
val factory = WorkflowViewModel.Factory(viewRegistry, workflow, initialInput, restored)
113+
val factory = WorkflowViewModel.Factory(viewRegistry, workflow, inputs, restored)
113114

114115
// We use an Android lifecycle ViewModel to shield ourselves from configuration changes.
115116
// ViewModelProviders.of() uses the factory to instantiate a new instance only
@@ -141,5 +142,5 @@ fun <OutputT : Any, RenderingT : Any> FragmentActivity.setContentWorkflow(
141142
workflow: Workflow<Unit, OutputT, RenderingT>,
142143
restored: PickledWorkflow?
143144
): WorkflowActivityRunner<OutputT, RenderingT> {
144-
return setContentWorkflow(viewRegistry, workflow, Unit, restored)
145+
return setContentWorkflow(viewRegistry, workflow, Flowable.fromArray(Unit), restored)
145146
}

‎kotlin/workflow-ui-android/src/main/java/com/squareup/workflow/ui/WorkflowViewModel.kt

+8-10
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ import android.arch.lifecycle.ViewModel
1919
import android.arch.lifecycle.ViewModelProvider
2020
import com.squareup.workflow.Snapshot
2121
import com.squareup.workflow.Workflow
22-
import com.squareup.workflow.WorkflowHost
22+
import com.squareup.workflow.WorkflowHost.Update
23+
import com.squareup.workflow.rx2.flatMapWorkflow
24+
import io.reactivex.Flowable
2325
import io.reactivex.disposables.Disposable
24-
import kotlinx.coroutines.Dispatchers
25-
import kotlinx.coroutines.rx2.asObservable
2626

2727
/**
2828
* The guts of [WorkflowActivityRunner]. We could have made that class itself a
@@ -31,22 +31,20 @@ import kotlinx.coroutines.rx2.asObservable
3131
*/
3232
internal class WorkflowViewModel<OutputT : Any, RenderingT : Any>(
3333
val viewRegistry: ViewRegistry,
34-
host: WorkflowHost<OutputT, RenderingT>
34+
workflowUpdates: Flowable<Update<OutputT, RenderingT>>
3535
) : ViewModel() {
3636

3737
internal class Factory<InputT : Any, OutputT : Any, RenderingT : Any>(
3838
private val viewRegistry: ViewRegistry,
3939
private val workflow: Workflow<InputT, OutputT, RenderingT>,
40-
private val initialInput: InputT,
40+
private val inputs: Flowable<InputT>,
4141
private val restored: PickledWorkflow?
4242
) : ViewModelProvider.Factory {
43-
@Suppress("EXPERIMENTAL_API_USAGE")
44-
private val hostFactory = WorkflowHost.Factory(Dispatchers.Unconfined)
4543

4644
override fun <T : ViewModel> create(modelClass: Class<T>): T {
47-
val host = hostFactory.run(workflow, initialInput, restored?.snapshot)
45+
val workflowUpdates = inputs.flatMapWorkflow(workflow, restored?.snapshot)
4846
@Suppress("UNCHECKED_CAST")
49-
return WorkflowViewModel(viewRegistry, host) as T
47+
return WorkflowViewModel(viewRegistry, workflowUpdates) as T
5048
}
5149
}
5250

@@ -56,7 +54,7 @@ internal class WorkflowViewModel<OutputT : Any, RenderingT : Any>(
5654

5755
@Suppress("EXPERIMENTAL_API_USAGE")
5856
val updates =
59-
host.updates.asObservable(Dispatchers.Main.immediate)
57+
workflowUpdates.toObservable()
6058
.doOnNext { lastSnapshot = it.snapshot }
6159
.replay(1)
6260
.autoConnect(1) { sub = it }

0 commit comments

Comments
 (0)
Please sign in to comment.