-
Notifications
You must be signed in to change notification settings - Fork 440
/
Copy pathBuildCommandSubscriberRoutesPass.php
104 lines (85 loc) · 4.71 KB
/
BuildCommandSubscriberRoutesPass.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
<?php
namespace Enqueue\Symfony\Client\DependencyInjection;
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Client\Route;
use Enqueue\Client\RouteCollection;
use Enqueue\Symfony\DiUtils;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
final class BuildCommandSubscriberRoutesPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container): void
{
if (false == $container->hasParameter('enqueue.clients')) {
throw new \LogicException('The "enqueue.clients" parameter must be set.');
}
$names = $container->getParameter('enqueue.clients');
$defaultName = $container->getParameter('enqueue.default_client');
foreach ($names as $name) {
$diUtils = DiUtils::create(ClientFactory::MODULE, $name);
$routeCollectionId = $diUtils->format('route_collection');
if (false == $container->hasDefinition($routeCollectionId)) {
throw new \LogicException(sprintf('Service "%s" not found', $routeCollectionId));
}
$tag = 'enqueue.command_subscriber';
$routeCollection = new RouteCollection([]);
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
$processorDefinition = $container->getDefinition($serviceId);
if ($processorDefinition->getFactory()) {
throw new \LogicException('The command subscriber tag could not be applied to a service created by factory.');
}
$processorClass = $processorDefinition->getClass();
if (false == class_exists($processorClass)) {
throw new \LogicException(sprintf('The processor class "%s" could not be found.', $processorClass));
}
if (false == is_subclass_of($processorClass, CommandSubscriberInterface::class)) {
throw new \LogicException(sprintf('The processor must implement "%s" interface to be used with the tag "%s"', CommandSubscriberInterface::class, $tag));
}
foreach ($tagAttributes as $tagAttribute) {
$client = $tagAttribute['client'] ?? $defaultName;
if ($client !== $name && 'all' !== $client) {
continue;
}
/** @var CommandSubscriberInterface $processorClass */
$commands = $processorClass::getSubscribedCommand();
if (empty($commands)) {
throw new \LogicException('Command subscriber must return something.');
}
if (is_string($commands)) {
$commands = [$commands];
}
if (!is_array($commands)) {
throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.');
}
if (isset($commands['command'])) {
$commands = [$commands];
}
foreach ($commands as $key => $params) {
if (is_string($params)) {
$routeCollection->add(new Route($params, Route::COMMAND, $serviceId, ['processor_service_id' => $serviceId]));
} elseif (is_array($params)) {
$source = $params['command'] ?? null;
$processor = $params['processor'] ?? $serviceId;
unset($params['command'], $params['source'], $params['source_type'], $params['processor'], $params['options']);
$options = $params;
$options['processor_service_id'] = $serviceId;
$routeCollection->add(new Route($source, Route::COMMAND, $processor, $options));
} else {
throw new \LogicException(sprintf(
'Command subscriber configuration is invalid for "%s::getSubscribedCommand()". "%s"',
$processorClass,
json_encode($processorClass::getSubscribedCommand())
));
}
}
}
}
$rawRoutes = $routeCollection->toArray();
$routeCollectionService = $container->getDefinition($routeCollectionId);
$routeCollectionService->replaceArgument(0, array_merge(
$routeCollectionService->getArgument(0),
$rawRoutes
));
}
}
}