Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit bbec2c7

Browse files
authoredOct 11, 2017
Merge pull request #217 from php-enqueue/amqp-add-basic-consume-support
[BC break] Amqp add basic consume support
2 parents 48e17ee + 00ec39a commit bbec2c7

28 files changed

+783
-24
lines changed
 

‎composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
"enqueue/test": "*@dev",
2828
"enqueue/async-event-dispatcher": "*@dev",
2929
"queue-interop/queue-interop": "^0.6@dev",
30-
"queue-interop/amqp-interop": "^0.6@dev",
31-
"queue-interop/queue-spec": "^0.5@dev",
30+
"queue-interop/amqp-interop": "^0.7@dev",
31+
"queue-interop/queue-spec": "^0.5.1@dev",
3232

3333
"phpunit/phpunit": "^5",
3434
"doctrine/doctrine-bundle": "~1.2",

‎docker/Dockerfile

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ RUN echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini
1717
RUN echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini
1818

1919
COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
20+
COPY ./php/amqp.so /usr/lib/php/20160303/amqp.so
2021
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
2122
RUN chmod u+x /usr/local/bin/entrypoint.sh
2223

‎docker/php/amqp.so

609 KB
Binary file not shown.

‎phpstan.neon

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ parameters:
77
- pkg/redis/PhpRedis.php
88
- pkg/redis/RedisConnectionFactory.php
99
- pkg/gearman
10-
- pkg/amqp-ext/AmqpConsumer.php
10+
- pkg/amqp-ext/AmqpConsumer.php
11+
- pkg/amqp-ext/AmqpContext.php

‎pkg/amqp-bunny/AmqpContext.php

+104
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
namespace Enqueue\AmqpBunny;
44

55
use Bunny\Channel;
6+
use Bunny\Client;
7+
use Bunny\Message;
68
use Enqueue\AmqpTools\DelayStrategyAware;
79
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
810
use Interop\Amqp\AmqpBind as InteropAmqpBind;
11+
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
912
use Interop\Amqp\AmqpContext as InteropAmqpContext;
1013
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
1114
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
@@ -43,6 +46,13 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
4346
*/
4447
private $buffer;
4548

49+
/**
50+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
51+
*
52+
* @var array
53+
*/
54+
private $subscribers;
55+
4656
/**
4757
* Callable must return instance of \Bunny\Channel once called.
4858
*
@@ -309,6 +319,77 @@ public function setQos($prefetchSize, $prefetchCount, $global)
309319
$this->getBunnyChannel()->qos($prefetchSize, $prefetchCount, $global);
310320
}
311321

322+
/**
323+
* {@inheritdoc}
324+
*/
325+
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
326+
{
327+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
328+
return;
329+
}
330+
331+
$bunnyCallback = function (Message $message, Channel $channel, Client $bunny) {
332+
$receivedMessage = $this->convertMessage($message);
333+
$receivedMessage->setConsumerTag($message->consumerTag);
334+
335+
/**
336+
* @var AmqpConsumer
337+
* @var callable $callback
338+
*/
339+
list($consumer, $callback) = $this->subscribers[$message->consumerTag];
340+
341+
if (false === call_user_func($callback, $receivedMessage, $consumer)) {
342+
$bunny->stop();
343+
}
344+
};
345+
346+
$frame = $this->getBunnyChannel()->consume(
347+
$bunnyCallback,
348+
$consumer->getQueue()->getQueueName(),
349+
$consumer->getConsumerTag(),
350+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
351+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
352+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
353+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
354+
);
355+
356+
if (empty($frame->consumerTag)) {
357+
throw new Exception('Got empty consumer tag');
358+
}
359+
360+
$consumer->setConsumerTag($frame->consumerTag);
361+
362+
$this->subscribers[$frame->consumerTag] = [$consumer, $callback];
363+
}
364+
365+
/**
366+
* {@inheritdoc}
367+
*/
368+
public function unsubscribe(InteropAmqpConsumer $consumer)
369+
{
370+
if (false == $consumer->getConsumerTag()) {
371+
return;
372+
}
373+
374+
$consumerTag = $consumer->getConsumerTag();
375+
376+
$this->getBunnyChannel()->cancel($consumerTag);
377+
$consumer->setConsumerTag(null);
378+
unset($this->subscribers[$consumerTag]);
379+
}
380+
381+
/**
382+
* {@inheritdoc}
383+
*/
384+
public function consume($timeout = 0)
385+
{
386+
if (empty($this->subscribers)) {
387+
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
388+
}
389+
390+
$this->getBunnyChannel()->getClient()->run($timeout / 1000);
391+
}
392+
312393
/**
313394
* @return Channel
314395
*/
@@ -328,4 +409,27 @@ public function getBunnyChannel()
328409

329410
return $this->bunnyChannel;
330411
}
412+
413+
/**
414+
* @param Message $bunnyMessage
415+
*
416+
* @return InteropAmqpMessage
417+
*/
418+
private function convertMessage(Message $bunnyMessage)
419+
{
420+
$headers = $bunnyMessage->headers;
421+
422+
$properties = [];
423+
if (isset($headers['application_headers'])) {
424+
$properties = $headers['application_headers'];
425+
}
426+
unset($headers['application_headers']);
427+
428+
$message = new AmqpMessage($bunnyMessage->content, $properties, $headers);
429+
$message->setDeliveryTag($bunnyMessage->deliveryTag);
430+
$message->setRedelivered($bunnyMessage->redelivered);
431+
$message->setRoutingKey($bunnyMessage->routingKey);
432+
433+
return $message;
434+
}
331435
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeBreakOnFalseSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeBreakOnFalseTest extends BasicConsumeBreakOnFalseSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeFromAllSubscribedQueuesSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeFromAllSubscribedQueuesTest extends BasicConsumeFromAllSubscribedQueuesSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldAddConsumerTagOnSubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldAddConsumerTagOnSubscribeTest extends BasicConsumeShouldAddConsumerTagOnSubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldRemoveConsumerTagOnUnsubscribeTest extends BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeUntilUnsubscribedSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeUntilUnsubscribedTest extends BasicConsumeUntilUnsubscribedSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}

‎pkg/amqp-bunny/composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"require": {
88
"php": ">=5.6",
99

10-
"queue-interop/amqp-interop": "^0.6@dev",
10+
"queue-interop/amqp-interop": "^0.7@dev",
1111
"bunny/bunny": "^0.2.4",
1212
"enqueue/amqp-tools": "^0.8@dev"
1313
},
@@ -16,7 +16,7 @@
1616
"enqueue/test": "^0.8@dev",
1717
"enqueue/enqueue": "^0.8@dev",
1818
"enqueue/null": "^0.8@dev",
19-
"queue-interop/queue-spec": "^0.5@dev",
19+
"queue-interop/queue-spec": "^0.5.1@dev",
2020
"symfony/dependency-injection": "^2.8|^3",
2121
"symfony/config": "^2.8|^3"
2222
},

‎pkg/amqp-ext/AmqpContext.php

+124
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
77
use Interop\Amqp\AmqpBind as InteropAmqpBind;
8+
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
89
use Interop\Amqp\AmqpContext as InteropAmqpContext;
910
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
1011
use Interop\Amqp\AmqpTopic as InteropAmqpTopic;
@@ -41,6 +42,13 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
4142
*/
4243
private $receiveMethod;
4344

45+
/**
46+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
47+
*
48+
* @var array
49+
*/
50+
private $subscribers;
51+
4452
/**
4553
* Callable must return instance of \AMQPChannel once called.
4654
*
@@ -60,6 +68,7 @@ public function __construct($extChannel, $receiveMethod)
6068
}
6169

6270
$this->buffer = new Buffer();
71+
$this->subscribers = [];
6372
}
6473

6574
/**
@@ -289,4 +298,119 @@ public function getExtChannel()
289298

290299
return $this->extChannel;
291300
}
301+
302+
/**
303+
* {@inheritdoc}
304+
*/
305+
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
306+
{
307+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
308+
return;
309+
}
310+
311+
$extQueue = new \AMQPQueue($this->getExtChannel());
312+
$extQueue->setName($consumer->getQueue()->getQueueName());
313+
314+
$extQueue->consume(null, Flags::convertConsumerFlags($consumer->getFlags()), $consumer->getConsumerTag());
315+
316+
$consumerTag = $extQueue->getConsumerTag();
317+
$consumer->setConsumerTag($consumerTag);
318+
$this->subscribers[$consumerTag] = [$consumer, $callback];
319+
}
320+
321+
/**
322+
* {@inheritdoc}
323+
*/
324+
public function unsubscribe(InteropAmqpConsumer $consumer)
325+
{
326+
if (false == $consumer->getConsumerTag()) {
327+
return;
328+
}
329+
330+
// seg fault
331+
// $consumerTag = $consumer->getConsumerTag();
332+
// $consumer->setConsumerTag(null);
333+
//
334+
// $extQueue = new \AMQPQueue($this->getExtChannel());
335+
// $extQueue->setName($consumer->getQueue()->getQueueName());
336+
//
337+
// $extQueue->cancel($consumerTag);
338+
// unset($this->subscribers[$consumerTag]);
339+
}
340+
341+
/**
342+
* {@inheritdoc}
343+
*/
344+
public function consume($timeout = 0)
345+
{
346+
if (empty($this->subscribers)) {
347+
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
348+
}
349+
350+
/** @var \AMQPQueue $extQueue */
351+
$extConnection = $this->getExtChannel()->getConnection();
352+
353+
$originalTimeout = $extConnection->getReadTimeout();
354+
try {
355+
$extConnection->setReadTimeout($timeout / 1000);
356+
357+
reset($this->subscribers);
358+
/** @var $consumer AmqpConsumer */
359+
list($consumer) = current($this->subscribers);
360+
361+
$extQueue = new \AMQPQueue($this->getExtChannel());
362+
$extQueue->setName($consumer->getQueue()->getQueueName());
363+
$extQueue->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) {
364+
$message = $this->convertMessage($extEnvelope);
365+
$message->setConsumerTag($q->getConsumerTag());
366+
367+
/**
368+
* @var AmqpConsumer
369+
* @var callable $callback
370+
*/
371+
list($consumer, $callback) = $this->subscribers[$q->getConsumerTag()];
372+
373+
return call_user_func($callback, $message, $consumer);
374+
}, AMQP_JUST_CONSUME);
375+
} catch (\AMQPQueueException $e) {
376+
if ('Consumer timeout exceed' == $e->getMessage()) {
377+
return null;
378+
}
379+
380+
throw $e;
381+
} finally {
382+
$extConnection->setReadTimeout($originalTimeout);
383+
}
384+
}
385+
386+
/**
387+
* @param \AMQPEnvelope $extEnvelope
388+
*
389+
* @return AmqpMessage
390+
*/
391+
private function convertMessage(\AMQPEnvelope $extEnvelope)
392+
{
393+
$message = new AmqpMessage(
394+
$extEnvelope->getBody(),
395+
$extEnvelope->getHeaders(),
396+
[
397+
'message_id' => $extEnvelope->getMessageId(),
398+
'correlation_id' => $extEnvelope->getCorrelationId(),
399+
'app_id' => $extEnvelope->getAppId(),
400+
'type' => $extEnvelope->getType(),
401+
'content_encoding' => $extEnvelope->getContentEncoding(),
402+
'content_type' => $extEnvelope->getContentType(),
403+
'expiration' => $extEnvelope->getExpiration(),
404+
'priority' => $extEnvelope->getPriority(),
405+
'reply_to' => $extEnvelope->getReplyTo(),
406+
'timestamp' => $extEnvelope->getTimeStamp(),
407+
'user_id' => $extEnvelope->getUserId(),
408+
]
409+
);
410+
$message->setRedelivered($extEnvelope->isRedelivery());
411+
$message->setDeliveryTag($extEnvelope->getDeliveryTag());
412+
$message->setRoutingKey($extEnvelope->getRoutingKey());
413+
414+
return $message;
415+
}
292416
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeBreakOnFalseSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeBreakOnFalseTest extends BasicConsumeBreakOnFalseSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeFromAllSubscribedQueuesSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeFromAllSubscribedQueuesTest extends BasicConsumeFromAllSubscribedQueuesSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldAddConsumerTagOnSubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldAddConsumerTagOnSubscribeTest extends BasicConsumeShouldAddConsumerTagOnSubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldRemoveConsumerTagOnUnsubscribeTest extends BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec
12+
{
13+
public function test()
14+
{
15+
$this->markTestSkipped('Seg fault.');
16+
}
17+
18+
/**
19+
* {@inheritdoc}
20+
*/
21+
protected function createContext()
22+
{
23+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
24+
25+
return $factory->createContext();
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeUntilUnsubscribedSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeUntilUnsubscribedTest extends BasicConsumeUntilUnsubscribedSpec
12+
{
13+
public function test()
14+
{
15+
$this->markTestSkipped('Sig fault');
16+
}
17+
18+
/**
19+
* {@inheritdoc}
20+
*/
21+
protected function createContext()
22+
{
23+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
24+
25+
return $factory->createContext();
26+
}
27+
}

‎pkg/amqp-ext/composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
"php": ">=5.6",
99
"ext-amqp": "^1.6",
1010

11-
"queue-interop/amqp-interop": "^0.6@dev",
11+
"queue-interop/amqp-interop": "^0.7@dev",
1212
"enqueue/amqp-tools": "^0.8@dev"
1313
},
1414
"require-dev": {
1515
"phpunit/phpunit": "~5.4.0",
1616
"enqueue/test": "^0.8@dev",
1717
"enqueue/enqueue": "^0.8@dev",
1818
"enqueue/null": "^0.8@dev",
19-
"queue-interop/queue-spec": "^0.5@dev",
19+
"queue-interop/queue-spec": "^0.5.1@dev",
2020
"empi89/php-amqp-stubs": "*@dev",
2121
"symfony/dependency-injection": "^2.8|^3",
2222
"symfony/config": "^2.8|^3"

‎pkg/amqp-lib/AmqpContext.php

+126
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
77
use Interop\Amqp\AmqpBind as InteropAmqpBind;
8+
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
89
use Interop\Amqp\AmqpContext as InteropAmqpContext;
910
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
1011
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
@@ -19,6 +20,8 @@
1920
use Interop\Queue\PsrTopic;
2021
use PhpAmqpLib\Channel\AMQPChannel;
2122
use PhpAmqpLib\Connection\AbstractConnection;
23+
use PhpAmqpLib\Exception\AMQPTimeoutException;
24+
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
2225
use PhpAmqpLib\Wire\AMQPTable;
2326

2427
class AmqpContext implements InteropAmqpContext, DelayStrategyAware
@@ -45,6 +48,13 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
4548
*/
4649
private $buffer;
4750

51+
/**
52+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
53+
*
54+
* @var array
55+
*/
56+
private $subscribers;
57+
4858
/**
4959
* @param AbstractConnection $connection
5060
* @param array $config
@@ -302,6 +312,98 @@ public function setQos($prefetchSize, $prefetchCount, $global)
302312
$this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
303313
}
304314

315+
/**
316+
* {@inheritdoc}
317+
*/
318+
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
319+
{
320+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
321+
return;
322+
}
323+
324+
$libCallback = function (LibAMQPMessage $message) {
325+
$receivedMessage = $this->convertMessage($message);
326+
$receivedMessage->setConsumerTag($message->delivery_info['consumer_tag']);
327+
328+
/**
329+
* @var AmqpConsumer
330+
* @var callable $callback
331+
*/
332+
list($consumer, $callback) = $this->subscribers[$message->delivery_info['consumer_tag']];
333+
334+
if (false === call_user_func($callback, $receivedMessage, $consumer)) {
335+
throw new StopBasicConsumptionException();
336+
}
337+
};
338+
339+
$consumerTag = $this->getChannel()->basic_consume(
340+
$consumer->getQueue()->getQueueName(),
341+
$consumer->getConsumerTag(),
342+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
343+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
344+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
345+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT),
346+
$libCallback
347+
);
348+
349+
if (empty($consumerTag)) {
350+
throw new Exception('Got empty consumer tag');
351+
}
352+
353+
$consumer->setConsumerTag($consumerTag);
354+
355+
$this->subscribers[$consumerTag] = [$consumer, $callback];
356+
}
357+
358+
/**
359+
* {@inheritdoc}
360+
*/
361+
public function unsubscribe(InteropAmqpConsumer $consumer)
362+
{
363+
if (false == $consumer->getConsumerTag()) {
364+
return;
365+
}
366+
367+
$consumerTag = $consumer->getConsumerTag();
368+
369+
$this->getChannel()->basic_cancel($consumerTag);
370+
371+
$consumer->setConsumerTag(null);
372+
unset($this->subscribers[$consumerTag], $this->getChannel()->callbacks[$consumerTag]);
373+
}
374+
375+
/**
376+
* {@inheritdoc}
377+
*/
378+
public function consume($timeout = 0)
379+
{
380+
if (empty($this->subscribers)) {
381+
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
382+
}
383+
384+
try {
385+
while (true) {
386+
$start = microtime(true);
387+
388+
$this->channel->wait(null, false, $timeout / 1000);
389+
390+
if ($timeout <= 0) {
391+
continue;
392+
}
393+
394+
// compute remaining timeout and continue until time is up
395+
$stop = microtime(true);
396+
$timeout -= ($stop - $start) * 1000;
397+
398+
if ($timeout <= 0) {
399+
break;
400+
}
401+
}
402+
} catch (AMQPTimeoutException $e) {
403+
} catch (StopBasicConsumptionException $e) {
404+
}
405+
}
406+
305407
/**
306408
* @return AMQPChannel
307409
*/
@@ -318,4 +420,28 @@ private function getChannel()
318420

319421
return $this->channel;
320422
}
423+
424+
/**
425+
* @param LibAMQPMessage $amqpMessage
426+
*
427+
* @return InteropAmqpMessage
428+
*/
429+
private function convertMessage(LibAMQPMessage $amqpMessage)
430+
{
431+
$headers = new AMQPTable($amqpMessage->get_properties());
432+
$headers = $headers->getNativeData();
433+
434+
$properties = [];
435+
if (isset($headers['application_headers'])) {
436+
$properties = $headers['application_headers'];
437+
}
438+
unset($headers['application_headers']);
439+
440+
$message = new AmqpMessage($amqpMessage->getBody(), $properties, $headers);
441+
$message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']);
442+
$message->setRedelivered($amqpMessage->delivery_info['redelivered']);
443+
$message->setRoutingKey($amqpMessage->delivery_info['routing_key']);
444+
445+
return $message;
446+
}
321447
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib;
4+
5+
class StopBasicConsumptionException extends \LogicException
6+
{
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeBreakOnFalseSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeBreakOnFalseTest extends BasicConsumeBreakOnFalseSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeFromAllSubscribedQueuesSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeFromAllSubscribedQueuesTest extends BasicConsumeFromAllSubscribedQueuesSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldAddConsumerTagOnSubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldAddConsumerTagOnSubscribeTest extends BasicConsumeShouldAddConsumerTagOnSubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldRemoveConsumerTagOnUnsubscribeTest extends BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeUntilUnsubscribedSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeUntilUnsubscribedTest extends BasicConsumeUntilUnsubscribedSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}

‎pkg/amqp-lib/composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
"php": ">=5.6",
99
"php-amqplib/php-amqplib": "^2.7@dev",
1010
"queue-interop/queue-interop": "^0.6@dev",
11-
"queue-interop/amqp-interop": "^0.6@dev",
11+
"queue-interop/amqp-interop": "^0.7@dev",
1212
"enqueue/amqp-tools": "^0.8@dev"
1313
},
1414
"require-dev": {
1515
"phpunit/phpunit": "~5.4.0",
1616
"enqueue/test": "^0.8@dev",
1717
"enqueue/enqueue": "^0.8@dev",
1818
"enqueue/null": "^0.8@dev",
19-
"queue-interop/queue-spec": "^0.5@dev",
19+
"queue-interop/queue-spec": "^0.5.1@dev",
2020
"symfony/dependency-injection": "^2.8|^3",
2121
"symfony/config": "^2.8|^3"
2222
},

‎pkg/amqp-tools/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"require": {
88
"php": ">=5.6",
99
"queue-interop/queue-interop": "^0.6@dev",
10-
"queue-interop/amqp-interop": "^0.6@dev"
10+
"queue-interop/amqp-interop": "^0.7@dev"
1111
},
1212
"require-dev": {
1313
"phpunit/phpunit": "~5.4.0",

‎pkg/enqueue/Consumption/QueueConsumer.php

+70-14
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\AmqpExt\AmqpConsumer;
56
use Enqueue\Consumption\Exception\ConsumptionInterruptedException;
67
use Enqueue\Consumption\Exception\InvalidArgumentException;
78
use Enqueue\Consumption\Exception\LogicException;
89
use Enqueue\Util\VarExport;
10+
use Interop\Amqp\AmqpContext;
11+
use Interop\Amqp\AmqpMessage;
912
use Interop\Queue\PsrConsumer;
1013
use Interop\Queue\PsrContext;
1114
use Interop\Queue\PsrProcessor;
@@ -143,6 +146,10 @@ public function bind($queue, $processor)
143146
*/
144147
public function consume(ExtensionInterface $runtimeExtension = null)
145148
{
149+
if (empty($this->boundProcessors)) {
150+
throw new \LogicException('There is nothing to consume. It is required to bind something before calling consume method.');
151+
}
152+
146153
/** @var PsrConsumer[] $consumers */
147154
$consumers = [];
148155
/** @var PsrQueue $queue */
@@ -163,21 +170,65 @@ public function consume(ExtensionInterface $runtimeExtension = null)
163170

164171
while (true) {
165172
try {
166-
/** @var PsrQueue $queue */
167-
foreach ($this->boundProcessors as list($queue, $processor)) {
168-
$consumer = $consumers[$queue->getQueueName()];
169-
170-
$context = new Context($this->psrContext);
171-
$context->setLogger($logger);
172-
$context->setPsrQueue($queue);
173-
$context->setPsrConsumer($consumer);
174-
$context->setPsrProcessor($processor);
175-
176-
$this->doConsume($extension, $context);
173+
if ($this->psrContext instanceof AmqpContext) {
174+
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger) {
175+
$currentProcessor = null;
176+
177+
/** @var PsrQueue $queue */
178+
foreach ($this->boundProcessors as list($queue, $processor)) {
179+
if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) {
180+
$currentProcessor = $processor;
181+
}
182+
}
183+
184+
if (false == $currentProcessor) {
185+
throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName()));
186+
}
187+
188+
$context = new Context($this->psrContext);
189+
$context->setLogger($logger);
190+
$context->setPsrQueue($consumer->getQueue());
191+
$context->setPsrConsumer($consumer);
192+
$context->setPsrProcessor($currentProcessor);
193+
$context->setPsrMessage($message);
194+
195+
$this->doConsume($extension, $context);
196+
197+
return true;
198+
};
199+
200+
foreach ($consumers as $consumer) {
201+
/* @var AmqpConsumer $consumer */
202+
203+
$this->psrContext->subscribe($consumer, $callback);
204+
}
205+
206+
$this->psrContext->consume($this->receiveTimeout);
207+
} else {
208+
/** @var PsrQueue $queue */
209+
foreach ($this->boundProcessors as list($queue, $processor)) {
210+
$consumer = $consumers[$queue->getQueueName()];
211+
212+
$context = new Context($this->psrContext);
213+
$context->setLogger($logger);
214+
$context->setPsrQueue($queue);
215+
$context->setPsrConsumer($consumer);
216+
$context->setPsrProcessor($processor);
217+
218+
$this->doConsume($extension, $context);
219+
}
177220
}
178221
} catch (ConsumptionInterruptedException $e) {
179222
$logger->info(sprintf('Consuming interrupted'));
180223

224+
if ($this->psrContext instanceof AmqpContext) {
225+
foreach ($consumers as $consumer) {
226+
/* @var AmqpConsumer $consumer */
227+
228+
$this->psrContext->unsubscribe($consumer);
229+
}
230+
}
231+
181232
$context->setExecutionInterrupted(true);
182233

183234
$extension->onInterrupted($context);
@@ -218,14 +269,19 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
218269
throw new ConsumptionInterruptedException();
219270
}
220271

221-
if ($message = $consumer->receive($this->receiveTimeout)) {
272+
$message = $context->getPsrMessage();
273+
if (false == $message) {
274+
if ($message = $consumer->receive($this->receiveTimeout)) {
275+
$context->setPsrMessage($message);
276+
}
277+
}
278+
279+
if ($message) {
222280
$logger->info('Message received from the queue: '.$context->getPsrQueue()->getQueueName());
223281
$logger->debug('Headers: {headers}', ['headers' => new VarExport($message->getHeaders())]);
224282
$logger->debug('Properties: {properties}', ['properties' => new VarExport($message->getProperties())]);
225283
$logger->debug('Payload: {payload}', ['payload' => new VarExport($message->getBody())]);
226284

227-
$context->setPsrMessage($message);
228-
229285
$extension->onPreReceived($context);
230286
if (!$context->getResult()) {
231287
$result = $processor->process($message, $this->psrContext);

0 commit comments

Comments
 (0)
Please sign in to comment.