diff --git a/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php b/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php deleted file mode 100644 index 5507d193d..000000000 --- a/pkg/enqueue-bundle/Tests/Unit/Mocks/FooTransportFactory.php +++ /dev/null @@ -1,83 +0,0 @@ -name = $name; - } - - /** - * {@inheritdoc} - */ - public function addConfiguration(ArrayNodeDefinition $builder) - { - $builder - ->children() - ->scalarNode('foo_param')->isRequired()->cannotBeEmpty()->end() - ; - } - - /** - * {@inheritdoc} - */ - public function createConnectionFactory(ContainerBuilder $container, array $config) - { - $factoryId = 'foo.connection_factory'; - - $container->setDefinition($factoryId, new Definition(\stdClass::class, [$config])); - - return $factoryId; - } - - /** - * {@inheritdoc} - */ - public function createContext(ContainerBuilder $container, array $config) - { - $contextId = 'foo.context'; - - $context = new Definition(\stdClass::class, [$config]); - $context->setPublic(true); - - $container->setDefinition($contextId, $context); - - return $contextId; - } - - public function createDriver(ContainerBuilder $container, array $config) - { - $driverId = 'foo.driver'; - - $driver = new Definition(\stdClass::class, [$config]); - $driver->setPublic(true); - - $container->setDefinition($driverId, $driver); - - return $driverId; - } - - /** - * {@inheritdoc} - */ - public function getName() - { - return $this->name; - } -} diff --git a/pkg/enqueue-bundle/Tests/Unit/Mocks/TransportFactoryWithoutDriverFactory.php b/pkg/enqueue-bundle/Tests/Unit/Mocks/TransportFactoryWithoutDriverFactory.php deleted file mode 100644 index f3a003201..000000000 --- a/pkg/enqueue-bundle/Tests/Unit/Mocks/TransportFactoryWithoutDriverFactory.php +++ /dev/null @@ -1,71 +0,0 @@ -name = $name; - } - - /** - * {@inheritdoc} - */ - public function addConfiguration(ArrayNodeDefinition $builder) - { - } - - /** - * {@inheritdoc} - */ - public function createConnectionFactory(ContainerBuilder $container, array $config) - { - $factoryId = 'without_driver.connection_factory'; - - $container->setDefinition($factoryId, new Definition(\stdClass::class, [$config])); - - return $factoryId; - } - - /** - * {@inheritdoc} - */ - public function createContext(ContainerBuilder $container, array $config) - { - $contextId = 'without_driver.context'; - - $context = new Definition(\stdClass::class, [$config]); - $context->setPublic(true); - - $container->setDefinition($contextId, $context); - - return $contextId; - } - - public function createDriver(ContainerBuilder $container, array $config) - { - throw new \LogicException('It should not be called. The method will be removed'); - } - - /** - * {@inheritdoc} - */ - public function getName() - { - return $this->name; - } -} diff --git a/pkg/enqueue/Symfony/AmqpTransportFactory.php b/pkg/enqueue/Symfony/AmqpTransportFactory.php deleted file mode 100644 index 1be59c17c..000000000 --- a/pkg/enqueue/Symfony/AmqpTransportFactory.php +++ /dev/null @@ -1,253 +0,0 @@ -name = $name; - } - - /** - * {@inheritdoc} - */ - public function addConfiguration(ArrayNodeDefinition $builder) - { - $transportsMap = static::getAvailableTransportsMap(); - - $builder - ->beforeNormalization() - ->ifTrue(function ($v) { - return empty($v); - }) - ->then(function ($v) { - return ['dsn' => 'amqp:']; - }) - ->ifString() - ->then(function ($v) { - return ['dsn' => $v]; - }) - ->end() - ->children() - ->scalarNode('driver') - ->validate() - ->always(function ($v) use ($transportsMap) { - $drivers = array_keys($transportsMap); - if (empty($transportsMap)) { - throw new \InvalidArgumentException('There is no amqp driver available. Please consider installing one of the packages: enqueue/amqp-ext, enqueue/amqp-lib, enqueue/amqp-bunny.'); - } - - if ($v && false == in_array($v, $drivers, true)) { - throw new \InvalidArgumentException(sprintf('Unexpected driver given "%s". Available are "%s"', $v, implode('", "', $drivers))); - } - - return $v; - }) - ->end() - ->end() - ->scalarNode('dsn') - ->info('The connection to AMQP broker set as a string. Other parameters could be used as defaults') - ->end() - ->scalarNode('host') - ->info('The host to connect too. Note: Max 1024 characters') - ->end() - ->scalarNode('port') - ->info('Port on the host.') - ->end() - ->scalarNode('user') - ->info('The user name to use. Note: Max 128 characters.') - ->end() - ->scalarNode('pass') - ->info('Password. Note: Max 128 characters.') - ->end() - ->scalarNode('vhost') - ->info('The virtual host on the host. Note: Max 128 characters.') - ->end() - ->floatNode('connection_timeout') - ->min(0) - ->info('Connection timeout. Note: 0 or greater seconds. May be fractional.') - ->end() - ->floatNode('read_timeout') - ->min(0) - ->info('Timeout in for income activity. Note: 0 or greater seconds. May be fractional.') - ->end() - ->floatNode('write_timeout') - ->min(0) - ->info('Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.') - ->end() - ->floatNode('heartbeat') - ->min(0) - ->info('How often to send heartbeat. 0 means off.') - ->end() - ->booleanNode('persisted')->end() - ->booleanNode('lazy')->end() - ->enumNode('receive_method') - ->values(['basic_get', 'basic_consume']) - ->info('The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher') - ->end() - ->floatNode('qos_prefetch_size') - ->min(0) - ->info('The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"') - ->end() - ->floatNode('qos_prefetch_count') - ->min(0) - ->info('Specifies a prefetch window in terms of whole messages') - ->end() - ->booleanNode('qos_global') - ->info('If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.') - ->end() - ->variableNode('driver_options') - ->info('The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.') - ->end() - ->booleanNode('ssl_on') - ->info('Should be true if you want to use secure connections. False by default') - ->end() - ->booleanNode('ssl_verify') - ->info('This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default.') - ->end() - ->scalarNode('ssl_cacert') - ->info('Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string.') - ->end() - ->scalarNode('ssl_cert') - ->info('Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string') - ->end() - ->scalarNode('ssl_key') - ->info('Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string.') - ->end() - ; - } - - /** - * {@inheritdoc} - */ - public function createConnectionFactory(ContainerBuilder $container, array $config) - { - if (array_key_exists('driver_options', $config) && is_array($config['driver_options'])) { - $driverOptions = $config['driver_options']; - unset($config['driver_options']); - - $config = array_replace($driverOptions, $config); - } - - $factory = new Definition(AmqpConnectionFactory::class); - $factory->setFactory([self::class, 'createConnectionFactoryFactory']); - $factory->setArguments([$config]); - - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - $container->setDefinition($factoryId, $factory); - - return $factoryId; - } - - /** - * {@inheritdoc} - */ - public function createContext(ContainerBuilder $container, array $config) - { - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - - $context = new Definition(AmqpContext::class); - $context->setPublic(true); - $context->setFactory([new Reference($factoryId), 'createContext']); - - $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); - $container->setDefinition($contextId, $context); - - return $contextId; - } - - /** - * {@inheritdoc} - */ - public function createDriver(ContainerBuilder $container, array $config) - { - $driver = new Definition(AmqpDriver::class); - $driver->setPublic(true); - $driver->setArguments([ - new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), - new Reference('enqueue.client.config'), - new Reference('enqueue.client.meta.queue_meta_registry'), - ]); - - $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); - $container->setDefinition($driverId, $driver); - - return $driverId; - } - - /** - * {@inheritdoc} - */ - public function getName() - { - return $this->name; - } - - public static function createConnectionFactoryFactory(array $config) - { - if (false == empty($config['driver'])) { - $transportsMap = static::getAvailableTransportsMap(); - - if (false == array_key_exists($config['driver'], $transportsMap)) { - throw new \InvalidArgumentException(sprintf('Unexpected driver given "invalidDriver". Available are "%s"', implode('", "', array_keys($transportsMap)))); - } - - $connectionFactoryClass = $transportsMap[$config['driver']]; - - unset($config['driver']); - - return new $connectionFactoryClass($config); - } - - $dsn = array_key_exists('dsn', $config) ? $config['dsn'] : 'amqp:'; - $factory = (new ConnectionFactoryFactory())->create($dsn); - - if (false == $factory instanceof AmqpConnectionFactory) { - throw new \LogicException(sprintf('Factory must be instance of "%s" but got "%s"', AmqpConnectionFactory::class, get_class($factory))); - } - - $factoryClass = get_class($factory); - - return new $factoryClass($config); - } - - /** - * @return string[] - */ - private static function getAvailableTransportsMap() - { - $map = []; - if (class_exists(AmqpExtConnectionFactory::class)) { - $map['ext'] = AmqpExtConnectionFactory::class; - } - if (class_exists(AmqpLibConnectionFactory::class)) { - $map['lib'] = AmqpLibConnectionFactory::class; - } - if (class_exists(AmqpBunnyConnectionFactory::class)) { - $map['bunny'] = AmqpBunnyConnectionFactory::class; - } - - return $map; - } -} diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php deleted file mode 100644 index eda667eb6..000000000 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ /dev/null @@ -1,93 +0,0 @@ -beforeNormalization() - ->always(function ($v) { - if (empty($v)) { - return ['dsn' => 'null:']; - } - - if (is_array($v)) { - return $v; - } - - if (is_string($v)) { - return ['dsn' => $v]; - } - - throw new \LogicException(sprintf('The value must be array, null or string. Got "%s"', gettype($v))); - }) - ->end() - ->pro - ->children() - ->scalarNode('dsn')->cannotBeEmpty()->end() - ->variableNode('config')->end() - ->end() - ->end() - ; - } - - public function createConnectionFactory(ContainerBuilder $container, array $config): string - { - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - - $container->register($factoryId, ConnectionFactory::class) - ->setFactory([new Reference('enqueue.connection_factory_factory'), 'create']) - ->addArgument($config['dsn']) - ; - - $container->setAlias('enqueue.transport.connection_factory', new Alias($factoryId, true)); - - return $factoryId; - } - - public function createContext(ContainerBuilder $container, array $config): string - { - $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - - $container->register($contextId, Context::class) - ->setFactory([new Reference($factoryId), 'createContext']) - ; - - $container->setAlias('enqueue.transport.context', new Alias($contextId, true)); - - return $contextId; - } - - public function createDriver(ContainerBuilder $container, array $config): string - { - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); - - $container->register($driverId, DriverInterface::class) - ->setFactory([new Reference('enqueue.client.driver_factory'), 'create']) - ->addArgument(new Reference($factoryId)) - ->addArgument($config['dsn']) - ->addArgument($config) - ; - - $container->setAlias('enqueue.client.driver', new Alias($driverId, true)); - - return $driverId; - } - - public function getName(): string - { - return 'default'; - } -} diff --git a/pkg/enqueue/Symfony/DriverFactoryInterface.php b/pkg/enqueue/Symfony/DriverFactoryInterface.php deleted file mode 100644 index 0b1ab027d..000000000 --- a/pkg/enqueue/Symfony/DriverFactoryInterface.php +++ /dev/null @@ -1,21 +0,0 @@ -name = $name; - $this->packages = $packages; - } - - /** - * {@inheritdoc} - */ - public function addConfiguration(ArrayNodeDefinition $builder) - { - if (1 == count($this->packages)) { - $message = sprintf( - 'In order to use the transport "%s" install a package "%s"', - $this->getName(), - implode('", "', $this->packages) - ); - } else { - $message = sprintf( - 'In order to use the transport "%s" install one of the packages "%s"', - $this->getName(), - implode('", "', $this->packages) - ); - } - - $builder - ->info($message) - ->beforeNormalization() - ->always(function () { - return []; - }) - ->end() - ->validate() - ->always(function () use ($message) { - throw new \InvalidArgumentException($message); - }) - ->end() - ; - } - - /** - * {@inheritdoc} - */ - public function createConnectionFactory(ContainerBuilder $container, array $config) - { - throw new \LogicException('Should not be called'); - } - - /** - * {@inheritdoc} - */ - public function createContext(ContainerBuilder $container, array $config) - { - throw new \LogicException('Should not be called'); - } - - /** - * {@inheritdoc} - */ - public function createDriver(ContainerBuilder $container, array $config) - { - throw new \LogicException('Should not be called'); - } - - /** - * @return string - */ - public function getName() - { - return $this->name; - } -} diff --git a/pkg/enqueue/Symfony/RabbitMqAmqpTransportFactory.php b/pkg/enqueue/Symfony/RabbitMqAmqpTransportFactory.php deleted file mode 100644 index f02aa1575..000000000 --- a/pkg/enqueue/Symfony/RabbitMqAmqpTransportFactory.php +++ /dev/null @@ -1,69 +0,0 @@ -children() - ->scalarNode('delay_strategy') - ->defaultValue('dlx') - ->info('The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id') - ->end() - ; - } - - /** - * {@inheritdoc} - */ - public function createConnectionFactory(ContainerBuilder $container, array $config) - { - $factoryId = parent::createConnectionFactory($container, $config); - - $this->registerDelayStrategy($container, $config, $factoryId, $this->getName()); - - return $factoryId; - } - - /** - * {@inheritdoc} - */ - public function createDriver(ContainerBuilder $container, array $config) - { - $driver = new Definition(RabbitMqDriver::class); - $driver->setPublic(true); - $driver->setArguments([ - new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), - new Reference('enqueue.client.config'), - new Reference('enqueue.client.meta.queue_meta_registry'), - ]); - $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); - $container->setDefinition($driverId, $driver); - - return $driverId; - } -} diff --git a/pkg/enqueue/Symfony/TransportFactoryInterface.php b/pkg/enqueue/Symfony/TransportFactoryInterface.php deleted file mode 100644 index d2be801ca..000000000 --- a/pkg/enqueue/Symfony/TransportFactoryInterface.php +++ /dev/null @@ -1,35 +0,0 @@ -