Skip to content

Queue Consumer Options #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions docs/bundle/cli_commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ Usage:
enq:c

Arguments:
client-queue-names Queues to consume messages from
client-queue-names Queues to consume messages from

Options:
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--setup-broker Creates queues, topics, exchanges, binding etc on broker side.
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "dev"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--setup-broker Creates queues, topics, exchanges, binding etc on broker side.
--idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received.
--receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message.
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "test"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
A client's worker that processes messages. By default it connects to default queue. It select an appropriate message processor based on a message headers
Expand Down Expand Up @@ -143,26 +145,28 @@ Help:

```
./bin/console enqueue:transport:consume --help
Usage:
Usage:ng mqdev_gearmand_1 ... done
enqueue:transport:consume [options] [--] <processor-service>

Arguments:
processor-service A message processor service
processor-service A message processor service

Options:
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--queue[=QUEUE] Queues to consume from (multiple values allowed)
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "dev"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
--message-limit=MESSAGE-LIMIT Consume n messages and exit
--time-limit=TIME-LIMIT Consume messages during this time
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
--idle-timeout=IDLE-TIMEOUT The time in milliseconds queue consumer idle if no message has been received.
--receive-timeout=RECEIVE-TIMEOUT The time in milliseconds queue consumer waits for a message.
--queue[=QUEUE] Queues to consume from (multiple values allowed)
-h, --help Display this help message
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi Force ANSI output
--no-ansi Disable ANSI output
-n, --no-interaction Do not ask any interactive question
-e, --env=ENV The environment name [default: "test"]
--no-debug Switches off debug mode
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Help:
A worker that consumes message from a broker. To use this broker you have to explicitly set a queue to consume from and a message processor service
Expand Down
9 changes: 9 additions & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ enqueue:
router_processor: enqueue.client.router_processor
default_processor_queue: default
redelivered_delay_time: 0
consumption:

# the time in milliseconds queue consumer waits if no message received
idle_timeout: 0

# the time in milliseconds queue consumer waits for a message (100 ms by default)
receive_timeout: 100
job: false
async_events:
enabled: false
Expand All @@ -306,6 +313,8 @@ enqueue:
doctrine_clear_identity_map_extension: false
signal_extension: true
reply_extension: true


```

[back to index](../index.md)
12 changes: 12 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public function getConfigTreeBuilder()
->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end()
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
->end()->end()
->arrayNode('consumption')->addDefaultsIfNotSet()->children()
->integerNode('idle_timeout')
->min(0)
->defaultValue(0)
->info('the time in milliseconds queue consumer waits if no message received')
->end()
->integerNode('receive_timeout')
->min(0)
->defaultValue(100)
->info('the time in milliseconds queue consumer waits for a message (100 ms by default)')
->end()
->end()->end()
->booleanNode('job')->defaultFalse()->end()
->arrayNode('async_events')
->addDefaultsIfNotSet()
Expand Down
13 changes: 13 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ public function load(array $configs, ContainerBuilder $container)
}
}

// configure queue consumer
$container->getDefinition('enqueue.consumption.queue_consumer')
->replaceArgument(2, $config['consumption']['idle_timeout'])
->replaceArgument(3, $config['consumption']['receive_timeout'])
;

if ($container->hasDefinition('enqueue.client.queue_consumer')) {
$container->getDefinition('enqueue.client.queue_consumer')
->replaceArgument(2, $config['consumption']['idle_timeout'])
->replaceArgument(3, $config['consumption']['receive_timeout'])
;
}

if ($config['job']) {
if (false == class_exists(Job::class)) {
throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.');
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue-bundle/Resources/config/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ services:
arguments:
- '@enqueue.transport.context'
- '@enqueue.consumption.extensions'
- ~
- ~

enqueue.client.consume_messages_command:
class: 'Enqueue\Symfony\Client\ConsumeMessagesCommand'
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue-bundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
arguments:
- '@enqueue.transport.context'
- '@enqueue.consumption.extensions'
- ~
- ~

enqueue.command.consume_messages:
class: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,4 +423,42 @@ public function testShouldAllowEnableAsyncEvents()
],
], $config);
}

public function testShouldSetDefaultConfigurationForConsumption()
{
$configuration = new Configuration([]);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'transport' => [],
]]);

$this->assertArraySubset([
'consumption' => [
'idle_timeout' => 0,
'receive_timeout' => 100,
],
], $config);
}

public function testShouldAllowConfigureConsumption()
{
$configuration = new Configuration([]);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'transport' => [],
'consumption' => [
'idle_timeout' => 123,
'receive_timeout' => 456,
],
]]);

$this->assertArraySubset([
'consumption' => [
'idle_timeout' => 123,
'receive_timeout' => 456,
],
], $config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,28 @@ public function testShouldNotAddJobQueueEntityMappingIfDoctrineBundleIsNotRegist

$this->assertSame([], $container->getExtensionConfig('doctrine'));
}

public function testShouldConfigureQueueConsumer()
{
$container = new ContainerBuilder();

$extension = new EnqueueExtension();
$extension->load([[
'client' => [],
'transport' => [
],
'consumption' => [
'idle_timeout' => 123,
'receive_timeout' => 456,
],
]], $container);

$def = $container->getDefinition('enqueue.consumption.queue_consumer');
$this->assertSame(123, $def->getArgument(2));
$this->assertSame(456, $def->getArgument(3));

$def = $container->getDefinition('enqueue.client.queue_consumer');
$this->assertSame(123, $def->getArgument(2));
$this->assertSame(456, $def->getArgument(3));
}
}
32 changes: 32 additions & 0 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,38 @@ public function __construct(
$this->boundProcessors = [];
}

/**
* @param int $timeout
*/
public function setIdleTimeout($timeout)
{
$this->idleTimeout = (int) $timeout;
}

/**
* @return int
*/
public function getIdleTimeout()
{
return $this->idleTimeout;
}

/**
* @param int $timeout
*/
public function setReceiveTimeout($timeout)
{
$this->receiveTimeout = (int) $timeout;
}

/**
* @return int
*/
public function getReceiveTimeout()
{
return $this->receiveTimeout;
}

/**
* @return PsrContext
*/
Expand Down
5 changes: 5 additions & 0 deletions pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Enqueue\Consumption\Extension\LoggerExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Symfony\Consumption\LimitsExtensionsCommandTrait;
use Enqueue\Symfony\Consumption\QueueConsumerOptionsCommandTrait;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -19,6 +20,7 @@ class ConsumeMessagesCommand extends Command
{
use LimitsExtensionsCommandTrait;
use SetupBrokerExtensionCommandTrait;
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
Expand Down Expand Up @@ -67,6 +69,7 @@ protected function configure()
{
$this->configureLimitsExtensions();
$this->configureSetupBrokerExtension();
$this->configureQueueConsumerOptions();

$this
->setName('enqueue:consume')
Expand All @@ -83,6 +86,8 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->setQueueConsumerOptions($this->consumer, $input);

$queueMetas = [];
if ($clientQueueNames = $input->getArgument('client-queue-names')) {
foreach ($clientQueueNames as $clientQueueName) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ConsumeMessagesCommand extends Command implements ContainerAwareInterface
{
use ContainerAwareTrait;
use LimitsExtensionsCommandTrait;
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
Expand All @@ -38,6 +39,7 @@ public function __construct(QueueConsumer $consumer)
protected function configure()
{
$this->configureLimitsExtensions();
$this->configureQueueConsumerOptions();

$this
->setName('enqueue:transport:consume')
Expand All @@ -51,6 +53,8 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->setQueueConsumerOptions($this->consumer, $input);

$extensions = $this->getLimitsExtensions($input, $output);
array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ContainerAwareConsumeMessagesCommand extends Command implements ContainerA
{
use ContainerAwareTrait;
use LimitsExtensionsCommandTrait;
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
Expand All @@ -44,6 +45,7 @@ public function __construct(QueueConsumer $consumer)
protected function configure()
{
$this->configureLimitsExtensions();
$this->configureQueueConsumerOptions();

$this
->setName('enqueue:transport:consume')
Expand All @@ -60,6 +62,8 @@ protected function configure()
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->setQueueConsumerOptions($this->consumer, $input);

/** @var PsrProcessor $processor */
$processor = $this->container->get($input->getArgument('processor-service'));
if (false == $processor instanceof PsrProcessor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

namespace Enqueue\Symfony\Consumption;

use Enqueue\Consumption\QueueConsumer;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;

trait QueueConsumerOptionsCommandTrait
{
/**
* {@inheritdoc}
*/
protected function configureQueueConsumerOptions()
{
$this
->addOption('idle-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer idle if no message has been received.')
->addOption('receive-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer waits for a message.')
;
}

/**
* @param QueueConsumer $consumer
* @param InputInterface $input
*/
protected function setQueueConsumerOptions(QueueConsumer $consumer, InputInterface $input)
{
if (null !== $idleTimeout = $input->getOption('idle-timeout')) {
$consumer->setIdleTimeout((int) $idleTimeout);
}

if (null !== $receiveTimeout = $input->getOption('receive-timeout')) {
$consumer->setReceiveTimeout((int) $receiveTimeout);
}
}
}
Loading