-
Notifications
You must be signed in to change notification settings - Fork 440
/
Copy pathRouterProcessor.php
64 lines (50 loc) · 1.88 KB
/
RouterProcessor.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<?php
namespace Enqueue\Client;
use Enqueue\Consumption\Result;
use Interop\Queue\Context;
use Interop\Queue\Message as InteropMessage;
use Interop\Queue\Processor;
final class RouterProcessor implements Processor
{
/**
* compatibility with 0.8x.
*/
private const COMMAND_TOPIC_08X = '__command__';
/**
* @var DriverInterface
*/
private $driver;
public function __construct(DriverInterface $driver)
{
$this->driver = $driver;
}
public function process(InteropMessage $message, Context $context): Result
{
// compatibility with 0.8x
if (self::COMMAND_TOPIC_08X === $message->getProperty(Config::TOPIC)) {
$clientMessage = $this->driver->createClientMessage($message);
$clientMessage->setProperty(Config::TOPIC, null);
$this->driver->sendToProcessor($clientMessage);
return Result::ack('Legacy 0.8x message routed to processor');
}
// compatibility with 0.8x
if ($message->getProperty(Config::COMMAND)) {
return Result::reject(sprintf(
'Unexpected command "%s" got. Command must not go to the router.',
$message->getProperty(Config::COMMAND)
));
}
$topic = $message->getProperty(Config::TOPIC);
if (false == $topic) {
return Result::reject(sprintf('Topic property "%s" is required but not set or empty.', Config::TOPIC));
}
$count = 0;
foreach ($this->driver->getRouteCollection()->topic($topic) as $route) {
$clientMessage = $this->driver->createClientMessage($message);
$clientMessage->setProperty(Config::PROCESSOR, $route->getProcessor());
$this->driver->sendToProcessor($clientMessage);
++$count;
}
return Result::ack(sprintf('Routed to "%d" event subscribers', $count));
}
}