Skip to content

Commit fee1cff

Browse files
joelwurtzkelunik
authored andcommitted
Add async interface (#11)
1 parent 78020ff commit fee1cff

File tree

4 files changed

+278
-29
lines changed

4 files changed

+278
-29
lines changed

src/Client.php

+51-26
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@
55
use Amp\Artax;
66
use Amp\CancellationTokenSource;
77
use Amp\Promise;
8-
use Http\Adapter\Artax\Internal\ResponseStream;
98
use Http\Client\Exception\RequestException;
9+
use Http\Client\Exception\TransferException;
10+
use Http\Client\HttpAsyncClient;
1011
use Http\Client\HttpClient;
1112
use Http\Discovery\MessageFactoryDiscovery;
1213
use Http\Message\ResponseFactory;
1314
use Http\Message\StreamFactory;
1415
use Psr\Http\Message\RequestInterface;
1516
use function Amp\call;
1617

17-
class Client implements HttpClient
18+
class Client implements HttpClient, HttpAsyncClient
1819
{
1920
private $client;
21+
2022
private $responseFactory;
2123

2224
/**
@@ -32,39 +34,62 @@ public function __construct(
3234
$this->client = $client ?? new Artax\DefaultClient();
3335
$this->responseFactory = $responseFactory ?? MessageFactoryDiscovery::find();
3436

35-
if ($streamFactory !== null || \func_num_args() === 3) {
37+
if (null === $streamFactory || 3 === \func_num_args()) {
3638
@\trigger_error('The $streamFactory parameter is deprecated and ignored.', \E_USER_DEPRECATED);
3739
}
3840
}
3941

4042
/** {@inheritdoc} */
4143
public function sendRequest(RequestInterface $request)
4244
{
43-
return Promise\wait(call(function () use ($request) {
44-
$cancellationTokenSource = new CancellationTokenSource();
45+
return $this->doRequest($request)->wait();
46+
}
47+
48+
/** {@inheritdoc} */
49+
public function sendAsyncRequest(RequestInterface $request)
50+
{
51+
return $this->doRequest($request, false);
52+
}
53+
54+
protected function doRequest(RequestInterface $request, $useInternalStream = true): Promise
55+
{
56+
return new Internal\Promise(
57+
call(function () use ($request, $useInternalStream) {
58+
$cancellationTokenSource = new CancellationTokenSource();
59+
60+
/** @var Artax\Request $req */
61+
$req = new Artax\Request($request->getUri(), $request->getMethod());
62+
$req = $req->withProtocolVersions([$request->getProtocolVersion()]);
63+
$req = $req->withHeaders($request->getHeaders());
64+
$req = $req->withBody((string) $request->getBody());
65+
66+
try {
67+
/** @var Artax\Response $resp */
68+
$resp = yield $this->client->request($req, [
69+
Artax\Client::OP_MAX_REDIRECTS => 0,
70+
], $cancellationTokenSource->getToken());
71+
} catch (Artax\HttpException $e) {
72+
throw new RequestException($e->getMessage(), $request, $e);
73+
} catch (\Throwable $e) {
74+
throw new TransferException($e->getMessage(), 0, $e);
75+
}
4576

46-
/** @var Artax\Request $req */
47-
$req = new Artax\Request($request->getUri(), $request->getMethod());
48-
$req = $req->withProtocolVersions([$request->getProtocolVersion()]);
49-
$req = $req->withHeaders($request->getHeaders());
50-
$req = $req->withBody((string) $request->getBody());
77+
if ($useInternalStream) {
78+
$body = new Internal\ResponseStream($resp->getBody()->getInputStream(), $cancellationTokenSource);
79+
} else {
80+
$body = yield $resp->getBody();
81+
}
5182

52-
try {
53-
/** @var Artax\Response $resp */
54-
$resp = yield $this->client->request($req, [
55-
Artax\Client::OP_MAX_REDIRECTS => 0,
56-
], $cancellationTokenSource->getToken());
57-
} catch (Artax\HttpException $e) {
58-
throw new RequestException($e->getMessage(), $request, $e);
59-
}
83+
$response = $this->responseFactory->createResponse(
84+
$resp->getStatus(),
85+
$resp->getReason(),
86+
$resp->getHeaders(),
87+
$body,
88+
$resp->getProtocolVersion()
89+
);
6090

61-
return $this->responseFactory->createResponse(
62-
$resp->getStatus(),
63-
$resp->getReason(),
64-
$resp->getHeaders(),
65-
new ResponseStream($resp->getBody()->getInputStream(), $cancellationTokenSource),
66-
$resp->getProtocolVersion()
67-
);
68-
}));
91+
return $response;
92+
})
93+
);
6994
}
7095
}

src/Internal/Promise.php

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
<?php
2+
3+
namespace Http\Adapter\Artax\Internal;
4+
5+
use Amp\Promise as AmpPromise;
6+
use Http\Client\Exception;
7+
use Http\Promise\Promise as HttpPromise;
8+
use Psr\Http\Message\ResponseInterface;
9+
10+
/**
11+
* Promise adapter between artax and php-http, which allow to use the sendAsyncRequest and also the coroutine system by still respecting the Amp Promise.
12+
*
13+
* @internal
14+
*/
15+
class Promise implements HttpPromise, AmpPromise
16+
{
17+
/** @var string */
18+
private $state = HttpPromise::PENDING;
19+
20+
/** @var ResponseInterface */
21+
private $response;
22+
23+
/** @var Exception */
24+
private $exception;
25+
26+
/** @var AmpPromise */
27+
private $promise;
28+
29+
/** @var callable|null */
30+
private $onFulfilled;
31+
32+
/** @var callable|null */
33+
private $onRejected;
34+
35+
/**
36+
* @param AmpPromise $promise Underlying amp promise which MUST resolve with a ResponseInterface or MUST fail with a Http\Client\Exception
37+
*/
38+
public function __construct(AmpPromise $promise)
39+
{
40+
$this->promise = $promise;
41+
$this->promise->onResolve(function ($error, $result) {
42+
if (null !== $error) {
43+
if (!$error instanceof Exception) {
44+
$error = new Exception\TransferException($error->getMessage(), 0, $error);
45+
}
46+
47+
$this->reject($error);
48+
} else {
49+
if (!$result instanceof ResponseInterface) {
50+
$this->reject(new Exception\TransferException('Bad reponse returned'));
51+
} else {
52+
$this->resolve($result);
53+
}
54+
}
55+
});
56+
}
57+
58+
/**
59+
* {@inheritdoc}
60+
*/
61+
public function then(callable $onFulfilled = null, callable $onRejected = null)
62+
{
63+
$deferred = new \Amp\Deferred();
64+
$newPromise = new self($deferred->promise());
65+
66+
$onFulfilled = $onFulfilled ?? function (ResponseInterface $response) {
67+
return $response;
68+
};
69+
70+
$onRejected = $onRejected ?? function (\Throwable $exception) {
71+
if (!$exception instanceof Exception) {
72+
$exception = new Exception\TransferException($exception->getMessage(), 0, $exception);
73+
}
74+
75+
throw $exception;
76+
};
77+
78+
$this->onFulfilled = function (ResponseInterface $response) use ($onFulfilled, $deferred) {
79+
try {
80+
$deferred->resolve($onFulfilled($response));
81+
} catch (Exception $exception) {
82+
$deferred->fail($exception);
83+
} catch (\Throwable $error) {
84+
$deferred->fail(new Exception\TransferException($error->getMessage(), 0, $error));
85+
}
86+
};
87+
88+
$this->onRejected = function (Exception $exception) use ($onRejected, $deferred) {
89+
try {
90+
$deferred->resolve($onRejected($exception));
91+
} catch (Exception $exception) {
92+
$deferred->fail($exception);
93+
} catch (\Throwable $error) {
94+
$deferred->fail(new Exception\TransferException($error->getMessage(), 0, $error));
95+
}
96+
};
97+
98+
if (HttpPromise::FULFILLED === $this->state) {
99+
$this->resolve($this->response);
100+
}
101+
102+
if (HttpPromise::REJECTED === $this->state) {
103+
$this->reject($this->exception);
104+
}
105+
106+
return $newPromise;
107+
}
108+
109+
/**
110+
* {@inheritdoc}
111+
*/
112+
public function getState()
113+
{
114+
return $this->state;
115+
}
116+
117+
/**
118+
* {@inheritdoc}
119+
*/
120+
public function wait($unwrap = true)
121+
{
122+
try {
123+
AmpPromise\wait($this->promise);
124+
} catch (Exception $exception) {
125+
}
126+
127+
if ($unwrap) {
128+
if (HttpPromise::REJECTED === $this->getState()) {
129+
throw $this->exception;
130+
}
131+
132+
return $this->response;
133+
}
134+
}
135+
136+
/**
137+
* {@inheritdoc}
138+
*/
139+
public function onResolve(callable $onResolved)
140+
{
141+
$this->promise->onResolve($onResolved);
142+
}
143+
144+
private function resolve(ResponseInterface $response)
145+
{
146+
$this->state = HttpPromise::FULFILLED;
147+
$this->response = $response;
148+
$onFulfilled = $this->onFulfilled;
149+
150+
if (null !== $onFulfilled) {
151+
$onFulfilled($response);
152+
}
153+
}
154+
155+
private function reject(Exception $exception)
156+
{
157+
$this->state = HttpPromise::REJECTED;
158+
$this->exception = $exception;
159+
$onRejected = $this->onRejected;
160+
161+
if (null !== $onRejected) {
162+
$onRejected($exception);
163+
}
164+
}
165+
}

src/Internal/ResponseStream.php

+6-3
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
class ResponseStream implements StreamInterface
2020
{
2121
private $buffer = '';
22+
2223
private $position = 0;
24+
2325
private $eof = false;
2426

2527
private $body;
28+
2629
private $cancellationTokenSource;
2730

2831
/**
@@ -114,7 +117,7 @@ public function read($length)
114117
return '';
115118
}
116119

117-
if ($this->buffer === '') {
120+
if ('' === $this->buffer) {
118121
try {
119122
$this->buffer = Promise\wait($this->body->read());
120123
} catch (Artax\HttpException $e) {
@@ -123,7 +126,7 @@ public function read($length)
123126
throw new \RuntimeException('Reading from the stream failed', 0, $e);
124127
}
125128

126-
if ($this->buffer === null) {
129+
if (null === $this->buffer) {
127130
$this->eof = true;
128131

129132
return '';
@@ -150,6 +153,6 @@ public function getContents()
150153

151154
public function getMetadata($key = null)
152155
{
153-
return $key === null ? [] : null;
156+
return null === $key ? [] : null;
154157
}
155158
}

tests/AsyncClientTest.php

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?php
2+
3+
declare(ticks=1);
4+
5+
namespace Http\Adapter\Artax\Test;
6+
7+
use Amp\Artax;
8+
use Amp\Loop;
9+
use GuzzleHttp\Psr7\Request;
10+
use GuzzleHttp\Psr7\Response;
11+
use Http\Adapter\Artax\Client;
12+
use Http\Client\HttpAsyncClient;
13+
use Http\Client\Tests\HttpAsyncClientTest;
14+
15+
class AsyncClientTest extends HttpAsyncClientTest
16+
{
17+
/** @return HttpAsyncClient */
18+
protected function createHttpAsyncClient()
19+
{
20+
$client = new Artax\DefaultClient();
21+
$client->setOption(Artax\Client::OP_TRANSFER_TIMEOUT, 1000);
22+
23+
return new Client($client);
24+
}
25+
26+
/**
27+
* Test using the async method in an existing loop.
28+
*
29+
* As an example a stream implementation of PSR7 using the \Amp\wait function
30+
* would fail in an existing loop. This test prevent regression of this behavior.
31+
*/
32+
public function testInLoop()
33+
{
34+
Loop::run(function () use (&$content, &$response, &$exception) {
35+
$client = $this->createHttpAsyncClient();
36+
$request = new Request('GET', 'https://httpbin.org/get');
37+
38+
try {
39+
$response = yield $client->sendAsyncRequest($request);
40+
$content = (string) $response->getBody();
41+
} catch (\Throwable $e) {
42+
$exception = $e;
43+
}
44+
45+
Loop::stop();
46+
});
47+
48+
if (null !== $exception) {
49+
throw $exception;
50+
}
51+
52+
self::assertInstanceOf(Response::class, $response);
53+
self::assertNotNull($content);
54+
self::assertNotEmpty($content);
55+
}
56+
}

0 commit comments

Comments
 (0)