Skip to content

[3.x] Support iterable type for parallel() + series() + waterfall() #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ $promise->then(function (int $bytes) {

### parallel()

The `parallel(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
The `parallel(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
like this:

```php
Expand Down Expand Up @@ -250,7 +250,7 @@ React\Async\parallel([

### series()

The `series(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
The `series(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
like this:

```php
Expand Down Expand Up @@ -292,7 +292,7 @@ React\Async\series([

### waterfall()

The `waterfall(array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
The `waterfall(iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
like this:

```php
Expand Down
71 changes: 47 additions & 24 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,10 @@ function coroutine(callable $function, ...$args): PromiseInterface
}

/**
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<array<mixed>,Exception>
*/
function parallel(array $tasks): PromiseInterface
function parallel(iterable $tasks): PromiseInterface
{
$pending = [];
$deferred = new Deferred(function () use (&$pending) {
Expand All @@ -297,15 +297,10 @@ function parallel(array $tasks): PromiseInterface
$pending = [];
});
$results = [];
$errored = false;
$continue = true;

$numTasks = count($tasks);
if (0 === $numTasks) {
$deferred->resolve($results);
}

$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
$errored = true;
$taskErrback = function ($error) use (&$pending, $deferred, &$continue) {
$continue = false;
$deferred->reject($error);

foreach ($pending as $promise) {
Expand All @@ -317,33 +312,39 @@ function parallel(array $tasks): PromiseInterface
};

foreach ($tasks as $i => $task) {
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
$taskCallback = function ($result) use (&$results, &$pending, &$continue, $i, $deferred) {
$results[$i] = $result;
unset($pending[$i]);

if (count($results) === $numTasks) {
if (!$pending && !$continue) {
$deferred->resolve($results);
}
};

$promise = call_user_func($task);
$promise = \call_user_func($task);
assert($promise instanceof PromiseInterface);
$pending[$i] = $promise;

$promise->then($taskCallback, $taskErrback);

if ($errored) {
if (!$continue) {
break;
}
}

$continue = false;
if (!$pending) {
$deferred->resolve($results);
}

return $deferred->promise();
}

/**
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<array<mixed>,Exception>
*/
function series(array $tasks): PromiseInterface
function series(iterable $tasks): PromiseInterface
{
$pending = null;
$deferred = new Deferred(function () use (&$pending) {
Expand All @@ -354,20 +355,31 @@ function series(array $tasks): PromiseInterface
});
$results = [];

if ($tasks instanceof \IteratorAggregate) {
$tasks = $tasks->getIterator();
assert($tasks instanceof \Iterator);
}

/** @var callable():void $next */
$taskCallback = function ($result) use (&$results, &$next) {
$results[] = $result;
$next();
};

$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
if (0 === count($tasks)) {
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
$deferred->resolve($results);
return;
}

$task = array_shift($tasks);
$promise = call_user_func($task);
if ($tasks instanceof \Iterator) {
$task = $tasks->current();
$tasks->next();
} else {
$task = \array_shift($tasks);
}

$promise = \call_user_func($task);
assert($promise instanceof PromiseInterface);
$pending = $promise;

Expand All @@ -380,10 +392,10 @@ function series(array $tasks): PromiseInterface
}

/**
* @param array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
* @param iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<mixed,Exception>
*/
function waterfall(array $tasks): PromiseInterface
function waterfall(iterable $tasks): PromiseInterface
{
$pending = null;
$deferred = new Deferred(function () use (&$pending) {
Expand All @@ -393,15 +405,26 @@ function waterfall(array $tasks): PromiseInterface
$pending = null;
});

if ($tasks instanceof \IteratorAggregate) {
$tasks = $tasks->getIterator();
assert($tasks instanceof \Iterator);
}

/** @var callable $next */
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
if (0 === count($tasks)) {
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
$deferred->resolve($value);
return;
}

$task = array_shift($tasks);
$promise = call_user_func_array($task, func_get_args());
if ($tasks instanceof \Iterator) {
$task = $tasks->current();
$tasks->next();
} else {
$task = \array_shift($tasks);
}

$promise = \call_user_func_array($task, func_get_args());
assert($promise instanceof PromiseInterface);
$pending = $promise;

Expand Down
65 changes: 65 additions & 0 deletions tests/ParallelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use React;
use React\EventLoop\Loop;
use React\Promise\Promise;
use function React\Promise\reject;

class ParallelTest extends TestCase
{
Expand All @@ -17,6 +18,19 @@ public function testParallelWithoutTasks()
$promise->then($this->expectCallableOnceWith(array()));
}

public function testParallelWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
{
$tasks = (function () {
if (false) {
yield;
}
})();

$promise = React\Async\parallel($tasks);

$promise->then($this->expectCallableOnceWith([]));
}

public function testParallelWithTasks()
{
$tasks = array(
Expand Down Expand Up @@ -49,6 +63,38 @@ function () {
$timer->assertInRange(0.1, 0.2);
}

public function testParallelWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
{
$tasks = (function () {
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.1, function () use ($resolve) {
$resolve('foo');
});
});
};
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.11, function () use ($resolve) {
$resolve('bar');
});
});
};
})();

$promise = React\Async\parallel($tasks);

$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));

$timer = new Timer($this);
$timer->start();

Loop::run();

$timer->stop();
$timer->assertInRange(0.1, 0.2);
}

public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$called = 0;
Expand Down Expand Up @@ -81,6 +127,25 @@ function () use (&$called) {
$this->assertSame(2, $called);
}

public function testParallelWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$called = 0;

$tasks = (function () use (&$called) {
while (true) {
yield function () use (&$called) {
return reject(new \RuntimeException('Rejected ' . ++$called));
};
}
})();

$promise = React\Async\parallel($tasks);

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));

$this->assertSame(1, $called);
}

public function testParallelWithErrorWillCancelPendingPromises()
{
$cancelled = 0;
Expand Down
87 changes: 87 additions & 0 deletions tests/SeriesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use React;
use React\EventLoop\Loop;
use React\Promise\Promise;
use function React\Promise\reject;

class SeriesTest extends TestCase
{
Expand All @@ -17,6 +18,19 @@ public function testSeriesWithoutTasks()
$promise->then($this->expectCallableOnceWith(array()));
}

public function testSeriesWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
{
$tasks = (function () {
if (false) {
yield;
}
})();

$promise = React\Async\series($tasks);

$promise->then($this->expectCallableOnceWith([]));
}

public function testSeriesWithTasks()
{
$tasks = array(
Expand Down Expand Up @@ -49,6 +63,38 @@ function () {
$timer->assertInRange(0.10, 0.20);
}

public function testSeriesWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
{
$tasks = (function () {
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.051, function () use ($resolve) {
$resolve('foo');
});
});
};
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.051, function () use ($resolve) {
$resolve('bar');
});
});
};
})();

$promise = React\Async\series($tasks);

$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));

$timer = new Timer($this);
$timer->start();

Loop::run();

$timer->stop();
$timer->assertInRange(0.10, 0.20);
}

public function testSeriesWithError()
{
$called = 0;
Expand Down Expand Up @@ -80,6 +126,47 @@ function () use (&$called) {
$this->assertSame(1, $called);
}

public function testSeriesWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$called = 0;

$tasks = (function () use (&$called) {
while (true) {
yield function () use (&$called) {
return reject(new \RuntimeException('Rejected ' . ++$called));
};
}
})();

$promise = React\Async\series($tasks);

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));

$this->assertSame(1, $called);
}

public function testSeriesWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$tasks = new class() implements \IteratorAggregate {
public $called = 0;

public function getIterator(): \Iterator
{
while (true) {
yield function () {
return reject(new \RuntimeException('Rejected ' . ++$this->called));
};
}
}
};

$promise = React\Async\series($tasks);

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));

$this->assertSame(1, $tasks->called);
}

public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
{
$cancelled = 0;
Expand Down
Loading