From bebe040427521ab1cf938697db17b615dca52c80 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 20 Dec 2018 14:20:43 +0200 Subject: [PATCH 1/2] [client] sendToProcessor should able to send message to router processor. --- pkg/enqueue/Client/Driver/GenericDriver.php | 8 ++-- .../Client/Driver/GenericDriverTestsTrait.php | 48 +++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/pkg/enqueue/Client/Driver/GenericDriver.php b/pkg/enqueue/Client/Driver/GenericDriver.php index 63a1f12f3..529f6322c 100644 --- a/pkg/enqueue/Client/Driver/GenericDriver.php +++ b/pkg/enqueue/Client/Driver/GenericDriver.php @@ -71,7 +71,9 @@ public function sendToProcessor(Message $message): DriverSendResult /** @var InteropQueue $queue */ $queue = null; - if ($topic && $processor = $message->getProperty(Config::PROCESSOR)) { + $routerProcessor = $this->config->getRouterProcessor(); + $processor = $message->getProperty(Config::PROCESSOR); + if ($topic && $processor && $processor !== $routerProcessor) { $route = $this->routeCollection->topicAndProcessor($topic, $processor); if (false == $route) { throw new \LogicException(sprintf('There is no route for topic "%s" and processor "%s"', $topic, $processor)); @@ -79,8 +81,8 @@ public function sendToProcessor(Message $message): DriverSendResult $message->setProperty(Config::PROCESSOR, $route->getProcessor()); $queue = $this->createRouteQueue($route); - } elseif ($topic && false == $message->getProperty(Config::PROCESSOR)) { - $message->setProperty(Config::PROCESSOR, $this->config->getRouterProcessor()); + } elseif ($topic && (false == $processor || $processor === $routerProcessor)) { + $message->setProperty(Config::PROCESSOR, $routerProcessor); $queue = $this->createQueue($this->config->getRouterQueue()); } elseif ($command) { diff --git a/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php b/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php index b4c23dd0d..ec38dfdd5 100644 --- a/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php +++ b/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php @@ -445,6 +445,54 @@ public function testThrowIfCommandSetOnSendToRouter() $driver->sendToRouter($message); } + public function testShouldSendMessageToRouterProcessor() + { + $queue = $this->createQueue(''); + $transportMessage = $this->createMessage(); + + $producer = $this->createProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with($this->getDefaultQueueTransportName()) + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $config = $this->createDummyConfig(); + + $driver = $this->createDriver( + $context, + $config, + new RouteCollection([ + new Route('topic', Route::TOPIC, 'processor', [ + 'queue' => 'custom' + ]), + ]) + ); + + $message = new Message(); + $message->setProperty(Config::TOPIC, 'topic'); + $message->setProperty(Config::PROCESSOR, $config->getRouterProcessor()); + + $driver->sendToProcessor($message); + } + public function testShouldSendTopicMessageToProcessorToDefaultQueue() { $queue = $this->createQueue(''); From 9e281edb161f1a507fc6c1028e8dd61151518044 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 20 Dec 2018 15:04:59 +0200 Subject: [PATCH 2/2] fix cs --- pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php b/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php index ec38dfdd5..cb141372b 100644 --- a/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php +++ b/pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php @@ -481,7 +481,7 @@ public function testShouldSendMessageToRouterProcessor() $config, new RouteCollection([ new Route('topic', Route::TOPIC, 'processor', [ - 'queue' => 'custom' + 'queue' => 'custom', ]), ]) );