Skip to content

[WIP][beanstalk] Add transport for beanstalkd #123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 26, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ Features:

* [Feature rich](docs/quick_tour.md).
* [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transport [abstraction](https://github.com/php-enqueue/psr-queue).
* Supports transports [AMQP (RabbitMQ, ActiveMQ)](docs/transport/amqp.md), [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
* Supports transports [AMQP (RabbitMQ, ActiveMQ)](docs/transport/amqp.md), [Beanstalk (Pheanstalk)](docs/transport/pheanstalk.md) [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
* [Symfony bundle](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md)
* [Magento1 extension](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/magento/quick_tour.md)
* [Message bus](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) support.
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ function waitForService()
waitForService rabbitmq 5672 50
waitForService mysql 3306 50
waitForService redis 6379 50
waitForService beanstalkd 11300

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
"enqueue/null": "*@dev",
"enqueue/dbal": "*@dev",
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
"enqueue/simple-client": "*@dev",
@@ -85,6 +86,10 @@
"type": "path",
"url": "pkg/sqs"
},
{
"type": "path",
"url": "pkg/pheanstalk"
},
{
"type": "path",
"url": "pkg/simple-client"
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: '2'

services:
dev:
image: enqueue/dev:latest
@@ -7,6 +8,7 @@ services:
- rabbitmq
- mysql
- redis
- beanstalkd
volumes:
- './:/mqdev'
environment:
@@ -29,6 +31,9 @@ services:
- AWS__SQS__KEY=$ENQUEUE_AWS__SQS__KEY
- AWS__SQS__SECRET=$ENQUEUE_AWS__SQS__SECRET
- AWS__SQS__REGION=$ENQUEUE_AWS__SQS__REGION
- BEANSTALKD_HOST=beanstalkd
- BEANSTALKD_PORT=11300
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300

rabbitmq:
image: enqueue/rabbitmq:latest
@@ -37,6 +42,12 @@ services:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
- RABBITMQ_DEFAULT_VHOST=mqdev
ports:
- "15677:15672"

beanstalkd:
image: 'schickling/beanstalkd'

redis:
image: 'redis:3'
ports:
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
* Transports
- [Amqp (RabbitMQ, ActiveMQ)](transport/amqp.md)
- [Amazon SQS](transport/sqs.md)
- [Beanstalk (Pheanstalk)](transport/pheanstalk.md)
- [Stomp](transport/stomp.md)
- [Redis](transport/redis.md)
- [Doctrine DBAL](transport/dbal.md)
84 changes: 84 additions & 0 deletions docs/transport/pheanstalk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Beanstalk (Pheanstalk) transport

The transport uses [Beanstalkd](http://kr.github.io/beanstalkd/) job manager.
The transport uses [Pheanstalk](https://github.com/pda/pheanstalk) library internally.

* [Installation](#installation)
* [Create context](#create-context)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Consume message](#consume-message)

## Installation

```bash
$ composer require enqueue/pheanstalk
```


## Create context

```php
<?php
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;

// connects to localhost:11300
$factory = new PheanstalkConnectionFactory();

// same as above
$factory = new PheanstalkConnectionFactory('beanstalk://');

// connects to example host and port 5555
$factory = new PheanstalkConnectionFactory('beanstalk://example:5555');

// same as above but configured by array
$factory = new PheanstalkConnectionFactory([
'host' => 'example',
'port' => 5555
]);
```

## Send message to topic

```php
<?php
/** @var \Enqueue\Pheanstalk\PheanstalkContext $psrContext */

$fooTopic = $psrContext->createTopic('aTopic');
$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooTopic, $message);
```

## Send message to queue

```php
<?php
/** @var \Enqueue\Pheanstalk\PheanstalkContext $psrContext */

$fooQueue = $psrContext->createQueue('aQueue');
$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()->send($fooQueue, $message);
```

## Consume message:

```php
<?php
/** @var \Enqueue\Pheanstalk\PheanstalkContext $psrContext */

$fooQueue = $psrContext->createQueue('aQueue');
$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive(2000); // wait for 2 seconds

$message = $consumer->receiveNoWait(); // fetch message or return null immediately

// process a message

$consumer->acknowledge($message);
// $consumer->reject($message);
```

[back to index](../index.md)
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
@@ -49,6 +49,10 @@
<directory>pkg/sqs/Tests</directory>
</testsuite>

<testsuite name="pheanstalk transport">
<directory>pkg/pheanstalk/Tests</directory>
</testsuite>

<testsuite name="enqueue-bundle">
<directory>pkg/enqueue-bundle/Tests</directory>
</testsuite>
2 changes: 1 addition & 1 deletion pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
@@ -150,7 +150,7 @@ private function parseDsn($dsn)
], $dsnConfig);

if ('amqp' !== $dsnConfig['scheme']) {
throw new \LogicException('The given DSN scheme "%s" is not supported. Could be "amqp" only.');
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme']));
}

if ($dsnConfig['query']) {
2 changes: 1 addition & 1 deletion pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig()
public function testThrowIfSchemeIsNotAmqp()
{
$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The given DSN scheme "%s" is not supported. Could be "amqp" only.');
$this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "amqp" only.');

new AmqpConnectionFactory('http://example.com');
}
38 changes: 38 additions & 0 deletions pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\Spec\SendToAndReceiveFromQueueSpec;

/**
* @group functional
*/
class AmqpSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = $context->createQueue($queueName);
$context->declareQueue($queue);
$context->purge($queue);

return $queue;
}
}
39 changes: 39 additions & 0 deletions pkg/amqp-ext/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\Spec\SendToAndReceiveFromTopicSpec;

/**
* @group functional
*/
class AmqpSendToAndReceiveFromTopicTest extends SendToAndReceiveFromTopicSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createTopic(PsrContext $context, $topicName)
{
$topic = $context->createTopic($topicName);
$topic->setType(\AMQP_EX_TYPE_FANOUT);
$topic->addFlag(\AMQP_DURABLE);
$context->declareTopic($topic);

return $topic;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\Spec\SendToAndReceiveNoWaitFromQueueSpec;

/**
* @group functional
*/
class AmqpSendToAndReceiveNoWaitFromQueueTest extends SendToAndReceiveNoWaitFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = $context->createQueue($queueName);
$context->declareQueue($queue);
$context->purge($queue);

return $queue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\Spec\SendToAndReceiveNoWaitFromTopicSpec;

/**
* @group functional
*/
class AmqpSendToAndReceiveNoWaitFromTopicTest extends SendToAndReceiveNoWaitFromTopicSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createTopic(PsrContext $context, $topicName)
{
$topic = $context->createTopic($topicName);
$topic->setType(\AMQP_EX_TYPE_FANOUT);
$topic->addFlag(\AMQP_DURABLE);
$context->declareTopic($topic);

return $topic;
}
}
Loading