Skip to content

Commit 83c90c2

Browse files
committed
Cancel fiber
1 parent ff11a7a commit 83c90c2

File tree

4 files changed

+190
-3
lines changed

4 files changed

+190
-3
lines changed

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
"react/promise": "^2.8 || ^1.2.1"
3232
},
3333
"require-dev": {
34-
"phpunit/phpunit": "^9.3"
34+
"phpunit/phpunit": "^9.3",
35+
"react/promise-timer": "^1.8"
3536
},
3637
"autoload": {
3738
"psr-4": {

src/FiberMap.php

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace React\Async;
4+
5+
use Fiber;
6+
use React\Promise\PromiseInterface;
7+
8+
/**
9+
* @internal
10+
*/
11+
final class FiberMap
12+
{
13+
private array $status = [];
14+
private array $map = [];
15+
16+
public function register(Fiber $fiber): void
17+
{
18+
$this->status[spl_object_hash($fiber)] = false;
19+
$this->map[spl_object_hash($fiber)] = [];
20+
}
21+
22+
public function cancel(Fiber $fiber): void
23+
{
24+
$this->status[spl_object_hash($fiber)] = true;
25+
}
26+
27+
public function isCanceled(Fiber $fiber): bool
28+
{
29+
return $this->status[spl_object_hash($fiber)];
30+
}
31+
32+
public function attachPromise(Fiber $fiber, PromiseInterface $promise): void
33+
{
34+
$this->map[spl_object_hash($fiber)][] = $promise;
35+
}
36+
37+
public function has(Fiber $fiber): bool
38+
{
39+
return array_key_exists(spl_object_hash($fiber), $this->map);
40+
}
41+
42+
public function getPromises(Fiber $fiber): array
43+
{
44+
return $this->map[spl_object_hash($fiber)];
45+
}
46+
47+
public function unregister(Fiber $fiber): void
48+
{
49+
unset($this->status[spl_object_hash($fiber)], $this->map[spl_object_hash($fiber)]);
50+
}
51+
}

src/functions.php

+49-2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
namespace React\Async;
44

5+
use Fiber;
56
use React\EventLoop\Loop;
67
use React\Promise\CancellablePromiseInterface;
78
use React\Promise\Deferred;
9+
use React\Promise\ExtendedPromiseInterface;
810
use React\Promise\Promise;
911
use React\Promise\PromiseInterface;
1012
use function React\Promise\reject;
@@ -20,16 +22,36 @@
2022
*/
2123
function async(callable $function): callable
2224
{
23-
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
24-
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
25+
$fiber = null;
26+
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
27+
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
28+
if (fiberMap()->isCanceled($fiber)) {
29+
fiberMap()->unregister($fiber);
30+
$reject(new \Exception('Fiber has been cancelled before execution started'));
31+
return;
32+
}
33+
2534
try {
2635
$resolve($function(...$args));
2736
} catch (\Throwable $exception) {
2837
$reject($exception);
38+
} finally {
39+
fiberMap()->unregister($fiber);
2940
}
3041
});
3142

43+
fiberMap()->register($fiber);
44+
3245
Loop::futureTick(static fn() => $fiber->start());
46+
}, function () use (&$fiber): void {
47+
if ($fiber instanceof Fiber) {
48+
fiberMap()->cancel($fiber);
49+
foreach (fiberMap()->getPromises($fiber) as $promise) {
50+
if (method_exists($promise, 'cancel')) {
51+
$promise->cancel();
52+
}
53+
}
54+
}
3355
});
3456
}
3557

@@ -82,6 +104,13 @@ function await(PromiseInterface $promise): mixed
82104
$rejected = false;
83105
$resolvedValue = null;
84106
$rejectedThrowable = null;
107+
$lowLevelFiber = \Fiber::getCurrent();
108+
109+
if ($lowLevelFiber !== null) {
110+
if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
111+
$promise->cancel();
112+
}
113+
}
85114

86115
$promise->then(
87116
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
@@ -118,6 +147,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
118147
throw $rejectedThrowable;
119148
}
120149

150+
if ($lowLevelFiber !== null) {
151+
fiberMap()->attachPromise($lowLevelFiber, $promise);
152+
}
153+
121154
$fiber = FiberFactory::create();
122155

123156
return $fiber->suspend();
@@ -433,3 +466,17 @@ function waterfall(array $tasks): PromiseInterface
433466

434467
return $deferred->promise();
435468
}
469+
470+
/**
471+
* @internal
472+
*/
473+
function fiberMap(): FiberMap
474+
{
475+
static $wm = null;
476+
477+
if ($wm === null) {
478+
$wm = new FiberMap();
479+
}
480+
481+
return $wm;
482+
}

tests/AsyncTest.php

+88
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use function React\Async\async;
99
use function React\Async\await;
1010
use function React\Promise\all;
11+
use function React\Promise\Timer\sleep;
1112

1213
class AsyncTest extends TestCase
1314
{
@@ -84,4 +85,91 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
8485
$this->assertGreaterThan(0.1, $time);
8586
$this->assertLessThan(0.12, $time);
8687
}
88+
89+
public function testCancel()
90+
{
91+
self::expectOutputString('a');
92+
93+
$promise = async(function (): int {
94+
echo 'a';
95+
await(sleep(2));
96+
echo 'b';
97+
98+
return time();
99+
})();
100+
Loop::addTimer(0.001, function () use ($promise) {
101+
$promise->cancel();
102+
});
103+
104+
Loop::run();
105+
}
106+
107+
public function testNestedCancel()
108+
{
109+
self::expectOutputString('abc');
110+
111+
$promise = async(function (): int {
112+
echo 'a';
113+
await(async(function(): void {
114+
echo 'b';
115+
await(async(function(): void {
116+
echo 'c';
117+
await(sleep(2));
118+
echo 'd';
119+
})());
120+
echo 'e';
121+
})());
122+
echo 'f';
123+
124+
return time();
125+
})();
126+
Loop::addTimer(0.001, function () use ($promise) {
127+
$promise->cancel();
128+
});
129+
130+
Loop::run();
131+
}
132+
133+
public function testCancelFiberThatCatchesExceptions()
134+
{
135+
self::expectOutputString('ab');
136+
$this->expectException(\Exception::class);
137+
$this->expectExceptionMessage('Timer cancelled');
138+
139+
$promise = async(function (): int {
140+
echo 'a';
141+
try {
142+
await(sleep(2));
143+
} catch (\Throwable) {
144+
// No-Op
145+
}
146+
echo 'b';
147+
await(sleep(0.1));
148+
echo 'c';
149+
150+
return time();
151+
})();
152+
153+
Loop::addTimer(0.001, function () use ($promise) {
154+
$promise->cancel();
155+
});
156+
await($promise);
157+
}
158+
159+
public function testCancelNeverStartedFiber()
160+
{
161+
self::expectOutputString('');
162+
$this->expectException(\Exception::class);
163+
$this->expectExceptionMessage('Fiber has been cancelled before execution started');
164+
165+
$promise = async(function (): int {
166+
echo 'a';
167+
await(sleep(2));
168+
echo 'b';
169+
170+
return time();
171+
})();
172+
$promise->cancel();
173+
await($promise);
174+
}
87175
}

0 commit comments

Comments
 (0)