diff --git a/src/functions.php b/src/functions.php index 350f914..0f2ef7e 100644 --- a/src/functions.php +++ b/src/functions.php @@ -3,6 +3,7 @@ namespace React\Async; use React\EventLoop\Loop; +use React\Promise\CancellablePromiseInterface; use React\Promise\Deferred; use React\Promise\PromiseInterface; @@ -96,46 +97,53 @@ function ($error) use (&$exception, &$rejected, &$wait) { */ function parallel(array $tasks) { - $deferred = new Deferred(); - $results = array(); - $errors = array(); - - $done = function () use (&$results, &$errors, $deferred) { - if (count($errors)) { - $deferred->reject(array_shift($errors)); - return; + $pending = array(); + $deferred = new Deferred(function () use (&$pending) { + foreach ($pending as $promise) { + if ($promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } } - - $deferred->resolve($results); - }; + $pending = array(); + }); + $results = array(); + $errored = false; $numTasks = count($tasks); - if (0 === $numTasks) { - $done(); + $deferred->resolve($results); } - $checkDone = function () use (&$results, &$errors, $numTasks, $done) { - if ($numTasks === count($results) + count($errors)) { - $done(); - } - }; + $taskErrback = function ($error) use (&$pending, $deferred, &$errored) { + $errored = true; + $deferred->reject($error); - $taskErrback = function ($error) use (&$errors, $checkDone) { - $errors[] = $error; - $checkDone(); + foreach ($pending as $promise) { + if ($promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } + } + $pending = array(); }; foreach ($tasks as $i => $task) { - $taskCallback = function ($result) use (&$results, $i, $checkDone) { + $taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) { $results[$i] = $result; - $checkDone(); + + if (count($results) === $numTasks) { + $deferred->resolve($results); + } }; $promise = call_user_func($task); assert($promise instanceof PromiseInterface); + $pending[$i] = $promise; $promise->then($taskCallback, $taskErrback); + + if ($errored) { + break; + } } return $deferred->promise(); @@ -147,7 +155,13 @@ function parallel(array $tasks) */ function series(array $tasks) { - $deferred = new Deferred(); + $pending = null; + $deferred = new Deferred(function () use (&$pending) { + if ($pending instanceof CancellablePromiseInterface) { + $pending->cancel(); + } + $pending = null; + }); $results = array(); /** @var callable():void $next */ @@ -156,7 +170,7 @@ function series(array $tasks) $next(); }; - $next = function () use (&$tasks, $taskCallback, $deferred, &$results) { + $next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) { if (0 === count($tasks)) { $deferred->resolve($results); return; @@ -165,6 +179,7 @@ function series(array $tasks) $task = array_shift($tasks); $promise = call_user_func($task); assert($promise instanceof PromiseInterface); + $pending = $promise; $promise->then($taskCallback, array($deferred, 'reject')); }; @@ -180,10 +195,16 @@ function series(array $tasks) */ function waterfall(array $tasks) { - $deferred = new Deferred(); + $pending = null; + $deferred = new Deferred(function () use (&$pending) { + if ($pending instanceof CancellablePromiseInterface) { + $pending->cancel(); + } + $pending = null; + }); /** @var callable $next */ - $next = function ($value = null) use (&$tasks, &$next, $deferred) { + $next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) { if (0 === count($tasks)) { $deferred->resolve($value); return; @@ -192,6 +213,7 @@ function waterfall(array $tasks) $task = array_shift($tasks); $promise = call_user_func_array($task, func_get_args()); assert($promise instanceof PromiseInterface); + $pending = $promise; $promise->then($next, array($deferred, 'reject')); }; diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index f27a73e..b77a3ca 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -29,7 +29,7 @@ function () { }, function () { return new Promise(function ($resolve) { - Loop::addTimer(0.1, function () use ($resolve) { + Loop::addTimer(0.11, function () use ($resolve) { $resolve('bar'); }); }); @@ -49,7 +49,7 @@ function () { $timer->assertInRange(0.1, 0.2); } - public function testParallelWithError() + public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks() { $called = 0; @@ -60,7 +60,8 @@ function () use (&$called) { $resolve('foo'); }); }, - function () { + function () use (&$called) { + $called++; return new Promise(function () { throw new \RuntimeException('whoops'); }); @@ -80,7 +81,59 @@ function () use (&$called) { $this->assertSame(2, $called); } - public function testParallelWithDelayedError() + public function testParallelWithErrorWillCancelPendingPromises() + { + $cancelled = 0; + + $tasks = array( + function () use (&$cancelled) { + return new Promise(function () { }, function () use (&$cancelled) { + $cancelled++; + }); + }, + function () { + return new Promise(function () { + throw new \RuntimeException('whoops'); + }); + }, + function () use (&$cancelled) { + return new Promise(function () { }, function () use (&$cancelled) { + $cancelled++; + }); + } + ); + + $promise = React\Async\parallel($tasks); + + $promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('whoops'))); + + $this->assertSame(1, $cancelled); + } + + public function testParallelWillCancelPendingPromisesWhenCallingCancelOnResultingPromise() + { + $cancelled = 0; + + $tasks = array( + function () use (&$cancelled) { + return new Promise(function () { }, function () use (&$cancelled) { + $cancelled++; + }); + }, + function () use (&$cancelled) { + return new Promise(function () { }, function () use (&$cancelled) { + $cancelled++; + }); + } + ); + + $promise = React\Async\parallel($tasks); + $promise->cancel(); + + $this->assertSame(2, $cancelled); + } + + public function testParallelWithDelayedErrorReturnsPromiseRejectedWithExceptionFromTask() { $called = 0; @@ -91,7 +144,8 @@ function () use (&$called) { $resolve('foo'); }); }, - function () { + function () use (&$called) { + $called++; return new Promise(function ($_, $reject) { Loop::addTimer(0.001, function () use ($reject) { $reject(new \RuntimeException('whoops')); @@ -112,6 +166,6 @@ function () use (&$called) { Loop::run(); - $this->assertSame(2, $called); + $this->assertSame(3, $called); } } diff --git a/tests/SeriesTest.php b/tests/SeriesTest.php index 2e168c2..71fc432 100644 --- a/tests/SeriesTest.php +++ b/tests/SeriesTest.php @@ -79,4 +79,27 @@ function () use (&$called) { $this->assertSame(1, $called); } + + public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise() + { + $cancelled = 0; + + $tasks = array( + function () { + return new Promise(function ($resolve) { + $resolve(); + }); + }, + function () use (&$cancelled) { + return new Promise(function () { }, function () use (&$cancelled) { + $cancelled++; + }); + } + ); + + $promise = React\Async\series($tasks); + $promise->cancel(); + + $this->assertSame(1, $cancelled); + } } diff --git a/tests/WaterfallTest.php b/tests/WaterfallTest.php index a04ad5c..b0c5c3c 100644 --- a/tests/WaterfallTest.php +++ b/tests/WaterfallTest.php @@ -86,4 +86,27 @@ function () use (&$called) { $this->assertSame(1, $called); } + + public function testWaterfallWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise() + { + $cancelled = 0; + + $tasks = array( + function () { + return new Promise(function ($resolve) { + $resolve(); + }); + }, + function () use (&$cancelled) { + return new Promise(function () { }, function () use (&$cancelled) { + $cancelled++; + }); + } + ); + + $promise = React\Async\waterfall($tasks); + $promise->cancel(); + + $this->assertSame(1, $cancelled); + } }