diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php new file mode 100644 index 000000000..5f83e83e2 --- /dev/null +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php @@ -0,0 +1,40 @@ +hasDefinition('enqueue.client.extensions')) { + return; + } + + $tags = $container->findTaggedServiceIds('enqueue.client.extension'); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $priority = isset($tagAttribute['priority']) ? (int) $tagAttribute['priority'] : 0; + + $groupByPriority[$priority][] = new Reference($serviceId); + } + } + + krsort($groupByPriority, SORT_NUMERIC); + + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $container->getDefinition('enqueue.client.extensions')->replaceArgument(0, $flatExtensions); + } +} diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php similarity index 94% rename from pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php rename to pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php index 31dcee799..20f2a3817 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildExtensionsPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildConsumptionExtensionsPass.php @@ -6,7 +6,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Reference; -class BuildExtensionsPass implements CompilerPassInterface +class BuildConsumptionExtensionsPass implements CompilerPassInterface { /** * {@inheritdoc} diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 4c8e5806d..5b4039104 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -5,8 +5,9 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -34,11 +35,12 @@ class EnqueueBundle extends Bundle */ public function build(ContainerBuilder $container) { - $container->addCompilerPass(new BuildExtensionsPass()); + $container->addCompilerPass(new BuildConsumptionExtensionsPass()); $container->addCompilerPass(new BuildClientRoutingPass()); $container->addCompilerPass(new BuildProcessorRegistryPass()); $container->addCompilerPass(new BuildTopicMetaSubscribersPass()); $container->addCompilerPass(new BuildQueueMetaRegistryPass()); + $container->addCompilerPass(new BuildClientExtensionsPass()); /** @var EnqueueExtension $extension */ $extension = $container->getExtension('enqueue'); diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 0c1dab387..c83db9055 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -5,7 +5,15 @@ services: enqueue.client.producer: class: 'Enqueue\Client\Producer' - arguments: ['@enqueue.client.driver'] + arguments: + - '@enqueue.client.driver' + - '@enqueue.client.extensions' + + enqueue.client.extensions: + class: 'Enqueue\Client\ChainExtension' + public: false + arguments: + - [] enqueue.producer: alias: 'enqueue.client.producer' diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php new file mode 100644 index 000000000..5b98ecda6 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientExtensionsPassTest.php @@ -0,0 +1,129 @@ +assertClassImplements(CompilerPassInterface::class, BuildClientExtensionsPass::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new BuildClientExtensionsPass(); + } + + public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('bar_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $this->assertEquals( + [new Reference('foo_extension'), new Reference('bar_extension')], + $extensions->getArgument(0) + ); + } + + public function testShouldOrderExtensionsByPriority() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 6]); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -5]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 2]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[2]); + } + + public function testShouldAssumePriorityZeroIfPriorityIsNotSet() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.client.extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => 1]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.client.extension', ['priority' => -1]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[2]); + } + + public function testShouldDoesNothingIfClientExtensionServiceIsNotDefined() + { + $container = $this->createMock(ContainerBuilder::class); + $container + ->expects($this->once()) + ->method('hasDefinition') + ->with('enqueue.client.extensions') + ->willReturn(false) + ; + $container + ->expects($this->never()) + ->method('findTaggedServiceIds') + ; + + $pass = new BuildClientExtensionsPass(); + $pass->process($container); + } +} diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php similarity index 90% rename from pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php rename to pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php index 8f02a365b..048e0c467 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildExtensionsPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildConsumptionExtensionsPassTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Test\ClassExtensionTrait; use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -10,18 +10,18 @@ use Symfony\Component\DependencyInjection\Reference; use PHPUnit\Framework\TestCase; -class BuildExtensionsPassTest extends TestCase +class BuildConsumptionExtensionsPassTest extends TestCase { use ClassExtensionTrait; public function testShouldImplementCompilerPass() { - $this->assertClassImplements(CompilerPassInterface::class, BuildExtensionsPass::class); + $this->assertClassImplements(CompilerPassInterface::class, BuildConsumptionExtensionsPass::class); } public function testCouldBeConstructedWithoutAnyArguments() { - new BuildExtensionsPass(); + new BuildConsumptionExtensionsPass(); } public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWithTaggsExtensions() @@ -40,7 +40,7 @@ public function testShouldReplaceFirstArgumentOfExtensionsServiceConstructorWith $extension->addTag('enqueue.consumption.extension'); $container->setDefinition('bar_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertEquals( @@ -69,7 +69,7 @@ public function testShouldOrderExtensionsByPriority() $extension->addTag('enqueue.consumption.extension', ['priority' => 2]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -99,7 +99,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet() $extension->addTag('enqueue.consumption.extension', ['priority' => -1]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildExtensionsPass(); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index ee10168f6..e5bc2f0a0 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -4,8 +4,9 @@ use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; @@ -46,7 +47,7 @@ public function testShouldRegisterExpectedCompilerPasses() $container ->expects($this->at(0)) ->method('addCompilerPass') - ->with($this->isInstanceOf(BuildExtensionsPass::class)) + ->with($this->isInstanceOf(BuildConsumptionExtensionsPass::class)) ; $container ->expects($this->at(1)) @@ -70,6 +71,11 @@ public function testShouldRegisterExpectedCompilerPasses() ; $container ->expects($this->at(5)) + ->method('addCompilerPass') + ->with($this->isInstanceOf(BuildClientExtensionsPass::class)) + ; + $container + ->expects($this->at(6)) ->method('getExtension') ->willReturn($extensionMock) ; diff --git a/pkg/enqueue/Client/ChainExtension.php b/pkg/enqueue/Client/ChainExtension.php new file mode 100644 index 000000000..c202e98e0 --- /dev/null +++ b/pkg/enqueue/Client/ChainExtension.php @@ -0,0 +1,39 @@ +extensions = $extensions; + } + + /** + * {@inheritdoc} + */ + public function onPreSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPreSend($topic, $message); + } + } + + /** + * {@inheritdoc} + */ + public function onPostSend($topic, Message $message) + { + foreach ($this->extensions as $extension) { + $extension->onPostSend($topic, $message); + } + } +} diff --git a/pkg/enqueue/Client/ExtensionInterface.php b/pkg/enqueue/Client/ExtensionInterface.php new file mode 100644 index 000000000..3b0a028e8 --- /dev/null +++ b/pkg/enqueue/Client/ExtensionInterface.php @@ -0,0 +1,19 @@ +driver = $driver; + $this->extension = $extension ?: new ChainExtension([]); } /** @@ -55,7 +61,9 @@ public function send($topic, $message) throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME)); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToRouter($message); + $this->extension->onPostSend($topic, $message); } elseif (Message::SCOPE_APP == $message->getScope()) { if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName()); @@ -64,7 +72,9 @@ public function send($topic, $message) $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName()); } + $this->extension->onPreSend($topic, $message); $this->driver->sendToProcessor($message); + $this->extension->onPostSend($topic, $message); } else { throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope())); } diff --git a/pkg/enqueue/Tests/Client/ChainExtensionTest.php b/pkg/enqueue/Tests/Client/ChainExtensionTest.php new file mode 100644 index 000000000..3b1d82f9a --- /dev/null +++ b/pkg/enqueue/Tests/Client/ChainExtensionTest.php @@ -0,0 +1,76 @@ +assertClassImplements(ExtensionInterface::class, ChainExtension::class); + } + + public function testCouldBeConstructedWithExtensionsArray() + { + new ChainExtension([$this->createExtension(), $this->createExtension()]); + } + + public function testShouldProxyOnPreSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPreSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPreSend('topic', $message); + } + + public function testShouldProxyOnPostSendToAllInternalExtensions() + { + $message = new Message(); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onPostSend') + ->with('topic', $this->identicalTo($message)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onPostSend('topic', $message); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ExtensionInterface + */ + protected function createExtension() + { + return $this->createMock(ExtensionInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Client/ProducerTest.php b/pkg/enqueue/Tests/Client/ProducerTest.php index 9741810cc..79c7a0cb2 100644 --- a/pkg/enqueue/Tests/Client/ProducerTest.php +++ b/pkg/enqueue/Tests/Client/ProducerTest.php @@ -4,6 +4,7 @@ use Enqueue\Client\Config; use Enqueue\Client\DriverInterface; +use Enqueue\Client\ExtensionInterface; use Enqueue\Client\Message; use Enqueue\Client\MessagePriority; use Enqueue\Client\Producer; @@ -541,6 +542,62 @@ public function testThrowIfUnSupportedScopeGivenOnSend() $producer->send('topic', $message); } + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToRouter() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_MESSAGE_BUS); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToRouter') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + + public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToProcessor() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_APP); + + $extension = $this->createMock(ExtensionInterface::class); + $extension + ->expects($this->at(0)) + ->method('onPreSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + $extension + ->expects($this->at(1)) + ->method('onPostSend') + ->with($this->identicalTo('topic'), $this->identicalTo($message)) + ; + + $driver = $this->createDriverStub(); + $driver + ->expects($this->once()) + ->method('sendToProcessor') + ; + + $producer = new Producer($driver, $extension); + $producer->send('topic', $message); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface */