diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index 9aceb9fe3..815c8c432 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -61,11 +61,30 @@ public function createContext() { if ($this->config['lazy']) { return new AmqpContext(function () { - return new \AMQPChannel($this->establishConnection()); + return $this->createExtContext($this->establishConnection()); }); } - return new AmqpContext(new \AMQPChannel($this->establishConnection())); + return new AmqpContext($this->createExtContext($this->establishConnection())); + } + + /** + * @param \AMQPConnection $extConnection + * + * @return \AMQPChannel + */ + private function createExtContext(\AMQPConnection $extConnection) + { + $channel = new \AMQPChannel($extConnection); + if (false == empty($this->config['pre_fetch_count'])) { + $channel->setPrefetchCount((int) $this->config['pre_fetch_count']); + } + + if (false == empty($this->config['pre_fetch_size'])) { + $channel->setPrefetchSize((int) $this->config['pre_fetch_size']); + } + + return $channel; } /** @@ -118,6 +137,7 @@ private function parseDsn($dsn) if ($dsnConfig['query']) { $query = []; parse_str($dsnConfig['query'], $query); + $dsnConfig = array_replace($query, $dsnConfig); } @@ -149,6 +169,8 @@ private function defaultConfig() 'connect_timeout' => null, 'persisted' => false, 'lazy' => true, + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, ]; } } diff --git a/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php b/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php index 767784b7a..c9aa67493 100644 --- a/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php +++ b/pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php @@ -65,6 +65,8 @@ public static function provideConfigs() 'connect_timeout' => null, 'persisted' => false, 'lazy' => true, + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, ], ]; @@ -83,6 +85,8 @@ public static function provideConfigs() 'connect_timeout' => null, 'persisted' => false, 'lazy' => true, + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, ], ]; @@ -99,6 +103,8 @@ public static function provideConfigs() 'connect_timeout' => null, 'persisted' => false, 'lazy' => true, + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, ], ]; @@ -115,6 +121,8 @@ public static function provideConfigs() 'connect_timeout' => null, 'persisted' => false, 'lazy' => true, + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, ], ]; @@ -131,6 +139,8 @@ public static function provideConfigs() 'connect_timeout' => '2', 'persisted' => false, 'lazy' => '', + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, ], ]; @@ -147,6 +157,8 @@ public static function provideConfigs() 'connect_timeout' => null, 'persisted' => false, 'lazy' => true, + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, ], ]; @@ -163,6 +175,44 @@ public static function provideConfigs() 'connect_timeout' => null, 'persisted' => false, 'lazy' => false, + 'pre_fetch_count' => null, + 'pre_fetch_size' => null, + ], + ]; + + yield [ + ['pre_fetch_count' => 123, 'pre_fetch_size' => 321], + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => null, + 'write_timeout' => null, + 'connect_timeout' => null, + 'persisted' => false, + 'lazy' => true, + 'pre_fetch_count' => 123, + 'pre_fetch_size' => 321, + ], + ]; + + yield [ + 'amqp://user:pass@host:10000/vhost?pre_fetch_count=123&pre_fetch_size=321', + [ + 'host' => 'host', + 'port' => '10000', + 'vhost' => 'vhost', + 'user' => 'user', + 'pass' => 'pass', + 'read_timeout' => null, + 'write_timeout' => null, + 'connect_timeout' => null, + 'persisted' => false, + 'lazy' => true, + 'pre_fetch_count' => 123, + 'pre_fetch_size' => 321, ], ]; }