Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e4ffb6c

Browse files
authoredMay 22, 2017
Merge pull request #95 from php-enqueue/async-event-subscribers
Symfony. Async event subscriber.
2 parents 2baa704 + 26ecb89 commit e4ffb6c

File tree

6 files changed

+207
-4
lines changed

6 files changed

+207
-4
lines changed
 

‎docs/bundle/async_events.md

+12
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ services:
4343
- { name: 'kernel.event_listener', async: true, event: 'foo', method: 'onEvent' }
4444
```
4545
46+
or to `kernel.event_subscriber`:
47+
48+
```yaml
49+
# app/config/config.yml
50+
51+
services:
52+
test_async_subscriber:
53+
class: 'AcmeBundle\Listener\TestAsyncSubscriber'
54+
tags:
55+
- { name: 'kernel.event_subscriber', async: true }
56+
```
57+
4658
That's basically it. The rest of the doc describes advanced features.
4759

4860
## Advanced Usage.

‎pkg/enqueue-bundle/Events/DependencyInjection/AsyncEventsPass.php

+41-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
66
use Symfony\Component\DependencyInjection\ContainerBuilder;
77
use Symfony\Component\EventDispatcher\DependencyInjection\RegisterListenersPass;
8+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
89

910
class AsyncEventsPass implements CompilerPassInterface
1011
{
@@ -28,26 +29,62 @@ public function process(ContainerBuilder $container)
2829
continue;
2930
}
3031

32+
$event = $tagAttribute['event'];
33+
3134
$service = $container->getDefinition($serviceId);
3235

3336
$service->clearTag('kernel.event_listener');
3437
$service->addTag('enqueue.async_event_listener', $tagAttribute);
3538

36-
if (false == isset($registeredToEvent[$tagAttribute['event']])) {
39+
if (false == isset($registeredToEvent[$event])) {
3740
$container->getDefinition('enqueue.events.async_listener')
3841
->addTag('kernel.event_listener', [
39-
'event' => $tagAttribute['event'],
42+
'event' => $event,
4043
'method' => 'onEvent',
4144
])
4245
;
4346

4447
$container->getDefinition('enqueue.events.async_processor')
4548
->addTag('enqueue.client.processor', [
46-
'topicName' => 'event.'.$tagAttribute['event'],
49+
'topicName' => 'event.'.$event,
4750
])
4851
;
4952

50-
$registeredToEvent[$tagAttribute['event']] = true;
53+
$registeredToEvent[$event] = true;
54+
}
55+
}
56+
}
57+
58+
foreach ($container->findTaggedServiceIds('kernel.event_subscriber') as $serviceId => $tagAttributes) {
59+
foreach ($tagAttributes as $tagAttribute) {
60+
if (false == isset($tagAttribute['async'])) {
61+
continue;
62+
}
63+
64+
$service = $container->getDefinition($serviceId);
65+
$service->clearTag('kernel.event_subscriber');
66+
$service->addTag('enqueue.async_event_subscriber', $tagAttribute);
67+
68+
/** @var EventSubscriberInterface $serviceClass */
69+
$serviceClass = $service->getClass();
70+
71+
foreach ($serviceClass::getSubscribedEvents() as $event => $data) {
72+
if (false == isset($registeredToEvent[$event])) {
73+
$container->getDefinition('enqueue.events.async_listener')
74+
->addTag('kernel.event_listener', [
75+
'event' => $event,
76+
'method' => 'onEvent',
77+
])
78+
;
79+
80+
$container->getDefinition('enqueue.events.async_processor')
81+
->addTag('enqueue.client.processor', [
82+
'topicName' => 'event.'.$event,
83+
])
84+
;
85+
86+
$registeredToEvent[$event] = true;
87+
}
5188
}
5289
}
5390
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional\App;
4+
5+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
6+
7+
class TestAsyncSubscriber implements EventSubscriberInterface
8+
{
9+
public $calls = [];
10+
11+
public function onEvent()
12+
{
13+
$this->calls[] = func_get_args();
14+
}
15+
16+
public static function getSubscribedEvents()
17+
{
18+
return ['test_async_subscriber' => 'onEvent'];
19+
}
20+
}

‎pkg/enqueue-bundle/Tests/Functional/App/config/config.yml

+6
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,13 @@ services:
4747
tags:
4848
- { name: 'kernel.event_listener', async: true, event: 'test_async', method: 'onEvent' }
4949

50+
test_async_subscriber:
51+
class: 'Enqueue\Bundle\Tests\Functional\App\TestAsyncSubscriber'
52+
tags:
53+
- { name: 'kernel.event_subscriber', async: true }
54+
5055
test_async_event_transformer:
5156
class: 'Enqueue\Bundle\Tests\Functional\App\TestAsyncEventTransformer'
5257
tags:
5358
- {name: 'enqueue.event_transformer', eventName: 'test_async', transformerName: 'test_async' }
59+
- {name: 'enqueue.event_transformer', eventName: 'test_async_subscriber', transformerName: 'test_async' }

‎pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php

+32
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\Bundle\Events\AsyncListener;
66
use Enqueue\Bundle\Events\AsyncProcessor;
77
use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener;
8+
use Enqueue\Bundle\Tests\Functional\App\TestAsyncSubscriber;
89
use Enqueue\Bundle\Tests\Functional\WebTestCase;
910
use Enqueue\Null\NullContext;
1011
use Enqueue\Null\NullMessage;
@@ -86,4 +87,35 @@ public function testShouldCallRealListener()
8687
$listener->calls[0][2]
8788
);
8889
}
90+
91+
public function testShouldCallRealSubscriber()
92+
{
93+
/** @var AsyncProcessor $processor */
94+
$processor = $this->container->get('enqueue.events.async_processor');
95+
96+
$message = new NullMessage();
97+
$message->setProperty('event_name', 'test_async_subscriber');
98+
$message->setProperty('transformer_name', 'test_async');
99+
$message->setBody(JSON::encode([
100+
'subject' => 'theSubject',
101+
'arguments' => ['fooArg' => 'fooVal'],
102+
]));
103+
104+
$this->assertEquals(PsrProcessor::ACK, $processor->process($message, new NullContext()));
105+
106+
/** @var TestAsyncSubscriber $subscriber */
107+
$subscriber = $this->container->get('test_async_subscriber');
108+
109+
$this->assertNotEmpty($subscriber->calls);
110+
111+
$this->assertInstanceOf(GenericEvent::class, $subscriber->calls[0][0]);
112+
$this->assertEquals('theSubject', $subscriber->calls[0][0]->getSubject());
113+
$this->assertEquals(['fooArg' => 'fooVal'], $subscriber->calls[0][0]->getArguments());
114+
$this->assertEquals('test_async_subscriber', $subscriber->calls[0][1]);
115+
116+
$this->assertSame(
117+
$this->container->get('enqueue.events.event_dispatcher'),
118+
$subscriber->calls[0][2]
119+
);
120+
}
89121
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional\Events;
4+
5+
use Enqueue\Bundle\Events\AsyncListener;
6+
use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener;
7+
use Enqueue\Bundle\Tests\Functional\WebTestCase;
8+
use Enqueue\Client\TraceableProducer;
9+
use Symfony\Component\EventDispatcher\Event;
10+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
11+
use Symfony\Component\EventDispatcher\GenericEvent;
12+
13+
/**
14+
* @group functional
15+
*/
16+
class AsyncSubscriberTest extends WebTestCase
17+
{
18+
public function setUp()
19+
{
20+
parent::setUp();
21+
22+
/** @var AsyncListener $asyncListener */
23+
$asyncListener = $this->container->get('enqueue.events.async_listener');
24+
25+
$asyncListener->resetSyncMode();
26+
}
27+
28+
public function testShouldNotCallRealSubscriberIfMarkedAsAsync()
29+
{
30+
/** @var EventDispatcherInterface $dispatcher */
31+
$dispatcher = $this->container->get('event_dispatcher');
32+
33+
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('aSubject'));
34+
35+
/** @var TestAsyncListener $listener */
36+
$listener = $this->container->get('test_async_subscriber');
37+
38+
$this->assertEmpty($listener->calls);
39+
}
40+
41+
public function testShouldSendMessageToExpectedTopicInsteadOfCallingRealSubscriber()
42+
{
43+
/** @var EventDispatcherInterface $dispatcher */
44+
$dispatcher = $this->container->get('event_dispatcher');
45+
46+
$event = new GenericEvent('theSubject', ['fooArg' => 'fooVal']);
47+
48+
$dispatcher->dispatch('test_async_subscriber', $event);
49+
50+
/** @var TraceableProducer $producer */
51+
$producer = $this->container->get('enqueue.producer');
52+
53+
$traces = $producer->getTopicTraces('event.test_async_subscriber');
54+
55+
$this->assertCount(1, $traces);
56+
57+
$this->assertEquals('event.test_async_subscriber', $traces[0]['topic']);
58+
$this->assertEquals('{"subject":"theSubject","arguments":{"fooArg":"fooVal"}}', $traces[0]['body']);
59+
}
60+
61+
public function testShouldSendMessageForEveryDispatchCall()
62+
{
63+
/** @var EventDispatcherInterface $dispatcher */
64+
$dispatcher = $this->container->get('event_dispatcher');
65+
66+
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));
67+
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));
68+
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));
69+
70+
/** @var TraceableProducer $producer */
71+
$producer = $this->container->get('enqueue.producer');
72+
73+
$traces = $producer->getTopicTraces('event.test_async_subscriber');
74+
75+
$this->assertCount(3, $traces);
76+
}
77+
78+
public function testShouldSendMessageIfDispatchedFromInsideListener()
79+
{
80+
/** @var EventDispatcherInterface $dispatcher */
81+
$dispatcher = $this->container->get('event_dispatcher');
82+
83+
$dispatcher->addListener('foo', function (Event $event, $eventName, EventDispatcherInterface $dispatcher) {
84+
$dispatcher->dispatch('test_async_subscriber', new GenericEvent('theSubject', ['fooArg' => 'fooVal']));
85+
});
86+
87+
$dispatcher->dispatch('foo');
88+
89+
/** @var TraceableProducer $producer */
90+
$producer = $this->container->get('enqueue.producer');
91+
92+
$traces = $producer->getTopicTraces('event.test_async_subscriber');
93+
94+
$this->assertCount(1, $traces);
95+
}
96+
}

0 commit comments

Comments
 (0)
Please sign in to comment.