Skip to content

Commit 8a6c6f9

Browse files
authored
Merge pull request #703 from php-enqueue/client-generic-driver-should-able-send-to-router-processor
[client] sendToProcessor should able to send message to router processor.
2 parents bbd367b + 9e281ed commit 8a6c6f9

File tree

2 files changed

+53
-3
lines changed

2 files changed

+53
-3
lines changed

pkg/enqueue/Client/Driver/GenericDriver.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,18 @@ public function sendToProcessor(Message $message): DriverSendResult
7171

7272
/** @var InteropQueue $queue */
7373
$queue = null;
74-
if ($topic && $processor = $message->getProperty(Config::PROCESSOR)) {
74+
$routerProcessor = $this->config->getRouterProcessor();
75+
$processor = $message->getProperty(Config::PROCESSOR);
76+
if ($topic && $processor && $processor !== $routerProcessor) {
7577
$route = $this->routeCollection->topicAndProcessor($topic, $processor);
7678
if (false == $route) {
7779
throw new \LogicException(sprintf('There is no route for topic "%s" and processor "%s"', $topic, $processor));
7880
}
7981

8082
$message->setProperty(Config::PROCESSOR, $route->getProcessor());
8183
$queue = $this->createRouteQueue($route);
82-
} elseif ($topic && false == $message->getProperty(Config::PROCESSOR)) {
83-
$message->setProperty(Config::PROCESSOR, $this->config->getRouterProcessor());
84+
} elseif ($topic && (false == $processor || $processor === $routerProcessor)) {
85+
$message->setProperty(Config::PROCESSOR, $routerProcessor);
8486

8587
$queue = $this->createQueue($this->config->getRouterQueue());
8688
} elseif ($command) {

pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,54 @@ public function testThrowIfCommandSetOnSendToRouter()
445445
$driver->sendToRouter($message);
446446
}
447447

448+
public function testShouldSendMessageToRouterProcessor()
449+
{
450+
$queue = $this->createQueue('');
451+
$transportMessage = $this->createMessage();
452+
453+
$producer = $this->createProducerMock();
454+
$producer
455+
->expects($this->once())
456+
->method('send')
457+
->with($this->identicalTo($queue), $this->identicalTo($transportMessage))
458+
;
459+
$context = $this->createContextMock();
460+
$context
461+
->expects($this->once())
462+
->method('createQueue')
463+
->with($this->getDefaultQueueTransportName())
464+
->willReturn($queue)
465+
;
466+
$context
467+
->expects($this->once())
468+
->method('createProducer')
469+
->willReturn($producer)
470+
;
471+
$context
472+
->expects($this->once())
473+
->method('createMessage')
474+
->willReturn($transportMessage)
475+
;
476+
477+
$config = $this->createDummyConfig();
478+
479+
$driver = $this->createDriver(
480+
$context,
481+
$config,
482+
new RouteCollection([
483+
new Route('topic', Route::TOPIC, 'processor', [
484+
'queue' => 'custom',
485+
]),
486+
])
487+
);
488+
489+
$message = new Message();
490+
$message->setProperty(Config::TOPIC, 'topic');
491+
$message->setProperty(Config::PROCESSOR, $config->getRouterProcessor());
492+
493+
$driver->sendToProcessor($message);
494+
}
495+
448496
public function testShouldSendTopicMessageToProcessorToDefaultQueue()
449497
{
450498
$queue = $this->createQueue('');

0 commit comments

Comments
 (0)