Skip to content

Commit 9bebe8f

Browse files
Upgrade kotlin and coroutines dependencies to latest versions.
Closes #188 because the deprecated `cancel` method that took `Throwable` has been removed. The issue where workflow subscriptions were disposed multiple times also seems to have been fixed. The only real thing this "broke" was how legacy workflow's `switchMapState` handles channel cancellation, now we have to be a bit more explicit. This is why channels shouldn't be used as streams…
1 parent f52816b commit 9bebe8f

File tree

14 files changed

+113
-68
lines changed

14 files changed

+113
-68
lines changed

kotlin/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ buildscript {
3535
'hamcrest': '1.3',
3636
'intellijAnnotations': '13.0',
3737
'junit': '4.12',
38-
'kotlin': '1.3.21',
39-
'kotlinCoroutines': '1.1.1',
38+
'kotlin': '1.3.30',
39+
'kotlinCoroutines': '1.2.0',
4040
'ktlintPlugin': '5.1.0',
4141
'mavenPublishPlugin': '0.6.0',
4242
'mockito': '2.7.5',

kotlin/legacy/legacy-workflow-core/src/main/java/com/squareup/workflow/legacy/WorkflowOperators.kt

+21-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.squareup.workflow.legacy
1919

20+
import kotlinx.coroutines.CancellationException
2021
import kotlinx.coroutines.CoroutineScope
2122
import kotlinx.coroutines.Deferred
2223
import kotlinx.coroutines.Dispatchers.Unconfined
@@ -93,7 +94,20 @@ fun <S1 : Any, S2 : Any, E : Any, O : Any> Workflow<S1, E, O>.switchMapState(
9394
// leave the consumeEach loop but the produce coroutine will wait for this child coroutine
9495
// to complete (structured concurrency) before closing the downstream channel.
9596
transformerJob = launch {
96-
transform(upstreamState).toChannel(downstreamChannel)
97+
try {
98+
transform(upstreamState).toChannel(downstreamChannel)
99+
} catch (e: Throwable) {
100+
// Actual errors should be propagated, cancellation should interrupt forwarding
101+
// the current transformed channel but allow the next one to continue.
102+
// However, there may be multiple nested CancellationExceptions wrapping the actual
103+
// exception, so we have to go digging.
104+
val causeChain = generateSequence(e) { it.cause }
105+
causeChain.firstOrNull { it !is CancellationException }
106+
?.let { realException ->
107+
downstreamChannel.close(realException)
108+
throw realException
109+
}
110+
}
97111
}
98112
}
99113
}
@@ -114,10 +128,13 @@ fun <S : Any, E : Any, O1 : Any, O2 : Any> Workflow<S, E, O1>.mapResult(
114128

115129
// Propagate cancellation upstream.
116130
transformedResult.invokeOnCompletion { cause ->
117-
if (cause != null) {
118-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
119-
@Suppress("DEPRECATION")
131+
this@mapResult.cancel(
132+
if (cause is CancellationException) cause else CancellationException(null, cause)
133+
)
134+
if (cause is CancellationException) {
120135
this@mapResult.cancel(cause)
136+
} else if (cause != null) {
137+
this@mapResult.cancel(CancellationException(null, cause))
121138
}
122139
}
123140

kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/CoroutineWorkflowTest.kt

+11-4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import kotlinx.coroutines.CancellationException
2121
import kotlinx.coroutines.CoroutineName
2222
import kotlinx.coroutines.CoroutineScope
2323
import kotlinx.coroutines.Dispatchers.Unconfined
24+
import kotlinx.coroutines.cancel
2425
import kotlinx.coroutines.channels.ReceiveChannel
2526
import kotlinx.coroutines.channels.consume
2627
import kotlinx.coroutines.suspendCancellableCoroutine
@@ -173,7 +174,7 @@ class CoroutineWorkflowTest : CoroutineScope {
173174
workflow.sendEvent(Unit)
174175
}
175176

176-
@Test fun `block gets original cancellation reason`() {
177+
@Test fun `block gets original cancellation reason - null cause`() {
177178
lateinit var cancelReason: Throwable
178179
val workflow = workflow<Nothing, Nothing, Nothing> { _, _ ->
179180
suspendCancellableCoroutine<Nothing> { continuation ->
@@ -190,7 +191,7 @@ class CoroutineWorkflowTest : CoroutineScope {
190191
}
191192

192193
@Suppress("DEPRECATION")
193-
@Test fun `block gets original cancellation reason - deprecated cancel`() {
194+
@Test fun `block gets original cancellation reason - non-null cause`() {
194195
lateinit var cancelReason: Throwable
195196
val workflow = workflow<Nothing, Nothing, Nothing> { _, _ ->
196197
suspendCancellableCoroutine<Nothing> { continuation ->
@@ -203,10 +204,16 @@ class CoroutineWorkflowTest : CoroutineScope {
203204
workflow.cancel(ExpectedException)
204205

205206
assertTrue(cancelReason is CancellationException)
206-
assertEquals(ExpectedException, cancelReason.cause)
207+
// Search up the cause chain for the expected exception, since multiple CancellationExceptions
208+
// may be chained together first.
209+
val causeChain = generateSequence<Throwable>(cancelReason) { it.cause }
210+
assertEquals(
211+
1, causeChain.count { it === ExpectedException },
212+
"Expected cancellation exception cause chain to include ExpectedException."
213+
)
207214
}
208215

209216
private companion object {
210-
object ExpectedException : RuntimeException()
217+
object ExpectedException : CancellationException()
211218
}
212219
}

kotlin/legacy/legacy-workflow-core/src/test/java/com/squareup/workflow/legacy/WorkflowOperatorsTest.kt

+28-20
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import kotlinx.coroutines.channels.Channel
2424
import kotlinx.coroutines.channels.consume
2525
import kotlinx.coroutines.channels.produce
2626
import kotlinx.coroutines.isActive
27+
import kotlinx.coroutines.runBlocking
2728
import kotlinx.coroutines.suspendCancellableCoroutine
2829
import kotlin.test.BeforeTest
2930
import kotlin.test.Test
3031
import kotlin.test.assertEquals
32+
import kotlin.test.assertFails
3133
import kotlin.test.assertFailsWith
3234
import kotlin.test.assertFalse
3335
import kotlin.test.assertNull
@@ -81,9 +83,7 @@ class WorkflowOperatorsTest {
8183
it.toString()
8284
}
8385

84-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
85-
@Suppress("DEPRECATION")
86-
withAdaptedEvents.cancel(ExpectedException())
86+
withAdaptedEvents.cancel(CancellationException(null, ExpectedException()))
8787

8888
assertTrue(sourceCancellation is CancellationException)
8989
}
@@ -161,9 +161,7 @@ class WorkflowOperatorsTest {
161161
val withMappedStates = source.mapState { it }
162162

163163
assertNull(sourceCancellation)
164-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
165-
@Suppress("DEPRECATION")
166-
withMappedStates.cancel(ExpectedException())
164+
withMappedStates.cancel(CancellationException(null, ExpectedException()))
167165
assertTrue(sourceCancellation is CancellationException)
168166
}
169167

@@ -308,7 +306,16 @@ class WorkflowOperatorsTest {
308306
.consume {
309307
assertFalse(withMappedStates.isCompleted)
310308
withMappedStates.sendEvent(Unit)
311-
assertFailsWith<ExpectedException> { poll() }
309+
assertFails { runBlocking { receive() } }
310+
.also { error ->
311+
// Search up the cause chain for the expected exception, since multiple CancellationExceptions
312+
// may be chained together first.
313+
val causeChain = generateSequence(error) { it.cause }
314+
assertEquals(
315+
1, causeChain.count { it is ExpectedException },
316+
"Expected cancellation exception cause chain to include ExpectedException."
317+
)
318+
}
312319
// Exception is not sent through the result.
313320
assertEquals(Unit, withMappedStates.getCompleted())
314321
}
@@ -321,17 +328,24 @@ class WorkflowOperatorsTest {
321328
}
322329
val withMappedStates: Workflow<Int, Unit, Unit> = source.switchMapState {
323330
Channel<Int>().apply {
324-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
325-
@Suppress("DEPRECATION")
326-
cancel(ExpectedException())
331+
cancel(CancellationException(null, ExpectedException()))
327332
}
328333
}
329334

330335
withMappedStates.openSubscriptionToState()
331336
.consume {
332337
assertFalse(withMappedStates.isCompleted)
333338
withMappedStates.sendEvent(Unit)
334-
assertFailsWith<ExpectedException> { poll() }
339+
assertFails { runBlocking { receive() } }
340+
.also { error ->
341+
// Search up the cause chain for the expected exception, since multiple CancellationExceptions
342+
// may be chained together first.
343+
val causeChain = generateSequence(error) { it.cause }
344+
assertEquals(
345+
1, causeChain.count { it is ExpectedException },
346+
"Expected cancellation exception cause chain to include ExpectedException."
347+
)
348+
}
335349
// Exception is not sent through the result.
336350
assertEquals(Unit, withMappedStates.getCompleted())
337351
}
@@ -371,9 +385,7 @@ class WorkflowOperatorsTest {
371385
withMappedStates.sendEvent(Unit)
372386

373387
assertFalse(transformedChannel.isClosedForSend)
374-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
375-
@Suppress("DEPRECATION")
376-
this.cancel(ExpectedException())
388+
this.cancel(CancellationException(null, ExpectedException()))
377389
assertTrue(transformedChannel.isClosedForSend)
378390
}
379391
}
@@ -390,9 +402,7 @@ class WorkflowOperatorsTest {
390402
val withMappedStates = source.switchMapState { produce { send(it) } }
391403

392404
assertNull(sourceCancellation)
393-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
394-
@Suppress("DEPRECATION")
395-
withMappedStates.cancel(ExpectedException())
405+
withMappedStates.cancel(CancellationException(null, ExpectedException()))
396406
assertTrue(sourceCancellation is CancellationException)
397407
}
398408

@@ -450,9 +460,7 @@ class WorkflowOperatorsTest {
450460
val withMappedResult = source.mapResult { it }
451461

452462
assertNull(sourceCancellation)
453-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
454-
@Suppress("DEPRECATION")
455-
withMappedResult.cancel(ExpectedException())
463+
withMappedResult.cancel(CancellationException(null, ExpectedException()))
456464
assertTrue(sourceCancellation is CancellationException)
457465
}
458466
}

kotlin/legacy/legacy-workflow-rx2/src/main/java/com/squareup/workflow/legacy/rx2/EventChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ fun <E : Any> ReceiveChannel<E>.asEventChannel() = object : EventChannel<E> {
123123
.also { selectionJob.cancel() }
124124
} catch (cancellation: CancellationException) {
125125
val cause = cancellation.cause
126-
if (cause == null) {
126+
if (cause == null || cause is CancellationException) {
127127
// The select was cancelled normally, which means the workflow was abandoned and we're
128128
// about to get unsubscribed from. Don't propagate the error, just never emit/return.
129129
suspendCoroutine<Nothing> { }

kotlin/legacy/legacy-workflow-rx2/src/test/java/com/squareup/workflow/legacy/rx2/WorkflowOperatorsTest.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ class WorkflowOperatorsTest {
7878
states.cancel()
7979

8080
assertThat(subscribeCount).isEqualTo(1)
81-
// For some reason the observable is actually disposed twice. Seems like a coroutines bug, but
82-
// Disposable.dispose() is an idempotent operation so it should be fine.
83-
assertThat(disposeCount).isEqualTo(2)
81+
assertThat(disposeCount).isEqualTo(1)
8482
}
8583
}

kotlin/samples/tictactoe/android/build.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ android {
2727
versionCode 1
2828
versionName "1.0.0"
2929
}
30+
31+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1064#issuecomment-479412940
32+
packagingOptions {
33+
exclude 'META-INF/atomicfu.kotlin_module'
34+
}
3035
}
3136

3237
dependencies {

kotlin/workflow-runtime/src/main/java/com/squareup/workflow/WorkflowHost.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import com.squareup.workflow.WorkflowHost.Update
2222
import com.squareup.workflow.internal.WorkflowId
2323
import com.squareup.workflow.internal.WorkflowNode
2424
import com.squareup.workflow.internal.id
25+
import kotlinx.coroutines.CancellationException
2526
import kotlinx.coroutines.cancel
2627
import kotlinx.coroutines.channels.ReceiveChannel
2728
import kotlinx.coroutines.channels.produce
@@ -149,9 +150,7 @@ internal fun <I : Any, O : Any, R : Any> WorkflowNode<I, *, O, R>.start(
149150
} catch (e: Throwable) {
150151
// For some reason the exception gets masked if we don't explicitly pass it to cancel the
151152
// producer coroutine ourselves here.
152-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
153-
@Suppress("DEPRECATION")
154-
coroutineContext.cancel(e)
153+
coroutineContext.cancel(if (e is CancellationException) e else CancellationException(null, e))
155154
throw e
156155
} finally {
157156
// There's a potential race condition if the producer coroutine is cancelled before it has a chance

kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/ChannelUpdatesTest.kt

+11-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package com.squareup.workflow.internal
2020
import com.squareup.workflow.util.ChannelUpdate
2121
import com.squareup.workflow.util.ChannelUpdate.Closed
2222
import com.squareup.workflow.util.ChannelUpdate.Value
23+
import kotlinx.coroutines.CancellationException
2324
import kotlinx.coroutines.Dispatchers
2425
import kotlinx.coroutines.GlobalScope
2526
import kotlinx.coroutines.async
@@ -82,16 +83,22 @@ class ChannelUpdatesTest {
8283

8384
@Test fun `handles error`() {
8485
val channel = Channel<String>()
85-
// TODO https://github.com/square/workflow/issues/188 Stop using parameterized cancel.
86-
@Suppress("DEPRECATION")
87-
channel.cancel(ExpectedException())
86+
channel.cancel(CancellationException(null, ExpectedException()))
8887

89-
assertFailsWith<ExpectedException> {
88+
assertFailsWith<CancellationException> {
9089
runBlocking {
9190
select<ChannelUpdate<String>> {
9291
onChannelUpdate(channel) { it }
9392
}
9493
}
94+
}.also { error ->
95+
// Search up the cause chain for the expected exception, since multiple CancellationExceptions
96+
// may be chained together first.
97+
val causeChain = generateSequence<Throwable>(error) { it.cause }
98+
assertEquals(
99+
1, causeChain.count { it is ExpectedException },
100+
"Expected cancellation exception cause chain to include original cause."
101+
)
95102
}
96103
}
97104

kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/RealWorkflowContextTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.squareup.workflow.Workflow
2323
import com.squareup.workflow.WorkflowAction.Companion.emitOutput
2424
import com.squareup.workflow.WorkflowAction.Companion.noop
2525
import com.squareup.workflow.WorkflowContext
26-
import com.squareup.workflow.compose
26+
import com.squareup.workflow.composeChild
2727
import com.squareup.workflow.internal.Behavior.WorkflowOutputCase
2828
import com.squareup.workflow.internal.RealWorkflowContext.Composer
2929
import com.squareup.workflow.internal.RealWorkflowContextTest.TestComposer.Rendering
@@ -149,7 +149,7 @@ class RealWorkflowContextTest {
149149
context.onReceive<Unit>({ fail() }, Unit::class.starProjectedType) { fail() }
150150
}
151151
val child = Workflow.stateless<Nothing, Unit> { fail() }
152-
assertFailsWith<IllegalStateException> { context.compose(child) }
152+
assertFailsWith<IllegalStateException> { context.composeChild(child) }
153153
assertFailsWith<IllegalStateException> { context.buildBehavior() }
154154
}
155155
}

kotlin/workflow-rx2/src/test/java/com/squareup/workflow/rx2/SubscriptionsTest.kt

+4-10
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,7 @@ class SubscriptionsTest {
101101
}
102102

103103
assertEquals(1, workflow.subscriptions)
104-
// For some reason the observable is actually disposed twice. Seems like a coroutines bug, but
105-
// Disposable.dispose() is an idempotent operation so it should be fine.
106-
assertEquals(2, workflow.disposals)
104+
assertEquals(1, workflow.disposals)
107105
}
108106
}
109107

@@ -116,9 +114,7 @@ class SubscriptionsTest {
116114
}
117115

118116
assertEquals(1, workflow.subscriptions)
119-
// For some reason the observable is actually disposed twice. Seems like a coroutines bug, but
120-
// Disposable.dispose() is an idempotent operation so it should be fine.
121-
assertEquals(2, workflow.disposals)
117+
assertEquals(1, workflow.disposals)
122118
}
123119
}
124120

@@ -148,15 +144,13 @@ class SubscriptionsTest {
148144

149145
host.withNextRendering { setSubscribed ->
150146
assertEquals(1, workflow.subscriptions)
151-
// For some reason the observable is actually disposed twice. Seems like a coroutines bug,
152-
// but Disposable.dispose() is an idempotent operation so it should be fine.
153-
assertEquals(2, workflow.disposals)
147+
assertEquals(1, workflow.disposals)
154148
setSubscribed(true)
155149
}
156150

157151
host.withNextRendering {
158152
assertEquals(2, workflow.subscriptions)
159-
assertEquals(2, workflow.disposals)
153+
assertEquals(1, workflow.disposals)
160154
}
161155
}
162156
}

0 commit comments

Comments
 (0)