From 2376d18429232db2a287243eb223b7d42bc0ffc1 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 18 May 2017 17:14:51 +0300 Subject: [PATCH 1/3] [client] SpoolProducer --- composer.json | 2 + .../DependencyInjection/EnqueueExtension.php | 5 +- .../Resources/config/client.yml | 15 +++ .../flush_spool_producer_extension.yml | 8 ++ .../Functional/Client/SpoolProducerTest.php | 27 ++++ .../EnqueueExtensionTest.php | 4 +- .../FlushSpoolProducerExtension.php | 39 ++++++ pkg/enqueue/Client/SpoolProducer.php | 46 +++++++ .../Client/FlushSpoolProducerListener.php | 47 +++++++ .../FlushSpoolProducerExtensionTest.php | 125 ++++++++++++++++++ .../Tests/Client/SpoolProducerTest.php | 78 +++++++++++ .../Client/FlushSpoolProducerListenerTest.php | 67 ++++++++++ pkg/enqueue/composer.json | 1 + 13 files changed, 460 insertions(+), 4 deletions(-) create mode 100644 pkg/enqueue-bundle/Resources/config/extensions/flush_spool_producer_extension.yml create mode 100644 pkg/enqueue-bundle/Tests/Functional/Client/SpoolProducerTest.php create mode 100644 pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php create mode 100644 pkg/enqueue/Client/SpoolProducer.php create mode 100644 pkg/enqueue/Symfony/Client/FlushSpoolProducerListener.php create mode 100644 pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php create mode 100644 pkg/enqueue/Tests/Client/SpoolProducerTest.php create mode 100644 pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php diff --git a/composer.json b/composer.json index 173f4b218..3aa91097d 100644 --- a/composer.json +++ b/composer.json @@ -24,6 +24,8 @@ "symfony/monolog-bundle": "^2.8|^3", "symfony/browser-kit": "^2.8|^3", "symfony/expression-language": "^2.8|^3", + "symfony/event-dispatcher": "^2.8|^3", + "symfony/console": "^2.8|^3", "friendsofphp/php-cs-fixer": "^2", "empi89/php-amqp-stubs": "*@dev", "phpstan/phpstan": "^0.7.0" diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 478e564c2..789135253 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -64,6 +64,7 @@ public function load(array $configs, ContainerBuilder $container) if (isset($config['client'])) { $loader->load('client.yml'); + $loader->load('extensions/flush_spool_producer_extension.yml'); foreach ($config['transport'] as $name => $transportConfig) { $this->factories[$name]->createDriver($container, $transportConfig); @@ -88,10 +89,10 @@ public function load(array $configs, ContainerBuilder $container) $container->setParameter('enqueue.client.default_queue_name', $config['client']['default_processor_queue']); if (false == empty($config['client']['traceable_producer'])) { - $producerId = 'enqueue.client.traceable_message_producer'; + $producerId = 'enqueue.client.traceable_producer'; $container->register($producerId, TraceableProducer::class) ->setDecoratedService('enqueue.client.producer') - ->addArgument(new Reference('enqueue.client.traceable_message_producer.inner')) + ->addArgument(new Reference('enqueue.client.traceable_producer.inner')) ; } diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index c83db9055..83694931e 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -9,6 +9,11 @@ services: - '@enqueue.client.driver' - '@enqueue.client.extensions' + enqueue.client.spool_producer: + class: 'Enqueue\Client\SpoolProducer' + arguments: + - '@enqueue.client.producer' + enqueue.client.extensions: class: 'Enqueue\Client\ChainExtension' public: false @@ -18,6 +23,9 @@ services: enqueue.producer: alias: 'enqueue.client.producer' + enqueue.spool_producer: + alias: 'enqueue.client.spool_producer' + enqueue.client.rpc_client: class: 'Enqueue\Client\RpcClient' arguments: @@ -123,3 +131,10 @@ services: name: 'data_collector' template: 'EnqueueBundle:Profiler:panel.html.twig' id: 'enqueue.message_queue' + + enqueue.flush_spool_producer_listener: + class: 'Enqueue\Symfony\Client\FlushSpoolProducerListener' + arguments: + - '@enqueue.client.spool_producer' + tags: + - { name: 'kernel.event_subscriber' } diff --git a/pkg/enqueue-bundle/Resources/config/extensions/flush_spool_producer_extension.yml b/pkg/enqueue-bundle/Resources/config/extensions/flush_spool_producer_extension.yml new file mode 100644 index 000000000..b665cf66d --- /dev/null +++ b/pkg/enqueue-bundle/Resources/config/extensions/flush_spool_producer_extension.yml @@ -0,0 +1,8 @@ +services: + enqueue.client.flush_spool_producer_extension: + class: 'Enqueue\Client\ConsumptionExtension\FlushSpoolProducerExtension' + public: false + arguments: + - '@enqueue.client.spool_producer' + tags: + - { name: 'enqueue.consumption.extension', priority: -100 } \ No newline at end of file diff --git a/pkg/enqueue-bundle/Tests/Functional/Client/SpoolProducerTest.php b/pkg/enqueue-bundle/Tests/Functional/Client/SpoolProducerTest.php new file mode 100644 index 000000000..ee09fa75e --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Functional/Client/SpoolProducerTest.php @@ -0,0 +1,27 @@ +container->get('enqueue.client.spool_producer'); + + $this->assertInstanceOf(SpoolProducer::class, $producer); + } + + public function testCouldBeGetFromContainerAsShortenAlias() + { + $producer = $this->container->get('enqueue.client.spool_producer'); + $aliasProducer = $this->container->get('enqueue.spool_producer'); + + $this->assertSame($producer, $aliasProducer); + } +} diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 58edb7a30..d5c55ff7a 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -246,7 +246,7 @@ public function testShouldUseTraceableMessageProducerIfTraceableProducerOptionSe ], ]], $container); - $producer = $container->getDefinition('enqueue.client.traceable_message_producer'); + $producer = $container->getDefinition('enqueue.client.traceable_producer'); self::assertEquals(TraceableProducer::class, $producer->getClass()); self::assertEquals( ['enqueue.client.producer', null, 0], @@ -255,7 +255,7 @@ public function testShouldUseTraceableMessageProducerIfTraceableProducerOptionSe self::assertInstanceOf(Reference::class, $producer->getArgument(0)); self::assertEquals( - 'enqueue.client.traceable_message_producer.inner', + 'enqueue.client.traceable_producer.inner', (string) $producer->getArgument(0) ); } diff --git a/pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php b/pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php new file mode 100644 index 000000000..efb6848b8 --- /dev/null +++ b/pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php @@ -0,0 +1,39 @@ +producer = $producer; + } + + /** + * {@inheritdoc} + */ + public function onPostReceived(Context $context) + { + $this->producer->flush(); + } + + public function onInterrupted(Context $context) + { + $this->producer->flush(); + } +} diff --git a/pkg/enqueue/Client/SpoolProducer.php b/pkg/enqueue/Client/SpoolProducer.php new file mode 100644 index 000000000..43e9a4e4d --- /dev/null +++ b/pkg/enqueue/Client/SpoolProducer.php @@ -0,0 +1,46 @@ +realProducer = $realProducer; + + $this->queue = new \SplQueue(); + } + + /** + * {@inheritdoc} + */ + public function send($topic, $message) + { + $this->queue->enqueue([$topic, $message]); + } + + /** + * When it is called it sends all previously queued messages. + */ + public function flush() + { + while (false == $this->queue->isEmpty()) { + list($topic, $message) = $this->queue->dequeue(); + + $this->realProducer->send($topic, $message); + } + } +} diff --git a/pkg/enqueue/Symfony/Client/FlushSpoolProducerListener.php b/pkg/enqueue/Symfony/Client/FlushSpoolProducerListener.php new file mode 100644 index 000000000..1f5fdcba7 --- /dev/null +++ b/pkg/enqueue/Symfony/Client/FlushSpoolProducerListener.php @@ -0,0 +1,47 @@ +producer = $producer; + } + + public function flushMessages() + { + $this->producer->flush(); + } + + /** + * {@inheritdoc} + */ + public static function getSubscribedEvents() + { + $events = []; + + if (class_exists(KernelEvents::class)) { + $events[KernelEvents::TERMINATE] = 'flushMessages'; + } + + if (class_exists(ConsoleEvents::class)) { + $events[ConsoleEvents::TERMINATE] = 'flushMessages'; + } + + return $events; + } +} diff --git a/pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php b/pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php new file mode 100644 index 000000000..1469c99bb --- /dev/null +++ b/pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php @@ -0,0 +1,125 @@ +assertClassImplements(ExtensionInterface::class, FlushSpoolProducerExtension::class); + } + + public function testCouldBeConstructedWithSpoolProducerAsFirstArgument() + { + new FlushSpoolProducerExtension($this->createSpoolProducerMock()); + } + + public function testShouldDoNothingOnStart() + { + $producer = $this->createSpoolProducerMock(); + $producer + ->expects(self::never()) + ->method('flush') + ; + + $extension = new FlushSpoolProducerExtension($producer); + $extension->onStart($this->createContextMock()); + } + + public function testShouldDoNothingOnBeforeReceive() + { + $producer = $this->createSpoolProducerMock(); + $producer + ->expects(self::never()) + ->method('flush') + ; + + $extension = new FlushSpoolProducerExtension($producer); + $extension->onBeforeReceive($this->createContextMock()); + } + + public function testShouldDoNothingOnPreReceived() + { + $producer = $this->createSpoolProducerMock(); + $producer + ->expects(self::never()) + ->method('flush') + ; + + $extension = new FlushSpoolProducerExtension($producer); + $extension->onPreReceived($this->createContextMock()); + } + + public function testShouldDoNothingOnResult() + { + $producer = $this->createSpoolProducerMock(); + $producer + ->expects(self::never()) + ->method('flush') + ; + + $extension = new FlushSpoolProducerExtension($producer); + $extension->onResult($this->createContextMock()); + } + + public function testShouldDoNothingOnIdle() + { + $producer = $this->createSpoolProducerMock(); + $producer + ->expects(self::never()) + ->method('flush') + ; + + $extension = new FlushSpoolProducerExtension($producer); + $extension->onIdle($this->createContextMock()); + } + + public function testShouldFlushSpoolProducerOnInterrupted() + { + $producer = $this->createSpoolProducerMock(); + $producer + ->expects(self::once()) + ->method('flush') + ; + + $extension = new FlushSpoolProducerExtension($producer); + $extension->onInterrupted($this->createContextMock()); + } + + public function testShouldFlushSpoolProducerOnPostReceived() + { + $producer = $this->createSpoolProducerMock(); + $producer + ->expects(self::once()) + ->method('flush') + ; + + $extension = new FlushSpoolProducerExtension($producer); + $extension->onPostReceived($this->createContextMock()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Context + */ + private function createContextMock() + { + return $this->createMock(Context::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|SpoolProducer + */ + private function createSpoolProducerMock() + { + return $this->createMock(SpoolProducer::class); + } +} diff --git a/pkg/enqueue/Tests/Client/SpoolProducerTest.php b/pkg/enqueue/Tests/Client/SpoolProducerTest.php new file mode 100644 index 000000000..c9d76efd6 --- /dev/null +++ b/pkg/enqueue/Tests/Client/SpoolProducerTest.php @@ -0,0 +1,78 @@ +createProducerMock()); + } + + public function testShouldQueueMessageOnSend() + { + $message = new Message(); + + $realProducer = $this->createProducerMock(); + $realProducer + ->expects($this->never()) + ->method('send') + ; + + $producer = new SpoolProducer($realProducer); + $producer->send('foo_topic', $message); + $producer->send('bar_topic', $message); + } + + public function testShouldSendQueuedMessagesOnFlush() + { + $message = new Message(); + $message->setScope('third'); + + $realProducer = $this->createProducerMock(); + $realProducer + ->expects($this->at(0)) + ->method('send') + ->with('foo_topic', 'first') + ; + $realProducer + ->expects($this->at(1)) + ->method('send') + ->with('bar_topic', ['second']) + ; + $realProducer + ->expects($this->at(2)) + ->method('send') + ->with('baz_topic', $this->identicalTo($message)) + ; + + $producer = new SpoolProducer($realProducer); + + $producer->send('foo_topic', 'first'); + $producer->send('bar_topic', ['second']); + $producer->send('baz_topic', $message); + + $producer->flush(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ProducerInterface + */ + protected function createProducerMock() + { + return $this->createMock(ProducerInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php b/pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php new file mode 100644 index 000000000..c06bf220d --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php @@ -0,0 +1,67 @@ +assertClassImplements(EventSubscriberInterface::class, FlushSpoolProducerListener::class); + } + + public function testShouldSubscribeOnKernelTerminateEvent() + { + $events = FlushSpoolProducerListener::getSubscribedEvents(); + + $this->assertInternalType('array', $events); + $this->assertArrayHasKey(KernelEvents::TERMINATE, $events); + + $this->assertEquals('flushMessages', $events[KernelEvents::TERMINATE]); + } + + public function testShouldSubscribeOnConsoleTerminateEvent() + { + $events = FlushSpoolProducerListener::getSubscribedEvents(); + + $this->assertInternalType('array', $events); + $this->assertArrayHasKey(ConsoleEvents::TERMINATE, $events); + + $this->assertEquals('flushMessages', $events[ConsoleEvents::TERMINATE]); + } + + public function testCouldBeConstructedWithSpoolProducerAsFirstArgument() + { + new FlushSpoolProducerListener($this->createSpoolProducerMock()); + } + + public function testShouldFlushSpoolProducerOnFlushMessagesCall() + { + $producerMock = $this->createSpoolProducerMock(); + $producerMock + ->expects($this->once()) + ->method('flush') + ; + + $listener = new FlushSpoolProducerListener($producerMock); + + $listener->flushMessages(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|SpoolProducer + */ + private function createSpoolProducerMock() + { + return $this->createMock(SpoolProducer::class); + } +} diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index a47952a47..e09f39af3 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -21,6 +21,7 @@ "symfony/console": "^2.8|^3", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3", + "symfony/event-dispatcher": "^2.8|^3", "enqueue/amqp-ext": "^0.4", "enqueue/fs": "^0.4", "enqueue/test": "^0.4", From 92ec5714839171bcfb52e79646310abb314945ab Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 18 May 2017 17:49:37 +0300 Subject: [PATCH 2/3] add docs. --- docs/bundle/message_producer.md | 51 +++++++++++++++++++++++++++++++++ docs/index.md | 1 + 2 files changed, 52 insertions(+) create mode 100644 docs/bundle/message_producer.md diff --git a/docs/bundle/message_producer.md b/docs/bundle/message_producer.md new file mode 100644 index 000000000..6efad8296 --- /dev/null +++ b/docs/bundle/message_producer.md @@ -0,0 +1,51 @@ +# Message producer + +You can choose how to send messages either using a transport directly or with the client. +Transport gives you the access to all transport specific features so you can tune things where the client provides you with easy to use abstraction. + +## Transport + +```php +get('enqueue.transport.context'); + +$context->createProducer()->send( + $context->createQueue('a_queue'), + $context->createMessage('Hello there!') +); +``` + +## Client + +The client is shipped with two types of producers. The first one sends messages immediately +where another one (it is called spool producer) collects them in memory and sends them `onTerminate` event (the response is already sent). + + + +```php +get('enqueue.producer'); + +// message is being sent right now +$producer->send('a_topic', 'Hello there!'); + + +/** @var \Enqueue\Client\SpoolProducer $spoolProducer */ +$spoolProducer = $container->get('enqueue.spool_producer'); + +// message is being sent on console.terminate or kernel.terminate event +$spoolProducer->send('a_topic', 'Hello there!'); + +// you could send queued messages manually by calling flush method +$spoolProducer->flush(); +``` + +[back to index](../index.md) diff --git a/docs/index.md b/docs/index.md index 40f7f95bf..76632d373 100644 --- a/docs/index.md +++ b/docs/index.md @@ -25,6 +25,7 @@ - [Quick tour](bundle/quick_tour.md) - [Config reference](bundle/config_reference.md) - [Cli commands](bundle/cli_commands.md) + - [Message producer](bundle/message_producer.md) - [Message processor](bundle/message_processor.md) - [Job queue](bundle/job_queue.md) - [Consumption extension](bundle/consumption_extension.md) From ce54144be3a97102a74fe43bdc759707b0f96643 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 18 May 2017 17:50:28 +0300 Subject: [PATCH 3/3] fix phpstan issue. --- .../Tests/Symfony/Client/FlushSpoolProducerListenerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php b/pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php index c06bf220d..a1fe06e7a 100644 --- a/pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/FlushSpoolProducerListenerTest.php @@ -1,6 +1,6 @@