Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: php-enqueue/enqueue-dev
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 0.4.4
Choose a base ref
...
head repository: php-enqueue/enqueue-dev
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0.4.5
Choose a head ref
  • 5 commits
  • 7 files changed
  • 1 contributor

Commits on May 22, 2017

  1. Copy the full SHA
    8d0c38b View commit details
  2. Copy the full SHA
    d10e6fb View commit details
  3. Copy the full SHA
    b8a3ce4 View commit details
  4. Merge pull request #95 from php-enqueue/async-event-subscribers

    Symfony. Async event subscriber.
    makasim authored May 22, 2017
    Copy the full SHA
    8ca3351 View commit details
  5. Release 0.4.5

    makasim committed May 22, 2017
    Copy the full SHA
    14e3e44 View commit details
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log

## [0.4.5](https://github.com/php-enqueue/enqueue-dev/tree/0.4.5) (2017-05-22)
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.4.4...0.4.5)

- Symfony async events. Support event subscribers. [\#94](https://github.com/php-enqueue/enqueue-dev/issues/94)

- Symfony. Async event subscriber. [\#95](https://github.com/php-enqueue/enqueue-dev/pull/95) ([makasim](https://github.com/makasim))

## [0.4.4](https://github.com/php-enqueue/enqueue-dev/tree/0.4.4) (2017-05-20)
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.4.3...0.4.4)

12 changes: 12 additions & 0 deletions docs/bundle/async_events.md
Original file line number Diff line number Diff line change
@@ -43,6 +43,18 @@ services:
- { name: 'kernel.event_listener', async: true, event: 'foo', method: 'onEvent' }
```
or to `kernel.event_subscriber`:

```yaml
# app/config/config.yml
services:
test_async_subscriber:
class: 'AcmeBundle\Listener\TestAsyncSubscriber'
tags:
- { name: 'kernel.event_subscriber', async: true }
```

That's basically it. The rest of the doc describes advanced features.

## Advanced Usage.
45 changes: 41 additions & 4 deletions pkg/enqueue-bundle/Events/DependencyInjection/AsyncEventsPass.php
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\EventDispatcher\DependencyInjection\RegisterListenersPass;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

class AsyncEventsPass implements CompilerPassInterface
{
@@ -28,26 +29,62 @@ public function process(ContainerBuilder $container)
continue;
}

$event = $tagAttribute['event'];

$service = $container->getDefinition($serviceId);

$service->clearTag('kernel.event_listener');
$service->addTag('enqueue.async_event_listener', $tagAttribute);

if (false == isset($registeredToEvent[$tagAttribute['event']])) {
if (false == isset($registeredToEvent[$event])) {
$container->getDefinition('enqueue.events.async_listener')
->addTag('kernel.event_listener', [
'event' => $tagAttribute['event'],
'event' => $event,
'method' => 'onEvent',
])
;

$container->getDefinition('enqueue.events.async_processor')
->addTag('enqueue.client.processor', [
'topicName' => 'event.'.$tagAttribute['event'],
'topicName' => 'event.'.$event,
])
;

$registeredToEvent[$tagAttribute['event']] = true;
$registeredToEvent[$event] = true;
}
}
}

foreach ($container->findTaggedServiceIds('kernel.event_subscriber') as $serviceId => $tagAttributes) {
foreach ($tagAttributes as $tagAttribute) {
if (false == isset($tagAttribute['async'])) {
continue;
}

$service = $container->getDefinition($serviceId);
$service->clearTag('kernel.event_subscriber');
$service->addTag('enqueue.async_event_subscriber', $tagAttribute);

/** @var EventSubscriberInterface $serviceClass */
$serviceClass = $service->getClass();

foreach ($serviceClass::getSubscribedEvents() as $event => $data) {
if (false == isset($registeredToEvent[$event])) {
$container->getDefinition('enqueue.events.async_listener')
->addTag('kernel.event_listener', [
'event' => $event,
'method' => 'onEvent',
])
;

$container->getDefinition('enqueue.events.async_processor')
->addTag('enqueue.client.processor', [
'topicName' => 'event.'.$event,
])
;

$registeredToEvent[$event] = true;
}
}
}
}
20 changes: 20 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/App/TestAsyncSubscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Enqueue\Bundle\Tests\Functional\App;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;

class TestAsyncSubscriber implements EventSubscriberInterface
{
public $calls = [];

public function onEvent()
{
$this->calls[] = func_get_args();
}

public static function getSubscribedEvents()
{
return ['test_async_subscriber' => 'onEvent'];
}
}
6 changes: 6 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/App/config/config.yml
Original file line number Diff line number Diff line change
@@ -47,7 +47,13 @@ services:
tags:
- { name: 'kernel.event_listener', async: true, event: 'test_async', method: 'onEvent' }

test_async_subscriber:
class: 'Enqueue\Bundle\Tests\Functional\App\TestAsyncSubscriber'
tags:
- { name: 'kernel.event_subscriber', async: true }

test_async_event_transformer:
class: 'Enqueue\Bundle\Tests\Functional\App\TestAsyncEventTransformer'
tags:
- {name: 'enqueue.event_transformer', eventName: 'test_async', transformerName: 'test_async' }
- {name: 'enqueue.event_transformer', eventName: 'test_async_subscriber', transformerName: 'test_async' }
32 changes: 32 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
use Enqueue\Bundle\Events\AsyncListener;
use Enqueue\Bundle\Events\AsyncProcessor;
use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener;
use Enqueue\Bundle\Tests\Functional\App\TestAsyncSubscriber;
use Enqueue\Bundle\Tests\Functional\WebTestCase;
use Enqueue\Null\NullContext;
use Enqueue\Null\NullMessage;
@@ -86,4 +87,35 @@ public function testShouldCallRealListener()
$listener->calls[0][2]
);
}

public function testShouldCallRealSubscriber()
{
/** @var AsyncProcessor $processor */
$processor = $this->container->get('enqueue.events.async_processor');

$message = new NullMessage();
$message->setProperty('event_name', 'test_async_subscriber');
$message->setProperty('transformer_name', 'test_async');
$message->setBody(JSON::encode([
'subject' => 'theSubject',
'arguments' => ['fooArg' => 'fooVal'],
]));

$this->assertEquals(PsrProcessor::ACK, $processor->process($message, new NullContext()));

/** @var TestAsyncSubscriber $subscriber */
$subscriber = $this->container->get('test_async_subscriber');

$this->assertNotEmpty($subscriber->calls);

$this->assertInstanceOf(GenericEvent::class, $subscriber->calls[0][0]);
$this->assertEquals('theSubject', $subscriber->calls[0][0]->getSubject());
$this->assertEquals(['fooArg' => 'fooVal'], $subscriber->calls[0][0]->getArguments());
$this->assertEquals('test_async_subscriber', $subscriber->calls[0][1]);

$this->assertSame(
$this->container->get('enqueue.events.event_dispatcher'),
$subscriber->calls[0][2]
);
}
}
96 changes: 96 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?php

namespace Enqueue\Bundle\Tests\Functional\Events;

use Enqueue\Bundle\Events\AsyncListener;
use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener;
use Enqueue\Bundle\Tests\Functional\WebTestCase;
use Enqueue\Client\TraceableProducer;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\GenericEvent;

/**
* @group functional
*/
class AsyncSubscriberTest extends WebTestCase
{
public function setUp()
{
parent::setUp();

/** @var AsyncListener $asyncListener */
$asyncListener = $this->container->get('enqueue.events.async_listener');

$asyncListener->resetSyncMode();
}

public function testShouldNotCallRealSubscriberIfMarkedAsAsync()
{
/** @var EventDispatcherInterface $dispatcher */
$dispatcher = $this->container->get('event_dispatcher');

$dispatcher->dispatch('test_async_subscriber', new GenericEvent('aSubject'));

/** @var TestAsyncListener $listener */
$listener = $this->container->get('test_async_subscriber');

$this->assertEmpty($listener->calls);
}

public function testShouldSendMessageToExpectedTopicInsteadOfCallingRealSubscriber()
{
/** @var EventDispatcherInterface $dispatcher */
$dispatcher = $this->container->get('event_dispatcher');

$event = new GenericEvent('theSubject', ['fooArg' => 'fooVal']);

$dispatcher->dispatch('test_async_subscriber', $event);

/** @var TraceableProducer $producer */
$producer = $this->container->get('enqueue.producer');

$traces = $producer->getTopicTraces('event.test_async_subscriber');

$this->assertCount(1, $traces);

$this->assertEquals('event.test_async_subscriber', $traces[0]['topic']);
$this->assertEquals('{"subject":"theSubject","arguments":{"fooArg":"fooVal"}}', $traces[0]['body']);
}

public function testShouldSendMessageForEveryDispatchCall()
{
/** @var EventDispatcherInterface $dispatcher */
$dispatcher = $this->container->get('event_dispatcher');

$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));

/** @var TraceableProducer $producer */
$producer = $this->container->get('enqueue.producer');

$traces = $producer->getTopicTraces('event.test_async_subscriber');

$this->assertCount(3, $traces);
}

public function testShouldSendMessageIfDispatchedFromInsideListener()
{
/** @var EventDispatcherInterface $dispatcher */
$dispatcher = $this->container->get('event_dispatcher');

$dispatcher->addListener('foo', function (Event $event, $eventName, EventDispatcherInterface $dispatcher) {
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));
});

$dispatcher->dispatch('foo');

/** @var TraceableProducer $producer */
$producer = $this->container->get('enqueue.producer');

$traces = $producer->getTopicTraces('event.test_async_subscriber');

$this->assertCount(1, $traces);
}
}