From b617dea433a031bd33345ed54e06ff6939e71e1d Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 16 Nov 2018 17:40:03 +0200 Subject: [PATCH 1/2] [bundle] Add BC for topic\command subscribers. --- .../Client/TopicSubscriberInterface.php | 18 ++++++-- .../BuildCommandSubscriberRoutesPass.php | 35 ++++++++++++++ .../BuildTopicSubscriberRoutesPass.php | 31 +++++++++++++ .../BuildCommandSubscriberRoutesPassTest.php | 42 +++++++++++++++++ .../BuildTopicSubscriberRoutesPassTest.php | 46 +++++++++++++++++++ 5 files changed, 169 insertions(+), 3 deletions(-) diff --git a/pkg/enqueue/Client/TopicSubscriberInterface.php b/pkg/enqueue/Client/TopicSubscriberInterface.php index 634a7ace2..849a7827f 100644 --- a/pkg/enqueue/Client/TopicSubscriberInterface.php +++ b/pkg/enqueue/Client/TopicSubscriberInterface.php @@ -15,10 +15,22 @@ interface TopicSubscriberInterface * * or * - * ['aTopicName' => [ - * 'processor' => 'processor', + * [ + * [ + * 'topic' => 'aTopicName', + * 'processor' => 'fooProcessor', * 'queue' => 'a_client_queue_name', - * ]] + * + * 'aCustomOption' => 'aVal', + * ], + * [ + * 'topic' => 'anotherTopicName', + * 'processor' => 'barProcessor', + * 'queue' => 'a_client_queue_name', + * + * 'aCustomOption' => 'aVal', + * ], + * ] * * Note: If you set prefix_queue to true then the queue is used as is and therefor the driver is not used to prepare a transport queue name. * It is possible to pass other options, they could be accessible on a route instance through options. diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php index cee1343f5..1527e51d8 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php @@ -66,6 +66,41 @@ public function process(ContainerBuilder $container): void throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.'); } + // 0.8 command subscriber + if (isset($commands['processorName'])) { + @trigger_error('The command subscriber 0.8 syntax is deprecated since Enqueue 0.9.', E_USER_DEPRECATED); + + $source = $commands['processorName']; + $processor = $params['processorName'] ?? $serviceId; + + $options = $commands; + unset( + $options['processorName'], + $options['queueName'], + $options['queueNameHardcoded'], + $options['exclusive'], + $options['topic'], + $options['source'], + $options['source_type'], + $options['processor'], + $options['options'] + ); + + $options['processor_service_id'] = $serviceId; + + if (isset($commands['queueName'])) { + $options['queue'] = $commands['queueName']; + } + + if (isset($commands['queueNameHardcoded']) && $commands['queueNameHardcoded']) { + $options['prefix_queue'] = false; + } + + $routeCollection->add(new Route($source, Route::COMMAND, $processor, $options)); + + continue; + } + if (isset($commands['command'])) { $commands = [$commands]; } diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php index 3414d4918..518851150 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php @@ -69,6 +69,37 @@ public function process(ContainerBuilder $container): void foreach ($topics as $key => $params) { if (is_string($params)) { $routeCollection->add(new Route($params, Route::TOPIC, $serviceId, ['processor_service_id' => $serviceId])); + + // 0.8 topic subscriber + } elseif (is_array($params) && is_string($key)) { + @trigger_error('The topic subscriber 0.8 syntax is deprecated since Enqueue 0.9.', E_USER_DEPRECATED); + + $source = $key; + $processor = $params['processorName'] ?? $serviceId; + + $options = $params; + unset( + $options['processorName'], + $options['queueName'], + $options['queueNameHardcoded'], + $options['topic'], + $options['source'], + $options['source_type'], + $options['processor'], + $options['options'] + ); + + $options['processor_service_id'] = $serviceId; + + if (isset($params['queueName'])) { + $options['queue'] = $params['queueName']; + } + + if (isset($params['queueNameHardcoded']) && $params['queueNameHardcoded']) { + $options['prefix_queue'] = false; + } + + $routeCollection->add(new Route($source, Route::TOPIC, $processor, $options)); } elseif (is_array($params)) { $source = $params['topic'] ?? null; $processor = $params['processor'] ?? $serviceId; diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php index 603f0fb0d..5d53c7bad 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php @@ -398,6 +398,48 @@ public function testShouldMergeExtractedRoutesWithAlreadySetInCollection() ); } + public function testShouldRegister08CommandProcessor() + { + $routeCollection = new Definition(RouteCollection::class); + $routeCollection->addArgument([]); + + $processor = $this->createCommandSubscriberProcessor([ + 'processorName' => 'fooCommand', + 'queueName' => 'a_client_queue_name', + 'queueNameHardcoded' => true, + 'exclusive' => true, + 'anOption' => 'aFooVal', + ]); + + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); + $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); + $container->register('aFooProcessor', get_class($processor)) + ->addTag('enqueue.command_subscriber') + ; + + $pass = new BuildCommandSubscriberRoutesPass(); + $pass->process($container); + + $this->assertInternalType('array', $routeCollection->getArgument(0)); + $this->assertCount(1, $routeCollection->getArgument(0)); + + $this->assertEquals( + [ + [ + 'source' => 'fooCommand', + 'source_type' => 'enqueue.client.command_route', + 'processor' => 'aFooProcessor', + 'processor_service_id' => 'aFooProcessor', + 'anOption' => 'aFooVal', + 'queue' => 'a_client_queue_name', + 'prefix_queue' => false, + ], + ], + $routeCollection->getArgument(0) + ); + } + private function createCommandSubscriberProcessor($commandSubscriberReturns = ['aCommand']) { $processor = new class() implements Processor, CommandSubscriberInterface { diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php index 72f5b2152..ecdab5fd7 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php @@ -358,6 +358,52 @@ public function testShouldMergeExtractedRoutesWithAlreadySetInCollection() ); } + public function testShouldRegister08TopicSubscriber() + { + $routeCollection = new Definition(RouteCollection::class); + $routeCollection->addArgument([]); + + $processor = $this->createTopicSubscriberProcessor([ + 'fooTopic' => ['processorName' => 'aCustomFooProcessorName', 'queueName' => 'fooQueue', 'queueNameHardcoded' => true, 'anOption' => 'aFooVal'], + 'barTopic' => ['processorName' => 'aCustomBarProcessorName', 'anOption' => 'aBarVal'], + ]); + + $container = new ContainerBuilder(); + $container->setParameter('enqueue.clients', ['default']); + $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); + $container->register('aFooProcessor', get_class($processor)) + ->addTag('enqueue.topic_subscriber') + ; + + $pass = new BuildTopicSubscriberRoutesPass(); + $pass->process($container); + + $this->assertInternalType('array', $routeCollection->getArgument(0)); + $this->assertCount(2, $routeCollection->getArgument(0)); + + $this->assertEquals( + [ + [ + 'source' => 'fooTopic', + 'source_type' => 'enqueue.client.topic_route', + 'processor' => 'aCustomFooProcessorName', + 'processor_service_id' => 'aFooProcessor', + 'anOption' => 'aFooVal', + 'queue' => 'fooQueue', + 'prefix_queue' => false, + ], + [ + 'source' => 'barTopic', + 'source_type' => 'enqueue.client.topic_route', + 'processor' => 'aCustomBarProcessorName', + 'processor_service_id' => 'aFooProcessor', + 'anOption' => 'aBarVal', + ], + ], + $routeCollection->getArgument(0) + ); + } + private function createTopicSubscriberProcessor($topicSubscriberReturns = ['aTopic']) { $processor = new class() implements Processor, TopicSubscriberInterface { From 53962b936e80dfadc01c7f606167e6da25a084f5 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 16 Nov 2018 20:00:31 +0200 Subject: [PATCH 2/2] fix tests. --- .../DependencyInjection/BuildCommandSubscriberRoutesPassTest.php | 1 + .../DependencyInjection/BuildTopicSubscriberRoutesPassTest.php | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php index 5d53c7bad..346c0cbdb 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php @@ -413,6 +413,7 @@ public function testShouldRegister08CommandProcessor() $container = new ContainerBuilder(); $container->setParameter('enqueue.clients', ['default']); + $container->setParameter('enqueue.default_client', 'default'); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.command_subscriber') diff --git a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php index ecdab5fd7..65b64dcc8 100644 --- a/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php @@ -370,6 +370,7 @@ public function testShouldRegister08TopicSubscriber() $container = new ContainerBuilder(); $container->setParameter('enqueue.clients', ['default']); + $container->setParameter('enqueue.default_client', 'default'); $container->setDefinition('enqueue.client.default.route_collection', $routeCollection); $container->register('aFooProcessor', get_class($processor)) ->addTag('enqueue.topic_subscriber')