Skip to content

Commit 08939e1

Browse files
committed
[client] Introduce GenericDriver.
1 parent ed4c6d8 commit 08939e1

File tree

2 files changed

+913
-0
lines changed

2 files changed

+913
-0
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Client\Driver;
6+
7+
use Enqueue\Client\Config;
8+
use Enqueue\Client\DriverInterface;
9+
use Enqueue\Client\Message;
10+
use Enqueue\Client\MessagePriority;
11+
use Enqueue\Client\Route;
12+
use Enqueue\Client\RouteCollection;
13+
use Interop\Queue\PsrContext;
14+
use Interop\Queue\PsrMessage;
15+
use Interop\Queue\PsrQueue;
16+
use Interop\Queue\PsrTopic;
17+
use Psr\Log\LoggerInterface;
18+
19+
class GenericDriver implements DriverInterface
20+
{
21+
/**
22+
* @var PsrContext
23+
*/
24+
private $context;
25+
26+
/**
27+
* @var Config
28+
*/
29+
private $config;
30+
31+
/**
32+
* @var RouteCollection
33+
*/
34+
private $routeCollection;
35+
36+
public function __construct(
37+
PsrContext $context,
38+
Config $config,
39+
RouteCollection $routeCollection
40+
) {
41+
$this->context = $context;
42+
$this->config = $config;
43+
$this->routeCollection = $routeCollection;
44+
}
45+
46+
public function sendToRouter(Message $message): void
47+
{
48+
if ($message->getProperty(Config::PARAMETER_COMMAND_NAME)) {
49+
throw new \LogicException('Command must not be send to router but go directly to its processor.');
50+
}
51+
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
52+
throw new \LogicException('Topic name parameter is required but is not set');
53+
}
54+
55+
$topic = $this->createRouterTopic();
56+
$transportMessage = $this->createTransportMessage($message);
57+
58+
$this->doSendToRouter($topic, $transportMessage);
59+
}
60+
61+
public function sendToProcessor(Message $message): void
62+
{
63+
$processor = $message->getProperty(Config::PARAMETER_PROCESSOR_NAME);
64+
if (false == $processor) {
65+
throw new \LogicException('Processor name parameter is required but is not set');
66+
}
67+
68+
$topic = $message->getProperty(Config::PARAMETER_TOPIC_NAME);
69+
$command = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
70+
71+
/** @var Route $route */
72+
$route = null;
73+
if ($topic) {
74+
$route = $this->routeCollection->topicAndProcessor($topic, $processor);
75+
if (false == $route) {
76+
throw new \LogicException(sprintf('There is no route for topic "%s" and processor "%s"', $topic, $processor));
77+
}
78+
} elseif ($command) {
79+
$route = $this->routeCollection->command($command);
80+
if (false == $route) {
81+
throw new \LogicException(sprintf('There is no route for command "%s" and processor "%s"', $command, $processor));
82+
}
83+
84+
if ($processor !== $route->getProcessor()) {
85+
throw new \LogicException(sprintf('The command "%s" route was found but processors do not match. Given "%s", route "%s"', $command, $processor, $route->getProcessor()));
86+
}
87+
} else {
88+
throw new \LogicException('Either topic or command parameter must be set.');
89+
}
90+
91+
$transportMessage = $this->createTransportMessage($message);
92+
$queue = $this->createRouteQueue($route);
93+
94+
$this->doSendToProcessor($queue, $transportMessage);
95+
}
96+
97+
public function setupBroker(LoggerInterface $logger = null): void
98+
{
99+
}
100+
101+
public function createQueue(string $clientQueueName): PsrQueue
102+
{
103+
$transportName = $this->createTransportQueueName($clientQueueName, true);
104+
105+
return $this->context->createQueue($transportName);
106+
}
107+
108+
public function createTransportMessage(Message $clientMessage): PsrMessage
109+
{
110+
$headers = $clientMessage->getHeaders();
111+
$properties = $clientMessage->getProperties();
112+
113+
$transportMessage = $this->context->createMessage();
114+
$transportMessage->setBody($clientMessage->getBody());
115+
$transportMessage->setHeaders($headers);
116+
$transportMessage->setProperties($properties);
117+
$transportMessage->setMessageId($clientMessage->getMessageId());
118+
$transportMessage->setTimestamp($clientMessage->getTimestamp());
119+
$transportMessage->setReplyTo($clientMessage->getReplyTo());
120+
$transportMessage->setCorrelationId($clientMessage->getCorrelationId());
121+
122+
if ($contentType = $clientMessage->getContentType()) {
123+
$transportMessage->setProperty('X-Enqueue-Content-Type', $contentType);
124+
}
125+
126+
if ($priority = $clientMessage->getPriority()) {
127+
$transportMessage->setProperty('X-Enqueue-Priority', $priority);
128+
}
129+
130+
if ($expire = $clientMessage->getExpire()) {
131+
$transportMessage->setProperty('X-Enqueue-Expire', $expire);
132+
}
133+
134+
if ($delay = $clientMessage->getDelay()) {
135+
$transportMessage->setProperty('X-Enqueue-Delay', $delay);
136+
}
137+
138+
return $transportMessage;
139+
}
140+
141+
public function createClientMessage(PsrMessage $transportMessage): Message
142+
{
143+
$clientMessage = new Message();
144+
145+
$clientMessage->setBody($transportMessage->getBody());
146+
$clientMessage->setHeaders($transportMessage->getHeaders());
147+
$clientMessage->setProperties($transportMessage->getProperties());
148+
149+
$clientMessage->setMessageId($transportMessage->getMessageId());
150+
$clientMessage->setTimestamp($transportMessage->getTimestamp());
151+
$clientMessage->setPriority(MessagePriority::NORMAL);
152+
$clientMessage->setReplyTo($transportMessage->getReplyTo());
153+
$clientMessage->setCorrelationId($transportMessage->getCorrelationId());
154+
155+
if ($contentType = $transportMessage->getProperty('X-Enqueue-Content-Type')) {
156+
$clientMessage->setContentType($contentType);
157+
}
158+
159+
if ($priority = $transportMessage->getProperty('X-Enqueue-Priority')) {
160+
$clientMessage->setPriority($priority);
161+
}
162+
163+
if ($delay = $transportMessage->getProperty('X-Enqueue-Delay')) {
164+
$clientMessage->setDelay((int) $delay);
165+
}
166+
167+
if ($expire = $transportMessage->getProperty('X-Enqueue-Expire')) {
168+
$clientMessage->setExpire((int) $expire);
169+
}
170+
171+
return $clientMessage;
172+
}
173+
174+
public function getConfig(): Config
175+
{
176+
return $this->config;
177+
}
178+
179+
public function getContext(): PsrContext
180+
{
181+
return $this->context;
182+
}
183+
184+
public function getRouteCollection(): RouteCollection
185+
{
186+
return $this->routeCollection;
187+
}
188+
189+
protected function doSendToRouter(PsrTopic $topic, PsrMessage $transportMessage): void
190+
{
191+
$this->context->createProducer()->send($topic, $transportMessage);
192+
}
193+
194+
protected function doSendToProcessor(PsrQueue $queue, PsrMessage $transportMessage): void
195+
{
196+
$this->context->createProducer()->send($queue, $transportMessage);
197+
}
198+
199+
protected function createRouterTopic(): PsrTopic
200+
{
201+
return $this->context->createTopic(
202+
$this->createTransportRouterTopicName($this->config->getRouterTopicName(), true)
203+
);
204+
}
205+
206+
protected function createRouteQueue(Route $route): PsrQueue
207+
{
208+
$transportName = $this->createTransportQueueName(
209+
$route->getQueue() ?: $this->config->getDefaultProcessorQueueName(),
210+
$route->isPrefixQueue()
211+
);
212+
213+
return $this->context->createQueue($transportName);
214+
}
215+
216+
protected function createTransportRouterTopicName(string $name, bool $prefix): string
217+
{
218+
$clientPrefix = $prefix ? $this->config->getPrefix() : '';
219+
220+
return strtolower(implode($this->config->getSeparator(), array_filter([$clientPrefix, $name])));
221+
}
222+
223+
protected function createTransportQueueName(string $name, bool $prefix): string
224+
{
225+
$clientPrefix = $prefix ? $this->config->getPrefix() : '';
226+
$clientAppName = $prefix ? $this->config->getAppName() : '';
227+
228+
return strtolower(implode($this->config->getSeparator(), array_filter([$clientPrefix, $clientAppName, $name])));
229+
}
230+
}

0 commit comments

Comments
 (0)