Skip to content

Commit 3d503b3

Browse files
committed
Rework exclusive command extension.
1 parent 9a77e1b commit 3d503b3

File tree

7 files changed

+149
-285
lines changed

7 files changed

+149
-285
lines changed

pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExclusiveCommandsExtensionPass.php

Lines changed: 0 additions & 49 deletions
This file was deleted.

pkg/enqueue-bundle/EnqueueBundle.php

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventsPass;
77
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass;
88
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
9-
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
109
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
11-
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
12-
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
1310
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1411
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
12+
use Enqueue\Symfony\DependencyInjection\AnalyzeRouteCollectionPass;
13+
use Enqueue\Symfony\DependencyInjection\BuildCommandSubscriberRoutesPass;
14+
use Enqueue\Symfony\DependencyInjection\BuildProcessorRegistryPass;
15+
use Enqueue\Symfony\DependencyInjection\BuildProcessorRoutesPass;
16+
use Enqueue\Symfony\DependencyInjection\BuildTopicSubscriberRoutesPass;
1517
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
1618
use Symfony\Component\DependencyInjection\ContainerBuilder;
1719
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -21,12 +23,15 @@ class EnqueueBundle extends Bundle
2123
public function build(ContainerBuilder $container): void
2224
{
2325
$container->addCompilerPass(new BuildConsumptionExtensionsPass());
24-
$container->addCompilerPass(new BuildClientRoutingPass());
25-
$container->addCompilerPass(new BuildProcessorRegistryPass());
2626
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
2727
$container->addCompilerPass(new BuildQueueMetaRegistryPass());
2828
$container->addCompilerPass(new BuildClientExtensionsPass());
29-
$container->addCompilerPass(new BuildExclusiveCommandsExtensionPass());
29+
30+
$container->addCompilerPass(new BuildTopicSubscriberRoutesPass('default'), 100);
31+
$container->addCompilerPass(new BuildCommandSubscriberRoutesPass('default'), 100);
32+
$container->addCompilerPass(new BuildProcessorRoutesPass('default'), 100);
33+
$container->addCompilerPass(new AnalyzeRouteCollectionPass('default'), 30);
34+
$container->addCompilerPass(new BuildProcessorRegistryPass('default'));
3035

3136
if (class_exists(AsyncEventDispatcherExtension::class)) {
3237
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);

pkg/enqueue-bundle/Resources/config/client.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,9 @@ services:
219219
enqueue.flush_spool_producer_listener:
220220
public: true
221221
alias: 'Enqueue\Symfony\Client\FlushSpoolProducerListener'
222+
223+
enqueue.client.default.exclusive_command_extension:
224+
class: 'Enqueue\Client\ConsumptionExtension\ExclusiveCommandExtension'
225+
arguments: '@enqueue.client.default.driver'
226+
tags:
227+
- { name: 'enqueue.client.extension' }

pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExclusiveCommandsExtensionPassTest.php

Lines changed: 0 additions & 105 deletions
This file was deleted.

pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
namespace Enqueue\Bundle\Tests\Unit;
44

55
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
6-
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
76
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
8-
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
97
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
108
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
119
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
@@ -37,11 +35,6 @@ public function testShouldRegisterExpectedCompilerPasses()
3735
->method('addCompilerPass')
3836
->with($this->isInstanceOf(BuildConsumptionExtensionsPass::class))
3937
;
40-
$container
41-
->expects($this->at(1))
42-
->method('addCompilerPass')
43-
->with($this->isInstanceOf(BuildClientRoutingPass::class))
44-
;
4538
$container
4639
->expects($this->at(2))
4740
->method('addCompilerPass')
@@ -62,11 +55,6 @@ public function testShouldRegisterExpectedCompilerPasses()
6255
->method('addCompilerPass')
6356
->with($this->isInstanceOf(BuildClientExtensionsPass::class))
6457
;
65-
$container
66-
->expects($this->at(6))
67-
->method('addCompilerPass')
68-
->with($this->isInstanceOf(BuildExclusiveCommandsExtensionPass::class))
69-
;
7058

7159
$bundle = new EnqueueBundle();
7260
$bundle->build($container);

pkg/enqueue/Client/ConsumptionExtension/ExclusiveCommandExtension.php

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,29 @@
33
namespace Enqueue\Client\ConsumptionExtension;
44

55
use Enqueue\Client\Config;
6-
use Enqueue\Client\EmptyExtensionTrait as ClientEmptyExtensionTrait;
7-
use Enqueue\Client\ExtensionInterface as ClientExtensionInterface;
8-
use Enqueue\Client\PreSend;
6+
use Enqueue\Client\DriverInterface;
7+
use Enqueue\Client\Route;
98
use Enqueue\Consumption\Context;
109
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
1110
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
1211

13-
final class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
12+
final class ExclusiveCommandExtension implements ConsumptionExtensionInterface
1413
{
15-
use ConsumptionEmptyExtensionTrait, ClientEmptyExtensionTrait;
14+
use ConsumptionEmptyExtensionTrait;
1615

1716
/**
18-
* @var string[]
17+
* @var DriverInterface
1918
*/
20-
private $queueNameToProcessorNameMap;
19+
private $driver;
2120

2221
/**
23-
* @var string[]
22+
* @var Route[]
2423
*/
25-
private $processorNameToQueueNameMap;
24+
private $queueToRouteMap;
2625

27-
/**
28-
* @param string[] $queueNameToProcessorNameMap
29-
*/
30-
public function __construct(array $queueNameToProcessorNameMap)
26+
public function __construct(DriverInterface $driver)
3127
{
32-
$this->queueNameToProcessorNameMap = $queueNameToProcessorNameMap;
33-
$this->processorNameToQueueNameMap = array_flip($queueNameToProcessorNameMap);
28+
$this->driver = $driver;
3429
}
3530

3631
public function onPreReceived(Context $context)
@@ -41,34 +36,46 @@ public function onPreReceived(Context $context)
4136
if ($message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
4237
return;
4338
}
44-
if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
39+
if ($message->getProperty(Config::PARAMETER_COMMAND_NAME)) {
4540
return;
4641
}
4742
if ($message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
4843
return;
4944
}
50-
if ($message->getProperty(Config::PARAMETER_COMMAND_NAME)) {
51-
return;
45+
46+
if (null === $this->queueToRouteMap) {
47+
$this->queueToRouteMap = $this->buildMap();
5248
}
5349

54-
if (array_key_exists($queue->getQueueName(), $this->queueNameToProcessorNameMap)) {
50+
if (array_key_exists($queue->getQueueName(), $this->queueToRouteMap)) {
5551
$context->getLogger()->debug('[ExclusiveCommandExtension] This is a exclusive command queue and client\'s properties are not set. Setting them');
5652

57-
$message->setProperty(Config::PARAMETER_TOPIC_NAME, Config::COMMAND_TOPIC);
58-
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $queue->getQueueName());
59-
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->queueNameToProcessorNameMap[$queue->getQueueName()]);
60-
$message->setProperty(Config::PARAMETER_COMMAND_NAME, $this->queueNameToProcessorNameMap[$queue->getQueueName()]);
53+
$route = $this->queueToRouteMap[$queue->getQueueName()];
54+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $route->getProcessor());
55+
$message->setProperty(Config::PARAMETER_COMMAND_NAME, $route->getSource());
6156
}
6257
}
6358

64-
public function onPreSendCommand(PreSend $context): void
59+
private function buildMap(): array
6560
{
66-
$message = $context->getMessage();
67-
$command = $context->getCommand();
61+
$map = [];
62+
foreach ($this->driver->getRouteCollection()->all() as $route) {
63+
if (false == $route->isCommand()) {
64+
continue;
65+
}
66+
67+
if (false == $route->isProcessorExclusive()) {
68+
continue;
69+
}
6870

69-
if (array_key_exists($command, $this->processorNameToQueueNameMap)) {
70-
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command);
71-
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$command]);
71+
$queueName = $this->driver->createQueue($route->getQueue())->getQueueName();
72+
if (array_key_exists($queueName, $map)) {
73+
throw new \LogicException('The queue name has been already bound by another exclusive command processor');
74+
}
75+
76+
$map[$queueName] = $route;
7277
}
78+
79+
return $map;
7380
}
7481
}

0 commit comments

Comments
 (0)