diff --git a/README.md b/README.md index 07fb1a3..b53c2ff 100644 --- a/README.md +++ b/README.md @@ -61,8 +61,8 @@ local Redis server and send some requests: ```php $factory = new Clue\React\Redis\Factory(); +$client = $factory->createLazyClient('localhost:6379'); -$client = $factory->createLazyClient('localhost'); $client->set('greeting', 'Hello world'); $client->append('greeting', '!'); @@ -118,14 +118,14 @@ $factory = new Clue\React\Redis\Factory(null, $connector); #### createClient() -The `createClient(string $redisUri): PromiseInterface` method can be used to +The `createClient(string $uri): PromiseInterface` method can be used to create a new [`Client`](#client). It helps with establishing a plain TCP/IP or secure TLS connection to Redis and optionally authenticating (AUTH) and selecting the right database (SELECT). ```php -$factory->createClient('redis://localhost:6379')->then( +$factory->createClient('localhost:6379')->then( function (Client $client) { // client connected (and authenticated) }, @@ -146,7 +146,7 @@ reject its value with an Exception and will cancel the underlying TCP/IP connection attempt and/or Redis authentication. ```php -$promise = $factory->createClient($redisUri); +$promise = $factory->createClient($uri); Loop::addTimer(3.0, function () use ($promise) { $promise->cancel(); @@ -215,14 +215,14 @@ $factory->createClient('localhost?timeout=0.5'); #### createLazyClient() -The `createLazyClient(string $redisUri): Client` method can be used to +The `createLazyClient(string $uri): Client` method can be used to create a new [`Client`](#client). It helps with establishing a plain TCP/IP or secure TLS connection to Redis and optionally authenticating (AUTH) and selecting the right database (SELECT). ```php -$client = $factory->createLazyClient('redis://localhost:6379'); +$client = $factory->createLazyClient('localhost:6379'); $client->incr('hello'); $client->end(); diff --git a/examples/cli.php b/examples/cli.php index 8f2baed..8b7fa03 100644 --- a/examples/cli.php +++ b/examples/cli.php @@ -1,5 +1,8 @@ createClient('localhost')->then(function (Client $client) { +$factory->createClient(getenv('REDIS_URI') ?: 'localhost:6379')->then(function (Client $client) { echo '# connected! Entering interactive mode, hit CTRL-D to quit' . PHP_EOL; Loop::addReadStream(STDIN, function () use ($client) { @@ -38,7 +41,7 @@ $promise->then(function ($data) { echo '# reply: ' . json_encode($data) . PHP_EOL; - }, function ($e) { + }, function (Exception $e) { echo '# error reply: ' . $e->getMessage() . PHP_EOL; }); }); @@ -48,10 +51,7 @@ Loop::removeReadStream(STDIN); }); -}, function (Exception $error) { - echo 'CONNECTION ERROR: ' . $error->getMessage() . PHP_EOL; - if ($error->getPrevious()) { - echo $error->getPrevious()->getMessage() . PHP_EOL; - } +}, function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; exit(1); }); diff --git a/examples/incr.php b/examples/incr.php index 8eb34e9..36a24d2 100644 --- a/examples/incr.php +++ b/examples/incr.php @@ -1,22 +1,22 @@ createLazyClient(getenv('REDIS_URI') ?: 'localhost:6379'); -$client = $factory->createLazyClient('localhost'); $client->incr('test'); $client->get('test')->then(function ($result) { var_dump($result); }, function (Exception $e) { echo 'Error: ' . $e->getMessage() . PHP_EOL; - if ($e->getPrevious()) { - echo $e->getPrevious()->getMessage() . PHP_EOL; - } exit(1); }); -$client->end(); +//$client->end(); diff --git a/examples/publish.php b/examples/publish.php index 8f371e0..5353301 100644 --- a/examples/publish.php +++ b/examples/publish.php @@ -1,22 +1,22 @@ createLazyClient(getenv('REDIS_URI') ?: 'localhost:6379'); $channel = isset($argv[1]) ? $argv[1] : 'channel'; $message = isset($argv[2]) ? $argv[2] : 'message'; -$client = $factory->createLazyClient('localhost'); $client->publish($channel, $message)->then(function ($received) { echo 'Successfully published. Received by ' . $received . PHP_EOL; }, function (Exception $e) { echo 'Unable to publish: ' . $e->getMessage() . PHP_EOL; - if ($e->getPrevious()) { - echo $e->getPrevious()->getMessage() . PHP_EOL; - } exit(1); }); diff --git a/examples/subscribe.php b/examples/subscribe.php index bb22a67..4e8c6fe 100644 --- a/examples/subscribe.php +++ b/examples/subscribe.php @@ -1,15 +1,18 @@ createLazyClient(getenv('REDIS_URI') ?: 'localhost:6379'); $channel = isset($argv[1]) ? $argv[1] : 'channel'; -$client = $factory->createLazyClient('localhost'); $client->subscribe($channel)->then(function () { echo 'Now subscribed to channel ' . PHP_EOL; }, function (Exception $e) use ($client) { diff --git a/src/Factory.php b/src/Factory.php index 90706d3..171523c 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -10,7 +10,6 @@ use React\Socket\ConnectionInterface; use React\Socket\Connector; use React\Socket\ConnectorInterface; -use InvalidArgumentException; class Factory { @@ -38,21 +37,49 @@ public function __construct(LoopInterface $loop = null, ConnectorInterface $conn /** * Create Redis client connected to address of given redis instance * - * @param string $target Redis server URI to connect to - * @return \React\Promise\PromiseInterface resolves with Client or rejects with \Exception + * @param string $uri Redis server URI to connect to + * @return \React\Promise\PromiseInterface Promise that will + * be fulfilled with `Client` on success or rejects with `\Exception` on error. */ - public function createClient($target) + public function createClient($uri) { - try { - $parts = $this->parseUrl($target); - } catch (InvalidArgumentException $e) { - return \React\Promise\reject($e); + // support `redis+unix://` scheme for Unix domain socket (UDS) paths + if (preg_match('/^(redis\+unix:\/\/(?:[^:]*:[^@]*@)?)(.+?)?$/', $uri, $match)) { + $parts = parse_url($match[1] . 'localhost/' . $match[2]); + } else { + if (strpos($uri, '://') === false) { + $uri = 'redis://' . $uri; + } + + $parts = parse_url($uri); + } + + $uri = preg_replace(array('/(:)[^:\/]*(@)/', '/([?&]password=).*?($|&)/'), '$1***$2', $uri); + if ($parts === false || !isset($parts['scheme'], $parts['host']) || !in_array($parts['scheme'], array('redis', 'rediss', 'redis+unix'))) { + return \React\Promise\reject(new \InvalidArgumentException( + 'Invalid Redis URI given (EINVAL)', + defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 + )); } - $connecting = $this->connector->connect($parts['authority']); - $deferred = new Deferred(function ($_, $reject) use ($connecting) { + $args = array(); + parse_str(isset($parts['query']) ? $parts['query'] : '', $args); + + $authority = $parts['host'] . ':' . (isset($parts['port']) ? $parts['port'] : 6379); + if ($parts['scheme'] === 'rediss') { + $authority = 'tls://' . $authority; + } elseif ($parts['scheme'] === 'redis+unix') { + $authority = 'unix://' . substr($parts['path'], 1); + unset($parts['path']); + } + $connecting = $this->connector->connect($authority); + + $deferred = new Deferred(function ($_, $reject) use ($connecting, $uri) { // connection cancelled, start with rejecting attempt, then clean up - $reject(new \RuntimeException('Connection to Redis server cancelled')); + $reject(new \RuntimeException( + 'Connection to ' . $uri . ' cancelled (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); // either close successful connection or cancel pending connection attempt $connecting->then(function (ConnectionInterface $connection) { @@ -64,46 +91,68 @@ public function createClient($target) $protocol = $this->protocol; $promise = $connecting->then(function (ConnectionInterface $stream) use ($protocol) { return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer()); - }, function (\Exception $e) { + }, function (\Exception $e) use ($uri) { throw new \RuntimeException( - 'Connection to Redis server failed because underlying transport connection failed', - 0, + 'Connection to ' . $uri . ' failed: ' . $e->getMessage(), + $e->getCode(), $e ); }); - if (isset($parts['auth'])) { - $promise = $promise->then(function (StreamingClient $client) use ($parts) { - return $client->auth($parts['auth'])->then( + // use `?password=secret` query or `user:secret@host` password form URL + $pass = isset($args['password']) ? $args['password'] : (isset($parts['pass']) ? rawurldecode($parts['pass']) : null); + if (isset($args['password']) || isset($parts['pass'])) { + $pass = isset($args['password']) ? $args['password'] : rawurldecode($parts['pass']); + $promise = $promise->then(function (StreamingClient $client) use ($pass, $uri) { + return $client->auth($pass)->then( function () use ($client) { return $client; }, - function ($error) use ($client) { + function (\Exception $e) use ($client, $uri) { $client->close(); + $const = ''; + $errno = $e->getCode(); + if ($errno === 0) { + $const = ' (EACCES)'; + $errno = $e->getCode() ?: (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13); + } + throw new \RuntimeException( - 'Connection to Redis server failed because AUTH command failed', - 0, - $error + 'Connection to ' . $uri . ' failed during AUTH command: ' . $e->getMessage() . $const, + $errno, + $e ); } ); }); } - if (isset($parts['db'])) { - $promise = $promise->then(function (StreamingClient $client) use ($parts) { - return $client->select($parts['db'])->then( + // use `?db=1` query or `/1` path (skip first slash) + if (isset($args['db']) || (isset($parts['path']) && $parts['path'] !== '/')) { + $db = isset($args['db']) ? $args['db'] : substr($parts['path'], 1); + $promise = $promise->then(function (StreamingClient $client) use ($db, $uri) { + return $client->select($db)->then( function () use ($client) { return $client; }, - function ($error) use ($client) { + function (\Exception $e) use ($client, $uri) { $client->close(); + $const = ''; + $errno = $e->getCode(); + if ($errno === 0 && strpos($e->getMessage(), 'NOAUTH ') === 0) { + $const = ' (EACCES)'; + $errno = defined('SOCKET_EACCES') ? SOCKET_EACCES : 13; + } elseif ($errno === 0) { + $const = ' (ENOENT)'; + $errno = defined('SOCKET_ENOENT') ? SOCKET_ENOENT : 2; + } + throw new \RuntimeException( - 'Connection to Redis server failed because SELECT command failed', - 0, - $error + 'Connection to ' . $uri . ' failed during SELECT command: ' . $e->getMessage() . $const, + $errno, + $e ); } ); @@ -113,15 +162,16 @@ function ($error) use ($client) { $promise->then(array($deferred, 'resolve'), array($deferred, 'reject')); // use timeout from explicit ?timeout=x parameter or default to PHP's default_socket_timeout (60) - $timeout = isset($parts['timeout']) ? $parts['timeout'] : (int) ini_get("default_socket_timeout"); + $timeout = isset($args['timeout']) ? (float) $args['timeout'] : (int) ini_get("default_socket_timeout"); if ($timeout < 0) { return $deferred->promise(); } - return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) { + return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) { if ($e instanceof TimeoutException) { throw new \RuntimeException( - 'Connection to Redis server timed out after ' . $e->getTimeout() . ' seconds' + 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)', + defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110 ); } throw $e; @@ -138,63 +188,4 @@ public function createLazyClient($target) { return new LazyClient($target, $this, $this->loop); } - - /** - * @param string $target - * @return array with keys authority, auth and db - * @throws InvalidArgumentException - */ - private function parseUrl($target) - { - $ret = array(); - // support `redis+unix://` scheme for Unix domain socket (UDS) paths - if (preg_match('/^redis\+unix:\/\/([^:]*:[^@]*@)?(.+?)(\?.*)?$/', $target, $match)) { - $ret['authority'] = 'unix://' . $match[2]; - $target = 'redis://' . (isset($match[1]) ? $match[1] : '') . 'localhost' . (isset($match[3]) ? $match[3] : ''); - } - - if (strpos($target, '://') === false) { - $target = 'redis://' . $target; - } - - $parts = parse_url($target); - if ($parts === false || !isset($parts['scheme'], $parts['host']) || !in_array($parts['scheme'], array('redis', 'rediss'))) { - throw new InvalidArgumentException('Given URL can not be parsed'); - } - - if (isset($parts['pass'])) { - $ret['auth'] = rawurldecode($parts['pass']); - } - - if (isset($parts['path']) && $parts['path'] !== '') { - // skip first slash - $ret['db'] = substr($parts['path'], 1); - } - - if (!isset($ret['authority'])) { - $ret['authority'] = - ($parts['scheme'] === 'rediss' ? 'tls://' : '') . - $parts['host'] . ':' . - (isset($parts['port']) ? $parts['port'] : 6379); - } - - if (isset($parts['query'])) { - $args = array(); - parse_str($parts['query'], $args); - - if (isset($args['password'])) { - $ret['auth'] = $args['password']; - } - - if (isset($args['db'])) { - $ret['db'] = $args['db']; - } - - if (isset($args['timeout'])) { - $ret['timeout'] = (float) $args['timeout']; - } - } - - return $ret; - } } diff --git a/src/LazyClient.php b/src/LazyClient.php index bfb2fef..c542f6b 100644 --- a/src/LazyClient.php +++ b/src/LazyClient.php @@ -115,7 +115,10 @@ private function client() public function __call($name, $args) { if ($this->closed) { - return \React\Promise\reject(new \RuntimeException('Connection closed')); + return \React\Promise\reject(new \RuntimeException( + 'Connection closed (ENOTCONN)', + defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 + )); } $that = $this; diff --git a/src/StreamingClient.php b/src/StreamingClient.php index 0d5f9bb..8afd84d 100644 --- a/src/StreamingClient.php +++ b/src/StreamingClient.php @@ -2,18 +2,15 @@ namespace Clue\React\Redis; -use Evenement\EventEmitter; -use Clue\Redis\Protocol\Parser\ParserInterface; -use Clue\Redis\Protocol\Parser\ParserException; -use Clue\Redis\Protocol\Serializer\SerializerInterface; use Clue\Redis\Protocol\Factory as ProtocolFactory; -use UnderflowException; -use RuntimeException; -use InvalidArgumentException; -use React\Promise\Deferred; use Clue\Redis\Protocol\Model\ErrorReply; use Clue\Redis\Protocol\Model\ModelInterface; use Clue\Redis\Protocol\Model\MultiBulkReply; +use Clue\Redis\Protocol\Parser\ParserException; +use Clue\Redis\Protocol\Parser\ParserInterface; +use Clue\Redis\Protocol\Serializer\SerializerInterface; +use Evenement\EventEmitter; +use React\Promise\Deferred; use React\Stream\DuplexStreamInterface; /** @@ -47,9 +44,12 @@ public function __construct(DuplexStreamInterface $stream, ParserInterface $pars $stream->on('data', function($chunk) use ($parser, $that) { try { $models = $parser->pushIncoming($chunk); - } - catch (ParserException $error) { - $that->emit('error', array($error)); + } catch (ParserException $error) { + $that->emit('error', array(new \UnexpectedValueException( + 'Invalid data received: ' . $error->getMessage() . ' (EBADMSG)', + defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG : 77, + $error + ))); $that->close(); return; } @@ -57,8 +57,7 @@ public function __construct(DuplexStreamInterface $stream, ParserInterface $pars foreach ($models as $data) { try { $that->handleMessage($data); - } - catch (UnderflowException $error) { + } catch (\UnderflowException $error) { $that->emit('error', array($error)); $that->close(); return; @@ -84,11 +83,20 @@ public function __call($name, $args) static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); if ($this->ending) { - $request->reject(new RuntimeException('Connection closed')); + $request->reject(new \RuntimeException( + 'Connection ' . ($this->closed ? 'closed' : 'closing'). ' (ENOTCONN)', + defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107 + )); } elseif (count($args) !== 1 && in_array($name, $pubsubs)) { - $request->reject(new InvalidArgumentException('PubSub commands limited to single argument')); + $request->reject(new \InvalidArgumentException( + 'PubSub commands limited to single argument (EINVAL)', + defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22 + )); } elseif ($name === 'monitor') { - $request->reject(new \BadMethodCallException('MONITOR command explicitly not supported')); + $request->reject(new \BadMethodCallException( + 'MONITOR command explicitly not supported (ENOTSUP)', + defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95) + )); } else { $this->stream->write($this->serializer->getRequestMessage($name, $args)); $this->requests []= $request; @@ -131,11 +139,14 @@ public function handleMessage(ModelInterface $message) } if (!$this->requests) { - throw new UnderflowException('Unexpected reply received, no matching request found'); + throw new \UnderflowException( + 'Unexpected reply received, no matching request found (ENOMSG)', + defined('SOCKET_ENOMSG') ? SOCKET_ENOMSG : 42 + ); } $request = array_shift($this->requests); - /* @var $request Deferred */ + assert($request instanceof Deferred); if ($message instanceof ErrorReply) { $request->reject($message); @@ -166,15 +177,27 @@ public function close() $this->ending = true; $this->closed = true; + $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false; $this->stream->close(); $this->emit('close'); // reject all remaining requests in the queue - while($this->requests) { + while ($this->requests) { $request = array_shift($this->requests); - /* @var $request Request */ - $request->reject(new RuntimeException('Connection closing')); + assert($request instanceof Deferred); + + if ($remoteClosed) { + $request->reject(new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104 + )); + } else { + $request->reject(new \RuntimeException( + 'Connection closing (ECONNABORTED)', + defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103 + )); + } } } } diff --git a/tests/FactoryStreamingClientTest.php b/tests/FactoryStreamingClientTest.php index bb43f66..882af76 100644 --- a/tests/FactoryStreamingClientTest.php +++ b/tests/FactoryStreamingClientTest.php @@ -136,6 +136,15 @@ public function testWillWriteAuthCommandIfRedisUnixUriContainsPasswordQueryParam $this->factory->createClient('redis+unix:///tmp/redis.sock?password=world'); } + public function testWillNotWriteAnyCommandIfRedisUnixUriContainsNoPasswordOrDb() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->never())->method('write'); + + $this->connector->expects($this->once())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream)); + $this->factory->createClient('redis+unix:///tmp/redis.sock'); + } + public function testWillWriteAuthCommandIfRedisUnixUriContainsUserInfo() { $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); @@ -187,14 +196,55 @@ public function testWillRejectAndCloseAutomaticallyWhenAuthCommandReceivesErrorR $this->assertTrue(is_callable($dataHandler)); $dataHandler("-ERR invalid password\r\n"); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to redis://:***@localhost failed during AUTH command: ERR invalid password (EACCES)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13); + }), + $this->callback(function (\RuntimeException $e) { + return $e->getPrevious()->getMessage() === 'ERR invalid password'; + }) + ) + )); + } + + public function testWillRejectAndCloseAutomaticallyWhenConnectionIsClosedWhileWaitingForAuthCommand() + { + $closeHandler = null; + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->once())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n"); + $stream->expects($this->once())->method('close'); + $stream->expects($this->exactly(2))->method('on')->withConsecutive( + array('data', $this->anything()), + array('close', $this->callback(function ($arg) use (&$closeHandler) { + $closeHandler = $arg; + return true; + })) + ); + + $this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream)); + $promise = $this->factory->createClient('redis://:world@localhost'); + + $this->assertTrue(is_callable($closeHandler)); + $stream->expects($this->once())->method('isReadable')->willReturn(false); + $stream->expects($this->once())->method('isWritable')->willReturn(false); + call_user_func($closeHandler); + $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( $this->isInstanceOf('RuntimeException'), $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to Redis server failed because AUTH command failed'; + return $e->getMessage() === 'Connection to redis://:***@localhost failed during AUTH command: Connection closed by peer (ECONNRESET)'; }), $this->callback(function (\Exception $e) { - return $e->getPrevious()->getMessage() === 'ERR invalid password'; + return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104); + }), + $this->callback(function (\Exception $e) { + return $e->getPrevious()->getMessage() === 'Connection closed by peer (ECONNRESET)'; }) ) )); @@ -251,14 +301,91 @@ public function testWillRejectAndCloseAutomaticallyWhenSelectCommandReceivesErro $this->assertTrue(is_callable($dataHandler)); $dataHandler("-ERR DB index is out of range\r\n"); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to redis://localhost/123 failed during SELECT command: ERR DB index is out of range (ENOENT)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOENT') ? SOCKET_ENOENT : 2); + }), + $this->callback(function (\RuntimeException $e) { + return $e->getPrevious()->getMessage() === 'ERR DB index is out of range'; + }) + ) + )); + } + + public function testWillRejectAndCloseAutomaticallyWhenSelectCommandReceivesAuthErrorResponseIfRedisUriContainsPath() + { + $dataHandler = null; + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->once())->method('write')->with("*2\r\n$6\r\nselect\r\n$3\r\n123\r\n"); + $stream->expects($this->once())->method('close'); + $stream->expects($this->exactly(2))->method('on')->withConsecutive( + array('data', $this->callback(function ($arg) use (&$dataHandler) { + $dataHandler = $arg; + return true; + })), + array('close', $this->anything()) + ); + + $this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream)); + $promise = $this->factory->createClient('redis://localhost/123'); + + $this->assertTrue(is_callable($dataHandler)); + $dataHandler("-NOAUTH Authentication required.\r\n"); + $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( $this->isInstanceOf('RuntimeException'), $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to Redis server failed because SELECT command failed'; + return $e->getMessage() === 'Connection to redis://localhost/123 failed during SELECT command: NOAUTH Authentication required. (EACCES)'; }), $this->callback(function (\Exception $e) { - return $e->getPrevious()->getMessage() === 'ERR DB index is out of range'; + return $e->getCode() === (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13); + }), + $this->callback(function (\Exception $e) { + return $e->getPrevious()->getMessage() === 'NOAUTH Authentication required.'; + }) + ) + )); + } + + public function testWillRejectAndCloseAutomaticallyWhenConnectionIsClosedWhileWaitingForSelectCommand() + { + $closeHandler = null; + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->once())->method('write')->with("*2\r\n$6\r\nselect\r\n$3\r\n123\r\n"); + $stream->expects($this->once())->method('close'); + $stream->expects($this->exactly(2))->method('on')->withConsecutive( + array('data', $this->anything()), + array('close', $this->callback(function ($arg) use (&$closeHandler) { + $closeHandler = $arg; + return true; + })) + ); + + $this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream)); + $promise = $this->factory->createClient('redis://localhost/123'); + + $this->assertTrue(is_callable($closeHandler)); + $stream->expects($this->once())->method('isReadable')->willReturn(false); + $stream->expects($this->once())->method('isWritable')->willReturn(false); + call_user_func($closeHandler); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\Exception $e) { + return $e->getMessage() === 'Connection to redis://localhost/123 failed during SELECT command: Connection closed by peer (ECONNRESET)'; + }), + $this->callback(function (\Exception $e) { + return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104); + }), + $this->callback(function (\Exception $e) { + return $e->getPrevious()->getMessage() === 'Connection closed by peer (ECONNRESET)'; }) ) )); @@ -266,14 +393,20 @@ public function testWillRejectAndCloseAutomaticallyWhenSelectCommandReceivesErro public function testWillRejectIfConnectorRejects() { - $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException())); + $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException('Foo', 42))); $promise = $this->factory->createClient('redis://127.0.0.1:2'); $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( $this->isInstanceOf('RuntimeException'), - $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to Redis server failed because underlying transport connection failed'; + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to redis://127.0.0.1:2 failed: Foo'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === 42; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getPrevious()->getMessage() === 'Foo'; }) ) )); @@ -283,7 +416,17 @@ public function testWillRejectIfTargetIsInvalid() { $promise = $this->factory->createClient('http://invalid target'); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('InvalidArgumentException'))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('InvalidArgumentException'), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getMessage() === 'Invalid Redis URI given (EINVAL)'; + }), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22); + }) + ) + )); } public function testCancelWillRejectPromise() @@ -297,19 +440,93 @@ public function testCancelWillRejectPromise() $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); } - public function testCancelWillCancelConnectorWhenConnectionIsPending() + public function provideUris() + { + return array( + array( + 'localhost', + 'redis://localhost' + ), + array( + 'redis://localhost', + 'redis://localhost' + ), + array( + 'redis://localhost:6379', + 'redis://localhost:6379' + ), + array( + 'redis://localhost/0', + 'redis://localhost/0' + ), + array( + 'redis://user@localhost', + 'redis://user@localhost' + ), + array( + 'redis://:secret@localhost', + 'redis://:***@localhost' + ), + array( + 'redis://user:secret@localhost', + 'redis://user:***@localhost' + ), + array( + 'redis://:@localhost', + 'redis://:***@localhost' + ), + array( + 'redis://localhost?password=secret', + 'redis://localhost?password=***' + ), + array( + 'redis://localhost/0?password=secret', + 'redis://localhost/0?password=***' + ), + array( + 'redis://localhost?password=', + 'redis://localhost?password=***' + ), + array( + 'redis://localhost?foo=1&password=secret&bar=2', + 'redis://localhost?foo=1&password=***&bar=2' + ), + array( + 'rediss://localhost', + 'rediss://localhost' + ), + array( + 'redis+unix://:secret@/tmp/redis.sock', + 'redis+unix://:***@/tmp/redis.sock' + ), + array( + 'redis+unix:///tmp/redis.sock?password=secret', + 'redis+unix:///tmp/redis.sock?password=***' + ) + ); + } + + /** + * @dataProvider provideUris + * @param string $uri + * @param string $safe + */ + public function testCancelWillRejectWithUriInMessageAndCancelConnectorWhenConnectionIsPending($uri, $safe) { $deferred = new Deferred($this->expectCallableOnce()); - $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn($deferred->promise()); + $this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); - $promise = $this->factory->createClient('redis://127.0.0.1:2'); + $promise = $this->factory->createClient($uri); $promise->cancel(); $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( $this->isInstanceOf('RuntimeException'), - $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to Redis server cancelled'; + $this->callback(function (\RuntimeException $e) use ($safe) { + return $e->getMessage() === 'Connection to ' . $safe . ' cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); }) ) )); @@ -329,8 +546,11 @@ public function testCancelWillCloseConnectionWhenConnectionWaitsForSelect() $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( $this->isInstanceOf('RuntimeException'), - $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to Redis server cancelled'; + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to redis://127.0.0.1:2/123 cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); }) ) )); @@ -356,7 +576,10 @@ public function testCreateClientWithTimeoutParameterWillStartTimerAndRejectOnExp $this->logicalAnd( $this->isInstanceOf('RuntimeException'), $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to Redis server timed out after 0 seconds'; + return $e->getMessage() === 'Connection to redis://127.0.0.1:2?timeout=0 timed out after 0 seconds (ETIMEDOUT)'; + }), + $this->callback(function (\Exception $e) { + return $e->getCode() === (defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110); }) ) )); diff --git a/tests/LazyClientTest.php b/tests/LazyClientTest.php index 1faf779..5510ec9 100644 --- a/tests/LazyClientTest.php +++ b/tests/LazyClientTest.php @@ -185,7 +185,17 @@ public function testPingAfterCloseWillRejectWithoutCreatingUnderlyingConnection( $this->client->close(); $promise = $this->client->ping(); - $promise->then(null, $this->expectCallableOnce()); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closed (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); } public function testPingAfterPingWillNotStartIdleTimerWhenFirstPingResolves() diff --git a/tests/StreamingClientTest.php b/tests/StreamingClientTest.php index 67c5c17..af54942 100644 --- a/tests/StreamingClientTest.php +++ b/tests/StreamingClientTest.php @@ -2,13 +2,13 @@ namespace Clue\Tests\React\Redis; -use Clue\React\Redis\StreamingClient; -use Clue\Redis\Protocol\Parser\ParserException; -use Clue\Redis\Protocol\Model\IntegerReply; use Clue\Redis\Protocol\Model\BulkReply; use Clue\Redis\Protocol\Model\ErrorReply; +use Clue\Redis\Protocol\Model\IntegerReply; use Clue\Redis\Protocol\Model\MultiBulkReply; +use Clue\Redis\Protocol\Parser\ParserException; use Clue\React\Redis\Client; +use Clue\React\Redis\StreamingClient; use React\Stream\ThroughStream; class StreamingClientTest extends TestCase @@ -30,6 +30,28 @@ public function setUpClient() $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); } + public function testConstructWithoutParserAssignsParserAutomatically() + { + $this->client = new StreamingClient($this->stream, null, $this->serializer); + + $ref = new \ReflectionProperty($this->client, 'parser'); + $ref->setAccessible(true); + $parser = $ref->getValue($this->client); + + $this->assertInstanceOf('Clue\Redis\Protocol\Parser\ParserInterface', $parser); + } + + public function testConstructWithoutParserAndSerializerAssignsParserAndSerializerAutomatically() + { + $this->client = new StreamingClient($this->stream, $this->parser); + + $ref = new \ReflectionProperty($this->client, 'serializer'); + $ref->setAccessible(true); + $serializer = $ref->getValue($this->client); + + $this->assertInstanceOf('Clue\Redis\Protocol\Serializer\SerializerInterface', $serializer); + } + public function testSending() { $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message')); @@ -60,21 +82,43 @@ public function testReceiveParseErrorEmitsErrorEvent() $this->stream = new ThroughStream(); $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); - $this->client->on('error', $this->expectCallableOnce()); + $this->client->on('error', $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('UnexpectedValueException'), + $this->callback(function (\UnexpectedValueException $e) { + return $e->getMessage() === 'Invalid data received: Foo (EBADMSG)'; + }), + $this->callback(function (\UnexpectedValueException $e) { + return $e->getCode() === (defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG : 77); + }) + ) + )); $this->client->on('close', $this->expectCallableOnce()); - $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException())); + $this->parser->expects($this->once())->method('pushIncoming')->with('message')->willThrowException(new ParserException('Foo')); $this->stream->emit('data', array('message')); } - public function testReceiveThrowMessageEmitsErrorEvent() + public function testReceiveUnexpectedReplyEmitsErrorEvent() { $this->stream = new ThroughStream(); $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); $this->client->on('error', $this->expectCallableOnce()); - - $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); + $this->client->on('error', $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('UnderflowException'), + $this->callback(function (\UnderflowException $e) { + return $e->getMessage() === 'Unexpected reply received, no matching request found (ENOMSG)'; + }), + $this->callback(function (\UnderflowException $e) { + return $e->getCode() === (defined('SOCKET_ENOMSG') ? SOCKET_ENOMSG : 42); + }) + ) + )); + + + $this->parser->expects($this->once())->method('pushIncoming')->with('message')->willReturn(array(new IntegerReply(2))); $this->stream->emit('data', array('message')); } @@ -102,10 +146,19 @@ public function testMonitorCommandIsNotSupported() { $promise = $this->client->monitor(); - $this->expectPromiseReject($promise); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('BadMethodCallException'), + $this->callback(function (\BadMethodCallException $e) { + return $e->getMessage() === 'MONITOR command explicitly not supported (ENOTSUP)'; + }), + $this->callback(function (\BadMethodCallException $e) { + return $e->getCode() === (defined('SOCKET_ENOTSUP') ? SOCKET_ENOTSUP : (defined('SOCKET_EOPNOTSUPP') ? SOCKET_EOPNOTSUPP : 95)); + }) + ) + )); } - public function testErrorReply() { $promise = $this->client->invalid(); @@ -113,7 +166,6 @@ public function testErrorReply() $err = new ErrorReply("ERR unknown command 'invalid'"); $this->client->handleMessage($err); - $this->expectPromiseReject($promise); $promise->then(null, $this->expectCallableOnceWith($err)); } @@ -122,7 +174,58 @@ public function testClosingClientRejectsAllRemainingRequests() $promise = $this->client->ping(); $this->client->close(); - $this->expectPromiseReject($promise); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); + }) + ) + )); + } + + public function testClosingStreamRejectsAllRemainingRequests() + { + $this->stream = new ThroughStream(); + $this->parser->expects($this->once())->method('pushIncoming')->willReturn(array()); + $this->client = new StreamingClient($this->stream, $this->parser, $this->serializer); + + $promise = $this->client->ping(); + $this->stream->close(); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closed by peer (ECONNRESET)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104); + }) + ) + )); + } + + public function testEndingClientRejectsAllNewRequests() + { + $this->client->ping(); + $this->client->end(); + $promise = $this->client->ping(); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); } public function testClosedClientRejectsAllNewRequests() @@ -130,7 +233,17 @@ public function testClosedClientRejectsAllNewRequests() $this->client->close(); $promise = $this->client->ping(); - $this->expectPromiseReject($promise); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closed (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); } public function testEndingNonBusyClosesClient() @@ -212,10 +325,37 @@ public function testPubsubMessage(Client $client) $client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload')))); } - public function testPubsubSubscribeSingleOnly() + public function testSubscribeWithMultipleArgumentsRejects() + { + $promise = $this->client->subscribe('a', 'b'); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('InvalidArgumentException'), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getMessage() === 'PubSub commands limited to single argument (EINVAL)'; + }), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22); + }) + ) + )); + } + + public function testUnsubscribeWithoutArgumentsRejects() { - $this->expectPromiseReject($this->client->subscribe('a', 'b')); - $this->expectPromiseReject($this->client->unsubscribe('a', 'b')); - $this->expectPromiseReject($this->client->unsubscribe()); + $promise = $this->client->unsubscribe(); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('InvalidArgumentException'), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getMessage() === 'PubSub commands limited to single argument (EINVAL)'; + }), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22); + }) + ) + )); } }