Skip to content

Commit 25810ab

Browse files
committed
Improve async() by making its promises cancelable
Since `async()` returns a promise and those are normally cancelable, implementing this puts them in line with the rest of our ecosystem. As such the following example will throw a timeout exception from the canceled `sleep()` call. ```php $promise = async(static function (): int { echo 'a'; await(sleep(2)); echo 'b'; return time(); })(); $promise->cancel(); await($promise); ```` This builds on top of reactphp#15, reactphp#18, reactphp#19, reactphp#26, reactphp#28, reactphp#30, and reactphp#32.
1 parent 4cadacc commit 25810ab

File tree

5 files changed

+265
-9
lines changed

5 files changed

+265
-9
lines changed

README.md

+23
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,29 @@ $promise->then(function (int $bytes) {
204204
});
205205
```
206206

207+
Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and
208+
any nested fibers with their awaited promises will also be cancelled. As such the following example will only output
209+
`ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) between `a` and `b` is cancelled throwing a timeout
210+
exception that bubbles up through the fibers ultimately to the end user through the [`await()`](#await) on the last line
211+
of the example.
212+
213+
```php
214+
$promise = async(static function (): int {
215+
echo 'a';
216+
await(async(static function(): void {
217+
echo 'b';
218+
await(sleep(2));
219+
echo 'c';
220+
})());
221+
echo 'd';
222+
223+
return time();
224+
})();
225+
226+
$promise->cancel();
227+
await($promise);
228+
```
229+
207230
### await()
208231

209232
The `await(PromiseInterface $promise): mixed` function can be used to

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use Fiber;
6+
use React\Promise\PromiseInterface;
7+
8+
/**
9+
* @internal
10+
*/
11+
final class FiberMap
12+
{
13+
private array $status = [];
14+
private array $map = [];
15+
16+
public function register(Fiber $fiber): void
17+
{
18+
$this->status[spl_object_hash($fiber)] = false;
19+
$this->map[spl_object_hash($fiber)] = [];
20+
}
21+
22+
public function cancel(Fiber $fiber): void
23+
{
24+
$this->status[spl_object_hash($fiber)] = true;
25+
}
26+
27+
public function isCanceled(Fiber $fiber): bool
28+
{
29+
return $this->status[spl_object_hash($fiber)];
30+
}
31+
32+
public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
33+
{
34+
$this->map[spl_object_hash($fiber)][spl_object_hash($promise)] = $promise;
35+
}
36+
37+
public function has(Fiber $fiber): bool
38+
{
39+
return array_key_exists(spl_object_hash($fiber), $this->map);
40+
}
41+
42+
public function getPromises(Fiber $fiber): array
43+
{
44+
return $this->map[spl_object_hash($fiber)];
45+
}
46+
47+
public function unregister(Fiber $fiber): void
48+
{
49+
unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]);
50+
}
51+
}

src/functions.php

+82-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace React\Async;
44

5+
use Fiber;
56
use React\EventLoop\Loop;
67
use React\Promise\CancellablePromiseInterface;
78
use React\Promise\Deferred;
@@ -148,24 +149,72 @@
148149
* });
149150
* ```
150151
*
152+
* Promises returned by `async()` can be cancelled, and when done any currently
153+
* and future awaited promise inside that and any nested fibers with their
154+
* awaited promises will also be cancelled. As such the following example will
155+
* only output `ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep)
156+
* between `a` and `b` is cancelled throwing a timeout exception that bubbles up
157+
* through the fibers ultimately to the end user through the [`await()`](#await)
158+
* on the last line of the example.
159+
*
160+
* ```php
161+
* $promise = async(static function (): int {
162+
* echo 'a';
163+
* await(async(static function(): void {
164+
* echo 'b';
165+
* await(sleep(2));
166+
* echo 'c';
167+
* })());
168+
* echo 'd';
169+
*
170+
* return time();
171+
* })();
172+
*
173+
* $promise->cancel();
174+
* await($promise);
175+
* ```
176+
*
151177
* @param callable(mixed ...$args):mixed $function
152178
* @return callable(): PromiseInterface<mixed>
153179
* @since 4.0.0
154180
* @see coroutine()
155181
*/
156182
function async(callable $function): callable
157183
{
158-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
159-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
160-
try {
161-
$resolve($function(...$args));
162-
} catch (\Throwable $exception) {
163-
$reject($exception);
184+
return static function (mixed ...$args) use ($function): PromiseInterface {
185+
$fiber = null;
186+
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
187+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
188+
try {
189+
$resolve($function(...$args));
190+
} catch (\Throwable $exception) {
191+
$reject($exception);
192+
} finally {
193+
fiberMap()->unregister($fiber);
194+
}
195+
});
196+
197+
fiberMap()->register($fiber);
198+
199+
$fiber->start();
200+
}, function () use (&$fiber): void {
201+
if ($fiber instanceof Fiber) {
202+
fiberMap()->cancel($fiber);
203+
foreach (fiberMap()->getPromises($fiber) as $promise) {
204+
if (method_exists($promise, 'cancel')) {
205+
$promise->cancel();
206+
}
207+
}
164208
}
165209
});
166210

167-
$fiber->start();
168-
});
211+
$lowLevelFiber = \Fiber::getCurrent();
212+
if ($lowLevelFiber !== null) {
213+
fiberMap()->attachPromise($lowLevelFiber, $promise);
214+
}
215+
216+
return $promise;
217+
};
169218
}
170219

171220

@@ -230,6 +279,13 @@ function await(PromiseInterface $promise): mixed
230279
$rejected = false;
231280
$resolvedValue = null;
232281
$rejectedThrowable = null;
282+
$lowLevelFiber = \Fiber::getCurrent();
283+
284+
if ($lowLevelFiber !== null) {
285+
if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
286+
$promise->cancel();
287+
}
288+
}
233289

234290
$promise->then(
235291
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
@@ -285,6 +341,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
285341
throw $rejectedThrowable;
286342
}
287343

344+
if ($lowLevelFiber !== null) {
345+
fiberMap()->attachPromise($lowLevelFiber, $promise);
346+
}
347+
288348
$fiber = FiberFactory::create();
289349

290350
return $fiber->suspend();
@@ -601,3 +661,17 @@ function waterfall(array $tasks): PromiseInterface
601661

602662
return $deferred->promise();
603663
}
664+
665+
/**
666+
* @internal
667+
*/
668+
function fiberMap(): FiberMap
669+
{
670+
static $wm = null;
671+
672+
if ($wm === null) {
673+
$wm = new FiberMap();
674+
}
675+
676+
return $wm;
677+
}

tests/AsyncTest.php

+107
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use function React\Promise\all;
1212
use function React\Promise\reject;
1313
use function React\Promise\resolve;
14+
use function React\Promise\Timer\sleep;
1415

1516
class AsyncTest extends TestCase
1617
{
@@ -185,4 +186,110 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
185186
$this->assertGreaterThan(0.1, $time);
186187
$this->assertLessThan(0.12, $time);
187188
}
189+
190+
public function testCancel()
191+
{
192+
self::expectOutputString('a');
193+
$this->expectException(\Exception::class);
194+
$this->expectExceptionMessage('Timer cancelled');
195+
196+
$promise = async(static function (): int {
197+
echo 'a';
198+
await(sleep(2));
199+
echo 'b';
200+
201+
return time();
202+
})();
203+
204+
$promise->cancel();
205+
await($promise);
206+
}
207+
208+
public function testCancelTryCatch()
209+
{
210+
self::expectOutputString('ab');
211+
// $this->expectException(\Exception::class);
212+
// $this->expectExceptionMessage('Timer cancelled');
213+
214+
$promise = async(static function (): int {
215+
echo 'a';
216+
try {
217+
await(sleep(2));
218+
} catch (\Throwable) {
219+
// No-Op
220+
}
221+
echo 'b';
222+
223+
return time();
224+
})();
225+
226+
$promise->cancel();
227+
await($promise);
228+
}
229+
230+
public function testNestedCancel()
231+
{
232+
self::expectOutputString('abc');
233+
$this->expectException(\Exception::class);
234+
$this->expectExceptionMessage('Timer cancelled');
235+
236+
$promise = async(static function (): int {
237+
echo 'a';
238+
await(async(static function(): void {
239+
echo 'b';
240+
await(async(static function(): void {
241+
echo 'c';
242+
await(sleep(2));
243+
echo 'd';
244+
})());
245+
echo 'e';
246+
})());
247+
echo 'f';
248+
249+
return time();
250+
})();
251+
252+
$promise->cancel();
253+
await($promise);
254+
}
255+
256+
public function testCancelFiberThatCatchesExceptions()
257+
{
258+
self::expectOutputString('ab');
259+
$this->expectException(\Exception::class);
260+
$this->expectExceptionMessage('Timer cancelled');
261+
262+
$promise = async(static function (): int {
263+
echo 'a';
264+
try {
265+
await(sleep(2));
266+
} catch (\Throwable) {
267+
// No-Op
268+
}
269+
echo 'b';
270+
await(sleep(0.1));
271+
echo 'c';
272+
273+
return time();
274+
})();
275+
276+
$promise->cancel();
277+
await($promise);
278+
}
279+
280+
public function testNotAwaitedPromiseWillNotBeCanceled()
281+
{
282+
self::expectOutputString('acb');
283+
284+
async(static function (): int {
285+
echo 'a';
286+
sleep(0.001)->then(static function (): void {
287+
echo 'b';
288+
});
289+
echo 'c';
290+
291+
return time();
292+
})()->cancel();
293+
Loop::run();
294+
}
188295
}

0 commit comments

Comments
 (0)