|
1 | 1 | package kotlinx.coroutines
|
2 | 2 |
|
| 3 | +import kotlinx.coroutines.flow.* |
3 | 4 | import kotlinx.coroutines.testing.*
|
4 | 5 | import org.junit.Test
|
| 6 | +import java.util.concurrent.CopyOnWriteArrayList |
| 7 | +import java.util.concurrent.ExecutorService |
| 8 | +import java.util.concurrent.Executors |
5 | 9 | import kotlin.coroutines.*
|
6 | 10 | import kotlin.test.*
|
7 |
| -import kotlinx.coroutines.flow.* |
8 | 11 |
|
9 | 12 | class ThreadContextElementTest : TestBase() {
|
10 | 13 |
|
@@ -155,39 +158,81 @@ class ThreadContextElementTest : TestBase() {
|
155 | 158 | }
|
156 | 159 | }
|
157 | 160 |
|
158 |
| - class JobCaptor(val capturees: ArrayList<Job> = ArrayList()) : ThreadContextElement<Unit> { |
| 161 | + class JobCaptor(val capturees: MutableList<String> = CopyOnWriteArrayList()) : ThreadContextElement<Unit> { |
159 | 162 |
|
160 | 163 | companion object Key : CoroutineContext.Key<MyElement>
|
161 | 164 |
|
162 | 165 | override val key: CoroutineContext.Key<*> get() = Key
|
163 | 166 |
|
164 | 167 | override fun updateThreadContext(context: CoroutineContext) {
|
165 |
| - capturees.add(context.job) |
| 168 | + capturees.add("Update: ${context.job}") |
166 | 169 | }
|
167 | 170 |
|
168 | 171 | override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
|
| 172 | + capturees.add("Restore: ${context.job}") |
169 | 173 | }
|
170 | 174 | }
|
171 | 175 |
|
| 176 | + /** |
| 177 | + * For stability of the test, it is important to make sure that |
| 178 | + * the parent job actually suspends when calling |
| 179 | + * `withContext(dispatcher2 + CoroutineName("dispatched"))`. |
| 180 | + * |
| 181 | + * Here this requirement is fulfilled by forcing execution on a single thread. |
| 182 | + * However, dispatching is performed with two non-equal dispatchers to force dispatching. |
| 183 | + * |
| 184 | + * Suspend of the parent coroutine [kotlinx.coroutines.DispatchedCoroutine.trySuspend] is out of the control of the test, |
| 185 | + * while being executed concurrently with resume of the child coroutine [kotlinx.coroutines.DispatchedCoroutine.tryResume]. |
| 186 | + */ |
172 | 187 | @Test
|
173 | 188 | fun testWithContextJobAccess() = runTest {
|
| 189 | + val executor = Executors.newSingleThreadExecutor() |
| 190 | + // Emulate non-equal dispatchers |
| 191 | + val executor1 = object : ExecutorService by executor {} |
| 192 | + val executor2 = object : ExecutorService by executor {} |
| 193 | + val dispatcher1 = executor1.asCoroutineDispatcher() |
| 194 | + val dispatcher2 = executor2.asCoroutineDispatcher() |
174 | 195 | val captor = JobCaptor()
|
175 |
| - val manuallyCaptured = ArrayList<Job>() |
176 |
| - runBlocking(captor) { |
177 |
| - manuallyCaptured += coroutineContext.job |
| 196 | + val manuallyCaptured = mutableListOf<String>() |
| 197 | + |
| 198 | + fun registerUpdate(job: Job?) = manuallyCaptured.add("Update: $job") |
| 199 | + fun registerRestore(job: Job?) = manuallyCaptured.add("Restore: $job") |
| 200 | + |
| 201 | + var rootJob: Job? = null |
| 202 | + runBlocking(captor + dispatcher1) { |
| 203 | + rootJob = coroutineContext.job |
| 204 | + registerUpdate(rootJob) |
| 205 | + var undispatchedJob: Job? = null |
178 | 206 | withContext(CoroutineName("undispatched")) {
|
179 |
| - manuallyCaptured += coroutineContext.job |
180 |
| - withContext(Dispatchers.IO) { |
181 |
| - manuallyCaptured += coroutineContext.job |
| 207 | + undispatchedJob = coroutineContext.job |
| 208 | + registerUpdate(undispatchedJob) |
| 209 | + // These 2 restores and the corresponding next 2 updates happen only if the following `withContext` |
| 210 | + // call actually suspends. |
| 211 | + registerRestore(undispatchedJob) |
| 212 | + registerRestore(rootJob) |
| 213 | + // Without forcing of single backing thread the code inside `withContext` |
| 214 | + // may already complete at the moment when the parent coroutine decides |
| 215 | + // whether it needs to suspend or not. |
| 216 | + var dispatchedJob: Job? = null |
| 217 | + withContext(dispatcher2 + CoroutineName("dispatched")) { |
| 218 | + dispatchedJob = coroutineContext.job |
| 219 | + registerUpdate(dispatchedJob) |
182 | 220 | }
|
| 221 | + registerRestore(dispatchedJob) |
183 | 222 | // Context restored, captured again
|
184 |
| - manuallyCaptured += coroutineContext.job |
| 223 | + registerUpdate(undispatchedJob) |
185 | 224 | }
|
| 225 | + registerRestore(undispatchedJob) |
186 | 226 | // Context restored, captured again
|
187 |
| - manuallyCaptured += coroutineContext.job |
| 227 | + registerUpdate(rootJob) |
188 | 228 | }
|
| 229 | + registerRestore(rootJob) |
189 | 230 |
|
190 |
| - assertEquals(manuallyCaptured, captor.capturees) |
| 231 | + // Restores may be called concurrently to the update calls in other threads, so their order is not checked. |
| 232 | + val expected = manuallyCaptured.filter { it.startsWith("Update: ") }.joinToString(separator = "\n") |
| 233 | + val actual = captor.capturees.filter { it.startsWith("Update: ") }.joinToString(separator = "\n") |
| 234 | + assertEquals(expected, actual) |
| 235 | + executor.shutdownNow() |
191 | 236 | }
|
192 | 237 |
|
193 | 238 | @Test
|
|
0 commit comments