Skip to content

Commit 3c55df5

Browse files
committed
[fs] Add tests for driver and transport factory.
1 parent acb309c commit 3c55df5

File tree

6 files changed

+537
-5
lines changed

6 files changed

+537
-5
lines changed

pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,14 @@ public function testShouldCreateDriver()
121121

122122
$driver = $container->getDefinition($serviceId);
123123
$this->assertSame(AmqpDriver::class, $driver->getClass());
124+
125+
$this->assertInstanceOf(Reference::class, $driver->getArgument(0));
126+
$this->assertEquals('enqueue.transport.amqp.context', (string) $driver->getArgument(0));
127+
128+
$this->assertInstanceOf(Reference::class, $driver->getArgument(1));
129+
$this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1));
130+
131+
$this->assertInstanceOf(Reference::class, $driver->getArgument(2));
132+
$this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2));
124133
}
125134
}

pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
1212
use Enqueue\Bundle\DependencyInjection\EnqueueExtension;
1313
use Enqueue\Bundle\EnqueueBundle;
14+
use Enqueue\Fs\Symfony\FsTransportFactory;
1415
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
1516
use Enqueue\Stomp\Symfony\StompTransportFactory;
1617
use Enqueue\Symfony\DefaultTransportFactory;
@@ -139,6 +140,23 @@ public function testShouldRegisterAmqpAndRabbitMqAmqpTransportFactories()
139140
$bundle->build($container);
140141
}
141142

143+
public function testShouldRegisterFSTransportFactory()
144+
{
145+
$extensionMock = $this->createEnqueueExtensionMock();
146+
147+
$container = new ContainerBuilder();
148+
$container->registerExtension($extensionMock);
149+
150+
$extensionMock
151+
->expects($this->at(6))
152+
->method('addTransportFactory')
153+
->with($this->isInstanceOf(FsTransportFactory::class))
154+
;
155+
156+
$bundle = new EnqueueBundle();
157+
$bundle->build($container);
158+
}
159+
142160
/**
143161
* @return \PHPUnit_Framework_MockObject_MockObject|EnqueueExtension
144162
*/

pkg/fs/Client/FsDriver.php

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use Enqueue\Client\Config;
66
use Enqueue\Client\DriverInterface;
77
use Enqueue\Client\Message;
8+
use Enqueue\Client\MessagePriority;
9+
use Enqueue\Client\Meta\QueueMetaRegistry;
810
use Enqueue\Fs\FsContext;
911
use Enqueue\Fs\FsDestination;
1012
use Enqueue\Fs\FsMessage;
@@ -25,13 +27,20 @@ class FsDriver implements DriverInterface
2527
private $config;
2628

2729
/**
28-
* @param FsContext $context
29-
* @param Config $config
30+
* @var QueueMetaRegistry
3031
*/
31-
public function __construct(FsContext $context, Config $config)
32+
private $queueMetaRegistry;
33+
34+
/**
35+
* @param FsContext $context
36+
* @param Config $config
37+
* @param QueueMetaRegistry $queueMetaRegistry
38+
*/
39+
public function __construct(FsContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
3240
{
3341
$this->context = $context;
3442
$this->config = $config;
43+
$this->queueMetaRegistry = $queueMetaRegistry;
3544
}
3645

3746
/**
@@ -74,8 +83,27 @@ public function sendToProcessor(Message $message)
7483
public function setupBroker(LoggerInterface $logger = null)
7584
{
7685
$logger = $logger ?: new NullLogger();
86+
$log = function ($text, ...$args) use ($logger) {
87+
$logger->debug(sprintf('[FsDriver] '.$text, ...$args));
88+
};
89+
90+
// setup router
91+
$routerTopic = $this->createRouterTopic();
92+
$routerQueue = $this->createQueue($this->config->getRouterQueueName());
7793

78-
// TODO: implement it;
94+
$log('Declare router exchange "%s" file: %s', $routerTopic->getTopicName(), $routerTopic->getFileInfo());
95+
$this->context->declareDestination($routerTopic);
96+
97+
$log('Declare router queue "%s" file: %s', $routerQueue->getQueueName(), $routerTopic->getFileInfo());
98+
$this->context->declareDestination($routerQueue);
99+
100+
// setup queues
101+
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
102+
$queue = $this->createQueue($meta->getClientName());
103+
104+
$log('Declare processor queue "%s" file: %s', $queue->getQueueName(), $queue->getFileInfo());
105+
$this->context->declareDestination($queue);
106+
}
79107
}
80108

81109
/**
@@ -126,6 +154,7 @@ public function createClientMessage(TransportMessage $message)
126154
$clientMessage->setContentType($message->getHeader('content_type'));
127155
$clientMessage->setMessageId($message->getMessageId());
128156
$clientMessage->setTimestamp($message->getTimestamp());
157+
$clientMessage->setPriority(MessagePriority::NORMAL);
129158

130159
return $clientMessage;
131160
}

pkg/fs/Symfony/FsTransportFactory.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,15 @@ public function addConfiguration(ArrayNodeDefinition $builder)
3838
->cannotBeEmpty()
3939
->info('The store directory where all queue\topics files will be created and messages are stored')
4040
->end()
41-
->integerNode('pre_fetch_count')
41+
->integerNode('pre_fetch_count')
4242
->min(1)
43+
->defaultValue(1)
4344
->info('The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose.')
4445
->end()
46+
->integerNode('chmod')
47+
->defaultValue(0600)
48+
->info('The queue files are created with this given permissions if not exist.')
49+
->end()
4550
;
4651
}
4752

@@ -84,6 +89,7 @@ public function createDriver(ContainerBuilder $container, array $config)
8489
$driver->setArguments([
8590
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
8691
new Reference('enqueue.client.config'),
92+
new Reference('enqueue.client.meta.queue_meta_registry'),
8793
]);
8894

8995
$driverId = sprintf('enqueue.client.%s.driver', $this->getName());

0 commit comments

Comments
 (0)