From d69f276da1d000c9cc4ce192f523151de1cc1de9 Mon Sep 17 00:00:00 2001 From: Henning Date: Wed, 21 Apr 2021 09:01:11 +0200 Subject: [PATCH 1/2] add subscription_interval as config for dbal subscription consumer --- pkg/dbal/DbalContext.php | 5 +++++ pkg/dbal/DbalSubscriptionConsumer.php | 18 ++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index c775efc02..ce164ee27 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -46,6 +46,7 @@ public function __construct($connection, array $config = []) $this->config = array_replace([ 'table_name' => 'enqueue', 'polling_interval' => null, + 'subscription_interval' => null, ], $config); if ($connection instanceof Connection) { @@ -135,6 +136,10 @@ public function createSubscriptionConsumer(): SubscriptionConsumer $consumer->setRedeliveryDelay($this->config['redelivery_delay']); } + if (isset($this->config['subscription_interval'])) { + $consumer->setSubscriptionInterval($this->config['subscription_interval']); + } + return $consumer; } diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 60d30cc7e..999679fc6 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -36,6 +36,13 @@ class DbalSubscriptionConsumer implements SubscriptionConsumer */ private $redeliveryDelay; + /** + * Time to wait between subscription requests in milliseconds. + * + * @var int + */ + private $subscriptionInterval = 200; + /** * @param DbalContext $context */ @@ -63,6 +70,13 @@ public function setRedeliveryDelay(int $redeliveryDelay): self return $this; } + public function setSubscriptionInterval(int $subscriptionInterval): self + { + $this->subscriptionInterval = $subscriptionInterval; + + return $this; + } + public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { @@ -92,7 +106,7 @@ public function consume(int $timeout = 0): void * @var DbalConsumer * @var callable $callback */ - list($consumer, $callback) = $this->subscribers[$message->getQueue()]; + [$consumer, $callback] = $this->subscribers[$message->getQueue()]; if (false === call_user_func($callback, $message, $consumer)) { return; @@ -102,7 +116,7 @@ public function consume(int $timeout = 0): void } else { $currentQueueNames = []; - usleep(200000); // 200ms + usleep($this->subscriptionInterval * 1000); // 200ms } if ($timeout && microtime(true) >= $now + $timeout) { From bc99baacbf7521420b04b62738cea06dd51d56a7 Mon Sep 17 00:00:00 2001 From: Henning Date: Wed, 28 Apr 2021 09:14:29 +0200 Subject: [PATCH 2/2] change option name --- pkg/dbal/DbalContext.php | 18 +++++------------- pkg/dbal/DbalSubscriptionConsumer.php | 16 +++++++++------- pkg/dbal/Tests/DbalContextTest.php | 3 +++ 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index ce164ee27..5fdf6b59e 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -39,14 +39,13 @@ class DbalContext implements Context * Callable must return instance of Doctrine\DBAL\Connection once called. * * @param Connection|callable $connection - * @param array $config */ public function __construct($connection, array $config = []) { $this->config = array_replace([ 'table_name' => 'enqueue', 'polling_interval' => null, - 'subscription_interval' => null, + 'subscription_polling_interval' => null, ], $config); if ($connection instanceof Connection) { @@ -54,11 +53,7 @@ public function __construct($connection, array $config = []) } elseif (is_callable($connection)) { $this->connectionFactory = $connection; } else { - throw new \InvalidArgumentException(sprintf( - 'The connection argument must be either %s or callable that returns %s.', - Connection::class, - Connection::class - )); + throw new \InvalidArgumentException(sprintf('The connection argument must be either %s or callable that returns %s.', Connection::class, Connection::class)); } } @@ -136,8 +131,8 @@ public function createSubscriptionConsumer(): SubscriptionConsumer $consumer->setRedeliveryDelay($this->config['redelivery_delay']); } - if (isset($this->config['subscription_interval'])) { - $consumer->setSubscriptionInterval($this->config['subscription_interval']); + if (isset($this->config['subscription_polling_interval'])) { + $consumer->setPollingInterval($this->config['subscription_polling_interval']); } return $consumer; @@ -207,10 +202,7 @@ public function getDbalConnection(): Connection if (false == $this->connection) { $connection = call_user_func($this->connectionFactory); if (false == $connection instanceof Connection) { - throw new \LogicException(sprintf( - 'The factory must return instance of Doctrine\DBAL\Connection. It returns %s', - is_object($connection) ? get_class($connection) : gettype($connection) - )); + throw new \LogicException(sprintf('The factory must return instance of Doctrine\DBAL\Connection. It returns %s', is_object($connection) ? get_class($connection) : gettype($connection))); } $this->connection = $connection; diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 999679fc6..2551043e3 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -41,11 +41,8 @@ class DbalSubscriptionConsumer implements SubscriptionConsumer * * @var int */ - private $subscriptionInterval = 200; + private $pollingInterval = 200; - /** - * @param DbalContext $context - */ public function __construct(DbalContext $context) { $this->context = $context; @@ -70,9 +67,14 @@ public function setRedeliveryDelay(int $redeliveryDelay): self return $this; } - public function setSubscriptionInterval(int $subscriptionInterval): self + public function getPollingInterval(): int + { + return $this->pollingInterval; + } + + public function setPollingInterval(int $msec): self { - $this->subscriptionInterval = $subscriptionInterval; + $this->pollingInterval = $msec; return $this; } @@ -116,7 +118,7 @@ public function consume(int $timeout = 0): void } else { $currentQueueNames = []; - usleep($this->subscriptionInterval * 1000); // 200ms + usleep($this->getPollingInterval() * 1000); } if ($timeout && microtime(true) >= $now + $timeout) { diff --git a/pkg/dbal/Tests/DbalContextTest.php b/pkg/dbal/Tests/DbalContextTest.php index 0b793e6e7..12a13b21a 100644 --- a/pkg/dbal/Tests/DbalContextTest.php +++ b/pkg/dbal/Tests/DbalContextTest.php @@ -39,6 +39,7 @@ public function testCouldBeConstructedWithEmptyConfiguration() $this->assertAttributeEquals([ 'table_name' => 'enqueue', 'polling_interval' => null, + 'subscription_polling_interval' => null, ], 'config', $factory); } @@ -47,11 +48,13 @@ public function testCouldBeConstructedWithCustomConfiguration() $factory = new DbalContext($this->createConnectionMock(), [ 'table_name' => 'theTableName', 'polling_interval' => 12345, + 'subscription_polling_interval' => 12345, ]); $this->assertAttributeEquals([ 'table_name' => 'theTableName', 'polling_interval' => 12345, + 'subscription_polling_interval' => 12345, ], 'config', $factory); }