Skip to content

Commit ffefa04

Browse files
WIP: Write a Flow operator that runs a Workflow.
Closes #263.
1 parent 6fea2ea commit ffefa04

File tree

2 files changed

+110
-53
lines changed

2 files changed

+110
-53
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2019 Square Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.squareup.workflow
17+
18+
import com.squareup.workflow.WorkflowHost.Update
19+
import com.squareup.workflow.internal.runWorkflowTree
20+
import kotlinx.coroutines.FlowPreview
21+
import kotlinx.coroutines.coroutineScope
22+
import kotlinx.coroutines.flow.Flow
23+
import kotlinx.coroutines.flow.flow
24+
import kotlinx.coroutines.flow.produceIn
25+
26+
/**
27+
* Runs a [Workflow] by passing upstream items to it as input and emitting [Update]s as output.
28+
* The returned [Flow] will not terminate until cancelled or the workflow throws an exception.
29+
*
30+
* Remember, [Flow]s are cold streams, so every subscriber to the resulting [Flow] will spin up
31+
* a new instance of the tree.
32+
*
33+
* @param snapshot If non-null, used to restore the workflow tree from a previous [Snapshot] emitted
34+
* in an [Update].
35+
*/
36+
@FlowPreview
37+
fun <InputT : Any, OutputT : Any, RenderingT : Any> Flow<InputT>.flatMapWorkflow(
38+
workflow: Workflow<InputT, OutputT, RenderingT>,
39+
snapshot: Snapshot? = null
40+
): Flow<Update<OutputT, RenderingT>> = flow {
41+
coroutineScope {
42+
val inputFlow = this@flatMapWorkflow
43+
val inputs = inputFlow.produceIn(this)
44+
runWorkflowTree(
45+
workflow = workflow.asStatefulWorkflow(),
46+
inputs = inputs,
47+
initialSnapshot = snapshot,
48+
onUpdate = ::emit
49+
)
50+
}
51+
}

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/RealWorkflowHost.kt

+59-53
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal class RealWorkflowHost<InputT : Any, StateT : Any, OutputT : Any, Rende
2626
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
2727
context: CoroutineContext,
2828
initialInput: InputT,
29-
private val initialSnapshot: Snapshot?,
29+
initialSnapshot: Snapshot?,
3030
private val initialState: StateT? = null
3131
) : WorkflowHost<InputT, OutputT, RenderingT> {
3232

@@ -38,74 +38,80 @@ internal class RealWorkflowHost<InputT : Any, StateT : Any, OutputT : Any, Rende
3838
scope.produce(capacity = 0) {
3939
runWorkflowTree(
4040
workflow = workflow,
41+
inputs = inputs,
42+
initialSnapshot = initialSnapshot,
43+
initialState = initialState,
4144
onUpdate = ::send
4245
)
4346
}
4447

4548
override fun setInput(input: InputT) {
4649
inputs.offer(input)
4750
}
51+
}
4852

49-
/**
50-
* Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
51-
* updates by calling [onUpdate].
52-
*/
53-
private suspend fun runWorkflowTree(
54-
workflow: StatefulWorkflow<InputT, StateT, OutputT, RenderingT>,
55-
onUpdate: suspend (Update<OutputT, RenderingT>) -> Unit
56-
): Nothing {
57-
var output: OutputT? = null
58-
var input: InputT = inputs.receive()
59-
var inputsClosed = false
60-
val workflowNode = WorkflowNode(
61-
id = workflow.id(),
62-
workflow = workflow,
63-
initialInput = input,
64-
snapshot = initialSnapshot,
65-
baseContext = coroutineContext,
66-
initialState = initialState
67-
)
53+
/**
54+
* Loops forever, or until the coroutine is cancelled, processing the workflow tree and emitting
55+
* updates by calling [onUpdate].
56+
*/
57+
internal suspend fun <I : Any, S : Any, O : Any, R : Any> runWorkflowTree(
58+
workflow: StatefulWorkflow<I, S, O, R>,
59+
inputs: ReceiveChannel<I>,
60+
initialSnapshot: Snapshot?,
61+
initialState: S?,
62+
onUpdate: suspend (Update<O, R>) -> Unit
63+
): Nothing {
64+
var output: O? = null
65+
var input: I = inputs.receive()
66+
var inputsClosed = false
67+
val workflowNode = WorkflowNode(
68+
id = workflow.id(),
69+
workflow = workflow,
70+
initialInput = input,
71+
snapshot = initialSnapshot,
72+
baseContext = coroutineContext,
73+
initialState = initialState
74+
)
6875

69-
try {
70-
while (true) {
71-
coroutineContext.ensureActive()
76+
try {
77+
while (true) {
78+
coroutineContext.ensureActive()
7279

73-
val rendering = workflowNode.compose(workflow, input)
74-
val snapshot = workflowNode.snapshot(workflow)
80+
val rendering = workflowNode.compose(workflow, input)
81+
val snapshot = workflowNode.snapshot(workflow)
7582

76-
onUpdate(Update(rendering, snapshot, output))
83+
onUpdate(Update(rendering, snapshot, output))
7784

78-
// Tick _might_ return an output, but if it returns null, it means the state or a child
79-
// probably changed, so we should re-compose/snapshot and emit again.
80-
output = select {
81-
// While the inputs channel is still open, select on it so we can detect new inputs.
82-
if (!inputsClosed) {
83-
@Suppress("EXPERIMENTAL_API_USAGE")
84-
inputs.onReceiveOrNull { newInput ->
85-
if (newInput == null) {
86-
inputsClosed = true
87-
} else {
88-
input = newInput
89-
}
90-
// No output.
91-
return@onReceiveOrNull null
85+
// Tick _might_ return an output, but if it returns null, it means the state or a child
86+
// probably changed, so we should re-compose/snapshot and emit again.
87+
output = select {
88+
// While the inputs channel is still open, select on it so we can detect new inputs.
89+
if (!inputsClosed) {
90+
@Suppress("EXPERIMENTAL_API_USAGE")
91+
inputs.onReceiveOrNull { newInput ->
92+
if (newInput == null) {
93+
inputsClosed = true
94+
} else {
95+
input = newInput
9296
}
97+
// No output.
98+
return@onReceiveOrNull null
9399
}
94-
95-
// Tick the workflow tree.
96-
workflowNode.tick(this) { it }
97100
}
101+
102+
// Tick the workflow tree.
103+
workflowNode.tick(this) { it }
98104
}
99-
} catch (e: Throwable) {
100-
// For some reason the exception gets masked if we don't explicitly pass it to cancel the
101-
// producer coroutine ourselves here.
102-
coroutineContext.cancel(if (e is CancellationException) e else CancellationException(null, e))
103-
throw e
104-
} finally {
105-
// There's a potential race condition if the producer coroutine is cancelled before it has a
106-
// chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
107-
// actually see this cause problems, I'm not too worried about it.
108-
workflowNode.cancel()
109105
}
106+
} catch (e: Throwable) {
107+
// For some reason the exception gets masked if we don't explicitly pass it to cancel the
108+
// producer coroutine ourselves here.
109+
coroutineContext.cancel(if (e is CancellationException) e else CancellationException(null, e))
110+
throw e
111+
} finally {
112+
// There's a potential race condition if the producer coroutine is cancelled before it has a
113+
// chance to enter the try block, since we can't use CoroutineStart.ATOMIC. However, until we
114+
// actually see this cause problems, I'm not too worried about it.
115+
workflowNode.cancel()
110116
}
111117
}

0 commit comments

Comments
 (0)