Skip to content

Commit 87eabc0

Browse files
authored
Merge pull request #45 from clue-labs/iterable-v3
Support iterable type for `parallel()` + `series()` + `waterfall()`
2 parents 843ebc2 + 9660313 commit 87eabc0

File tree

5 files changed

+296
-27
lines changed

5 files changed

+296
-27
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ $promise->then(function (int $bytes) {
208208

209209
### parallel()
210210

211-
The `parallel(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
211+
The `parallel(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
212212
like this:
213213

214214
```php
@@ -250,7 +250,7 @@ React\Async\parallel([
250250

251251
### series()
252252

253-
The `series(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
253+
The `series(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
254254
like this:
255255

256256
```php
@@ -292,7 +292,7 @@ React\Async\series([
292292

293293
### waterfall()
294294

295-
The `waterfall(array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
295+
The `waterfall(iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
296296
like this:
297297

298298
```php

src/functions.php

+47-24
Original file line numberDiff line numberDiff line change
@@ -281,10 +281,10 @@ function coroutine(callable $function, ...$args): PromiseInterface
281281
}
282282

283283
/**
284-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
284+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
285285
* @return PromiseInterface<array<mixed>,Exception>
286286
*/
287-
function parallel(array $tasks): PromiseInterface
287+
function parallel(iterable $tasks): PromiseInterface
288288
{
289289
$pending = [];
290290
$deferred = new Deferred(function () use (&$pending) {
@@ -296,15 +296,10 @@ function parallel(array $tasks): PromiseInterface
296296
$pending = [];
297297
});
298298
$results = [];
299-
$errored = false;
299+
$continue = true;
300300

301-
$numTasks = count($tasks);
302-
if (0 === $numTasks) {
303-
$deferred->resolve($results);
304-
}
305-
306-
$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
307-
$errored = true;
301+
$taskErrback = function ($error) use (&$pending, $deferred, &$continue) {
302+
$continue = false;
308303
$deferred->reject($error);
309304

310305
foreach ($pending as $promise) {
@@ -316,33 +311,39 @@ function parallel(array $tasks): PromiseInterface
316311
};
317312

318313
foreach ($tasks as $i => $task) {
319-
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
314+
$taskCallback = function ($result) use (&$results, &$pending, &$continue, $i, $deferred) {
320315
$results[$i] = $result;
316+
unset($pending[$i]);
321317

322-
if (count($results) === $numTasks) {
318+
if (!$pending && !$continue) {
323319
$deferred->resolve($results);
324320
}
325321
};
326322

327-
$promise = call_user_func($task);
323+
$promise = \call_user_func($task);
328324
assert($promise instanceof PromiseInterface);
329325
$pending[$i] = $promise;
330326

331327
$promise->then($taskCallback, $taskErrback);
332328

333-
if ($errored) {
329+
if (!$continue) {
334330
break;
335331
}
336332
}
337333

334+
$continue = false;
335+
if (!$pending) {
336+
$deferred->resolve($results);
337+
}
338+
338339
return $deferred->promise();
339340
}
340341

341342
/**
342-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
343+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
343344
* @return PromiseInterface<array<mixed>,Exception>
344345
*/
345-
function series(array $tasks): PromiseInterface
346+
function series(iterable $tasks): PromiseInterface
346347
{
347348
$pending = null;
348349
$deferred = new Deferred(function () use (&$pending) {
@@ -353,20 +354,31 @@ function series(array $tasks): PromiseInterface
353354
});
354355
$results = [];
355356

357+
if ($tasks instanceof \IteratorAggregate) {
358+
$tasks = $tasks->getIterator();
359+
assert($tasks instanceof \Iterator);
360+
}
361+
356362
/** @var callable():void $next */
357363
$taskCallback = function ($result) use (&$results, &$next) {
358364
$results[] = $result;
359365
$next();
360366
};
361367

362368
$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
363-
if (0 === count($tasks)) {
369+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
364370
$deferred->resolve($results);
365371
return;
366372
}
367373

368-
$task = array_shift($tasks);
369-
$promise = call_user_func($task);
374+
if ($tasks instanceof \Iterator) {
375+
$task = $tasks->current();
376+
$tasks->next();
377+
} else {
378+
$task = \array_shift($tasks);
379+
}
380+
381+
$promise = \call_user_func($task);
370382
assert($promise instanceof PromiseInterface);
371383
$pending = $promise;
372384

@@ -379,10 +391,10 @@ function series(array $tasks): PromiseInterface
379391
}
380392

381393
/**
382-
* @param array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
394+
* @param iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
383395
* @return PromiseInterface<mixed,Exception>
384396
*/
385-
function waterfall(array $tasks): PromiseInterface
397+
function waterfall(iterable $tasks): PromiseInterface
386398
{
387399
$pending = null;
388400
$deferred = new Deferred(function () use (&$pending) {
@@ -392,15 +404,26 @@ function waterfall(array $tasks): PromiseInterface
392404
$pending = null;
393405
});
394406

407+
if ($tasks instanceof \IteratorAggregate) {
408+
$tasks = $tasks->getIterator();
409+
assert($tasks instanceof \Iterator);
410+
}
411+
395412
/** @var callable $next */
396413
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
397-
if (0 === count($tasks)) {
414+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
398415
$deferred->resolve($value);
399416
return;
400417
}
401418

402-
$task = array_shift($tasks);
403-
$promise = call_user_func_array($task, func_get_args());
419+
if ($tasks instanceof \Iterator) {
420+
$task = $tasks->current();
421+
$tasks->next();
422+
} else {
423+
$task = \array_shift($tasks);
424+
}
425+
426+
$promise = \call_user_func_array($task, func_get_args());
404427
assert($promise instanceof PromiseInterface);
405428
$pending = $promise;
406429

tests/ParallelTest.php

+65
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class ParallelTest extends TestCase
1011
{
@@ -17,6 +18,19 @@ public function testParallelWithoutTasks()
1718
$promise->then($this->expectCallableOnceWith(array()));
1819
}
1920

21+
public function testParallelWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
22+
{
23+
$tasks = (function () {
24+
if (false) {
25+
yield;
26+
}
27+
})();
28+
29+
$promise = React\Async\parallel($tasks);
30+
31+
$promise->then($this->expectCallableOnceWith([]));
32+
}
33+
2034
public function testParallelWithTasks()
2135
{
2236
$tasks = array(
@@ -49,6 +63,38 @@ function () {
4963
$timer->assertInRange(0.1, 0.2);
5064
}
5165

66+
public function testParallelWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
67+
{
68+
$tasks = (function () {
69+
yield function () {
70+
return new Promise(function ($resolve) {
71+
Loop::addTimer(0.1, function () use ($resolve) {
72+
$resolve('foo');
73+
});
74+
});
75+
};
76+
yield function () {
77+
return new Promise(function ($resolve) {
78+
Loop::addTimer(0.11, function () use ($resolve) {
79+
$resolve('bar');
80+
});
81+
});
82+
};
83+
})();
84+
85+
$promise = React\Async\parallel($tasks);
86+
87+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
88+
89+
$timer = new Timer($this);
90+
$timer->start();
91+
92+
Loop::run();
93+
94+
$timer->stop();
95+
$timer->assertInRange(0.1, 0.2);
96+
}
97+
5298
public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
5399
{
54100
$called = 0;
@@ -81,6 +127,25 @@ function () use (&$called) {
81127
$this->assertSame(2, $called);
82128
}
83129

130+
public function testParallelWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
131+
{
132+
$called = 0;
133+
134+
$tasks = (function () use (&$called) {
135+
while (true) {
136+
yield function () use (&$called) {
137+
return reject(new \RuntimeException('Rejected ' . ++$called));
138+
};
139+
}
140+
})();
141+
142+
$promise = React\Async\parallel($tasks);
143+
144+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
145+
146+
$this->assertSame(1, $called);
147+
}
148+
84149
public function testParallelWithErrorWillCancelPendingPromises()
85150
{
86151
$cancelled = 0;

tests/SeriesTest.php

+87
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class SeriesTest extends TestCase
1011
{
@@ -17,6 +18,19 @@ public function testSeriesWithoutTasks()
1718
$promise->then($this->expectCallableOnceWith(array()));
1819
}
1920

21+
public function testSeriesWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
22+
{
23+
$tasks = (function () {
24+
if (false) {
25+
yield;
26+
}
27+
})();
28+
29+
$promise = React\Async\series($tasks);
30+
31+
$promise->then($this->expectCallableOnceWith([]));
32+
}
33+
2034
public function testSeriesWithTasks()
2135
{
2236
$tasks = array(
@@ -49,6 +63,38 @@ function () {
4963
$timer->assertInRange(0.10, 0.20);
5064
}
5165

66+
public function testSeriesWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
67+
{
68+
$tasks = (function () {
69+
yield function () {
70+
return new Promise(function ($resolve) {
71+
Loop::addTimer(0.051, function () use ($resolve) {
72+
$resolve('foo');
73+
});
74+
});
75+
};
76+
yield function () {
77+
return new Promise(function ($resolve) {
78+
Loop::addTimer(0.051, function () use ($resolve) {
79+
$resolve('bar');
80+
});
81+
});
82+
};
83+
})();
84+
85+
$promise = React\Async\series($tasks);
86+
87+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
88+
89+
$timer = new Timer($this);
90+
$timer->start();
91+
92+
Loop::run();
93+
94+
$timer->stop();
95+
$timer->assertInRange(0.10, 0.20);
96+
}
97+
5298
public function testSeriesWithError()
5399
{
54100
$called = 0;
@@ -80,6 +126,47 @@ function () use (&$called) {
80126
$this->assertSame(1, $called);
81127
}
82128

129+
public function testSeriesWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
130+
{
131+
$called = 0;
132+
133+
$tasks = (function () use (&$called) {
134+
while (true) {
135+
yield function () use (&$called) {
136+
return reject(new \RuntimeException('Rejected ' . ++$called));
137+
};
138+
}
139+
})();
140+
141+
$promise = React\Async\series($tasks);
142+
143+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
144+
145+
$this->assertSame(1, $called);
146+
}
147+
148+
public function testSeriesWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
149+
{
150+
$tasks = new class() implements \IteratorAggregate {
151+
public $called = 0;
152+
153+
public function getIterator(): \Iterator
154+
{
155+
while (true) {
156+
yield function () {
157+
return reject(new \RuntimeException('Rejected ' . ++$this->called));
158+
};
159+
}
160+
}
161+
};
162+
163+
$promise = React\Async\series($tasks);
164+
165+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
166+
167+
$this->assertSame(1, $tasks->called);
168+
}
169+
83170
public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
84171
{
85172
$cancelled = 0;

0 commit comments

Comments
 (0)