Skip to content

Commit 8dfda8f

Browse files
committed
[amqp-bunny] implement basic consume related methods.
1 parent c5ee3a6 commit 8dfda8f

7 files changed

+241
-38
lines changed

pkg/amqp-bunny/AmqpContext.php

+104
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
namespace Enqueue\AmqpBunny;
44

55
use Bunny\Channel;
6+
use Bunny\Client;
7+
use Bunny\Message;
68
use Enqueue\AmqpTools\DelayStrategyAware;
79
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
810
use Interop\Amqp\AmqpBind as InteropAmqpBind;
11+
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
912
use Interop\Amqp\AmqpContext as InteropAmqpContext;
1013
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
1114
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
@@ -43,6 +46,13 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
4346
*/
4447
private $buffer;
4548

49+
/**
50+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
51+
*
52+
* @var array
53+
*/
54+
private $subscribers;
55+
4656
/**
4757
* Callable must return instance of \Bunny\Channel once called.
4858
*
@@ -309,6 +319,77 @@ public function setQos($prefetchSize, $prefetchCount, $global)
309319
$this->getBunnyChannel()->qos($prefetchSize, $prefetchCount, $global);
310320
}
311321

322+
/**
323+
* {@inheritdoc}
324+
*/
325+
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
326+
{
327+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
328+
return;
329+
}
330+
331+
$bunnyCallback = function (Message $message, Channel $channel, Client $bunny) {
332+
$receivedMessage = $this->convertMessage($message);
333+
$receivedMessage->setConsumerTag($message->consumerTag);
334+
335+
/**
336+
* @var AmqpConsumer
337+
* @var callable $callback
338+
*/
339+
list($consumer, $callback) = $this->subscribers[$message->consumerTag];
340+
341+
if (false === call_user_func($callback, $receivedMessage, $consumer)) {
342+
$bunny->stop();
343+
}
344+
};
345+
346+
$frame = $this->getBunnyChannel()->consume(
347+
$bunnyCallback,
348+
$consumer->getQueue()->getQueueName(),
349+
$consumer->getConsumerTag(),
350+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
351+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
352+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
353+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
354+
);
355+
356+
if (empty($frame->consumerTag)) {
357+
throw new Exception('Got empty consumer tag');
358+
}
359+
360+
$consumer->setConsumerTag($frame->consumerTag);
361+
362+
$this->subscribers[$frame->consumerTag] = [$consumer, $callback];
363+
}
364+
365+
/**
366+
* {@inheritdoc}
367+
*/
368+
public function unsubscribe(InteropAmqpConsumer $consumer)
369+
{
370+
if (false == $consumer->getConsumerTag()) {
371+
return;
372+
}
373+
374+
$consumerTag = $consumer->getConsumerTag();
375+
376+
$this->getBunnyChannel()->cancel($consumerTag);
377+
$consumer->setConsumerTag(null);
378+
unset($this->subscribers[$consumerTag]);
379+
}
380+
381+
/**
382+
* {@inheritdoc}
383+
*/
384+
public function consume($timeout = 0)
385+
{
386+
if (empty($this->subscribers)) {
387+
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
388+
}
389+
390+
$this->getBunnyChannel()->getClient()->run($timeout / 1000);
391+
}
392+
312393
/**
313394
* @return Channel
314395
*/
@@ -328,4 +409,27 @@ public function getBunnyChannel()
328409

329410
return $this->bunnyChannel;
330411
}
412+
413+
/**
414+
* @param Message $bunnyMessage
415+
*
416+
* @return InteropAmqpMessage
417+
*/
418+
private function convertMessage(Message $bunnyMessage)
419+
{
420+
$headers = $bunnyMessage->headers;
421+
422+
$properties = [];
423+
if (isset($headers['application_headers'])) {
424+
$properties = $headers['application_headers'];
425+
}
426+
unset($headers['application_headers']);
427+
428+
$message = new AmqpMessage($bunnyMessage->content, $properties, $headers);
429+
$message->setDeliveryTag($bunnyMessage->deliveryTag);
430+
$message->setRedelivered($bunnyMessage->redelivered);
431+
$message->setRoutingKey($bunnyMessage->routingKey);
432+
433+
return $message;
434+
}
331435
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldAddConsumerTagOnSubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldAddConsumerTagOnSubscribeTest extends BasicConsumeShouldAddConsumerTagOnSubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldRemoveConsumerTagOnUnsubscribeTest extends BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeUntilUnsubscribedSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeUntilUnsubscribedTest extends BasicConsumeUntilUnsubscribedSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}

pkg/amqp-ext/AmqpContext.php

+19-26
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
4343
private $receiveMethod;
4444

4545
/**
46+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
47+
*
4648
* @var array
4749
*/
48-
private $basicConsumeSubscribers;
50+
private $subscribers;
4951

5052
/**
5153
* Callable must return instance of \AMQPChannel once called.
@@ -66,7 +68,7 @@ public function __construct($extChannel, $receiveMethod)
6668
}
6769

6870
$this->buffer = new Buffer();
69-
$this->basicConsumeSubscribers = [];
71+
$this->subscribers = [];
7072
}
7173

7274
/**
@@ -298,23 +300,11 @@ public function getExtChannel()
298300
}
299301

300302
/**
301-
* Notify broker that the channel is interested in consuming messages from this queue.
302-
*
303-
* @param InteropAmqpConsumer $consumer
304-
* @param callable $callback A callback function to which the
305-
* consumed message will be passed. The
306-
* function must accept at a minimum
307-
* one parameter, an \Interop\Amqp\AmqpMessage object,
308-
* and an optional second parameter
309-
* the \Interop\Amqp\AmqpConsumer from which the message was
310-
* consumed. The \Interop\Amqp\AmqpContext::basicConsume() will
311-
* not return the processing thread back to
312-
* the PHP script until the callback
313-
* function returns FALSE.
303+
* {@inheritdoc}
314304
*/
315-
public function basicConsumeSubscribe(InteropAmqpConsumer $consumer, callable $callback)
305+
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
316306
{
317-
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->basicConsumeSubscribers)) {
307+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
318308
return;
319309
}
320310

@@ -325,10 +315,13 @@ public function basicConsumeSubscribe(InteropAmqpConsumer $consumer, callable $c
325315

326316
$consumerTag = $extQueue->getConsumerTag();
327317
$consumer->setConsumerTag($consumerTag);
328-
$this->basicConsumeSubscribers[$consumerTag] = [$consumer, $callback];
318+
$this->subscribers[$consumerTag] = [$consumer, $callback];
329319
}
330320

331-
public function basicConsumeUnsubscribe(InteropAmqpConsumer $consumer)
321+
/**
322+
* {@inheritdoc}
323+
*/
324+
public function unsubscribe(InteropAmqpConsumer $consumer)
332325
{
333326
if (false == $consumer->getConsumerTag()) {
334327
return;
@@ -341,15 +334,15 @@ public function basicConsumeUnsubscribe(InteropAmqpConsumer $consumer)
341334
$extQueue->setName($consumer->getQueue()->getQueueName());
342335

343336
$extQueue->cancel($consumerTag);
344-
unset($this->basicConsumeSubscribers[$consumerTag]);
337+
unset($this->subscribers[$consumerTag]);
345338
}
346339

347340
/**
348-
* @param float|int $timeout milliseconds, consumes endlessly if zero set
341+
* {@inheritdoc}
349342
*/
350-
public function basicConsume($timeout = 0)
343+
public function consume($timeout = 0)
351344
{
352-
if (empty($this->basicConsumeSubscribers)) {
345+
if (empty($this->subscribers)) {
353346
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
354347
}
355348

@@ -360,9 +353,9 @@ public function basicConsume($timeout = 0)
360353
try {
361354
$extConnection->setReadTimeout($timeout / 1000);
362355

363-
reset($this->basicConsumeSubscribers);
356+
reset($this->subscribers);
364357
/** @var $consumer AmqpConsumer */
365-
list($consumer) = current($this->basicConsumeSubscribers);
358+
list($consumer) = current($this->subscribers);
366359

367360
$extQueue = new \AMQPQueue($this->getExtChannel());
368361
$extQueue->setName($consumer->getQueue()->getQueueName());
@@ -374,7 +367,7 @@ public function basicConsume($timeout = 0)
374367
* @var AmqpConsumer
375368
* @var callable $callback
376369
*/
377-
list($consumer, $callback) = $this->basicConsumeSubscribers[$q->getConsumerTag()];
370+
list($consumer, $callback) = $this->subscribers[$q->getConsumerTag()];
378371

379372
return call_user_func($callback, $message, $consumer);
380373
}, AMQP_JUST_CONSUME);

pkg/amqp-lib/AmqpContext.php

+40
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
77
use Interop\Amqp\AmqpBind as InteropAmqpBind;
8+
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
89
use Interop\Amqp\AmqpContext as InteropAmqpContext;
910
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
1011
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
@@ -45,6 +46,13 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
4546
*/
4647
private $buffer;
4748

49+
/**
50+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
51+
*
52+
* @var array
53+
*/
54+
private $subscribers;
55+
4856
/**
4957
* @param AbstractConnection $connection
5058
* @param array $config
@@ -302,6 +310,38 @@ public function setQos($prefetchSize, $prefetchCount, $global)
302310
$this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
303311
}
304312

313+
/**
314+
* {@inheritdoc}
315+
*/
316+
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
317+
{
318+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
319+
return;
320+
}
321+
322+
throw new \LogicException('Not implemented');
323+
}
324+
325+
/**
326+
* {@inheritdoc}
327+
*/
328+
public function unsubscribe(InteropAmqpConsumer $consumer)
329+
{
330+
throw new \LogicException('Not implemented');
331+
}
332+
333+
/**
334+
* {@inheritdoc}
335+
*/
336+
public function consume($timeout = 0)
337+
{
338+
if (empty($this->subscribers)) {
339+
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
340+
}
341+
342+
throw new \LogicException('Not implemented');
343+
}
344+
305345
/**
306346
* @return AMQPChannel
307347
*/

0 commit comments

Comments
 (0)