diff --git a/docs/transport/dbal.md b/docs/transport/dbal.md index 6e72d7953..d14cad922 100644 --- a/docs/transport/dbal.md +++ b/docs/transport/dbal.md @@ -2,14 +2,14 @@ The transport uses [Doctrine DBAL](http://docs.doctrine-project.org/projects/doctrine-dbal/en/latest/) library and SQL like server as a broker. It creates a table there. Pushes and pops messages to\from that table. - -**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost. * [Installation](#installation) * [Init database](#init-database) * [Create context](#create-context) * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) +* [Send expiration message](#send-expiration-message) +* [Send delayed message](#send-delayed-message) * [Consume message](#consume-message) * [Subscription consumer](#subscription-consumer) @@ -90,6 +90,38 @@ $message = $psrContext->createMessage('Hello world!'); $psrContext->createProducer()->send($fooQueue, $message); ``` +## Send expiration message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setTimeToLive(60000) // 60 sec + // + ->send($fooQueue, $message) +; +``` + +## Send delayed message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setDeliveryDelay(5000) // 5 sec + // + ->send($fooQueue, $message) +; +```` + ## Consume message: ```php @@ -102,6 +134,9 @@ $consumer = $psrContext->createConsumer($fooQueue); $message = $consumer->receive(); // process a message + +$consumer->acknowledge($message); +//$consumer->reject($message); ``` ## Subscription consumer diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index c8e1b02d4..ede0cfa40 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -8,11 +8,15 @@ use Doctrine\DBAL\Types\Type; use Interop\Queue\Consumer; use Interop\Queue\Exception\InvalidMessageException; +use Interop\Queue\Impl\ConsumerPollingTrait; use Interop\Queue\Message; use Interop\Queue\Queue; class DbalConsumer implements Consumer { + use ConsumerPollingTrait, + DbalConsumerHelperTrait; + /** * @var DbalContext */ @@ -29,9 +33,11 @@ class DbalConsumer implements Consumer private $queue; /** + * Default 20 minutes in milliseconds. + * * @var int */ - private $pollingInterval; + private $redeliveryDelay; public function __construct(DbalContext $context, DbalDestination $queue) { @@ -39,23 +45,25 @@ public function __construct(DbalContext $context, DbalDestination $queue) $this->queue = $queue; $this->dbal = $this->context->getDbalConnection(); - $this->pollingInterval = 1000; + $this->redeliveryDelay = 1200000; } /** - * Polling interval is in milliseconds. + * Get interval between retry failed messages in milliseconds. */ - public function setPollingInterval(int $interval): void + public function getRedeliveryDelay(): int { - $this->pollingInterval = $interval; + return $this->redeliveryDelay; } /** - * Get polling interval in milliseconds. + * Get interval between retrying failed messages in milliseconds. */ - public function getPollingInterval(): int + public function setRedeliveryDelay(int $redeliveryDelay): self { - return $this->pollingInterval; + $this->redeliveryDelay = $redeliveryDelay; + + return $this; } /** @@ -66,33 +74,21 @@ public function getQueue(): Queue return $this->queue; } - public function receive(int $timeout = 0): ?Message + public function receiveNoWait(): ?Message { - $timeout /= 1000; - $startAt = microtime(true); - - while (true) { - $message = $this->receiveMessage(); - - if ($message) { - return $message; - } - - if ($timeout && (microtime(true) - $startAt) >= $timeout) { - return null; - } + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds - usleep($this->pollingInterval * 1000); + $this->removeExpiredMessages(); + $this->redeliverMessages(); - if ($timeout && (microtime(true) - $startAt) >= $timeout) { - return null; + // get top message from the queue + if ($message = $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay)) { + if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) { + return $this->getContext()->convertMessage($message); } } - } - public function receiveNoWait(): ?Message - { - return $this->receiveMessage(); + return null; } /** @@ -100,7 +96,9 @@ public function receiveNoWait(): ?Message */ public function acknowledge(Message $message): void { - // does nothing + InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); + + $this->deleteMessage($message->getDeliveryId()); } /** @@ -111,106 +109,37 @@ public function reject(Message $message, bool $requeue = false): void InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); if ($requeue) { - $this->context->createProducer()->send($this->queue, $message); + $message = clone $message; + $message->setRedelivered(false); + + $this->getContext()->createProducer()->send($this->queue, $message); return; } + + $this->deleteMessage($message->getDeliveryId()); } - protected function receiveMessage(): ?DbalMessage + protected function getContext(): DbalContext { - $this->dbal->beginTransaction(); - try { - $now = time(); - - $dbalMessage = $this->fetchPrioritizedMessage($now) ?: $dbalMessage = $this->fetchMessage($now); - if (false == $dbalMessage) { - $this->dbal->commit(); - - return null; - } - - // remove message - $affectedRows = $this->dbal->delete($this->context->getTableName(), ['id' => $dbalMessage['id']], [ - 'id' => Type::GUID, - ]); - - if (1 !== $affectedRows) { - throw new \LogicException(sprintf('Expected record was removed but it is not. id: "%s"', $dbalMessage['id'])); - } - - $this->dbal->commit(); - - if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) { - return $this->context->convertMessage($dbalMessage); - } - - return null; - } catch (\Exception $e) { - $this->dbal->rollBack(); - - throw $e; - } + return $this->context; } - private function fetchPrioritizedMessage(int $now): ?array + protected function getConnection(): Connection { - $query = $this->dbal->createQueryBuilder(); - $query - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('queue = :queue') - ->andWhere('priority IS NOT NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') - ->addOrderBy('priority', 'desc') - ->setMaxResults(1) - ; - - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queue' => $this->queue->getQueueName(), - 'delayedUntil' => $now, - ], - [ - 'queue' => Type::STRING, - 'delayedUntil' => Type::INTEGER, - ] - )->fetch(); - - return $result ?: null; + return $this->dbal; } - private function fetchMessage(int $now): ?array + private function deleteMessage(string $deliveryId): void { - $query = $this->dbal->createQueryBuilder(); - $query - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('queue = :queue') - ->andWhere('priority IS NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') - ->setMaxResults(1) - ; - - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queue' => $this->queue->getQueueName(), - 'delayedUntil' => $now, - ], - [ - 'queue' => Type::STRING, - 'delayedUntil' => Type::INTEGER, - ] - )->fetch(); - - return $result ?: null; + if (empty($deliveryId)) { + throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId)); + } + + $this->getConnection()->delete( + $this->getContext()->getTableName(), + ['delivery_id' => $deliveryId], + ['delivery_id' => Type::STRING] + ); } } diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php new file mode 100644 index 000000000..2dc29db26 --- /dev/null +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -0,0 +1,106 @@ +getConnection()->beginTransaction(); + + try { + $query = $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id IS NULL') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('queue IN (:queues)') + ->addOrderBy('priority', 'desc') + ->addOrderBy('published_at', 'asc') + ->setMaxResults(1); + + // select for update + $message = $this->getConnection()->executeQuery( + $query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(), + ['delayedUntil' => $now, 'queues' => array_values($queues)], + ['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY] + )->fetch(); + + if (!$message) { + $this->getConnection()->commit(); + + return null; + } + + // mark message as delivered to consumer + $this->getConnection()->createQueryBuilder() + ->andWhere('id = :id') + ->update($this->getContext()->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redeliver_after', ':redeliverAfter') + ->setParameter('id', $message['id'], Type::GUID) + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) + ->execute() + ; + + $this->getConnection()->commit(); + + $deliveredMessage = $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setMaxResults(1) + ->execute() + ->fetch() + ; + + return $deliveredMessage ?: null; + } catch (\Exception $e) { + $this->getConnection()->rollBack(); + + throw $e; + } + } + + protected function redeliverMessages(): void + { + $this->getConnection()->createQueryBuilder() + ->update($this->getContext()->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redelivered', ':redelivered') + ->andWhere('delivery_id IS NOT NULL') + ->andWhere('redeliver_after < :now') + ->setParameter(':now', (int) time(), Type::BIGINT) + ->setParameter('deliveryId', null, Type::STRING) + ->setParameter('redelivered', true, Type::BOOLEAN) + ->execute() + ; + } + + protected function removeExpiredMessages(): void + { + $this->getConnection()->createQueryBuilder() + ->delete($this->getContext()->getTableName()) + ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') + ->setParameter(':now', (int) time(), Type::BIGINT) + ->setParameter('redelivered', false, Type::BOOLEAN) + ->execute() + ; + } +} diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 5a715b288..0cac7ac45 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -116,6 +116,10 @@ public function createConsumer(Destination $destination): Consumer $consumer->setPollingInterval($this->config['polling_interval']); } + if (isset($this->config['redelivery_delay'])) { + $consumer->setRedeliveryDelay($this->config['redelivery_delay']); + } + return $consumer; } @@ -125,7 +129,13 @@ public function close(): void public function createSubscriptionConsumer(): SubscriptionConsumer { - return new DbalSubscriptionConsumer($this); + $consumer = new DbalSubscriptionConsumer($this); + + if (isset($this->config['redelivery_delay'])) { + $consumer->setRedeliveryDelay($this->config['redelivery_delay']); + } + + return $consumer; } /** @@ -133,6 +143,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer */ public function convertMessage(array $dbalMessage): DbalMessage { + /** @var DbalMessage $dbalMessageObj */ $dbalMessageObj = $this->createMessage( $dbalMessage['body'], $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], @@ -148,6 +159,12 @@ public function convertMessage(array $dbalMessage): DbalMessage if (isset($dbalMessage['published_at'])) { $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); } + if (isset($dbalMessage['delivery_id'])) { + $dbalMessageObj->setDeliveryId((string) $dbalMessage['delivery_id']); + } + if (isset($dbalMessage['redeliver_after'])) { + $dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']); + } return $dbalMessageObj; } @@ -212,6 +229,8 @@ public function createDataBaseTable(): void $table->addColumn('priority', Type::SMALLINT, ['notnull' => false]); $table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]); $table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]); + $table->addColumn('delivery_id', Type::STRING, ['notnull' => false]); + $table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]); $table->setPrimaryKey(['id']); $table->addIndex(['published_at']); @@ -219,6 +238,8 @@ public function createDataBaseTable(): void $table->addIndex(['priority']); $table->addIndex(['delayed_until']); $table->addIndex(['priority', 'published_at']); + $table->addIndex(['redeliver_after']); + $table->addUniqueIndex(['delivery_id']); $sm->createTable($table); } diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index 88b49c588..8464d56e3 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -38,11 +38,21 @@ class DbalMessage implements Message */ private $deliveryDelay; + /** + * @var int seconds + */ + private $redeliverAfter; + /** * @var int milliseconds */ private $timeToLive; + /** + * @var null|string + */ + private $deliveryId; + /** * Milliseconds, for example 15186054527288. * @@ -65,6 +75,8 @@ public function __construct(string $body = '', array $properties = [], array $he $this->redelivered = false; $this->priority = null; $this->deliveryDelay = null; + $this->deliveryId = null; + $this->redeliverAfter = null; } public function setBody(string $body): void @@ -208,6 +220,26 @@ public function setTimestamp(int $timestamp = null): void $this->setHeader('timestamp', $timestamp); } + public function getDeliveryId(): ?string + { + return $this->deliveryId; + } + + public function setDeliveryId(?string $deliveryId = null): void + { + $this->deliveryId = $deliveryId; + } + + public function getRedeliverAfter(): int + { + return $this->redeliverAfter; + } + + public function setRedeliverAfter(int $redeliverAfter = null): void + { + $this->redeliverAfter = $redeliverAfter; + } + public function getPublishedAt(): ?int { return $this->publishedAt; diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index ddf567a8a..38ad33414 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -87,6 +87,9 @@ public function send(Destination $destination, Message $message): void 'properties' => JSON::encode($message->getProperties()), 'priority' => $message->getPriority(), 'queue' => $destination->getQueueName(), + 'redelivered' => false, + 'delivery_id' => null, + 'redeliver_after' => null, ]; $delay = $message->getDeliveryDelay(); @@ -132,6 +135,9 @@ public function send(Destination $destination, Message $message): void 'queue' => Type::STRING, 'time_to_live' => Type::INTEGER, 'delayed_until' => Type::INTEGER, + 'redelivered' => Type::BOOLEAN, + 'delivery_id' => Type::STRING, + 'redeliver_after' => Type::BIGINT, ]); } catch (\Exception $e) { throw new Exception('The transport fails to send the message due to some internal error.', null, $e); diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index a822cee04..12c1cef37 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -4,12 +4,14 @@ namespace Enqueue\Dbal; -use Doctrine\DBAL\Types\Type; +use Doctrine\DBAL\Connection; use Interop\Queue\Consumer; use Interop\Queue\SubscriptionConsumer; class DbalSubscriptionConsumer implements SubscriptionConsumer { + use DbalConsumerHelperTrait; + /** * @var DbalContext */ @@ -27,6 +29,13 @@ class DbalSubscriptionConsumer implements SubscriptionConsumer */ private $dbal; + /** + * Default 20 minutes in milliseconds. + * + * @var int + */ + private $redeliveryDelay; + /** * @param DbalContext $context */ @@ -35,6 +44,23 @@ public function __construct(DbalContext $context) $this->context = $context; $this->dbal = $this->context->getDbalConnection(); $this->subscribers = []; + + $this->redeliveryDelay = 1200000; + } + + /** + * Get interval between retrying failed messages in milliseconds. + */ + public function getRedeliveryDelay(): int + { + return $this->redeliveryDelay; + } + + public function setRedeliveryDelay(int $redeliveryDelay): self + { + $this->redeliveryDelay = $redeliveryDelay; + + return $this; } public function consume(int $timeout = 0): void @@ -43,26 +69,26 @@ public function consume(int $timeout = 0): void throw new \LogicException('No subscribers'); } - $timeout = (int) ceil($timeout / 1000); - $endAt = time() + $timeout; - $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { $queueNames[$queueName] = $queueName; } + $timeout /= 1000; + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + $currentQueueNames = []; while (true) { if (empty($currentQueueNames)) { $currentQueueNames = $queueNames; } - $message = $this->fetchPrioritizedMessage($currentQueueNames) ?: $this->fetchMessage($currentQueueNames); - - if ($message) { - $this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]); + $now = time(); + $this->removeExpiredMessages(); + $this->redeliverMessages(); - $dbalMessage = $this->context->convertMessage($message); + if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { + $dbalMessage = $this->getContext()->convertMessage($message); /** * @var DbalConsumer @@ -81,7 +107,7 @@ public function consume(int $timeout = 0): void usleep(200000); // 200ms } - if ($timeout && microtime(true) >= $endAt) { + if ($timeout && microtime(true) >= $now + $timeout) { return; } } @@ -135,64 +161,13 @@ public function unsubscribeAll(): void $this->subscribers = []; } - private function fetchMessage(array $queues): ?array + protected function getContext(): DbalContext { - $query = $this->dbal->createQueryBuilder(); - $query - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('queue IN (:queues)') - ->andWhere('priority IS NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') - ->setMaxResults(1) - ; - - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queues' => array_keys($queues), - 'delayedUntil' => time(), - ], - [ - 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, - 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, - ] - )->fetch(); - - return $result ?: null; + return $this->context; } - private function fetchPrioritizedMessage(array $queues): ?array + protected function getConnection(): Connection { - $query = $this->dbal->createQueryBuilder(); - $query - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('queue IN (:queues)') - ->andWhere('priority IS NOT NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') - ->addOrderBy('priority', 'desc') - ->setMaxResults(1) - ; - - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queues' => array_keys($queues), - 'delayedUntil' => time(), - ], - [ - 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, - 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, - ] - )->fetch(); - - return $result ?: null; + return $this->dbal; } } diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index a7000d9f6..4eeb70a75 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -1,7 +1,11 @@ assertSame($destination, $consumer->getQueue()); } - public function testCouldCallAcknowledgedMethod() + public function testAcknowledgeShouldThrowIfInstanceOfMessageIsInvalid() { + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage( + 'The message must be an instance of '. + 'Enqueue\Dbal\DbalMessage '. + 'but it is Enqueue\Dbal\Tests\InvalidMessage.' + ); + $consumer = new DbalConsumer($this->createContextMock(), new DbalDestination('queue')); - $consumer->acknowledge(new DbalMessage()); + $consumer->acknowledge(new InvalidMessage()); + } + + public function testShouldDeleteMessageOnAcknowledge() + { + $queue = new DbalDestination('queue'); + + $message = new DbalMessage(); + $message->setBody('theBody'); + $message->setDeliveryId('foo-delivery-id'); + + $dbal = $this->createConectionMock(); + $dbal + ->expects($this->once()) + ->method('delete') + ->with( + 'some-table-name', + ['delivery_id' => $message->getDeliveryId()], + ['delivery_id' => Type::STRING] + ) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getDbalConnection') + ->will($this->returnValue($dbal)) + ; + $context + ->expects($this->once()) + ->method('getTableName') + ->will($this->returnValue('some-table-name')) + ; + + $consumer = new DbalConsumer($context, $queue); + + $consumer->acknowledge($message); } public function testCouldSetAndGetPollingInterval() @@ -52,6 +99,16 @@ public function testCouldSetAndGetPollingInterval() $this->assertEquals(123456, $consumer->getPollingInterval()); } + public function testCouldSetAndGetRedeliveryDelay() + { + $destination = new DbalDestination('queue'); + + $consumer = new DbalConsumer($this->createContextMock(), $destination); + $consumer->setRedeliveryDelay(123456); + + $this->assertEquals(123456, $consumer->getRedeliveryDelay()); + } + public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() { $this->expectException(InvalidMessageException::class); @@ -65,17 +122,35 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() $consumer->reject(new InvalidMessage()); } - public function testShouldDoNothingOnReject() + public function testShouldDeleteMessageFromQueueOnReject() { $queue = new DbalDestination('queue'); $message = new DbalMessage(); $message->setBody('theBody'); + $message->setDeliveryId('foo-delivery-id'); + + $dbal = $this->createConectionMock(); + $dbal + ->expects($this->once()) + ->method('delete') + ->with( + 'some-table-name', + ['delivery_id' => $message->getDeliveryId()], + ['delivery_id' => Type::STRING] + ) + ; $context = $this->createContextMock(); $context - ->expects($this->never()) - ->method('createProducer') + ->expects($this->once()) + ->method('getDbalConnection') + ->will($this->returnValue($dbal)) + ; + $context + ->expects($this->once()) + ->method('getTableName') + ->will($this->returnValue('some-table-name')) ; $consumer = new DbalConsumer($context, $queue); @@ -94,7 +169,7 @@ public function testRejectShouldReSendMessageToSameQueueOnRequeue() $producerMock ->expects($this->once()) ->method('send') - ->with($this->identicalTo($queue), $this->identicalTo($message)) + ->with($this->identicalTo($queue), $this->isInstanceOf($message)) ; $context = $this->createContextMock(); @@ -124,6 +199,14 @@ private function createContextMock() { return $this->createMock(DbalContext::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|DbalContext + */ + private function createConectionMock() + { + return $this->createMock(Connection::class); + } } class InvalidMessage implements Message diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index 92060500f..ecee1bc71 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -1,5 +1,7 @@ acknowledge($message); $this->assertSame($expectedPriority5Body, $message->getBody()); } + + public function testShouldDeleteExpiredMessage() + { + $context = $this->context; + $queue = $context->createQueue(__METHOD__); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $producer = $context->createProducer(); + + $this->context->getDbalConnection()->insert( + $this->context->getTableName(), [ + 'id' => 'id', + 'human_id' => 'id', + 'published_at' => '123', + 'body' => 'expiredMessage', + 'headers' => json_encode([]), + 'properties' => json_encode([]), + 'queue' => __METHOD__, + 'redelivered' => 0, + 'time_to_live' => time() - 10000, + ]); + + $message = $context->createMessage('notExpiredMessage'); + $message->setRedelivered(false); + $producer->send($queue, $message); + + $this->assertSame('2', $this->getQuerySize()); + + $message = $consumer->receive(8000); + + $this->assertSame('1', $this->getQuerySize()); + + $consumer->acknowledge($message); + + $this->assertSame('0', $this->getQuerySize()); + } + + private function getQuerySize(): string + { + return $this->context->getDbalConnection() + ->executeQuery('SELECT count(*) FROM '.$this->context->getTableName()) + ->fetchColumn(0) + ; + } }