Skip to content

Commit 5c11db9

Browse files
committed
[client] Rework AmqpDriver
fixes php-enqueue/enqueue-dev#523 (for amqp)
1 parent 1e7c955 commit 5c11db9

File tree

2 files changed

+262
-318
lines changed

2 files changed

+262
-318
lines changed

Client/Driver/AmqpDriver.php

Lines changed: 77 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace Enqueue\Client\Driver;
46

57
use Enqueue\Client\Config;
6-
use Enqueue\Client\DriverInterface;
78
use Enqueue\Client\Message;
8-
use Enqueue\Client\Meta\QueueMetaRegistry;
9+
use Enqueue\Client\MessagePriority;
10+
use Enqueue\Client\RouteCollection;
911
use Interop\Amqp\AmqpContext;
1012
use Interop\Amqp\AmqpMessage;
1113
use Interop\Amqp\AmqpQueue;
1214
use Interop\Amqp\AmqpTopic;
1315
use Interop\Amqp\Impl\AmqpBind;
1416
use Interop\Queue\PsrMessage;
1517
use Interop\Queue\PsrQueue;
18+
use Interop\Queue\PsrTopic;
1619
use Psr\Log\LoggerInterface;
1720
use Psr\Log\NullLogger;
1821

19-
class AmqpDriver implements DriverInterface
22+
class AmqpDriver extends GenericDriver
2023
{
2124
/**
2225
* @var AmqpContext
@@ -29,43 +32,59 @@ class AmqpDriver implements DriverInterface
2932
private $config;
3033

3134
/**
32-
* @var QueueMetaRegistry
35+
* @var array
36+
*/
37+
private $priorityMap;
38+
39+
/**
40+
* @var RouteCollection
3341
*/
34-
private $queueMetaRegistry;
42+
private $routeCollection;
3543

36-
public function __construct(AmqpContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
44+
public function __construct(AmqpContext $context, Config $config, RouteCollection $routeCollection)
3745
{
3846
$this->context = $context;
3947
$this->config = $config;
40-
$this->queueMetaRegistry = $queueMetaRegistry;
41-
}
48+
$this->routeCollection = $routeCollection;
4249

43-
public function sendToRouter(Message $message): void
44-
{
45-
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
46-
throw new \LogicException('Topic name parameter is required but is not set');
47-
}
48-
49-
$topic = $this->createRouterTopic();
50-
$transportMessage = $this->createTransportMessage($message);
50+
$this->priorityMap = [
51+
MessagePriority::VERY_LOW => 0,
52+
MessagePriority::LOW => 1,
53+
MessagePriority::NORMAL => 2,
54+
MessagePriority::HIGH => 3,
55+
MessagePriority::VERY_HIGH => 4,
56+
];
5157

52-
$this->context->createProducer()->send($topic, $transportMessage);
58+
parent::__construct($context, $config, $routeCollection);
5359
}
5460

55-
public function sendToProcessor(Message $message): void
61+
/**
62+
* @return AmqpMessage
63+
*/
64+
public function createTransportMessage(Message $clientMessage): PsrMessage
5665
{
57-
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
58-
throw new \LogicException('Processor name parameter is required but is not set');
59-
}
66+
/** @var AmqpMessage $transportMessage */
67+
$transportMessage = parent::createTransportMessage($clientMessage);
68+
$transportMessage->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
69+
$transportMessage->setContentType($clientMessage->getContentType());
6070

61-
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
62-
throw new \LogicException('Queue name parameter is required but is not set');
71+
if ($clientMessage->getExpire()) {
72+
$transportMessage->setExpiration($clientMessage->getExpire() * 1000);
6373
}
6474

65-
$transportMessage = $this->createTransportMessage($message);
66-
$destination = $this->createQueue($queueName);
75+
if ($priority = $clientMessage->getPriority()) {
76+
if (false == array_key_exists($priority, $this->getPriorityMap())) {
77+
throw new \InvalidArgumentException(sprintf(
78+
'Cant convert client priority "%s" to transport one. Could be one of "%s"',
79+
$priority,
80+
implode('", "', array_keys($this->getPriorityMap()))
81+
));
82+
}
6783

68-
$this->context->createProducer()->send($destination, $transportMessage);
84+
$transportMessage->setPriority($this->priorityMap[$priority]);
85+
}
86+
87+
return $transportMessage;
6988
}
7089

7190
public function setupBroker(LoggerInterface $logger = null): void
@@ -77,100 +96,73 @@ public function setupBroker(LoggerInterface $logger = null): void
7796

7897
// setup router
7998
$routerTopic = $this->createRouterTopic();
80-
$routerQueue = $this->createQueue($this->config->getRouterQueueName());
81-
8299
$log('Declare router exchange: %s', $routerTopic->getTopicName());
83100
$this->context->declareTopic($routerTopic);
101+
102+
$routerQueue = $this->createQueue($this->config->getRouterQueueName());
84103
$log('Declare router queue: %s', $routerQueue->getQueueName());
85104
$this->context->declareQueue($routerQueue);
105+
86106
$log('Bind router queue to exchange: %s -> %s', $routerQueue->getQueueName(), $routerTopic->getTopicName());
87107
$this->context->bind(new AmqpBind($routerTopic, $routerQueue, $routerQueue->getQueueName()));
88108

89109
// setup queues
90-
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
91-
$queue = $this->createQueue($meta->getClientName());
110+
$declaredQueues = [];
111+
foreach ($this->routeCollection->all() as $route) {
112+
/** @var AmqpQueue $queue */
113+
$queue = $this->createRouteQueue($route);
114+
if (array_key_exists($queue->getQueueName(), $declaredQueues)) {
115+
continue;
116+
}
92117

93118
$log('Declare processor queue: %s', $queue->getQueueName());
94119
$this->context->declareQueue($queue);
120+
121+
$declaredQueues[$queue->getQueueName()] = true;
95122
}
96123
}
97124

98125
/**
99126
* @return AmqpQueue
100127
*/
101-
public function createQueue(string $queueName): PsrQueue
128+
public function createQueue(string $clientQueuName): PsrQueue
102129
{
103-
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();
104-
105-
$queue = $this->context->createQueue($transportName);
130+
/** @var AmqpQueue $queue */
131+
$queue = parent::createQueue($clientQueuName);
106132
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
107133

108134
return $queue;
109135
}
110136

111137
/**
112-
* @return AmqpMessage
138+
* @param AmqpTopic $topic
139+
* @param AmqpMessage $transportMessage
113140
*/
114-
public function createTransportMessage(Message $message): PsrMessage
141+
protected function doSendToRouter(PsrTopic $topic, PsrMessage $transportMessage): void
115142
{
116-
$headers = $message->getHeaders();
117-
$properties = $message->getProperties();
118-
119-
$transportMessage = $this->context->createMessage();
120-
$transportMessage->setBody($message->getBody());
121-
$transportMessage->setHeaders($headers);
122-
$transportMessage->setProperties($properties);
123-
$transportMessage->setMessageId($message->getMessageId());
124-
$transportMessage->setTimestamp($message->getTimestamp());
125-
$transportMessage->setReplyTo($message->getReplyTo());
126-
$transportMessage->setCorrelationId($message->getCorrelationId());
127-
$transportMessage->setContentType($message->getContentType());
128-
$transportMessage->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT);
129-
130-
if ($message->getExpire()) {
131-
$transportMessage->setExpiration($message->getExpire() * 1000);
132-
}
143+
// We should not handle priority, expiration, and delay at this stage.
144+
// The router will take care of it while re-sending the message to the final destinations.
145+
$transportMessage->setPriority(null);
146+
$transportMessage->setExpiration(null);
133147

134-
return $transportMessage;
148+
$this->context->createProducer()->send($topic, $transportMessage);
135149
}
136150

137151
/**
138-
* @param AmqpMessage $message
152+
* @return AmqpTopic
139153
*/
140-
public function createClientMessage(PsrMessage $message): Message
141-
{
142-
$clientMessage = new Message();
143-
144-
$clientMessage->setBody($message->getBody());
145-
$clientMessage->setHeaders($message->getHeaders());
146-
$clientMessage->setProperties($message->getProperties());
147-
$clientMessage->setContentType($message->getContentType());
148-
149-
if ($expiration = $message->getExpiration()) {
150-
$clientMessage->setExpire((int) ($expiration / 1000));
151-
}
152-
153-
$clientMessage->setMessageId($message->getMessageId());
154-
$clientMessage->setTimestamp($message->getTimestamp());
155-
$clientMessage->setReplyTo($message->getReplyTo());
156-
$clientMessage->setCorrelationId($message->getCorrelationId());
157-
158-
return $clientMessage;
159-
}
160-
161-
public function getConfig(): Config
154+
protected function createRouterTopic(): PsrTopic
162155
{
163-
return $this->config;
164-
}
165-
166-
private function createRouterTopic(): AmqpTopic
167-
{
168-
$topic = $this->context->createTopic(
169-
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName())
170-
);
156+
/** @var AmqpTopic $topic */
157+
$topic = parent::createRouterTopic();
171158
$topic->setType(AmqpTopic::TYPE_FANOUT);
172159
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
173160

174161
return $topic;
175162
}
163+
164+
protected function getPriorityMap(): array
165+
{
166+
return $this->priorityMap;
167+
}
176168
}

0 commit comments

Comments
 (0)