Skip to content

Commit 784bd7a

Browse files
Introduce helpers for working with Sinks of WorkflowActions from suspend functions.
- `Sink.sendAndAwaitApplication()` - `Flow.collectToSink()` These helpers will be used to implement Workers using side effects (#12). GUWT workers are described in square/workflow#1021.
1 parent 6365b25 commit 784bd7a

File tree

5 files changed

+221
-6
lines changed

5 files changed

+221
-6
lines changed

workflow-core/api/workflow-core.api

+2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ public abstract interface class com/squareup/workflow/Sink {
5555
}
5656

5757
public final class com/squareup/workflow/SinkKt {
58+
public static final fun collectToSink (Lkotlinx/coroutines/flow/Flow;Lcom/squareup/workflow/Sink;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5859
public static final fun contraMap (Lcom/squareup/workflow/Sink;Lkotlin/jvm/functions/Function1;)Lcom/squareup/workflow/Sink;
60+
public static final fun sendAndAwaitApplication (Lcom/squareup/workflow/Sink;Lcom/squareup/workflow/WorkflowAction;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5961
}
6062

6163
public final class com/squareup/workflow/Snapshot {

workflow-core/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ dependencies {
3434
// For Snapshot.
3535
api(Dependencies.okio)
3636

37+
testImplementation(Dependencies.Kotlin.Coroutines.test)
3738
testImplementation(Dependencies.Kotlin.Test.jdk)
3839
}

workflow-core/src/main/java/com/squareup/workflow/Sink.kt

+59
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
*/
1616
package com.squareup.workflow
1717

18+
import kotlinx.coroutines.flow.Flow
19+
import kotlinx.coroutines.flow.collect
20+
import kotlinx.coroutines.suspendCancellableCoroutine
21+
import kotlin.coroutines.resume
22+
1823
/**
1924
* An object that receives values (commonly events or [WorkflowAction]).
2025
* [RenderContext.actionSink] implements this interface.
@@ -47,3 +52,57 @@ fun <T1, T2> Sink<T1>.contraMap(transform: (T2) -> T1): Sink<T2> {
4752
}
4853
}
4954
}
55+
56+
/**
57+
* Collects from a [Flow] by converting each item into a [WorkflowAction] and then sending them
58+
* to the [actionSink]. This operator propagates backpressure from the workflow runtime, so if there
59+
* is a lot of contention on the workflow runtime the flow will be suspended while the action is
60+
* queued.
61+
*
62+
* Example:
63+
* ```
64+
* context.runningSideEffect("collector") {
65+
* myFlow.collectToSink(context.actionSink) { value ->
66+
* action { setOutput(value) }
67+
* }
68+
* }
69+
* ```
70+
*/
71+
@ExperimentalWorkflow
72+
suspend fun <T, StateT, OutputT : Any> Flow<T>.collectToSink(
73+
actionSink: Sink<WorkflowAction<StateT, OutputT>>,
74+
handler: (T) -> WorkflowAction<StateT, OutputT>
75+
) {
76+
collect {
77+
// Don't process the emission until the last emission has had its action executed by the
78+
// workflow runtime.
79+
actionSink.sendAndAwaitApplication(handler(it))
80+
}
81+
}
82+
83+
/**
84+
* Sends [action] to this [Sink] and suspends until after [action]'s [WorkflowAction.apply] method
85+
* has been invoked. Since a [Sink] may be backed by an unbounded buffer, this method can be used
86+
* to apply backpressure to the caller when there are a lot events being sent to the workflow
87+
* runtime.
88+
*
89+
* If this coroutine is cancelled before the action gets applied, the action will not be applied.
90+
*/
91+
@ExperimentalWorkflow
92+
suspend fun <StateT, OutputT : Any> Sink<WorkflowAction<StateT, OutputT>>.sendAndAwaitApplication(
93+
action: WorkflowAction<StateT, OutputT>
94+
) {
95+
suspendCancellableCoroutine<Unit> { continuation ->
96+
val resumingAction = action<StateT, OutputT>({ "sendAndAwaitExecution($action)" }) {
97+
// Don't execute anything if the caller was cancelled while we were in the queue.
98+
if (!continuation.isActive) return@action
99+
100+
with(action) {
101+
// Forward our Updater to the real action.
102+
apply()
103+
}
104+
continuation.resume(Unit)
105+
}
106+
send(resumingAction)
107+
}
108+
}

workflow-core/src/main/java/com/squareup/workflow/WorkflowAction.kt

+10-6
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,10 @@ interface WorkflowAction<StateT, out OutputT : Any> {
4343
}
4444
}
4545

46-
@Suppress("DEPRECATION")
47-
@Deprecated("Implement Updater.apply")
48-
fun Mutator<StateT>.apply(): OutputT? {
49-
throw UnsupportedOperationException()
50-
}
51-
46+
/**
47+
* Executes the logic for this action, including any side effects, updating [state][StateT], and
48+
* setting the [OutputT] to emit.
49+
*/
5250
@Suppress("DEPRECATION")
5351
fun Updater<StateT, OutputT>.apply() {
5452
val mutator = Mutator(nextState)
@@ -57,6 +55,12 @@ interface WorkflowAction<StateT, out OutputT : Any> {
5755
nextState = mutator.state
5856
}
5957

58+
@Suppress("DEPRECATION")
59+
@Deprecated("Implement Updater.apply")
60+
fun Mutator<StateT>.apply(): OutputT? {
61+
throw UnsupportedOperationException()
62+
}
63+
6064
companion object {
6165
/**
6266
* Returns a [WorkflowAction] that does nothing: no output will be emitted, and
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2020 Square Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.squareup.workflow
17+
18+
import kotlinx.coroutines.ExperimentalCoroutinesApi
19+
import kotlinx.coroutines.flow.MutableStateFlow
20+
import kotlinx.coroutines.launch
21+
import kotlinx.coroutines.test.runBlockingTest
22+
import kotlin.test.Test
23+
import kotlin.test.assertEquals
24+
import kotlin.test.assertFalse
25+
import kotlin.test.assertNull
26+
import kotlin.test.assertTrue
27+
import kotlin.test.fail
28+
29+
@OptIn(
30+
ExperimentalCoroutinesApi::class,
31+
ExperimentalStdlibApi::class,
32+
ExperimentalWorkflow::class
33+
)
34+
class SinkTest {
35+
36+
private val sink = RecordingSink()
37+
38+
@Test fun `collectToActionSink sends action`() {
39+
runBlockingTest {
40+
val flow = MutableStateFlow(1)
41+
val collector = launch {
42+
flow.collectToSink(sink) {
43+
action {
44+
nextState = "$nextState $it"
45+
setOutput("output: $it")
46+
}
47+
}
48+
}
49+
50+
advanceUntilIdle()
51+
assertEquals(1, sink.actions.size)
52+
sink.actions.removeFirst()
53+
.let { action ->
54+
val (newState, output) = action.applyTo("state")
55+
assertEquals("state 1", newState)
56+
assertEquals("output: 1", output)
57+
}
58+
assertTrue(sink.actions.isEmpty())
59+
60+
flow.value = 2
61+
advanceUntilIdle()
62+
assertEquals(1, sink.actions.size)
63+
sink.actions.removeFirst()
64+
.let { action ->
65+
val (newState, output) = action.applyTo("state")
66+
assertEquals("state 2", newState)
67+
assertEquals("output: 2", output)
68+
}
69+
70+
collector.cancel()
71+
}
72+
}
73+
74+
@Test fun `sendAndAwaitApplication applies action`() {
75+
var applications = 0
76+
val action = action<String, String> {
77+
applications++
78+
nextState = "$nextState applied"
79+
setOutput("output")
80+
}
81+
82+
runBlockingTest {
83+
launch { sink.sendAndAwaitApplication(action) }
84+
advanceUntilIdle()
85+
86+
val enqueuedAction = sink.actions.removeFirst()
87+
val result = enqueuedAction.applyTo("state")
88+
assertEquals(1, applications)
89+
assertEquals("state applied", result.first)
90+
assertEquals("output", result.second)
91+
}
92+
}
93+
94+
@Test fun `sendAndAwaitApplication suspends until after applied`() {
95+
runBlockingTest {
96+
var resumed = false
97+
val action = action<String, String> {
98+
assertFalse(resumed)
99+
}
100+
launch {
101+
sink.sendAndAwaitApplication(action)
102+
resumed = true
103+
}
104+
advanceUntilIdle()
105+
assertFalse(resumed)
106+
assertEquals(1, sink.actions.size)
107+
108+
val enqueuedAction = sink.actions.removeFirst()
109+
pauseDispatcher()
110+
enqueuedAction.applyTo("state")
111+
112+
assertFalse(resumed)
113+
resumeDispatcher()
114+
advanceUntilIdle()
115+
assertTrue(resumed)
116+
}
117+
}
118+
119+
@Test fun `sendAndAwaitApplication doesn't apply action when cancelled while suspended`() {
120+
runBlockingTest {
121+
var applied = false
122+
val action = action<String, String> {
123+
applied = true
124+
fail()
125+
}
126+
val sendJob = launch { sink.sendAndAwaitApplication(action) }
127+
advanceUntilIdle()
128+
assertFalse(applied)
129+
assertEquals(1, sink.actions.size)
130+
131+
val enqueuedAction = sink.actions.removeFirst()
132+
sendJob.cancel()
133+
advanceUntilIdle()
134+
val result = enqueuedAction.applyTo("ignored")
135+
136+
assertFalse(applied)
137+
assertEquals("ignored", result.first)
138+
assertNull(result.second)
139+
}
140+
}
141+
142+
private class RecordingSink : Sink<WorkflowAction<String, String>> {
143+
val actions = mutableListOf<WorkflowAction<String, String>>()
144+
145+
override fun send(value: WorkflowAction<String, String>) {
146+
actions += value
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)