Skip to content

Commit d58622b

Browse files
committed
multi client configuration
1 parent c465116 commit d58622b

File tree

10 files changed

+366
-98
lines changed

10 files changed

+366
-98
lines changed

pkg/enqueue-bundle/DependencyInjection/Configuration.php

+74-48
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Bundle\DependencyInjection;
44

5+
use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory;
56
use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
67
use Enqueue\Symfony\DependencyInjection\TransportFactory;
78
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
@@ -26,8 +27,8 @@ public function getConfigTreeBuilder(): TreeBuilder
2627
->always(function ($value) {
2728
if (empty($value)) {
2829
return [
29-
'transport' => [
30-
'default' => [
30+
'default' => [
31+
'transport' => [
3132
'dsn' => 'null:',
3233
],
3334
],
@@ -36,8 +37,8 @@ public function getConfigTreeBuilder(): TreeBuilder
3637

3738
if (is_string($value)) {
3839
return [
39-
'transport' => [
40-
'default' => [
40+
'default' => [
41+
'transport' => [
4142
'dsn' => $value,
4243
],
4344
],
@@ -50,57 +51,82 @@ public function getConfigTreeBuilder(): TreeBuilder
5051

5152
$transportFactory = new TransportFactory('default');
5253

53-
/** @var ArrayNodeDefinition $transportNode */
54-
$transportNode = $rootNode->children()->arrayNode('transport');
55-
$transportNode
56-
->beforeNormalization()
57-
->always(function ($value) {
58-
if (empty($value)) {
59-
return ['default' => ['dsn' => 'null:']];
60-
}
61-
if (is_string($value)) {
62-
return ['default' => ['dsn' => $value]];
63-
}
64-
65-
if (is_array($value) && array_key_exists('dsn', $value)) {
66-
return ['default' => $value];
67-
}
68-
69-
return $value;
70-
});
71-
$transportPrototypeNode = $transportNode
72-
->requiresAtLeastOneElement()
73-
->useAttributeAsKey('key')
74-
->prototype('array')
75-
;
54+
$transportConfig = $transportFactory->getConfiguration('transport');
55+
$transportConfig->isRequired();
7656

77-
$transportFactory->addTransportConfiguration($transportPrototypeNode);
57+
$consumerConfig = $transportFactory->getQueueConsumerConfiguration('consumption');
7858

79-
$consumptionNode = $rootNode->children()->arrayNode('consumption');
80-
$transportFactory->addQueueConsumerConfiguration($consumptionNode);
59+
$clientConfig = (new ClientFactory('default'))->getConfiguration('client', $this->debug);
8160

82-
$clientFactory = new ClientFactory('default');
83-
$clientNode = $rootNode->children()->arrayNode('client');
84-
$clientFactory->addClientConfiguration($clientNode, $this->debug);
61+
$monitoringConfig = (new MonitoringFactory('default'))->getConfiguration('monitoring');
8562

86-
$rootNode->children()
87-
->booleanNode('job')->defaultFalse()->end()
88-
->arrayNode('async_events')
89-
->addDefaultsIfNotSet()
90-
->canBeEnabled()
91-
->end()
92-
->arrayNode('async_commands')
93-
->addDefaultsIfNotSet()
94-
->canBeEnabled()
63+
$rootNode
64+
->requiresAtLeastOneElement()
65+
->useAttributeAsKey('key')
66+
->arrayPrototype()
67+
->children()
68+
->append($transportConfig)
69+
->append($consumerConfig)
70+
->append($clientConfig)
71+
->append($monitoringConfig)
72+
->end()
9573
->end()
96-
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
97-
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
98-
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
99-
->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
100-
->booleanNode('reply_extension')->defaultTrue()->end()
101-
->end()->end()
10274
;
10375

76+
77+
// $transportFactory = new TransportFactory('default');
78+
//
79+
// /** @var ArrayNodeDefinition $transportNode */
80+
// $transportNode = $rootNode->children()->arrayNode('transport');
81+
// $transportNode
82+
// ->beforeNormalization()
83+
// ->always(function ($value) {
84+
// if (empty($value)) {
85+
// return ['default' => ['dsn' => 'null:']];
86+
// }
87+
// if (is_string($value)) {
88+
// return ['default' => ['dsn' => $value]];
89+
// }
90+
//
91+
// if (is_array($value) && array_key_exists('dsn', $value)) {
92+
// return ['default' => $value];
93+
// }
94+
//
95+
// return $value;
96+
// });
97+
// $transportPrototypeNode = $transportNode
98+
// ->requiresAtLeastOneElement()
99+
// ->useAttributeAsKey('key')
100+
// ->prototype('array')
101+
// ;
102+
//
103+
// $transportFactory->addTransportConfiguration($transportPrototypeNode);
104+
//
105+
// $consumptionNode = $rootNode->children()->arrayNode('consumption');
106+
// $transportFactory->addQueueConsumerConfiguration($consumptionNode);
107+
//
108+
// $clientFactory = new ClientFactory('default');
109+
// $clientNode = $rootNode->children()->arrayNode('client');
110+
// $clientFactory->addClientConfiguration($clientNode, $this->debug);
111+
//
112+
// $rootNode->children()
113+
// ->booleanNode('job')->defaultFalse()->end()
114+
// ->arrayNode('async_events')
115+
// ->addDefaultsIfNotSet()
116+
// ->canBeEnabled()
117+
// ->end()
118+
// ->arrayNode('async_commands')
119+
// ->addDefaultsIfNotSet()
120+
// ->canBeEnabled()
121+
// ->end()
122+
// ->arrayNode('extensions')->addDefaultsIfNotSet()->children()
123+
// ->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
124+
// ->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
125+
// ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
126+
// ->booleanNode('reply_extension')->defaultTrue()->end()
127+
// ->end()->end()
128+
// ;
129+
//
104130
return $tb;
105131
}
106132
}

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+53-21
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,60 @@ public function load(array $configs, ContainerBuilder $container): void
2525
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
2626
$loader->load('services.yml');
2727

28-
foreach ($config['transport'] as $name => $transportConfig) {
28+
// find default configuration
29+
$defaultName = null;
30+
foreach ($config as $name => $configs) {
31+
// set first as default
32+
if (null === $defaultName) {
33+
$defaultName = $name;
34+
}
35+
36+
// or with name 'default'
37+
if ('default' === $name) {
38+
$defaultName = $name;
39+
}
40+
}
41+
42+
$transportNames = [];
43+
$clientNames = [];
44+
foreach ($config as $name => $configs) {
45+
// transport & consumption
46+
$transportNames[] = $name;
47+
2948
$transportFactory = (new TransportFactory($name));
30-
$transportFactory->buildConnectionFactory($container, $transportConfig);
49+
$transportFactory->buildConnectionFactory($container, $config['transport']);
3150
$transportFactory->buildContext($container, []);
3251
$transportFactory->buildQueueConsumer($container, $config['consumption']);
3352
$transportFactory->buildRpcClient($container, []);
34-
}
3553

36-
$container->setParameter('enqueue.transports', array_keys($config['transport']));
54+
// client
55+
if (isset($configs['client'])) {
56+
$clientNames[] = $name;
3757

38-
if (isset($config['client'])) {
39-
$container->setParameter('enqueue.clients', ['default']);
58+
$clientConfig = $config['client'];
59+
// todo
60+
$clientConfig['transport'] = $config['transport'];
61+
$clientConfig['consumption'] = $config['consumption'];
4062

41-
$this->setupAutowiringForProcessors($container);
42-
43-
$loader->load('client.yml');
63+
$clientFactory = new ClientFactory($name);
64+
$clientFactory->build($container, $clientConfig, $defaultName === $name);
65+
$clientFactory->createDriver($container, $config['transport']);
66+
$clientFactory->createFlushSpoolProducerListener($container);
67+
}
68+
}
4469

45-
$clientConfig = $config['client'];
46-
// todo
47-
$clientConfig['transport'] = $config['transport']['default'];
48-
$clientConfig['consumption'] = $config['consumption'];
70+
$container->setParameter('enqueue.transports', $transportNames);
71+
$container->setParameter('enqueue.clients', $clientNames);
4972

50-
$clientFactory = new ClientFactory('default');
51-
$clientFactory->build($container, $clientConfig);
52-
$clientFactory->createDriver($container, $config['transport']['default']);
73+
if ($clientNames) {
74+
$this->setupAutowiringForProcessors($container, $clientNames);
5375
}
5476

77+
// @todo register MessageQueueCollector
78+
79+
return;
80+
81+
//
5582
if ($config['job']) {
5683
if (!class_exists(Job::class)) {
5784
throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.');
@@ -145,14 +172,19 @@ private function registerJobQueueDoctrineEntityMapping(ContainerBuilder $contain
145172
}
146173
}
147174

148-
private function setupAutowiringForProcessors(ContainerBuilder $container)
175+
private function setupAutowiringForProcessors(ContainerBuilder $container, array $clientNames)
149176
{
150-
$container->registerForAutoconfiguration(TopicSubscriberInterface::class)
177+
$topicSubscriber = $container->registerForAutoconfiguration(TopicSubscriberInterface::class)
151178
->setPublic(true)
152-
->addTag('enqueue.topic_subscriber', ['client' => 'default']);
179+
;
153180

154-
$container->registerForAutoconfiguration(CommandSubscriberInterface::class)
181+
$commandSubscriber = $container->registerForAutoconfiguration(CommandSubscriberInterface::class)
155182
->setPublic(true)
156-
->addTag('enqueue.command_subscriber', ['client' => 'default']);
183+
;
184+
185+
foreach ($clientNames as $clientName) {
186+
$topicSubscriber->addTag('enqueue.topic_subscriber', ['client' => $clientName]);
187+
$commandSubscriber->addTag('enqueue.command_subscriber', ['client' => $clientName]);
188+
}
157189
}
158190
}

pkg/enqueue-bundle/Resources/config/client.yml

-8
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,3 @@ services:
66
- '@enqueue.client.default.producer'
77
tags:
88
- { name: 'data_collector', template: '@Enqueue/Profiler/panel.html.twig', id: 'enqueue.message_queue' }
9-
10-
# todo
11-
enqueue.client.default.flush_spool_producer_listener:
12-
class: 'Enqueue\Symfony\Client\FlushSpoolProducerListener'
13-
arguments:
14-
- '@enqueue.client.default.spool_producer'
15-
tags:
16-
- { name: 'kernel.event_subscriber' }

pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php

+19-6
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension;
2121
use Enqueue\Consumption\QueueConsumer;
2222
use Enqueue\Rpc\RpcFactory;
23+
use Enqueue\Symfony\Client\FlushSpoolProducerListener;
2324
use Enqueue\Symfony\ContainerProcessorRegistry;
2425
use Interop\Queue\Context;
2526
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
27+
use Symfony\Component\Config\Definition\Builder\NodeDefinition;
2628
use Symfony\Component\DependencyInjection\ContainerBuilder;
2729
use Symfony\Component\DependencyInjection\ContainerInterface;
2830
use Symfony\Component\DependencyInjection\Reference;
@@ -48,22 +50,26 @@ public function __construct(string $name)
4850
$this->name = $name;
4951
}
5052

51-
public function addClientConfiguration(ArrayNodeDefinition $builder, bool $debug): void
53+
public function getConfiguration(string $name, bool $debug): NodeDefinition
5254
{
55+
$builder = new ArrayNodeDefinition($name);
56+
5357
$builder->children()
5458
->booleanNode('traceable_producer')->defaultValue($debug)->end()
5559
->scalarNode('prefix')->defaultValue('enqueue')->end()
5660
->scalarNode('app_name')->defaultValue('app')->end()
5761
->scalarNode('router_topic')->defaultValue('default')->cannotBeEmpty()->end()
5862
->scalarNode('router_queue')->defaultValue('default')->cannotBeEmpty()->end()
59-
->scalarNode('router_processor')->defaultValue($this->format('router_processor'))->end()
63+
->scalarNode('router_processor')->defaultNull()->end()
6064
->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end()
6165
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
6266
->end()->end()
6367
;
68+
69+
return $builder;
6470
}
6571

66-
public function build(ContainerBuilder $container, array $config): void
72+
public function build(ContainerBuilder $container, array $config, bool $default = false): void
6773
{
6874
$container->register($this->format('context'), Context::class)
6975
->setFactory([$this->reference('driver'), 'getContext'])
@@ -81,7 +87,7 @@ public function build(ContainerBuilder $container, array $config): void
8187
$config['router_topic'],
8288
$config['router_queue'],
8389
$config['default_processor_queue'],
84-
$config['router_processor'],
90+
isset($config['router_processor']) ? $config['router_processor'] : $this->format('router_processor'),
8591
// @todo should be driver options.
8692
$config['transport'],
8793
]);
@@ -180,9 +186,8 @@ public function build(ContainerBuilder $container, array $config): void
180186
]));
181187
}
182188

183-
if ('default' === $this->name) {
189+
if ($default) {
184190
$container->setAlias(ProducerInterface::class, $this->format('producer'));
185-
186191
$container->setAlias(SpoolProducer::class, $this->format('spool_producer'));
187192
}
188193
}
@@ -203,6 +208,14 @@ public function createDriver(ContainerBuilder $container, array $config): string
203208
return $driverId;
204209
}
205210

211+
public function createFlushSpoolProducerListener(ContainerBuilder $container): void
212+
{
213+
$container->register($this->format('flush_spool_producer_listener'), FlushSpoolProducerListener::class)
214+
->addArgument($this->reference('spool_producer'))
215+
->addTag('kernel.event_subscriber')
216+
;
217+
}
218+
206219
public function getName(): string
207220
{
208221
return $this->name;

pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php

+10-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Interop\Queue\ConnectionFactory;
1616
use Interop\Queue\Context;
1717
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
18+
use Symfony\Component\Config\Definition\Builder\NodeDefinition;
1819
use Symfony\Component\DependencyInjection\ContainerBuilder;
1920
use Symfony\Component\DependencyInjection\ContainerInterface;
2021
use Symfony\Component\DependencyInjection\Reference;
@@ -40,11 +41,12 @@ public function __construct(string $name)
4041
$this->name = $name;
4142
}
4243

43-
public function addTransportConfiguration(ArrayNodeDefinition $builder): void
44+
public function getConfiguration(string $name): NodeDefinition
4445
{
4546
$knownSchemes = array_keys(Resources::getKnownSchemes());
4647
$availableSchemes = array_keys(Resources::getAvailableSchemes());
4748

49+
$builder = new ArrayNodeDefinition($name);
4850
$builder
4951
->info('The transport option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at connection factory constructor docblock.')
5052
->beforeNormalization()
@@ -94,10 +96,14 @@ public function addTransportConfiguration(ArrayNodeDefinition $builder): void
9496
->end()
9597
->end()
9698
;
99+
100+
return $builder;
97101
}
98102

99-
public function addQueueConsumerConfiguration(ArrayNodeDefinition $builder): void
103+
public function getQueueConsumerConfiguration(string $name): ArrayNodeDefinition
100104
{
105+
$builder = new ArrayNodeDefinition($name);
106+
101107
$builder
102108
->addDefaultsIfNotSet()->children()
103109
->integerNode('receive_timeout')
@@ -106,6 +112,8 @@ public function addQueueConsumerConfiguration(ArrayNodeDefinition $builder): voi
106112
->info('the time in milliseconds queue consumer waits for a message (100 ms by default)')
107113
->end()
108114
;
115+
116+
return $builder;
109117
}
110118

111119
public function getName(): string

0 commit comments

Comments
 (0)