Skip to content

[bundle] Multi Client Configuration #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 15, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/bundle/async_commands.md
Original file line number Diff line number Diff line change
@@ -21,7 +21,8 @@ $ composer require enqueue/async-command:0.9.x-dev
# config/packages/enqueue_async_commands.yaml

enqueue:
async_commands: true
default:
async_commands: true
```

## Usage
9 changes: 5 additions & 4 deletions docs/bundle/async_events.md
Original file line number Diff line number Diff line change
@@ -31,10 +31,11 @@ If you already [installed the bundle](quick_tour.md#install), then enable `async
# app/config/config.yml

enqueue:
async_events:
enabled: true
# if you'd like to send send messages onTerminate use spool_producer (it further reduces response time):
# spool_producer: true
default:
async_events:
enabled: true
# if you'd like to send send messages onTerminate use spool_producer (it further reduces response time):
# spool_producer: true
```

## Usage
74 changes: 36 additions & 38 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
@@ -14,44 +14,42 @@ You can get this info by running `./bin/console config:dump-reference enqueue` c
```yaml
# Default configuration for extension with alias: "enqueue"
enqueue:

# The transport option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at connection factory constructor docblock.
transport:

# The broker DSN. These schemes are supported: "file", "amqp", "amqps", "db2", "ibm-db2", "mssql", "sqlsrv", "mysql", "mysql2", "pgsql", "postgres", "sqlite", "sqlite3", "null", "gearman", "beanstalk", "kafka", "rdkafka", "redis", "stomp", "sqs", "gps", "mongodb", to use these "file", "amqp", "amqps", "db2", "ibm-db2", "mssql", "sqlsrv", "mysql", "mysql2", "pgsql", "postgres", "sqlite", "sqlite3", "null", "gearman", "beanstalk", "kafka", "rdkafka", "redis", "stomp", "sqs", "gps", "mongodb" you have to install a package.
dsn: ~ # Required

# The connection factory class should implement "Interop\Queue\ConnectionFactory" interface
connection_factory_class: ~

# The factory class should implement "Enqueue\ConnectionFactoryFactoryInterface" interface
factory_service: ~

# The factory service should be a class that implements "Enqueue\ConnectionFactoryFactoryInterface" interface
factory_class: ~
client:
traceable_producer: true
prefix: enqueue
app_name: app
router_topic: default
router_queue: default
router_processor: Enqueue\Client\RouterProcessor
default_processor_queue: default
redelivered_delay_time: 0
consumption:

# the time in milliseconds queue consumer waits for a message (100 ms by default)
receive_timeout: 100
job: false
async_events:
enabled: false
async_commands:
enabled: false
extensions:
doctrine_ping_connection_extension: false
doctrine_clear_identity_map_extension: false
signal_extension: true
reply_extension: true
# Configuration name
default:
# The transport option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at connection factory constructor docblock.
transport:

# The broker DSN. These schemes are supported: "file", "amqp", "amqps", "db2", "ibm-db2", "mssql", "sqlsrv", "mysql", "mysql2", "pgsql", "postgres", "sqlite", "sqlite3", "null", "gearman", "beanstalk", "kafka", "rdkafka", "redis", "stomp", "sqs", "gps", "mongodb", to use these "file", "amqp", "amqps", "db2", "ibm-db2", "mssql", "sqlsrv", "mysql", "mysql2", "pgsql", "postgres", "sqlite", "sqlite3", "null", "gearman", "beanstalk", "kafka", "rdkafka", "redis", "stomp", "sqs", "gps", "mongodb" you have to install a package.
dsn: ~ # Required

# The connection factory class should implement "Interop\Queue\ConnectionFactory" interface
connection_factory_class: ~

# The factory class should implement "Enqueue\ConnectionFactoryFactoryInterface" interface
factory_service: ~

# The factory service should be a class that implements "Enqueue\ConnectionFactoryFactoryInterface" interface
factory_class: ~
client:
traceable_producer: true
prefix: enqueue
app_name: app
router_topic: default
router_queue: default
router_processor: Enqueue\Client\RouterProcessor
default_processor_queue: default
redelivered_delay_time: 0
consumption:

# the time in milliseconds queue consumer waits for a message (100 ms by default)
receive_timeout: 100
async_commands:
enabled: false
extensions:
doctrine_ping_connection_extension: false
doctrine_clear_identity_map_extension: false
signal_extension: true
reply_extension: true
```

[back to index](../index.md)
5 changes: 3 additions & 2 deletions docs/bundle/debugging.md
Original file line number Diff line number Diff line change
@@ -21,8 +21,9 @@ To enable profiler
# app/config/config_dev.yml

enqueue:
client:
traceable_producer: true
default:
client:
traceable_producer: true
```

Now suppose you have this code in an action:
10 changes: 6 additions & 4 deletions docs/bundle/functional_testing.md
Original file line number Diff line number Diff line change
@@ -27,8 +27,9 @@ Here's how you can configure it.
# app/config/config_test.yml

enqueue:
transport: 'null:'
client: ~
default:
transport: 'null:'
client: ~
```

## Traceable message producer
@@ -40,8 +41,9 @@ There is a solution for that. You have to enable traceable message producer in t
# app/config/config_test.yml

enqueue:
client:
traceable_producer: true
default:
client:
traceable_producer: true
```

If you did so, you can use its methods `getTraces`, `getTopicTraces` or `clearTraces`. Here's an example:
7 changes: 4 additions & 3 deletions docs/bundle/job_queue.md
Original file line number Diff line number Diff line change
@@ -57,9 +57,10 @@ class AppKernel extends Kernel
# app/config/config.yml

enqueue:
# plus basic bundle configuration

job: true
default:
# plus basic bundle configuration

job: true

doctrine:
# plus basic bundle configuration
5 changes: 3 additions & 2 deletions docs/bundle/quick_tour.md
Original file line number Diff line number Diff line change
@@ -57,8 +57,9 @@ First, you have to configure a transport layer and set one to be default.
# app/config/config.yml

enqueue:
transport: "amqp:"
client: ~
default:
transport: "amqp:"
client: ~
```

Once you configured everything you can start producing messages:
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

namespace Enqueue\AsyncCommand\DependencyInjection;

use Enqueue\AsyncCommand\RunCommandProcessor;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\Extension;
@@ -16,5 +17,13 @@ public function load(array $configs, ContainerBuilder $container)
{
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yml');

$service = $container->register('enqueue.async_command.run_command_processor', RunCommandProcessor::class)
->addArgument('%kernel.project_dir%')
;

foreach ($configs['clients'] as $client) {
$service->addTag('enqueue.command_subscriber', ['client' => $client]);
}
}
}
117 changes: 45 additions & 72 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
@@ -2,8 +2,11 @@

namespace Enqueue\Bundle\DependencyInjection;

use Enqueue\AsyncCommand\RunCommandProcessor;
use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory;
use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
use Enqueue\Symfony\DependencyInjection\TransportFactory;
use Enqueue\Symfony\MissingComponentFactory;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
@@ -21,86 +24,56 @@ public function getConfigTreeBuilder(): TreeBuilder
{
$tb = new TreeBuilder();
$rootNode = $tb->root('enqueue');
$rootNode
->beforeNormalization()
->always(function ($value) {
if (empty($value)) {
return [
'transport' => [
'default' => [
'dsn' => 'null:',
],
],
];
}

if (is_string($value)) {
return [
'transport' => [
'default' => [
'dsn' => $value,
],
],
];
}

return $value;
})
;

$transportFactory = new TransportFactory('default');

/** @var ArrayNodeDefinition $transportNode */
$transportNode = $rootNode->children()->arrayNode('transport');
$transportNode
->beforeNormalization()
->always(function ($value) {
if (empty($value)) {
return ['default' => ['dsn' => 'null:']];
}
if (is_string($value)) {
return ['default' => ['dsn' => $value]];
}

if (is_array($value) && array_key_exists('dsn', $value)) {
return ['default' => $value];
}

return $value;
});
$transportPrototypeNode = $transportNode
$rootNode
->requiresAtLeastOneElement()
->useAttributeAsKey('key')
->prototype('array')
->arrayPrototype()
->children()
->append(TransportFactory::getConfiguration())
->append(TransportFactory::getQueueConsumerConfiguration())
->append(ClientFactory::getConfiguration($this->debug))
->append($this->getMonitoringConfiguration())
->append($this->getAsyncCommandsConfiguration())
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
->booleanNode('reply_extension')->defaultTrue()->end()
->end()->end()
->end()
->end()
;

$transportFactory->addTransportConfiguration($transportPrototypeNode);
// $rootNode->children()
// ->booleanNode('job')->defaultFalse()->end()
// ->arrayNode('async_events')
// ->addDefaultsIfNotSet()
// ->canBeEnabled()
// ->end()
// ;

return $tb;
}

$consumptionNode = $rootNode->children()->arrayNode('consumption');
$transportFactory->addQueueConsumerConfiguration($consumptionNode);
private function getMonitoringConfiguration(): ArrayNodeDefinition
{
if (false === class_exists(MonitoringFactory::class)) {
return MissingComponentFactory::getConfiguration('monitoring', ['enqueue/monitoring']);
}

$clientFactory = new ClientFactory('default');
$clientNode = $rootNode->children()->arrayNode('client');
$clientFactory->addClientConfiguration($clientNode, $this->debug);
return MonitoringFactory::getConfiguration();
}

$rootNode->children()
->booleanNode('job')->defaultFalse()->end()
->arrayNode('async_events')
->addDefaultsIfNotSet()
->canBeEnabled()
->end()
->arrayNode('async_commands')
->addDefaultsIfNotSet()
->canBeEnabled()
->end()
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end()
->booleanNode('reply_extension')->defaultTrue()->end()
->end()->end()
;
private function getAsyncCommandsConfiguration(): ArrayNodeDefinition
{
if (false === class_exists(RunCommandProcessor::class)) {
return MissingComponentFactory::getConfiguration('async_commands', ['enqueue/async-command']);
}

return $tb;
return (new ArrayNodeDefinition('async_commands'))
->addDefaultsIfNotSet()
->canBeEnabled()
;
}
}
Loading