Skip to content

Commit cbcfd10

Browse files
authored
Merge pull request #59 from php-enqueue/redis-client
[client] Redis driver
2 parents 79ba06c + c892c61 commit cbcfd10

25 files changed

+1324
-176
lines changed

Diff for: pkg/enqueue-bundle/EnqueueBundle.php

+6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
1414
use Enqueue\Fs\FsContext;
1515
use Enqueue\Fs\Symfony\FsTransportFactory;
16+
use Enqueue\Redis\RedisContext;
17+
use Enqueue\Redis\Symfony\RedisTransportFactory;
1618
use Enqueue\Stomp\StompContext;
1719
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
1820
use Enqueue\Stomp\Symfony\StompTransportFactory;
@@ -52,5 +54,9 @@ public function build(ContainerBuilder $container)
5254
if (class_exists(FsContext::class)) {
5355
$extension->addTransportFactory(new FsTransportFactory());
5456
}
57+
58+
if (class_exists(RedisContext::class)) {
59+
$extension->addTransportFactory(new RedisTransportFactory());
60+
}
5561
}
5662
}

Diff for: pkg/enqueue-bundle/Tests/Functional/ConsumeMessagesCommandTest.php

-91
Original file line numberDiff line numberDiff line change
@@ -2,108 +2,17 @@
22

33
namespace Enqueue\Bundle\Tests\Functional;
44

5-
use Enqueue\AmqpExt\AmqpMessage;
6-
use Enqueue\Bundle\Tests\Functional\App\AmqpAppKernel;
7-
use Enqueue\Client\ProducerInterface;
85
use Enqueue\Symfony\Client\ConsumeMessagesCommand;
9-
use Enqueue\Test\RabbitmqManagmentExtensionTrait;
10-
use Symfony\Component\Console\Tester\CommandTester;
116

127
/**
138
* @group functional
149
*/
1510
class ConsumeMessagesCommandTest extends WebTestCase
1611
{
17-
use RabbitmqManagmentExtensionTrait;
18-
19-
public function setUp()
20-
{
21-
parent::setUp();
22-
23-
$this->removeExchange('amqp.test');
24-
$this->removeQueue('amqp.app.test');
25-
26-
$driver = $this->container->get('enqueue.client.driver');
27-
$driver->setupBroker();
28-
}
29-
3012
public function testCouldBeGetFromContainerAsService()
3113
{
3214
$command = $this->container->get('enqueue.client.consume_messages_command');
3315

3416
$this->assertInstanceOf(ConsumeMessagesCommand::class, $command);
3517
}
36-
37-
public function testClientConsumeMessagesCommandShouldConsumeMessage()
38-
{
39-
$command = $this->container->get('enqueue.client.consume_messages_command');
40-
$processor = $this->container->get('test.message.processor');
41-
42-
$this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body');
43-
44-
$tester = new CommandTester($command);
45-
$tester->execute([
46-
'--message-limit' => 2,
47-
'--time-limit' => 'now +10 seconds',
48-
]);
49-
50-
$this->assertInstanceOf(AmqpMessage::class, $processor->message);
51-
$this->assertEquals('test message body', $processor->message->getBody());
52-
}
53-
54-
public function testClientConsumeMessagesFromExplicitlySetQueue()
55-
{
56-
$command = $this->container->get('enqueue.client.consume_messages_command');
57-
$processor = $this->container->get('test.message.processor');
58-
59-
$this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body');
60-
61-
$tester = new CommandTester($command);
62-
$tester->execute([
63-
'--message-limit' => 2,
64-
'--time-limit' => 'now +10 seconds',
65-
'client-queue-names' => ['test'],
66-
]);
67-
68-
$this->assertInstanceOf(AmqpMessage::class, $processor->message);
69-
$this->assertEquals('test message body', $processor->message->getBody());
70-
}
71-
72-
public function testTransportConsumeMessagesCommandShouldConsumeMessage()
73-
{
74-
$command = $this->container->get('enqueue.command.consume_messages');
75-
$command->setContainer($this->container);
76-
$processor = $this->container->get('test.message.processor');
77-
78-
$this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body');
79-
80-
$tester = new CommandTester($command);
81-
$tester->execute([
82-
'--message-limit' => 1,
83-
'--time-limit' => '+10sec',
84-
'queue' => 'amqp.app.test',
85-
'processor-service' => 'test.message.processor',
86-
]);
87-
88-
$this->assertInstanceOf(AmqpMessage::class, $processor->message);
89-
$this->assertEquals('test message body', $processor->message->getBody());
90-
}
91-
92-
/**
93-
* @return string
94-
*/
95-
public static function getKernelClass()
96-
{
97-
include_once __DIR__.'/app/AmqpAppKernel.php';
98-
99-
return AmqpAppKernel::class;
100-
}
101-
102-
/**
103-
* @return ProducerInterface|object
104-
*/
105-
private function getMessageProducer()
106-
{
107-
return $this->container->get('enqueue.client.producer');
108-
}
10918
}

Diff for: pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php

+214
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional;
4+
5+
use Enqueue\Bundle\Tests\Functional\App\CustomAppKernel;
6+
use Enqueue\Client\DriverInterface;
7+
use Enqueue\Client\ProducerInterface;
8+
use Enqueue\Psr\PsrContext;
9+
use Enqueue\Psr\PsrMessage;
10+
use Symfony\Component\Console\Tester\CommandTester;
11+
12+
/**
13+
* @group functional
14+
*/
15+
class UseCasesTest extends WebTestCase
16+
{
17+
public function provideEnqueueConfigs()
18+
{
19+
return [
20+
['amqp' => [
21+
'transport' => [
22+
'default' => 'amqp',
23+
'amqp' => [
24+
'host' => getenv('SYMFONY__RABBITMQ__HOST'),
25+
'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'),
26+
'login' => getenv('SYMFONY__RABBITMQ__USER'),
27+
'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
28+
'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
29+
'lazy' => false,
30+
]
31+
]
32+
]],
33+
['stomp' => [
34+
'transport' => [
35+
'default' => 'stomp',
36+
'stomp' => [
37+
'host' => getenv('SYMFONY__RABBITMQ__HOST'),
38+
'port' => getenv('SYMFONY__RABBITMQ__STOMP__PORT'),
39+
'login' => getenv('SYMFONY__RABBITMQ__USER'),
40+
'password' => getenv('SYMFONY__RABBITMQ__PASSWORD'),
41+
'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'),
42+
'lazy' => false,
43+
]
44+
]
45+
]],
46+
['predis' => [
47+
'transport' => [
48+
'default' => 'redis',
49+
'redis' => [
50+
'host' => getenv('SYMFONY__REDIS__HOST'),
51+
'port' => (int) getenv('SYMFONY__REDIS__PORT'),
52+
'vendor' => 'predis',
53+
'lazy' => false,
54+
]
55+
]
56+
]],
57+
['phpredis' => [
58+
'transport' => [
59+
'default' => 'redis',
60+
'redis' => [
61+
'host' => getenv('SYMFONY__REDIS__HOST'),
62+
'port' => (int) getenv('SYMFONY__REDIS__PORT'),
63+
'vendor' => 'phpredis',
64+
'lazy' => false,
65+
]
66+
]
67+
]],
68+
['fs' => [
69+
'transport' => [
70+
'default' => 'fs',
71+
'fs' => [
72+
'store_dir' => sys_get_temp_dir(),
73+
]
74+
]
75+
]]
76+
];
77+
}
78+
79+
/**
80+
* @dataProvider provideEnqueueConfigs
81+
*/
82+
public function testProducerSendsMessage(array $enqueueConfig)
83+
{
84+
$this->customSetUp($enqueueConfig);
85+
86+
$this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body');
87+
88+
$queue = $this->getPsrContext()->createQueue('enqueue.test');
89+
90+
$consumer = $this->getPsrContext()->createConsumer($queue);
91+
92+
$message = $consumer->receive(100);
93+
94+
$this->assertInstanceOf(PsrMessage::class, $message);
95+
$this->assertSame('test message body', $message->getBody());
96+
}
97+
98+
/**
99+
* @dataProvider provideEnqueueConfigs
100+
*/
101+
public function testClientConsumeMessagesFromExplicitlySetQueue(array $enqueueConfig)
102+
{
103+
$this->customSetUp($enqueueConfig);
104+
105+
$command = $this->container->get('enqueue.client.consume_messages_command');
106+
$processor = $this->container->get('test.message.processor');
107+
108+
$this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body');
109+
110+
$tester = new CommandTester($command);
111+
$tester->execute([
112+
'--message-limit' => 2,
113+
'--time-limit' => 'now +10 seconds',
114+
'client-queue-names' => ['test'],
115+
]);
116+
117+
$this->assertInstanceOf(PsrMessage::class, $processor->message);
118+
$this->assertEquals('test message body', $processor->message->getBody());
119+
}
120+
121+
/**
122+
* @dataProvider provideEnqueueConfigs
123+
*/
124+
public function testTransportConsumeMessagesCommandShouldConsumeMessage(array $enqueueConfig)
125+
{
126+
$this->customSetUp($enqueueConfig);
127+
128+
$command = $this->container->get('enqueue.command.consume_messages');
129+
$command->setContainer($this->container);
130+
$processor = $this->container->get('test.message.processor');
131+
132+
$this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body');
133+
134+
$tester = new CommandTester($command);
135+
$tester->execute([
136+
'--message-limit' => 1,
137+
'--time-limit' => '+10sec',
138+
'queue' => 'enqueue.test',
139+
'processor-service' => 'test.message.processor',
140+
]);
141+
142+
$this->assertInstanceOf(PsrMessage::class, $processor->message);
143+
$this->assertEquals('test message body', $processor->message->getBody());
144+
}
145+
146+
/**
147+
* @return ProducerInterface|object
148+
*/
149+
private function getMessageProducer()
150+
{
151+
return $this->container->get('enqueue.client.producer');
152+
}
153+
154+
/**
155+
* @return PsrContext|object
156+
*/
157+
private function getPsrContext()
158+
{
159+
return $this->container->get('enqueue.transport.context');
160+
}
161+
162+
protected function customSetUp(array $enqueueConfig)
163+
{
164+
static::$class = null;
165+
166+
$this->client = static::createClient(['enqueue_config' => $enqueueConfig]);
167+
$this->client->getKernel()->boot();
168+
$this->container = static::$kernel->getContainer();
169+
170+
/** @var DriverInterface $driver */
171+
$driver = $this->container->get('enqueue.client.driver');
172+
$context = $this->getPsrContext();
173+
174+
$queue = $driver->createQueue('test');
175+
176+
//guard
177+
$this->assertEquals('enqueue.test', $queue->getQueueName());
178+
179+
if (method_exists($context, 'deleteQueue')) {
180+
$context->deleteQueue($queue);
181+
}
182+
183+
$driver->setupBroker();
184+
}
185+
186+
/**
187+
* {@inheritdoc}
188+
*/
189+
protected static function createKernel(array $options = array())
190+
{
191+
/** @var CustomAppKernel $kernel */
192+
$kernel = parent::createKernel($options);
193+
194+
$kernel->setEnqueueConfig(isset($options['enqueue_config']) ? $options['enqueue_config'] : []);
195+
196+
return $kernel;
197+
}
198+
199+
/**
200+
* @return string
201+
*/
202+
public static function getKernelClass()
203+
{
204+
include_once __DIR__.'/app/CustomAppKernel.php';
205+
206+
return CustomAppKernel::class;
207+
}
208+
209+
public function setUp()
210+
{
211+
// do not call parent::setUp.
212+
// parent::setUp();
213+
}
214+
}

Diff for: pkg/enqueue-bundle/Tests/Functional/app/AmqpAppKernel.php

-53
This file was deleted.

0 commit comments

Comments
 (0)