diff --git a/README.md b/README.md
index c982e1653..7e0769bfa 100644
--- a/README.md
+++ b/README.md
@@ -9,7 +9,7 @@ Features:
* [Feature rich](docs/quick_tour.md).
* [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transport [abstraction](https://github.com/php-enqueue/psr-queue).
-* Supports transports [AMQP (RabbitMQ, ActiveMQ)](docs/transport/amqp.md), [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
+* Supports transports [AMQP (RabbitMQ, ActiveMQ)](docs/transport/amqp.md), [Beanstalk (Pheanstalk)](docs/transport/pheanstalk.md) [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
* [Symfony bundle](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md)
* [Magento1 extension](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/magento/quick_tour.md)
* [Message bus](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) support.
diff --git a/bin/test b/bin/test
index 1bd264a4c..c3d2930bb 100755
--- a/bin/test
+++ b/bin/test
@@ -23,6 +23,7 @@ function waitForService()
waitForService rabbitmq 5672 50
waitForService mysql 3306 50
waitForService redis 6379 50
+waitForService beanstalkd 11300
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
diff --git a/composer.json b/composer.json
index 3aa91097d..5cc11dfd3 100644
--- a/composer.json
+++ b/composer.json
@@ -13,6 +13,7 @@
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
"enqueue/sqs": "*@dev",
+ "enqueue/pheanstalk": "*@dev",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
"enqueue/simple-client": "*@dev",
@@ -85,6 +86,10 @@
"type": "path",
"url": "pkg/sqs"
},
+ {
+ "type": "path",
+ "url": "pkg/pheanstalk"
+ },
{
"type": "path",
"url": "pkg/simple-client"
diff --git a/docker-compose.yml b/docker-compose.yml
index 16c6d946c..fba4ac60f 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,4 +1,5 @@
version: '2'
+
services:
dev:
image: enqueue/dev:latest
@@ -7,6 +8,7 @@ services:
- rabbitmq
- mysql
- redis
+ - beanstalkd
volumes:
- './:/mqdev'
environment:
@@ -29,6 +31,9 @@ services:
- AWS__SQS__KEY=$ENQUEUE_AWS__SQS__KEY
- AWS__SQS__SECRET=$ENQUEUE_AWS__SQS__SECRET
- AWS__SQS__REGION=$ENQUEUE_AWS__SQS__REGION
+ - BEANSTALKD_HOST=beanstalkd
+ - BEANSTALKD_PORT=11300
+ - BEANSTALKD_DSN=beanstalk://beanstalkd:11300
rabbitmq:
image: enqueue/rabbitmq:latest
@@ -37,6 +42,12 @@ services:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
- RABBITMQ_DEFAULT_VHOST=mqdev
+ ports:
+ - "15677:15672"
+
+ beanstalkd:
+ image: 'schickling/beanstalkd'
+
redis:
image: 'redis:3'
ports:
diff --git a/docs/index.md b/docs/index.md
index 0795e1ca0..875c0e3c0 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -4,6 +4,7 @@
* Transports
- [Amqp (RabbitMQ, ActiveMQ)](transport/amqp.md)
- [Amazon SQS](transport/sqs.md)
+ - [Beanstalk (Pheanstalk)](transport/pheanstalk.md)
- [Stomp](transport/stomp.md)
- [Redis](transport/redis.md)
- [Doctrine DBAL](transport/dbal.md)
diff --git a/docs/transport/pheanstalk.md b/docs/transport/pheanstalk.md
new file mode 100644
index 000000000..559517c00
--- /dev/null
+++ b/docs/transport/pheanstalk.md
@@ -0,0 +1,84 @@
+# Beanstalk (Pheanstalk) transport
+
+The transport uses [Beanstalkd](http://kr.github.io/beanstalkd/) job manager.
+The transport uses [Pheanstalk](https://github.com/pda/pheanstalk) library internally.
+
+* [Installation](#installation)
+* [Create context](#create-context)
+* [Send message to topic](#send-message-to-topic)
+* [Send message to queue](#send-message-to-queue)
+* [Consume message](#consume-message)
+
+## Installation
+
+```bash
+$ composer require enqueue/pheanstalk
+```
+
+
+## Create context
+
+```php
+ 'example',
+ 'port' => 5555
+]);
+```
+
+## Send message to topic
+
+```php
+createTopic('aTopic');
+$message = $psrContext->createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooTopic, $message);
+```
+
+## Send message to queue
+
+```php
+createQueue('aQueue');
+$message = $psrContext->createMessage('Hello world!');
+
+$psrContext->createProducer()->send($fooQueue, $message);
+```
+
+## Consume message:
+
+```php
+createQueue('aQueue');
+$consumer = $psrContext->createConsumer($fooQueue);
+
+$message = $consumer->receive(2000); // wait for 2 seconds
+
+$message = $consumer->receiveNoWait(); // fetch message or return null immediately
+
+// process a message
+
+$consumer->acknowledge($message);
+// $consumer->reject($message);
+```
+
+[back to index](../index.md)
\ No newline at end of file
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 429ead9d2..420daae7f 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -49,6 +49,10 @@
pkg/sqs/Tests
+
+ pkg/pheanstalk/Tests
+
+
pkg/enqueue-bundle/Tests
diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php
index ed220fbf6..90e8f64fc 100644
--- a/pkg/amqp-ext/AmqpConnectionFactory.php
+++ b/pkg/amqp-ext/AmqpConnectionFactory.php
@@ -150,7 +150,7 @@ private function parseDsn($dsn)
], $dsnConfig);
if ('amqp' !== $dsnConfig['scheme']) {
- throw new \LogicException('The given DSN scheme "%s" is not supported. Could be "amqp" only.');
+ throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme']));
}
if ($dsnConfig['query']) {
diff --git a/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php b/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
index 5e67ee529..b2132f704 100644
--- a/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
+++ b/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
@@ -24,7 +24,7 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig()
public function testThrowIfSchemeIsNotAmqp()
{
$this->expectException(\LogicException::class);
- $this->expectExceptionMessage('The given DSN scheme "%s" is not supported. Could be "amqp" only.');
+ $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "amqp" only.');
new AmqpConnectionFactory('http://example.com');
}
diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromQueueTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromQueueTest.php
new file mode 100644
index 000000000..e98bb9078
--- /dev/null
+++ b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromQueueTest.php
@@ -0,0 +1,38 @@
+createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ $queue = $context->createQueue($queueName);
+ $context->declareQueue($queue);
+ $context->purge($queue);
+
+ return $queue;
+ }
+}
diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php
new file mode 100644
index 000000000..72ebe9221
--- /dev/null
+++ b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php
@@ -0,0 +1,39 @@
+createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ $topic = $context->createTopic($topicName);
+ $topic->setType(\AMQP_EX_TYPE_FANOUT);
+ $topic->addFlag(\AMQP_DURABLE);
+ $context->declareTopic($topic);
+
+ return $topic;
+ }
+}
diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php
new file mode 100644
index 000000000..ca78e82f9
--- /dev/null
+++ b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php
@@ -0,0 +1,38 @@
+createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ $queue = $context->createQueue($queueName);
+ $context->declareQueue($queue);
+ $context->purge($queue);
+
+ return $queue;
+ }
+}
diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php
new file mode 100644
index 000000000..d0721d34b
--- /dev/null
+++ b/pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php
@@ -0,0 +1,39 @@
+createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ $topic = $context->createTopic($topicName);
+ $topic->setType(\AMQP_EX_TYPE_FANOUT);
+ $topic->addFlag(\AMQP_DURABLE);
+ $context->declareTopic($topic);
+
+ return $topic;
+ }
+}
diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php
new file mode 100644
index 000000000..fff7d4f81
--- /dev/null
+++ b/pkg/amqp-ext/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueTest.php
@@ -0,0 +1,55 @@
+createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ $queue = $context->createQueue($queueName);
+ $context->declareQueue($queue);
+ $context->purge($queue);
+
+ $context->bind($context->createTopic($queueName), $queue);
+
+ return $queue;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ $topic = $context->createTopic($topicName);
+ $topic->setType(\AMQP_EX_TYPE_FANOUT);
+ $topic->addFlag(\AMQP_DURABLE);
+ $context->declareTopic($topic);
+
+ return $topic;
+ }
+}
diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php
new file mode 100644
index 000000000..18dae26c0
--- /dev/null
+++ b/pkg/amqp-ext/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php
@@ -0,0 +1,55 @@
+createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ $queue = $context->createQueue($queueName);
+ $context->declareQueue($queue);
+ $context->purge($queue);
+
+ $context->bind($context->createTopic($queueName), $queue);
+
+ return $queue;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param AmqpContext $context
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ $topic = $context->createTopic($topicName);
+ $topic->setType(\AMQP_EX_TYPE_FANOUT);
+ $topic->addFlag(\AMQP_DURABLE);
+ $context->declareTopic($topic);
+
+ return $topic;
+ }
+}
diff --git a/pkg/pheanstalk/LICENSE b/pkg/pheanstalk/LICENSE
new file mode 100644
index 000000000..d9736f8bf
--- /dev/null
+++ b/pkg/pheanstalk/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+Copyright (c) 2017 Kotliar Maksym
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/pkg/pheanstalk/PheanstalkConnectionFactory.php b/pkg/pheanstalk/PheanstalkConnectionFactory.php
new file mode 100644
index 000000000..f1967bd61
--- /dev/null
+++ b/pkg/pheanstalk/PheanstalkConnectionFactory.php
@@ -0,0 +1,126 @@
+ 'localhost',
+ * 'port' => 11300,
+ * 'timeout' => null,
+ * 'persisted' => true,
+ * ]
+ *
+ * or
+ *
+ * beanstalk://host:port
+ *
+ * @param array|string $config
+ */
+ public function __construct($config = 'beanstalk://')
+ {
+ if (empty($config) || 'beanstalk://' === $config) {
+ $config = [];
+ } elseif (is_string($config)) {
+ $config = $this->parseDsn($config);
+ } elseif (is_array($config)) {
+ } else {
+ throw new \LogicException('The config must be either an array of options, a DSN string or null');
+ }
+
+ $this->config = array_replace($this->defaultConfig(), $config);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return PheanstalkContext
+ */
+ public function createContext()
+ {
+ return new PheanstalkContext($this->establishConnection());
+ }
+
+ /**
+ * @return Pheanstalk
+ */
+ private function establishConnection()
+ {
+ if (false == $this->connection) {
+ $this->connection = new Pheanstalk(
+ $this->config['host'],
+ $this->config['port'],
+ $this->config['timeout'],
+ $this->config['persisted']
+ );
+ }
+
+ return $this->connection;
+ }
+
+ /**
+ * @param string $dsn
+ *
+ * @return array
+ */
+ private function parseDsn($dsn)
+ {
+ $dsnConfig = parse_url($dsn);
+ if (false === $dsnConfig) {
+ throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
+ }
+
+ $dsnConfig = array_replace([
+ 'scheme' => null,
+ 'host' => null,
+ 'port' => null,
+ 'user' => null,
+ 'pass' => null,
+ 'path' => null,
+ 'query' => null,
+ ], $dsnConfig);
+
+ if ('beanstalk' !== $dsnConfig['scheme']) {
+ throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "beanstalk" only.', $dsnConfig['scheme']));
+ }
+
+ $query = [];
+ if ($dsnConfig['query']) {
+ parse_str($dsnConfig['query'], $query);
+ }
+
+ return array_replace($query, [
+ 'port' => $dsnConfig['port'],
+ 'host' => $dsnConfig['host'],
+ ]);
+ }
+
+ /**
+ * @return array
+ */
+ private function defaultConfig()
+ {
+ return [
+ 'host' => 'localhost',
+ 'port' => Pheanstalk::DEFAULT_PORT,
+ 'timeout' => null,
+ 'persisted' => true,
+ ];
+ }
+}
diff --git a/pkg/pheanstalk/PheanstalkConsumer.php b/pkg/pheanstalk/PheanstalkConsumer.php
new file mode 100644
index 000000000..3a143c103
--- /dev/null
+++ b/pkg/pheanstalk/PheanstalkConsumer.php
@@ -0,0 +1,109 @@
+destination = $destination;
+ $this->pheanstalk = $pheanstalk;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return PheanstalkDestination
+ */
+ public function getQueue()
+ {
+ return $this->destination;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return PheanstalkMessage|null
+ */
+ public function receive($timeout = 0)
+ {
+ if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), $timeout / 1000)) {
+ return $this->convertJobToMessage($job);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return PheanstalkMessage|null
+ */
+ public function receiveNoWait()
+ {
+ if ($job = $this->pheanstalk->reserveFromTube($this->destination->getName(), 0)) {
+ return $this->convertJobToMessage($job);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param PheanstalkMessage $message
+ */
+ public function acknowledge(PsrMessage $message)
+ {
+ InvalidMessageException::assertMessageInstanceOf($message, PheanstalkMessage::class);
+
+ if (false == $message->getJob()) {
+ throw new \LogicException('The message could not be acknowledged because it does not have job set.');
+ }
+
+ $this->pheanstalk->delete($message->getJob());
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param PheanstalkMessage $message
+ */
+ public function reject(PsrMessage $message, $requeue = false)
+ {
+ $this->acknowledge($message);
+
+ if ($requeue) {
+ $this->pheanstalk->release($message->getJob(), $message->getPriority(), $message->getDelay());
+ }
+ }
+
+ /**
+ * @param Job $job
+ *
+ * @return PheanstalkMessage
+ */
+ private function convertJobToMessage(Job $job)
+ {
+ $message = PheanstalkMessage::jsonUnserialize($job->getData());
+ $message->setJob($job);
+
+ return $message;
+ }
+}
diff --git a/pkg/pheanstalk/PheanstalkContext.php b/pkg/pheanstalk/PheanstalkContext.php
new file mode 100644
index 000000000..b40a72ede
--- /dev/null
+++ b/pkg/pheanstalk/PheanstalkContext.php
@@ -0,0 +1,93 @@
+pheanstalk = $pheanstalk;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createMessage($body = '', array $properties = [], array $headers = [])
+ {
+ return new PheanstalkMessage($body, $properties, $headers);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createTopic($topicName)
+ {
+ return new PheanstalkDestination($topicName);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createQueue($queueName)
+ {
+ return new PheanstalkDestination($queueName);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function createTemporaryQueue()
+ {
+ throw new \LogicException('Not implemented');
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @return PheanstalkProducer
+ */
+ public function createProducer()
+ {
+ return new PheanstalkProducer($this->pheanstalk);
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param PheanstalkDestination $destination
+ *
+ * @return PheanstalkConsumer
+ */
+ public function createConsumer(PsrDestination $destination)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, PheanstalkDestination::class);
+
+ return new PheanstalkConsumer($destination, $this->pheanstalk);
+ }
+
+ public function close()
+ {
+ $this->pheanstalk->getConnection()->disconnect();
+ }
+
+ /**
+ * @return Pheanstalk
+ */
+ public function getPheanstalk()
+ {
+ return $this->pheanstalk;
+ }
+}
diff --git a/pkg/pheanstalk/PheanstalkDestination.php b/pkg/pheanstalk/PheanstalkDestination.php
new file mode 100644
index 000000000..1ed29508f
--- /dev/null
+++ b/pkg/pheanstalk/PheanstalkDestination.php
@@ -0,0 +1,46 @@
+destinationName = $destinationName;
+ }
+
+ /**
+ * @return string
+ */
+ public function getName()
+ {
+ return $this->destinationName;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getQueueName()
+ {
+ return $this->getName();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTopicName()
+ {
+ return $this->getName();
+ }
+}
diff --git a/pkg/pheanstalk/PheanstalkMessage.php b/pkg/pheanstalk/PheanstalkMessage.php
new file mode 100644
index 000000000..2ce6d0685
--- /dev/null
+++ b/pkg/pheanstalk/PheanstalkMessage.php
@@ -0,0 +1,305 @@
+body = $body;
+ $this->properties = $properties;
+ $this->headers = $headers;
+ $this->redelivered = false;
+ }
+
+ /**
+ * @param string $body
+ */
+ public function setBody($body)
+ {
+ $this->body = $body;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getBody()
+ {
+ return $this->body;
+ }
+
+ /**
+ * @param array $properties
+ */
+ public function setProperties(array $properties)
+ {
+ $this->properties = $properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperties()
+ {
+ return $this->properties;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setProperty($name, $value)
+ {
+ $this->properties[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getProperty($name, $default = null)
+ {
+ return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default;
+ }
+
+ /**
+ * @param array $headers
+ */
+ public function setHeaders(array $headers)
+ {
+ $this->headers = $headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeaders()
+ {
+ return $this->headers;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setHeader($name, $value)
+ {
+ $this->headers[$name] = $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getHeader($name, $default = null)
+ {
+ return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default;
+ }
+
+ /**
+ * @return bool
+ */
+ public function isRedelivered()
+ {
+ return $this->redelivered;
+ }
+
+ /**
+ * @param bool $redelivered
+ */
+ public function setRedelivered($redelivered)
+ {
+ $this->redelivered = $redelivered;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setCorrelationId($correlationId)
+ {
+ $this->setHeader('correlation_id', (string) $correlationId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getCorrelationId()
+ {
+ return $this->getHeader('correlation_id');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setMessageId($messageId)
+ {
+ $this->setHeader('message_id', (string) $messageId);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getMessageId()
+ {
+ return $this->getHeader('message_id');
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getTimestamp()
+ {
+ $value = $this->getHeader('timestamp');
+
+ return $value === null ? null : (int) $value;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setTimestamp($timestamp)
+ {
+ $this->setHeader('timestamp', $timestamp);
+ }
+
+ /**
+ * @param string|null $replyTo
+ */
+ public function setReplyTo($replyTo)
+ {
+ $this->setHeader('reply_to', $replyTo);
+ }
+
+ /**
+ * @return string|null
+ */
+ public function getReplyTo()
+ {
+ return $this->getHeader('reply_to');
+ }
+
+ /**
+ * @param int $time
+ */
+ public function setTimeToRun($time)
+ {
+ $this->setHeader('ttr', $time);
+ }
+
+ /**
+ * @return int
+ */
+ public function getTimeToRun()
+ {
+ return $this->getHeader('ttr', Pheanstalk::DEFAULT_TTR);
+ }
+
+ /**
+ * @param int $priority
+ */
+ public function setPriority($priority)
+ {
+ $this->setHeader('priority', $priority);
+ }
+
+ /**
+ * @return int
+ */
+ public function getPriority()
+ {
+ return $this->getHeader('priority', Pheanstalk::DEFAULT_PRIORITY);
+ }
+
+ /**
+ * @param int $delay
+ */
+ public function setDelay($delay)
+ {
+ $this->setHeader('delay', $delay);
+ }
+
+ /**
+ * @return int
+ */
+ public function getDelay()
+ {
+ return $this->getHeader('delay', Pheanstalk::DEFAULT_DELAY);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function jsonSerialize()
+ {
+ return [
+ 'body' => $this->getBody(),
+ 'properties' => $this->getProperties(),
+ 'headers' => $this->getHeaders(),
+ ];
+ }
+
+ /**
+ * @param string $json
+ *
+ * @return PheanstalkMessage
+ */
+ public static function jsonUnserialize($json)
+ {
+ $data = json_decode($json, true);
+ if (JSON_ERROR_NONE !== json_last_error()) {
+ throw new \InvalidArgumentException(sprintf(
+ 'The malformed json given. Error %s and message %s',
+ json_last_error(),
+ json_last_error_msg()
+ ));
+ }
+
+ return new self($data['body'], $data['properties'], $data['headers']);
+ }
+
+ /**
+ * @return Job
+ */
+ public function getJob()
+ {
+ return $this->job;
+ }
+
+ /**
+ * @param Job $job
+ */
+ public function setJob(Job $job)
+ {
+ $this->job = $job;
+ }
+}
diff --git a/pkg/pheanstalk/PheanstalkProducer.php b/pkg/pheanstalk/PheanstalkProducer.php
new file mode 100644
index 000000000..7920ae9c2
--- /dev/null
+++ b/pkg/pheanstalk/PheanstalkProducer.php
@@ -0,0 +1,54 @@
+pheanstalk = $pheanstalk;
+ }
+
+ /**
+ * {@inheritdoc}
+ *
+ * @param PheanstalkDestination $destination
+ * @param PheanstalkMessage $message
+ */
+ public function send(PsrDestination $destination, PsrMessage $message)
+ {
+ InvalidDestinationException::assertDestinationInstanceOf($destination, PheanstalkDestination::class);
+ InvalidMessageException::assertMessageInstanceOf($message, PheanstalkMessage::class);
+
+ $rawMessage = json_encode($message);
+ if (JSON_ERROR_NONE !== json_last_error()) {
+ throw new \InvalidArgumentException(sprintf(
+ 'Could not encode value into json. Error %s and message %s',
+ json_last_error(),
+ json_last_error_msg()
+ ));
+ }
+
+ $this->pheanstalk->useTube($destination->getName())->put(
+ $rawMessage,
+ $message->getPriority(),
+ $message->getDelay(),
+ $message->getTimeToRun()
+ );
+ }
+}
diff --git a/pkg/pheanstalk/README.md b/pkg/pheanstalk/README.md
new file mode 100644
index 000000000..48a83505f
--- /dev/null
+++ b/pkg/pheanstalk/README.md
@@ -0,0 +1,26 @@
+# Beanstalk Transport
+
+[](https://gitter.im/php-enqueue/Lobby)
+[](https://travis-ci.org/php-enqueue/pheanstalk)
+[](https://packagist.org/packages/enqueue/pheanstalk)
+[](https://packagist.org/packages/enqueue/pheanstalk)
+
+This is an implementation of the queue specification. It allows you to send and consume message from Beanstalkd broker.
+
+## Resources
+
+* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
+* [Questions](https://gitter.im/php-enqueue/Lobby)
+* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)
+
+## Developed by Forma-Pro
+
+Forma-Pro is a full stack development company which interests also spread to open source development.
+Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience.
+Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability.
+
+If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com
+
+## License
+
+It is released under the [MIT License](LICENSE).
\ No newline at end of file
diff --git a/pkg/pheanstalk/Tests/PheanstalkConnectionFactoryConfigTest.php b/pkg/pheanstalk/Tests/PheanstalkConnectionFactoryConfigTest.php
new file mode 100644
index 000000000..0859a2386
--- /dev/null
+++ b/pkg/pheanstalk/Tests/PheanstalkConnectionFactoryConfigTest.php
@@ -0,0 +1,137 @@
+expectException(\LogicException::class);
+ $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null');
+
+ new PheanstalkConnectionFactory(new \stdClass());
+ }
+
+ public function testThrowIfSchemeIsNotBeanstalkAmqp()
+ {
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "beanstalk" only.');
+
+ new PheanstalkConnectionFactory('http://example.com');
+ }
+
+ public function testThrowIfDsnCouldNotBeParsed()
+ {
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Failed to parse DSN "beanstalk://:@/"');
+
+ new PheanstalkConnectionFactory('beanstalk://:@/');
+ }
+
+ /**
+ * @dataProvider provideConfigs
+ *
+ * @param mixed $config
+ * @param mixed $expectedConfig
+ */
+ public function testShouldParseConfigurationAsExpected($config, $expectedConfig)
+ {
+ $factory = new PheanstalkConnectionFactory($config);
+
+ $this->assertAttributeEquals($expectedConfig, 'config', $factory);
+ }
+
+ public static function provideConfigs()
+ {
+ yield [
+ null,
+ [
+ 'host' => 'localhost',
+ 'port' => 11300,
+ 'timeout' => null,
+ 'persisted' => true,
+ ],
+ ];
+
+ // some examples from Appendix A: Examples (https://www.rabbitmq.com/uri-spec.html)
+
+ yield [
+ 'beanstalk://',
+ [
+ 'host' => 'localhost',
+ 'port' => 11300,
+ 'timeout' => null,
+ 'persisted' => true,
+ ],
+ ];
+
+ yield [
+ [],
+ [
+ 'host' => 'localhost',
+ 'port' => 11300,
+ 'timeout' => null,
+ 'persisted' => true,
+ ],
+ ];
+
+ yield [
+ 'beanstalk://theHost:1234',
+ [
+ 'host' => 'theHost',
+ 'port' => 1234,
+ 'timeout' => null,
+ 'persisted' => true,
+ ],
+ ];
+
+ yield [
+ ['host' => 'theHost', 'port' => 1234],
+ [
+ 'host' => 'theHost',
+ 'port' => 1234,
+ 'timeout' => null,
+ 'persisted' => true,
+ ],
+ ];
+
+ yield [
+ ['host' => 'theHost'],
+ [
+ 'host' => 'theHost',
+ 'port' => 11300,
+ 'timeout' => null,
+ 'persisted' => true,
+ ],
+ ];
+
+ yield [
+ ['host' => 'theHost', 'timeout' => 123],
+ [
+ 'host' => 'theHost',
+ 'port' => 11300,
+ 'timeout' => 123,
+ 'persisted' => true,
+ ],
+ ];
+
+ yield [
+ 'beanstalk://theHost:1234?timeout=123&persisted=1',
+ [
+ 'host' => 'theHost',
+ 'port' => 1234,
+ 'timeout' => 123,
+ 'persisted' => 1,
+ ],
+ ];
+ }
+}
diff --git a/pkg/pheanstalk/Tests/PheanstalkConsumerTest.php b/pkg/pheanstalk/Tests/PheanstalkConsumerTest.php
new file mode 100644
index 000000000..60015dc99
--- /dev/null
+++ b/pkg/pheanstalk/Tests/PheanstalkConsumerTest.php
@@ -0,0 +1,128 @@
+createPheanstalkMock()
+ );
+ }
+
+ public function testShouldReturnQueueSetInConstructor()
+ {
+ $destination = new PheanstalkDestination('aQueueName');
+
+ $consumer = new PheanstalkConsumer(
+ $destination,
+ $this->createPheanstalkMock()
+ );
+
+ $this->assertSame($destination, $consumer->getQueue());
+ }
+
+ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
+ {
+ $destination = new PheanstalkDestination('theQueueName');
+
+ $pheanstalk = $this->createPheanstalkMock();
+ $pheanstalk
+ ->expects($this->once())
+ ->method('reserveFromTube')
+ ->with('theQueueName', 1)
+ ->willReturn(null)
+ ;
+
+ $consumer = new PheanstalkConsumer($destination, $pheanstalk);
+
+ $this->assertNull($consumer->receive(1000));
+ }
+
+ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
+ {
+ $destination = new PheanstalkDestination('theQueueName');
+ $message = new PheanstalkMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
+
+ $job = new Job('theJobId', json_encode($message));
+
+ $pheanstalk = $this->createPheanstalkMock();
+ $pheanstalk
+ ->expects($this->once())
+ ->method('reserveFromTube')
+ ->with('theQueueName', 1)
+ ->willReturn($job)
+ ;
+
+ $consumer = new PheanstalkConsumer($destination, $pheanstalk);
+
+ $actualMessage = $consumer->receive(1000);
+
+ $this->assertSame('theBody', $actualMessage->getBody());
+ $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties());
+ $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders());
+ $this->assertSame($job, $actualMessage->getJob());
+ }
+
+ public function testShouldReceiveNoWaitFromQueueAndReturnNullIfNoMessageInQueue()
+ {
+ $destination = new PheanstalkDestination('theQueueName');
+
+ $pheanstalk = $this->createPheanstalkMock();
+ $pheanstalk
+ ->expects($this->once())
+ ->method('reserveFromTube')
+ ->with('theQueueName', 0)
+ ->willReturn(null)
+ ;
+
+ $consumer = new PheanstalkConsumer($destination, $pheanstalk);
+
+ $this->assertNull($consumer->receiveNoWait());
+ }
+
+ public function testShouldReceiveNoWaitFromQueueAndReturnMessageIfMessageInQueue()
+ {
+ $destination = new PheanstalkDestination('theQueueName');
+ $message = new PheanstalkMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
+
+ $job = new Job('theJobId', json_encode($message));
+
+ $pheanstalk = $this->createPheanstalkMock();
+ $pheanstalk
+ ->expects($this->once())
+ ->method('reserveFromTube')
+ ->with('theQueueName', 0)
+ ->willReturn($job)
+ ;
+
+ $consumer = new PheanstalkConsumer($destination, $pheanstalk);
+
+ $actualMessage = $consumer->receiveNoWait();
+
+ $this->assertSame('theBody', $actualMessage->getBody());
+ $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties());
+ $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders());
+ $this->assertSame($job, $actualMessage->getJob());
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Pheanstalk
+ */
+ private function createPheanstalkMock()
+ {
+ return $this->createMock(Pheanstalk::class);
+ }
+}
diff --git a/pkg/pheanstalk/Tests/PheanstalkContextTest.php b/pkg/pheanstalk/Tests/PheanstalkContextTest.php
new file mode 100644
index 000000000..fa9ee7692
--- /dev/null
+++ b/pkg/pheanstalk/Tests/PheanstalkContextTest.php
@@ -0,0 +1,81 @@
+assertClassImplements(PsrContext::class, PheanstalkContext::class);
+ }
+
+ public function testCouldBeConstructedWithPheanstalkAsFirstArgument()
+ {
+ new PheanstalkContext($this->createPheanstalkMock());
+ }
+
+ public function testThrowNotImplementedOnCreateTemporaryQueue()
+ {
+ $context = new PheanstalkContext($this->createPheanstalkMock());
+
+ $this->expectException(\LogicException::class);
+ $this->expectExceptionMessage('Not implemented');
+ $context->createTemporaryQueue();
+ }
+
+ public function testThrowInvalidDestinationIfInvalidDestinationGivenOnCreateConsumer()
+ {
+ $context = new PheanstalkContext($this->createPheanstalkMock());
+
+ $this->expectException(InvalidDestinationException::class);
+ $context->createConsumer(new NullQueue('aQueue'));
+ }
+
+ public function testShouldAllowGetPheanstalkSetInConstructor()
+ {
+ $pheanstalk = $this->createPheanstalkMock();
+
+ $context = new PheanstalkContext($pheanstalk);
+
+ $this->assertSame($pheanstalk, $context->getPheanstalk());
+ }
+
+ public function testShouldDoConnectionDisconnectOnContextClose()
+ {
+ $connection = $this->createMock(Connection::class);
+ $connection
+ ->expects($this->once())
+ ->method('disconnect')
+ ;
+
+ $pheanstalk = $this->createPheanstalkMock();
+ $pheanstalk
+ ->expects($this->once())
+ ->method('getConnection')
+ ->willReturn($connection)
+ ;
+
+ $context = new PheanstalkContext($pheanstalk);
+
+ $context->close();
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Pheanstalk
+ */
+ private function createPheanstalkMock()
+ {
+ return $this->createMock(Pheanstalk::class);
+ }
+}
diff --git a/pkg/pheanstalk/Tests/PheanstalkDestinationTest.php b/pkg/pheanstalk/Tests/PheanstalkDestinationTest.php
new file mode 100644
index 000000000..bd52ef577
--- /dev/null
+++ b/pkg/pheanstalk/Tests/PheanstalkDestinationTest.php
@@ -0,0 +1,31 @@
+assertClassImplements(PsrQueue::class, PheanstalkDestination::class);
+ }
+
+ public function testShouldImplementPsrTopicInterface()
+ {
+ $this->assertClassImplements(PsrTopic::class, PheanstalkDestination::class);
+ }
+
+ public function testShouldAllowGetNameSetInConstructor()
+ {
+ $destionation = new PheanstalkDestination('theDestinationName');
+
+ $this->assertSame('theDestinationName', $destionation->getName());
+ }
+}
diff --git a/pkg/pheanstalk/Tests/PheanstalkMessageTest.php b/pkg/pheanstalk/Tests/PheanstalkMessageTest.php
new file mode 100644
index 000000000..183cbb435
--- /dev/null
+++ b/pkg/pheanstalk/Tests/PheanstalkMessageTest.php
@@ -0,0 +1,23 @@
+setJob($job);
+
+ $this->assertSame($job, $message->getJob());
+ }
+}
diff --git a/pkg/pheanstalk/Tests/PheanstalkProducerTest.php b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php
new file mode 100644
index 000000000..d9f00c617
--- /dev/null
+++ b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php
@@ -0,0 +1,75 @@
+createPheanstalkMock());
+ }
+
+ public function testThrowIfDestinationInvalid()
+ {
+ $producer = new PheanstalkProducer($this->createPheanstalkMock());
+
+ $this->expectException(InvalidDestinationException::class);
+ $this->expectExceptionMessage('The destination must be an instance of Enqueue\Pheanstalk\PheanstalkDestination but got Enqueue\Null\NullQueue.');
+ $producer->send(new NullQueue('aQueue'), new PheanstalkMessage());
+ }
+
+ public function testThrowIfMessageInvalid()
+ {
+ $producer = new PheanstalkProducer($this->createPheanstalkMock());
+
+ $this->expectException(InvalidMessageException::class);
+ $this->expectExceptionMessage('The message must be an instance of Enqueue\Pheanstalk\PheanstalkMessage but it is Enqueue\Null\NullMessage.');
+ $producer->send(new PheanstalkDestination('aQueue'), new NullMessage());
+ }
+
+ public function testShouldJsonEncodeMessageAndPutToExpectedTube()
+ {
+ $message = new PheanstalkMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
+
+ $pheanstalk = $this->createPheanstalkMock();
+ $pheanstalk
+ ->expects($this->once())
+ ->method('useTube')
+ ->with('theQueueName')
+ ->willReturnSelf()
+ ;
+ $pheanstalk
+ ->expects($this->once())
+ ->method('put')
+ ->with('{"body":"theBody","properties":{"foo":"fooVal"},"headers":{"bar":"barVal"}}')
+ ;
+
+ $producer = new PheanstalkProducer($pheanstalk);
+
+ $producer->send(
+ new PheanstalkDestination('theQueueName'),
+ $message
+ );
+ }
+
+ /**
+ * @return \PHPUnit_Framework_MockObject_MockObject|Pheanstalk
+ */
+ private function createPheanstalkMock()
+ {
+ return $this->createMock(Pheanstalk::class);
+ }
+}
diff --git a/pkg/pheanstalk/Tests/Spec/PheanstalkConnectionFactoryTest.php b/pkg/pheanstalk/Tests/Spec/PheanstalkConnectionFactoryTest.php
new file mode 100644
index 000000000..525ed38aa
--- /dev/null
+++ b/pkg/pheanstalk/Tests/Spec/PheanstalkConnectionFactoryTest.php
@@ -0,0 +1,17 @@
+createMock(Pheanstalk::class));
+ }
+}
diff --git a/pkg/pheanstalk/Tests/Spec/PheanstalkMessageTest.php b/pkg/pheanstalk/Tests/Spec/PheanstalkMessageTest.php
new file mode 100644
index 000000000..ac65e26c3
--- /dev/null
+++ b/pkg/pheanstalk/Tests/Spec/PheanstalkMessageTest.php
@@ -0,0 +1,17 @@
+createContext();
+ }
+
+ /**
+ * @param PsrContext $context
+ * @param string $queueName
+ *
+ * @return PsrQueue
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName.time());
+ }
+}
diff --git a/pkg/pheanstalk/Tests/Spec/PheanstalkSendToAndReceiveNoWaitFromQueueTest.php b/pkg/pheanstalk/Tests/Spec/PheanstalkSendToAndReceiveNoWaitFromQueueTest.php
new file mode 100644
index 000000000..072b74230
--- /dev/null
+++ b/pkg/pheanstalk/Tests/Spec/PheanstalkSendToAndReceiveNoWaitFromQueueTest.php
@@ -0,0 +1,30 @@
+createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName.time());
+ }
+}
diff --git a/pkg/pheanstalk/Tests/Spec/PheanstalkSendToTopicAndReceiveFromQueueTest.php b/pkg/pheanstalk/Tests/Spec/PheanstalkSendToTopicAndReceiveFromQueueTest.php
new file mode 100644
index 000000000..a81aa4344
--- /dev/null
+++ b/pkg/pheanstalk/Tests/Spec/PheanstalkSendToTopicAndReceiveFromQueueTest.php
@@ -0,0 +1,45 @@
+time = time();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function createContext()
+ {
+ $factory = new PheanstalkConnectionFactory(getenv('BEANSTALKD_DSN'));
+
+ return $factory->createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName.$this->time);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ return $context->createTopic($topicName.$this->time);
+ }
+}
diff --git a/pkg/pheanstalk/Tests/Spec/PheanstalkSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/pheanstalk/Tests/Spec/PheanstalkSendToTopicAndReceiveNoWaitFromQueueTest.php
new file mode 100644
index 000000000..efbe93e1d
--- /dev/null
+++ b/pkg/pheanstalk/Tests/Spec/PheanstalkSendToTopicAndReceiveNoWaitFromQueueTest.php
@@ -0,0 +1,45 @@
+time = time();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function createContext()
+ {
+ $factory = new PheanstalkConnectionFactory(getenv('BEANSTALKD_DSN'));
+
+ return $factory->createContext();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName.$this->time);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ return $context->createTopic($topicName.$this->time);
+ }
+}
diff --git a/pkg/pheanstalk/Tests/Spec/PheanstalkTopicTest.php b/pkg/pheanstalk/Tests/Spec/PheanstalkTopicTest.php
new file mode 100644
index 000000000..407e22f67
--- /dev/null
+++ b/pkg/pheanstalk/Tests/Spec/PheanstalkTopicTest.php
@@ -0,0 +1,17 @@
+=5.6",
+ "pda/pheanstalk": "^3",
+ "enqueue/psr-queue": "^0.5@dev"
+ },
+ "require-dev": {
+ "phpunit/phpunit": "~5.4.0",
+ "enqueue/test": "^0.5@dev",
+ "enqueue/enqueue": "^0.5@dev",
+ "enqueue/null": "^0.5@dev",
+ "symfony/dependency-injection": "^2.8|^3",
+ "symfony/config": "^2.8|^3"
+ },
+ "autoload": {
+ "psr-4": { "Enqueue\\Pheanstalk\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "suggest": {
+ "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features"
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "0.5.x-dev"
+ }
+ }
+}
diff --git a/pkg/pheanstalk/phpunit.xml.dist b/pkg/pheanstalk/phpunit.xml.dist
new file mode 100644
index 000000000..4dca142e1
--- /dev/null
+++ b/pkg/pheanstalk/phpunit.xml.dist
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+ ./Tests
+
+
+
+
+
+ .
+
+ ./vendor
+ ./Tests
+
+
+
+
diff --git a/pkg/psr-queue/Spec/PsrConnectionFactorySpec.php b/pkg/psr-queue/Spec/PsrConnectionFactorySpec.php
new file mode 100644
index 000000000..db37389c8
--- /dev/null
+++ b/pkg/psr-queue/Spec/PsrConnectionFactorySpec.php
@@ -0,0 +1,27 @@
+assertInstanceOf(PsrConnectionFactory::class, $this->createConnectionFactory());
+ }
+
+ public function testShouldReturnContextOnCreateContextMethodCall()
+ {
+ $factory = $this->createConnectionFactory();
+
+ $this->assertInstanceOf(PsrContext::class, $factory->createContext());
+ }
+
+ /**
+ * @return PsrConnectionFactory
+ */
+ abstract protected function createConnectionFactory();
+}
diff --git a/pkg/psr-queue/Spec/PsrContextSpec.php b/pkg/psr-queue/Spec/PsrContextSpec.php
new file mode 100644
index 000000000..40a69553d
--- /dev/null
+++ b/pkg/psr-queue/Spec/PsrContextSpec.php
@@ -0,0 +1,86 @@
+assertInstanceOf(PsrContext::class, $this->createContext());
+ }
+
+ public function testShouldCreateEmptyMessageOnCreateMessageMethodCallWithoutArguments()
+ {
+ $context = $this->createContext();
+
+ $message = $context->createMessage();
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $this->assertSame('', $message->getBody());
+ $this->assertSame([], $message->getHeaders());
+ $this->assertSame([], $message->getProperties());
+ }
+
+ public function testShouldCreateMessageOnCreateMessageMethodCallWithArguments()
+ {
+ $context = $this->createContext();
+
+ $message = $context->createMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $this->assertSame('theBody', $message->getBody());
+ $this->assertSame(['bar' => 'barVal'], $message->getHeaders());
+ $this->assertSame(['foo' => 'fooVal'], $message->getProperties());
+ }
+
+ public function testShouldCreateTopicWithGivenName()
+ {
+ $context = $this->createContext();
+
+ $topic = $context->createTopic('theName');
+
+ $this->assertInstanceOf(PsrTopic::class, $topic);
+ $this->assertSame('theName', $topic->getTopicName());
+ }
+
+ public function testShouldCreateQueueWithGivenName()
+ {
+ $context = $this->createContext();
+
+ $topic = $context->createTopic('theName');
+
+ $this->assertInstanceOf(PsrQueue::class, $topic);
+ $this->assertSame('theName', $topic->getTopicName());
+ }
+
+ public function testShouldCreateProducerOnCreateProducerMethodCall()
+ {
+ $context = $this->createContext();
+
+ $producer = $context->createProducer();
+
+ $this->assertInstanceOf(PsrProducer::class, $producer);
+ }
+
+ public function testShouldCreateConsumerOnCreateConsumerMethodCall()
+ {
+ $context = $this->createContext();
+
+ $consumer = $context->createConsumer($context->createQueue('aQueue'));
+
+ $this->assertInstanceOf(PsrConsumer::class, $consumer);
+ }
+
+ /**
+ * @return PsrContext
+ */
+ abstract protected function createContext();
+}
diff --git a/pkg/psr-queue/Spec/PsrQueueSpec.php b/pkg/psr-queue/Spec/PsrQueueSpec.php
new file mode 100644
index 000000000..ad98a986d
--- /dev/null
+++ b/pkg/psr-queue/Spec/PsrQueueSpec.php
@@ -0,0 +1,28 @@
+assertInstanceOf(PsrQueue::class, $this->createQueue());
+ }
+
+ public function testShouldReturnQueueName()
+ {
+ $queue = $this->createQueue();
+
+ $this->assertSame(self::EXPECTED_QUEUE_NAME, $queue->getQueueName());
+ }
+
+ /**
+ * @return PsrQueue
+ */
+ abstract protected function createQueue();
+}
diff --git a/pkg/psr-queue/Spec/PsrTopicSpec.php b/pkg/psr-queue/Spec/PsrTopicSpec.php
new file mode 100644
index 000000000..1ac1780c6
--- /dev/null
+++ b/pkg/psr-queue/Spec/PsrTopicSpec.php
@@ -0,0 +1,28 @@
+assertInstanceOf(PsrTopic::class, $this->createTopic());
+ }
+
+ public function testShouldReturnTopicName()
+ {
+ $topic = $this->createTopic();
+
+ $this->assertSame(self::EXPECTED_TOPIC_NAME, $topic->getTopicName());
+ }
+
+ /**
+ * @return PsrTopic
+ */
+ abstract protected function createTopic();
+}
diff --git a/pkg/psr-queue/Spec/SendToAndReceiveFromQueueSpec.php b/pkg/psr-queue/Spec/SendToAndReceiveFromQueueSpec.php
new file mode 100644
index 000000000..b6366de73
--- /dev/null
+++ b/pkg/psr-queue/Spec/SendToAndReceiveFromQueueSpec.php
@@ -0,0 +1,52 @@
+createContext();
+ $queue = $this->createQueue($context, 'send_to_and_receive_from_queue_spec');
+
+ $consumer = $context->createConsumer($queue);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $expectedBody = __CLASS__.time();
+
+ $context->createProducer()->send($queue, $context->createMessage($expectedBody));
+
+ $message = $consumer->receive(2000); // 2 sec
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertSame($expectedBody, $message->getBody());
+ }
+
+ /**
+ * @return PsrContext
+ */
+ abstract protected function createContext();
+
+ /**
+ * @param PsrContext $context
+ * @param string $queueName
+ *
+ * @return PsrQueue
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName);
+ }
+}
diff --git a/pkg/psr-queue/Spec/SendToAndReceiveFromTopicSpec.php b/pkg/psr-queue/Spec/SendToAndReceiveFromTopicSpec.php
new file mode 100644
index 000000000..ccc4dfa45
--- /dev/null
+++ b/pkg/psr-queue/Spec/SendToAndReceiveFromTopicSpec.php
@@ -0,0 +1,52 @@
+createContext();
+ $topic = $this->createTopic($context, 'send_to_and_receive_from_topic_spec');
+
+ $consumer = $context->createConsumer($topic);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $expectedBody = __CLASS__.time();
+
+ $context->createProducer()->send($topic, $context->createMessage($expectedBody));
+
+ $message = $consumer->receive(2000); // 2 sec
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertSame($expectedBody, $message->getBody());
+ }
+
+ /**
+ * @return PsrContext
+ */
+ abstract protected function createContext();
+
+ /**
+ * @param PsrContext $context
+ * @param string $topicName
+ *
+ * @return PsrTopic
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ return $context->createTopic($topicName);
+ }
+}
diff --git a/pkg/psr-queue/Spec/SendToAndReceiveNoWaitFromQueueSpec.php b/pkg/psr-queue/Spec/SendToAndReceiveNoWaitFromQueueSpec.php
new file mode 100644
index 000000000..ee27b3df7
--- /dev/null
+++ b/pkg/psr-queue/Spec/SendToAndReceiveNoWaitFromQueueSpec.php
@@ -0,0 +1,55 @@
+createContext();
+ $queue = $this->createQueue($context, 'send_to_and_receive_no_wait_from_queue_spec');
+
+ $consumer = $context->createConsumer($queue);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $expectedBody = __CLASS__.time();
+
+ $context->createProducer()->send($queue, $context->createMessage($expectedBody));
+
+ $startTime = microtime(true);
+ $message = $consumer->receiveNoWait();
+
+ $this->assertLessThan(2, microtime(true) - $startTime);
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertSame($expectedBody, $message->getBody());
+ }
+
+ /**
+ * @return PsrContext
+ */
+ abstract protected function createContext();
+
+ /**
+ * @param PsrContext $context
+ * @param string $queueName
+ *
+ * @return PsrQueue
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName);
+ }
+}
diff --git a/pkg/psr-queue/Spec/SendToAndReceiveNoWaitFromTopicSpec.php b/pkg/psr-queue/Spec/SendToAndReceiveNoWaitFromTopicSpec.php
new file mode 100644
index 000000000..10af61596
--- /dev/null
+++ b/pkg/psr-queue/Spec/SendToAndReceiveNoWaitFromTopicSpec.php
@@ -0,0 +1,55 @@
+createContext();
+ $topic = $this->createTopic($context, 'send_to_and_receive_no_wait_from_topic_spec');
+
+ $consumer = $context->createConsumer($topic);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $expectedBody = __CLASS__.time();
+
+ $context->createProducer()->send($topic, $context->createMessage($expectedBody));
+
+ $startTime = microtime(true);
+ $message = $consumer->receiveNoWait();
+
+ $this->assertLessThan(2, microtime(true) - $startTime);
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertSame($expectedBody, $message->getBody());
+ }
+
+ /**
+ * @return PsrContext
+ */
+ abstract protected function createContext();
+
+ /**
+ * @param PsrContext $context
+ * @param string $topicName
+ *
+ * @return PsrTopic
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ return $context->createTopic($topicName);
+ }
+}
diff --git a/pkg/psr-queue/Spec/SendToTopicAndReceiveFromQueueSpec.php b/pkg/psr-queue/Spec/SendToTopicAndReceiveFromQueueSpec.php
new file mode 100644
index 000000000..851857693
--- /dev/null
+++ b/pkg/psr-queue/Spec/SendToTopicAndReceiveFromQueueSpec.php
@@ -0,0 +1,65 @@
+createContext();
+ $topic = $this->createTopic($context, 'send_to_topic_and_receive_from_queue_spec');
+ $queue = $this->createQueue($context, 'send_to_topic_and_receive_from_queue_spec');
+
+ $consumer = $context->createConsumer($queue);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $expectedBody = __CLASS__.time();
+
+ $context->createProducer()->send($topic, $context->createMessage($expectedBody));
+
+ $message = $consumer->receive(2000); // 2 sec
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertSame($expectedBody, $message->getBody());
+ }
+
+ /**
+ * @return PsrContext
+ */
+ abstract protected function createContext();
+
+ /**
+ * @param PsrContext $context
+ * @param string $queueName
+ *
+ * @return PsrQueue
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName);
+ }
+
+ /**
+ * @param PsrContext $context
+ * @param string $topicName
+ *
+ * @return PsrTopic
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ return $context->createTopic($topicName);
+ }
+}
diff --git a/pkg/psr-queue/Spec/SendToTopicAndReceiveNoWaitFromQueueSpec.php b/pkg/psr-queue/Spec/SendToTopicAndReceiveNoWaitFromQueueSpec.php
new file mode 100644
index 000000000..e0670c77c
--- /dev/null
+++ b/pkg/psr-queue/Spec/SendToTopicAndReceiveNoWaitFromQueueSpec.php
@@ -0,0 +1,68 @@
+createContext();
+ $topic = $this->createTopic($context, 'send_to_topic_and_receive_from_queue_spec');
+ $queue = $this->createQueue($context, 'send_to_topic_and_receive_from_queue_spec');
+
+ $consumer = $context->createConsumer($queue);
+
+ // guard
+ $this->assertNull($consumer->receiveNoWait());
+
+ $expectedBody = __CLASS__.time();
+
+ $context->createProducer()->send($topic, $context->createMessage($expectedBody));
+
+ $startTime = microtime(true);
+ $message = $consumer->receiveNoWait();
+
+ $this->assertLessThan(2, microtime(true) - $startTime);
+
+ $this->assertInstanceOf(PsrMessage::class, $message);
+ $consumer->acknowledge($message);
+
+ $this->assertSame($expectedBody, $message->getBody());
+ }
+
+ /**
+ * @return PsrContext
+ */
+ abstract protected function createContext();
+
+ /**
+ * @param PsrContext $context
+ * @param string $queueName
+ *
+ * @return PsrQueue
+ */
+ protected function createQueue(PsrContext $context, $queueName)
+ {
+ return $context->createQueue($queueName);
+ }
+
+ /**
+ * @param PsrContext $context
+ * @param string $topicName
+ *
+ * @return PsrTopic
+ */
+ protected function createTopic(PsrContext $context, $topicName)
+ {
+ return $context->createTopic($topicName);
+ }
+}