Skip to content

Commit 69634d7

Browse files
committed
[amqp-lib] Add basic consume support.
1 parent 8dfda8f commit 69634d7

8 files changed

+214
-3
lines changed

pkg/amqp-lib/AmqpContext.php

+89-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
use Interop\Queue\PsrTopic;
2121
use PhpAmqpLib\Channel\AMQPChannel;
2222
use PhpAmqpLib\Connection\AbstractConnection;
23+
use PhpAmqpLib\Exception\AMQPTimeoutException;
24+
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
2325
use PhpAmqpLib\Wire\AMQPTable;
2426

2527
class AmqpContext implements InteropAmqpContext, DelayStrategyAware
@@ -319,15 +321,55 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
319321
return;
320322
}
321323

322-
throw new \LogicException('Not implemented');
324+
$libCallback = function (LibAMQPMessage $message) {
325+
$receivedMessage = $this->convertMessage($message);
326+
$receivedMessage->setConsumerTag($message->delivery_info['consumer_tag']);
327+
328+
/**
329+
* @var AmqpConsumer
330+
* @var callable $callback
331+
*/
332+
list($consumer, $callback) = $this->subscribers[$message->delivery_info['consumer_tag']];
333+
334+
if (false === call_user_func($callback, $receivedMessage, $consumer)) {
335+
throw new StopBasicConsumptionException();
336+
}
337+
};
338+
339+
$consumerTag = $this->getChannel()->basic_consume(
340+
$consumer->getQueue()->getQueueName(),
341+
$consumer->getConsumerTag(),
342+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
343+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
344+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
345+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT),
346+
$libCallback
347+
);
348+
349+
if (empty($consumerTag)) {
350+
throw new Exception('Got empty consumer tag');
351+
}
352+
353+
$consumer->setConsumerTag($consumerTag);
354+
355+
$this->subscribers[$consumerTag] = [$consumer, $callback];
323356
}
324357

325358
/**
326359
* {@inheritdoc}
327360
*/
328361
public function unsubscribe(InteropAmqpConsumer $consumer)
329362
{
330-
throw new \LogicException('Not implemented');
363+
if (false == $consumer->getConsumerTag()) {
364+
return;
365+
}
366+
367+
$consumerTag = $consumer->getConsumerTag();
368+
369+
$this->getChannel()->basic_cancel($consumerTag);
370+
371+
$consumer->setConsumerTag(null);
372+
unset($this->subscribers[$consumerTag], $this->getChannel()->callbacks[$consumerTag]);
331373
}
332374

333375
/**
@@ -339,7 +381,27 @@ public function consume($timeout = 0)
339381
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
340382
}
341383

342-
throw new \LogicException('Not implemented');
384+
try {
385+
while (true) {
386+
$start = microtime(true);
387+
388+
$this->channel->wait(null, false, $timeout / 1000);
389+
390+
if ($timeout <= 0) {
391+
continue;
392+
}
393+
394+
// compute remaining timeout and continue until time is up
395+
$stop = microtime(true);
396+
$timeout -= ($stop - $start) * 1000;
397+
398+
if ($timeout <= 0) {
399+
break;
400+
}
401+
}
402+
} catch (AMQPTimeoutException $e) {
403+
} catch (StopBasicConsumptionException $e) {
404+
}
343405
}
344406

345407
/**
@@ -358,4 +420,28 @@ private function getChannel()
358420

359421
return $this->channel;
360422
}
423+
424+
/**
425+
* @param LibAMQPMessage $amqpMessage
426+
*
427+
* @return InteropAmqpMessage
428+
*/
429+
private function convertMessage(LibAMQPMessage $amqpMessage)
430+
{
431+
$headers = new AMQPTable($amqpMessage->get_properties());
432+
$headers = $headers->getNativeData();
433+
434+
$properties = [];
435+
if (isset($headers['application_headers'])) {
436+
$properties = $headers['application_headers'];
437+
}
438+
unset($headers['application_headers']);
439+
440+
$message = new AmqpMessage($amqpMessage->getBody(), $properties, $headers);
441+
$message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']);
442+
$message->setRedelivered($amqpMessage->delivery_info['redelivered']);
443+
$message->setRoutingKey($amqpMessage->delivery_info['routing_key']);
444+
445+
return $message;
446+
}
361447
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib;
4+
5+
class StopBasicConsumptionException extends \LogicException
6+
{
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeBreakOnFalseSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeBreakOnFalseTest extends BasicConsumeBreakOnFalseSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeFromAllSubscribedQueuesSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeFromAllSubscribedQueuesTest extends BasicConsumeFromAllSubscribedQueuesSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldAddConsumerTagOnSubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldAddConsumerTagOnSubscribeTest extends BasicConsumeShouldAddConsumerTagOnSubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldRemoveConsumerTagOnUnsubscribeTest extends BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeUntilUnsubscribedSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeUntilUnsubscribedTest extends BasicConsumeUntilUnsubscribedSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}

pkg/enqueue/Consumption/QueueConsumer.php

+8
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@ public function consume(ExtensionInterface $runtimeExtension = null)
221221
} catch (ConsumptionInterruptedException $e) {
222222
$logger->info(sprintf('Consuming interrupted'));
223223

224+
if ($this->psrContext instanceof AmqpContext) {
225+
foreach ($consumers as $consumer) {
226+
/* @var AmqpConsumer $consumer */
227+
228+
$this->psrContext->unsubscribe($consumer);
229+
}
230+
}
231+
224232
$context->setExecutionInterrupted(true);
225233

226234
$extension->onInterrupted($context);

0 commit comments

Comments
 (0)