Skip to content

Commit 7ae7fc3

Browse files
authored
Merge pull request php-enqueue#611 from php-enqueue/fix-client-consume-command
[client][bundle] Take queue prefix into account while queue binding.
2 parents da0b24d + 95d6b72 commit 7ae7fc3

File tree

4 files changed

+189
-30
lines changed

4 files changed

+189
-30
lines changed

pkg/enqueue/Symfony/Client/ConsumeCommand.php

+31-14
Original file line numberDiff line numberDiff line change
@@ -96,26 +96,43 @@ protected function execute(InputInterface $input, OutputInterface $output): ?int
9696

9797
$this->setQueueConsumerOptions($consumer, $input);
9898

99-
$clientQueueNames = $input->getArgument('client-queue-names');
100-
if (empty($clientQueueNames)) {
101-
$clientQueueNames[$driver->getConfig()->getDefaultQueue()] = true;
102-
$clientQueueNames[$driver->getConfig()->getRouterQueue()] = true;
103-
104-
foreach ($driver->getRouteCollection()->all() as $route) {
105-
if ($route->getQueue()) {
106-
$clientQueueNames[$route->getQueue()] = true;
107-
}
99+
$allQueues[$driver->getConfig()->getDefaultQueue()] = true;
100+
$allQueues[$driver->getConfig()->getRouterQueue()] = true;
101+
foreach ($driver->getRouteCollection()->all() as $route) {
102+
if (false == $route->getQueue()) {
103+
continue;
104+
}
105+
if ($route->isProcessorExternal()) {
106+
continue;
108107
}
109108

110-
foreach ($input->getOption('skip') as $skipClientQueueName) {
111-
unset($clientQueueNames[$skipClientQueueName]);
109+
$allQueues[$route->getQueue()] = $route->isPrefixQueue();
110+
}
111+
112+
$selectedQueues = $input->getArgument('client-queue-names');
113+
if (empty($selectedQueues)) {
114+
$queues = $allQueues;
115+
} else {
116+
$queues = [];
117+
foreach ($selectedQueues as $queue) {
118+
if (false == array_key_exists($queue, $allQueues)) {
119+
throw new \LogicException(sprintf(
120+
'There is no such queue "%s". Available are "%s"',
121+
$queue,
122+
implode('", "', array_keys($allQueues))
123+
));
124+
}
125+
126+
$queues[$queue] = $allQueues[$queue];
112127
}
128+
}
113129

114-
$clientQueueNames = array_keys($clientQueueNames);
130+
foreach ($input->getOption('skip') as $skipQueue) {
131+
unset($queues[$skipQueue]);
115132
}
116133

117-
foreach ($clientQueueNames as $clientQueueName) {
118-
$queue = $driver->createQueue($clientQueueName);
134+
foreach ($queues as $queue => $prefix) {
135+
$queue = $driver->createQueue($queue, $prefix);
119136
$consumer->bind($queue, $processor);
120137
}
121138

pkg/enqueue/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPass.php

+38-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public function process(ContainerBuilder $container): void
3131

3232
$this->exclusiveCommandsCouldNotBeRunOnDefaultQueue($collection);
3333
$this->exclusiveCommandProcessorMustBeSingleOnGivenQueue($collection);
34+
$this->customQueueNamesUnique($collection);
35+
$this->defaultQueueMustBePrefixed($collection);
3436
}
3537
}
3638

@@ -39,7 +41,7 @@ protected function getName(): string
3941
return $this->name;
4042
}
4143

42-
private function exclusiveCommandsCouldNotBeRunOnDefaultQueue(RouteCollection $collection)
44+
private function exclusiveCommandsCouldNotBeRunOnDefaultQueue(RouteCollection $collection): void
4345
{
4446
foreach ($collection->all() as $route) {
4547
if ($route->isCommand() && $route->isProcessorExclusive() && false == $route->getQueue()) {
@@ -52,7 +54,7 @@ private function exclusiveCommandsCouldNotBeRunOnDefaultQueue(RouteCollection $c
5254
}
5355
}
5456

55-
private function exclusiveCommandProcessorMustBeSingleOnGivenQueue(RouteCollection $collection)
57+
private function exclusiveCommandProcessorMustBeSingleOnGivenQueue(RouteCollection $collection): void
5658
{
5759
$prefixedQueues = [];
5860
$queues = [];
@@ -91,4 +93,38 @@ private function exclusiveCommandProcessorMustBeSingleOnGivenQueue(RouteCollecti
9193
}
9294
}
9395
}
96+
97+
private function defaultQueueMustBePrefixed(RouteCollection $collection): void
98+
{
99+
foreach ($collection->all() as $route) {
100+
if (false == $route->getQueue() && false == $route->isPrefixQueue()) {
101+
throw new \LogicException('The default queue must be prefixed.');
102+
}
103+
}
104+
}
105+
106+
private function customQueueNamesUnique(RouteCollection $collection): void
107+
{
108+
$prefixedQueues = [];
109+
$notPrefixedQueues = [];
110+
111+
foreach ($collection->all() as $route) {
112+
//default queue
113+
$queueName = $route->getQueue();
114+
if (false == $queueName) {
115+
return;
116+
}
117+
118+
$route->isPrefixQueue() ?
119+
$prefixedQueues[$queueName] = $queueName :
120+
$notPrefixedQueues[$queueName] = $queueName
121+
;
122+
}
123+
124+
foreach ($notPrefixedQueues as $queueName) {
125+
if (array_key_exists($queueName, $prefixedQueues)) {
126+
throw new \LogicException(sprintf('There are prefixed and not prefixed queue with the same name "%s". This is not allowed.', $queueName));
127+
}
128+
}
129+
}
94130
}

pkg/enqueue/Tests/Symfony/Client/ConsumeCommandTest.php

+95-11
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public function testShouldBindDefaultQueueOnly()
9696
$driver
9797
->expects($this->once())
9898
->method('createQueue')
99-
->with('default')
99+
->with('default', true)
100100
->willReturn($queue)
101101
;
102102

@@ -153,7 +153,7 @@ public function testShouldUseRequestedClient()
153153
$fooDriver
154154
->expects($this->once())
155155
->method('createQueue')
156-
->with('default')
156+
->with('default', true)
157157
->willReturn($queue)
158158
;
159159

@@ -235,7 +235,7 @@ public function testShouldBindDefaultQueueIfRouteUseDifferentQueue()
235235
$driver
236236
->expects($this->once())
237237
->method('createQueue')
238-
->with('default')
238+
->with('default', true)
239239
->willReturn($queue)
240240
;
241241

@@ -264,13 +264,13 @@ public function testShouldBindCustomExecuteConsumptionAndUseCustomClientDestinat
264264
$driver
265265
->expects($this->at(3))
266266
->method('createQueue')
267-
->with('default')
267+
->with('default', true)
268268
->willReturn($defaultQueue)
269269
;
270270
$driver
271271
->expects($this->at(4))
272272
->method('createQueue')
273-
->with('custom')
273+
->with('custom', true)
274274
->willReturn($customQueue)
275275
;
276276

@@ -307,6 +307,7 @@ public function testShouldBindUserProvidedQueues()
307307

308308
$routeCollection = new RouteCollection([
309309
new Route('topic', Route::TOPIC, 'processor', ['queue' => 'custom']),
310+
new Route('topic', Route::TOPIC, 'processor', ['queue' => 'non-default-queue']),
310311
]);
311312

312313
$processor = $this->createDelegateProcessorMock();
@@ -315,7 +316,7 @@ public function testShouldBindUserProvidedQueues()
315316
$driver
316317
->expects($this->once())
317318
->method('createQueue')
318-
->with('non-default-queue')
319+
->with('non-default-queue', true)
319320
->willReturn($queue)
320321
;
321322

@@ -343,6 +344,48 @@ public function testShouldBindUserProvidedQueues()
343344
]);
344345
}
345346

347+
public function testShouldBindNotPrefixedQueue()
348+
{
349+
$queue = new NullQueue('');
350+
351+
$routeCollection = new RouteCollection([
352+
new Route('topic', Route::TOPIC, 'processor', ['queue' => 'non-prefixed-queue', 'prefix_queue' => false]),
353+
]);
354+
355+
$processor = $this->createDelegateProcessorMock();
356+
357+
$driver = $this->createDriverStub($routeCollection);
358+
$driver
359+
->expects($this->once())
360+
->method('createQueue')
361+
->with('non-prefixed-queue', false)
362+
->willReturn($queue)
363+
;
364+
365+
$consumer = $this->createQueueConsumerMock();
366+
$consumer
367+
->expects($this->once())
368+
->method('bind')
369+
->with($this->identicalTo($queue), $this->identicalTo($processor))
370+
;
371+
$consumer
372+
->expects($this->once())
373+
->method('consume')
374+
->with($this->isInstanceOf(ChainExtension::class))
375+
;
376+
377+
$command = new ConsumeCommand(new Container([
378+
'enqueue.client.default.queue_consumer' => $consumer,
379+
'enqueue.client.default.driver' => $driver,
380+
'enqueue.client.default.delegate_processor' => $processor,
381+
]));
382+
383+
$tester = new CommandTester($command);
384+
$tester->execute([
385+
'client-queue-names' => ['non-prefixed-queue'],
386+
]);
387+
}
388+
346389
public function testShouldBindQueuesOnlyOnce()
347390
{
348391
$defaultQueue = new NullQueue('');
@@ -360,12 +403,12 @@ public function testShouldBindQueuesOnlyOnce()
360403
$driver
361404
->expects($this->at(3))
362405
->method('createQueue')
363-
->with('default')
406+
->with('default', true)
364407
->willReturn($defaultQueue)
365408
;
366409
$driver
367410
->expects($this->at(4))
368-
->method('createQueue')
411+
->method('createQueue', true)
369412
->with('custom')
370413
->willReturn($customQueue)
371414
;
@@ -397,6 +440,47 @@ public function testShouldBindQueuesOnlyOnce()
397440
$tester->execute([]);
398441
}
399442

443+
public function testShouldNotBindExternalRoutes()
444+
{
445+
$defaultQueue = new NullQueue('');
446+
447+
$routeCollection = new RouteCollection([
448+
new Route('barTopic', Route::TOPIC, 'processor', ['queue' => null]),
449+
new Route('fooTopic', Route::TOPIC, 'processor', ['queue' => 'external_queue', 'external' => true]),
450+
]);
451+
452+
$processor = $this->createDelegateProcessorMock();
453+
454+
$driver = $this->createDriverStub($routeCollection);
455+
$driver
456+
->expects($this->exactly(1))
457+
->method('createQueue')
458+
->with('default', true)
459+
->willReturn($defaultQueue)
460+
;
461+
462+
$consumer = $this->createQueueConsumerMock();
463+
$consumer
464+
->expects($this->exactly(1))
465+
->method('bind')
466+
->with($this->identicalTo($defaultQueue), $this->identicalTo($processor))
467+
;
468+
$consumer
469+
->expects($this->at(1))
470+
->method('consume')
471+
->with($this->isInstanceOf(ChainExtension::class))
472+
;
473+
474+
$command = new ConsumeCommand(new Container([
475+
'enqueue.client.default.queue_consumer' => $consumer,
476+
'enqueue.client.default.driver' => $driver,
477+
'enqueue.client.default.delegate_processor' => $processor,
478+
]));
479+
480+
$tester = new CommandTester($command);
481+
$tester->execute([]);
482+
}
483+
400484
public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName()
401485
{
402486
$queue = new NullQueue('');
@@ -423,19 +507,19 @@ public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName(
423507
$driver = $this->createDriverStub($routeCollection);
424508
$driver
425509
->expects($this->at(3))
426-
->method('createQueue')
510+
->method('createQueue', true)
427511
->with('default')
428512
->willReturn($queue)
429513
;
430514
$driver
431515
->expects($this->at(4))
432-
->method('createQueue')
516+
->method('createQueue', true)
433517
->with('fooQueue')
434518
->willReturn($queue)
435519
;
436520
$driver
437521
->expects($this->at(5))
438-
->method('createQueue')
522+
->method('createQueue', true)
439523
->with('ololoQueue')
440524
->willReturn($queue)
441525
;

pkg/enqueue/Tests/Symfony/Client/DependencyInjection/AnalyzeRouteCollectionPassTest.php

+25-3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public function testThrowIfTwoExclusiveCommandProcessorsWorkOnSameQueue()
123123
$pass->process($container);
124124
}
125125

126-
public function testShouldNotThrowIfTwoExclusiveCommandProcessorsWorkOnQueueWithSameNameButOnePrefixed()
126+
public function testThrowIfThereAreTwoQueuesWithSameNameAndOneNotPrefixed()
127127
{
128128
$container = new ContainerBuilder();
129129
$container->setParameter('enqueue.clients', ['aName']);
@@ -132,19 +132,41 @@ public function testShouldNotThrowIfTwoExclusiveCommandProcessorsWorkOnQueueWith
132132
'aFooCommand',
133133
Route::COMMAND,
134134
'aFooProcessor',
135-
['exclusive' => true, 'queue' => 'aQueue', 'prefix_queue' => false]
135+
['queue' => 'foo', 'prefix_queue' => false]
136136
))->toArray(),
137137

138138
(new Route(
139139
'aBarCommand',
140140
Route::COMMAND,
141141
'aBarProcessor',
142-
['exclusive' => true, 'queue' => 'aQueue', 'prefix_queue' => true]
142+
['queue' => 'foo', 'prefix_queue' => true]
143+
))->toArray(),
144+
]);
145+
146+
$pass = new AnalyzeRouteCollectionPass();
147+
148+
$this->expectException(\LogicException::class);
149+
$this->expectExceptionMessage('There are prefixed and not prefixed queue with the same name "foo". This is not allowed.');
150+
$pass->process($container);
151+
}
152+
153+
public function testThrowIfDefaultQueueNotPrefixed()
154+
{
155+
$container = new ContainerBuilder();
156+
$container->setParameter('enqueue.clients', ['aName']);
157+
$container->register('enqueue.client.aName.route_collection')->addArgument([
158+
(new Route(
159+
'aFooCommand',
160+
Route::COMMAND,
161+
'aFooProcessor',
162+
['queue' => null, 'prefix_queue' => false]
143163
))->toArray(),
144164
]);
145165

146166
$pass = new AnalyzeRouteCollectionPass();
147167

168+
$this->expectException(\LogicException::class);
169+
$this->expectExceptionMessage('The default queue must be prefixed.');
148170
$pass->process($container);
149171
}
150172
}

0 commit comments

Comments
 (0)