From 5fc74d63ae7f2f8291074b4db9c516c7431ddf87 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 27 Apr 2023 17:47:09 -0400 Subject: [PATCH] Implement auto fork joining --- .../toolkit/src/listenerMiddleware/index.ts | 20 +++++- .../src/listenerMiddleware/tests/fork.test.ts | 71 ++++++++++++++----- .../toolkit/src/listenerMiddleware/types.ts | 12 +++- 3 files changed, 80 insertions(+), 23 deletions(-) diff --git a/packages/toolkit/src/listenerMiddleware/index.ts b/packages/toolkit/src/listenerMiddleware/index.ts index 56205295ee..5a2603edea 100644 --- a/packages/toolkit/src/listenerMiddleware/index.ts +++ b/packages/toolkit/src/listenerMiddleware/index.ts @@ -23,6 +23,7 @@ import type { TaskResult, AbortSignalWithReason, UnsubscribeListenerOptions, + ForkOptions, } from './types' import { abortControllerWithReason, @@ -78,13 +79,19 @@ const INTERNAL_NIL_TOKEN = {} as const const alm = 'listenerMiddleware' as const -const createFork = (parentAbortSignal: AbortSignalWithReason) => { +const createFork = ( + parentAbortSignal: AbortSignalWithReason, + parentBlockingPromises: Promise[] +) => { const linkControllers = (controller: AbortController) => addAbortSignalListener(parentAbortSignal, () => abortControllerWithReason(controller, parentAbortSignal.reason) ) - return (taskExecutor: ForkedTaskExecutor): ForkedTask => { + return ( + taskExecutor: ForkedTaskExecutor, + opts?: ForkOptions + ): ForkedTask => { assertFunction(taskExecutor, 'taskExecutor') const childAbortController = new AbortController() @@ -105,6 +112,10 @@ const createFork = (parentAbortSignal: AbortSignalWithReason) => { () => abortControllerWithReason(childAbortController, taskCompleted) ) + if (opts?.autoJoin) { + parentBlockingPromises.push(result) + } + return { result: createPause>(parentAbortSignal)(result), cancel() { @@ -376,6 +387,7 @@ export function createListenerMiddleware< startListening, internalTaskController.signal ) + const autoJoinPromises: Promise[] = [] try { entry.pending.add(internalTaskController) @@ -394,7 +406,7 @@ export function createListenerMiddleware< pause: createPause(internalTaskController.signal), extra, signal: internalTaskController.signal, - fork: createFork(internalTaskController.signal), + fork: createFork(internalTaskController.signal, autoJoinPromises), unsubscribe: entry.unsubscribe, subscribe: () => { listenerMap.set(entry.id, entry) @@ -417,6 +429,8 @@ export function createListenerMiddleware< }) } } finally { + await Promise.allSettled(autoJoinPromises) + abortControllerWithReason(internalTaskController, listenerCompleted) // Notify that the task has completed entry.pending.delete(internalTaskController) } diff --git a/packages/toolkit/src/listenerMiddleware/tests/fork.test.ts b/packages/toolkit/src/listenerMiddleware/tests/fork.test.ts index 6d9e6abb56..41a5df123f 100644 --- a/packages/toolkit/src/listenerMiddleware/tests/fork.test.ts +++ b/packages/toolkit/src/listenerMiddleware/tests/fork.test.ts @@ -12,6 +12,7 @@ import { listenerCancelled, listenerCompleted, taskCancelled, + taskCompleted, } from '../exceptions' function delay(ms: number) { @@ -349,28 +350,60 @@ describe('fork', () => { ) }) - test('forkApi.signal listener is invoked as soon as the parent listener is cancelled or completed', async () => { - let deferredResult = deferred() + it.each([ + { + autoJoin: true, + expectedAbortReason: taskCompleted, + cancelListener: false, + }, + { + autoJoin: false, + expectedAbortReason: listenerCompleted, + cancelListener: false, + }, + { + autoJoin: true, + expectedAbortReason: listenerCancelled, + cancelListener: true, + }, + { + autoJoin: false, + expectedAbortReason: listenerCancelled, + cancelListener: true, + }, + ])( + 'signal is $expectedAbortReason when autoJoin: $autoJoin, cancelListener: $cancelListener', + async ({ autoJoin, cancelListener, expectedAbortReason }) => { + let deferredResult = deferred() + + const unsubscribe = startListening({ + actionCreator: increment, + async effect(_, listenerApi) { + listenerApi.fork( + async (forkApi) => { + forkApi.signal.addEventListener('abort', () => { + deferredResult.resolve( + (forkApi.signal as AbortSignalWithReason).reason + ) + }) + + await forkApi.delay(10) + }, + { autoJoin } + ) + }, + }) - startListening({ - actionCreator: increment, - async effect(_, listenerApi) { - const wronglyDoNotAwaitResultOfTask = listenerApi.fork( - async (forkApi) => { - forkApi.signal.addEventListener('abort', () => { - deferredResult.resolve( - (forkApi.signal as AbortSignalWithReason).reason - ) - }) - } - ) - }, - }) + store.dispatch(increment()) - store.dispatch(increment()) + // let task start + await Promise.resolve() - expect(await deferredResult).toBe(listenerCompleted) - }) + if (cancelListener) unsubscribe({ cancelActive: true }) + + expect(await deferredResult).toBe(expectedAbortReason) + } + ) test('fork.delay does not trigger unhandledRejections for completed or cancelled tasks', async () => { let deferredCompletedEvt = deferred() diff --git a/packages/toolkit/src/listenerMiddleware/types.ts b/packages/toolkit/src/listenerMiddleware/types.ts index 6b75d80036..650c78c6fe 100644 --- a/packages/toolkit/src/listenerMiddleware/types.ts +++ b/packages/toolkit/src/listenerMiddleware/types.ts @@ -131,6 +131,15 @@ export interface ForkedTask { cancel(): void } +/** @public */ +export interface ForkOptions { + /** + * If true, causes the parent task to not be marked as complete until + * all autoJoined forks have completed or failed. + */ + autoJoin: boolean; +} + /** @public */ export interface ListenerEffectAPI< State, @@ -238,8 +247,9 @@ export interface ListenerEffectAPI< /** * Queues in the next microtask the execution of a task. * @param executor + * @param options */ - fork(executor: ForkedTaskExecutor): ForkedTask + fork(executor: ForkedTaskExecutor, options?: ForkOptions): ForkedTask /** * Returns a promise that resolves when `waitFor` resolves or * rejects if the listener has been cancelled or is completed.