Skip to content

Use Promise-based APIs instead of callbacks (continuation-passing style) #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 75 additions & 78 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
# NOTE: This package is no longer maintained. Use [react/promise](https://github.com/reactphp/promise) instead!

# Async

Async utilities for [ReactPHP](https://reactphp.org/).

It is heavily influenced by [async.js](https://github.com/caolan/async).

[![CI status](https://github.com/reactphp/async/workflows/CI/badge.svg)](https://github.com/reactphp/async/actions)

This library allows you to manage async control flow. It provides a number of
combinators for continuation-passing style (aka callbacks). Instead of nesting
those callbacks, you can declare them as a list, which is resolved
sequentially in an async manner.
Async utilities for [ReactPHP](https://reactphp.org/).

This library allows you to manage async control flow. It provides a number of
combinators for [Promise](https://github.com/reactphp/promise)-based APIs.
Instead of nesting or chaining promise callbacks, you can declare them as a
list, which is resolved sequentially in an async manner.
React/Async will not automagically change blocking code to be async. You need
to have an actual event loop and non-blocking libraries interacting with that
event loop for it to work. You can use `react/event-loop` for this, but you
don't have to. As long as you have a callback-based API that runs in an event
loop, it can be used with this library.

*You must be running inside an event loop for react/async to make any sense
whatsoever!*
event loop for it to work. As long as you have a Promise-based API that runs in
an event loop, it can be used with this library.

**Table of Contents**

Expand Down Expand Up @@ -62,112 +53,116 @@ Async\parallel(…);

### parallel()

The `parallel(array<callable> $tasks, ?callable $callback = null, ?callable $errback = null): void` function can be used
The `parallel(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
like this:

```php
<?php

use React\EventLoop\Loop;
use React\Promise\Promise;

React\Async\parallel(
array(
function ($callback, $errback) {
Loop::addTimer(1, function () use ($callback) {
$callback('Slept for a whole second');
React\Async\parallel([
function () {
return new Promise(function ($resolve) {
Loop::addTimer(1, function () use ($resolve) {
$resolve('Slept for a whole second');
});
},
function ($callback, $errback) {
Loop::addTimer(1, function () use ($callback) {
$callback('Slept for another whole second');
});
},
function () {
return new Promise(function ($resolve) {
Loop::addTimer(1, function () use ($resolve) {
$resolve('Slept for another whole second');
});
},
function ($callback, $errback) {
Loop::addTimer(1, function () use ($callback) {
$callback('Slept for yet another whole second');
});
},
function () {
return new Promise(function ($resolve) {
Loop::addTimer(1, function () use ($resolve) {
$resolve('Slept for yet another whole second');
});
},
),
function (array $results) {
foreach ($results as $result) {
var_dump($result);
}
});
},
function (Exception $e) {
throw $e;
])->then(function (array $results) {
foreach ($results as $result) {
var_dump($result);
}
);
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

### series()

The `series(array<callable> $tasks, ?callable $callback = null, ?callable $errback = null): void` function can be used
The `series(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
like this:

```php
<?php

use React\EventLoop\Loop;
use React\Promise\Promise;

React\Async\series(
array(
function ($callback, $errback) {
Loop::addTimer(1, function () use ($callback) {
$callback('Slept for a whole second');
React\Async\series([
function () {
return new Promise(function ($resolve) {
Loop::addTimer(1, function () use ($resolve) {
$resolve('Slept for a whole second');
});
},
function ($callback, $errback) {
Loop::addTimer(1, function () use ($callback) {
$callback('Slept for another whole second');
});
},
function () {
return new Promise(function ($resolve) {
Loop::addTimer(1, function () use ($resolve) {
$resolve('Slept for another whole second');
});
},
function ($callback, $errback) {
Loop::addTimer(1, function () use ($callback) {
$callback('Slept for yet another whole second');
});
},
function () {
return new Promise(function ($resolve) {
Loop::addTimer(1, function () use ($resolve) {
$resolve('Slept for yet another whole second');
});
},
),
function (array $results) {
foreach ($results as $result) {
var_dump($result);
}
});
},
function (Exception $e) {
throw $e;
])->then(function (array $results) {
foreach ($results as $result) {
var_dump($result);
}
);
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

### waterfall()

The `waterfall(array<callable> $tasks, ?callable $callback = null, ?callable $errback = null): void` function can be used
The `waterfall(array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
like this:

```php
<?php

use React\EventLoop\Loop;
use React\Promise\Promise;

$addOne = function ($prev, $callback = null) {
if (!$callback) {
$callback = $prev;
$prev = 0;
}

Loop::addTimer(1, function () use ($prev, $callback) {
$callback($prev + 1);
$addOne = function ($prev = 0) {
return new Promise(function ($resolve) use ($prev) {
Loop::addTimer(1, function () use ($prev, $resolve) {
$resolve($prev + 1);
});
});
};

React\Async\waterfall(array(
$addOne,
React\Async\waterfall([
$addOne,
$addOne,
function ($prev, $callback) use ($loop) {
echo "Final result is $prev\n";
$callback();
},
));
$addOne
])->then(function ($prev) {
echo "Final result is $prev\n";
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

## Todo
Expand Down Expand Up @@ -210,3 +205,5 @@ $ php vendor/bin/phpunit
## License

MIT, see [LICENSE file](LICENSE).

This project is heavily influenced by [async.js](https://github.com/caolan/async).
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
}
],
"require": {
"php": ">=5.3.2"
"php": ">=5.3.2",
"react/promise": "^2.8 || ^1.2.1"
},
"require-dev": {
"phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.35",
Expand Down
87 changes: 41 additions & 46 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,32 @@

namespace React\Async;

use React\Promise\Deferred;
use React\Promise\PromiseInterface;

/**
* @param array<callable> $tasks
* @param ?callable $callback
* @param ?callable $errback
* @return void
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<array<mixed>,Exception>
*/
function parallel(array $tasks, $callback = null, $errback = null)
function parallel(array $tasks)
{
$deferred = new Deferred();
$results = array();
$errors = array();

$done = function () use (&$results, &$errors, $callback, $errback) {
if (!$callback) {
return;
}

$done = function () use (&$results, &$errors, $deferred) {
if (count($errors)) {
$errback(array_shift($errors));
$deferred->reject(array_shift($errors));
return;
}

$callback($results);
$deferred->resolve($results);
};

$numTasks = count($tasks);

if (0 === $numTasks) {
$done();
return;
}

$checkDone = function () use (&$results, &$errors, $numTasks, $done) {
Expand All @@ -50,18 +47,22 @@ function parallel(array $tasks, $callback = null, $errback = null)
$checkDone();
};

call_user_func($task, $taskCallback, $taskErrback);
$promise = call_user_func($task);
assert($promise instanceof PromiseInterface);

$promise->then($taskCallback, $taskErrback);
}

return $deferred->promise();
}

/**
* @param array<callable> $tasks
* @param ?callable $callback
* @param ?callable $errback
* @return void
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<array<mixed>,Exception>
*/
function series(array $tasks, $callback = null, $errback = null)
function series(array $tasks)
{
$deferred = new Deferred();
$results = array();

/** @var callable():void $next */
Expand All @@ -70,53 +71,47 @@ function series(array $tasks, $callback = null, $errback = null)
$next();
};

$done = function () use (&$results, $callback) {
if ($callback) {
call_user_func($callback, $results);
}
};

$next = function () use (&$tasks, $taskCallback, $errback, $done) {
$next = function () use (&$tasks, $taskCallback, $deferred, &$results) {
if (0 === count($tasks)) {
$done();
$deferred->resolve($results);
return;
}

$task = array_shift($tasks);
call_user_func($task, $taskCallback, $errback);
$promise = call_user_func($task);
assert($promise instanceof PromiseInterface);

$promise->then($taskCallback, array($deferred, 'reject'));
};

$next();

return $deferred->promise();
}

/**
* @param array<callable> $tasks
* @param ?callable $callback
* @param ?callable $errback
* @return void
* @param array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<mixed,Exception>
*/
function waterfall(array $tasks, $callback = null, $errback = null)
function waterfall(array $tasks)
{
$taskCallback = function () use (&$next) {
call_user_func_array($next, func_get_args());
};
$deferred = new Deferred();

$done = function () use ($callback) {
if ($callback) {
call_user_func_array($callback, func_get_args());
}
};

$next = function () use (&$tasks, $taskCallback, $errback, $done) {
/** @var callable $next */
$next = function ($value = null) use (&$tasks, &$next, $deferred) {
if (0 === count($tasks)) {
call_user_func_array($done, func_get_args());
$deferred->resolve($value);
return;
}

$task = array_shift($tasks);
$args = array_merge(func_get_args(), array($taskCallback, $errback));
call_user_func_array($task, $args);
$promise = call_user_func_array($task, func_get_args());
assert($promise instanceof PromiseInterface);

$promise->then($next, array($deferred, 'reject'));
};

$next();

return $deferred->promise();
}
Loading