diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index c775efc02..ce164ee27 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -46,6 +46,7 @@ public function __construct($connection, array $config = []) $this->config = array_replace([ 'table_name' => 'enqueue', 'polling_interval' => null, + 'subscription_interval' => null, ], $config); if ($connection instanceof Connection) { @@ -135,6 +136,10 @@ public function createSubscriptionConsumer(): SubscriptionConsumer $consumer->setRedeliveryDelay($this->config['redelivery_delay']); } + if (isset($this->config['subscription_interval'])) { + $consumer->setSubscriptionInterval($this->config['subscription_interval']); + } + return $consumer; } diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 60d30cc7e..999679fc6 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -36,6 +36,13 @@ class DbalSubscriptionConsumer implements SubscriptionConsumer */ private $redeliveryDelay; + /** + * Time to wait between subscription requests in milliseconds. + * + * @var int + */ + private $subscriptionInterval = 200; + /** * @param DbalContext $context */ @@ -63,6 +70,13 @@ public function setRedeliveryDelay(int $redeliveryDelay): self return $this; } + public function setSubscriptionInterval(int $subscriptionInterval): self + { + $this->subscriptionInterval = $subscriptionInterval; + + return $this; + } + public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { @@ -92,7 +106,7 @@ public function consume(int $timeout = 0): void * @var DbalConsumer * @var callable $callback */ - list($consumer, $callback) = $this->subscribers[$message->getQueue()]; + [$consumer, $callback] = $this->subscribers[$message->getQueue()]; if (false === call_user_func($callback, $message, $consumer)) { return; @@ -102,7 +116,7 @@ public function consume(int $timeout = 0): void } else { $currentQueueNames = []; - usleep(200000); // 200ms + usleep($this->subscriptionInterval * 1000); // 200ms } if ($timeout && microtime(true) >= $now + $timeout) {