diff --git a/composer.json b/composer.json index d92658557..a26a38740 100644 --- a/composer.json +++ b/composer.json @@ -29,7 +29,7 @@ "enqueue/async-event-dispatcher": "*@dev", "queue-interop/queue-interop": "^0.6@dev", "queue-interop/amqp-interop": "^0.7@dev", - "queue-interop/queue-spec": "^0.5.1@dev", + "queue-interop/queue-spec": "^0.5.4@dev", "phpunit/phpunit": "^5", "doctrine/doctrine-bundle": "~1.2", diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index f898e567f..69b11dbb0 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -130,33 +130,10 @@ public function reject(PsrMessage $message, $requeue = false) { InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); - if (false == $requeue) { - return; - } + if ($requeue) { + $this->context->createProducer()->send($this->queue, $message); - $dbalMessage = [ - 'body' => $message->getBody(), - 'headers' => JSON::encode($message->getHeaders()), - 'properties' => JSON::encode($message->getProperties()), - 'priority' => $message->getPriority(), - 'queue' => $this->queue->getQueueName(), - 'redelivered' => true, - ]; - - $affectedRows = $this->dbal->insert($this->context->getTableName(), $dbalMessage, [ - 'body' => Type::TEXT, - 'headers' => Type::TEXT, - 'properties' => Type::TEXT, - 'priority' => Type::SMALLINT, - 'queue' => Type::STRING, - 'redelivered' => Type::BOOLEAN, - ]); - - if (1 !== $affectedRows) { - throw new \LogicException(sprintf( - 'Expected record was inserted but it is not. message: "%s"', - JSON::encode($dbalMessage) - )); + return; } } diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 87c4233d3..f6e4e3358 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -10,6 +10,7 @@ use Enqueue\Dbal\DbalContext; use Enqueue\Dbal\DbalDestination; use Enqueue\Dbal\DbalMessage; +use Enqueue\Dbal\DbalProducer; use Enqueue\Test\ClassExtensionTrait; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrConsumer; @@ -67,70 +68,58 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() $consumer->reject(new InvalidMessage()); } - public function testRejectShouldInsertNewMessageOnRequeue() + public function testShouldDoNothingOnReject() { - $expectedMessage = [ - 'body' => 'theBody', - 'headers' => '[]', - 'properties' => '[]', - 'priority' => 0, - 'queue' => 'queue', - 'redelivered' => true, - ]; + $queue = new DbalDestination('queue'); - $dbal = $this->createConnectionMock(); - $dbal - ->expects($this->once()) - ->method('insert') - ->with('tableName', $this->equalTo($expectedMessage)) - ->will($this->returnValue(1)) - ; + $message = new DbalMessage(); + $message->setBody('theBody'); $context = $this->createContextMock(); $context - ->expects($this->once()) - ->method('getDbalConnection') - ->will($this->returnValue($dbal)) - ; - $context - ->expects($this->once()) - ->method('getTableName') - ->will($this->returnValue('tableName')) + ->expects($this->never()) + ->method('createProducer') ; - $message = new DbalMessage(); - $message->setBody('theBody'); + $consumer = new DbalConsumer($context, $queue); - $consumer = new DbalConsumer($context, new DbalDestination('queue')); - $consumer->reject($message, true); + $consumer->reject($message); } - public function testRejectShouldThrowIfMessageWasNotInserted() + public function testRejectShouldReSendMessageToSameQueueOnRequeue() { - $dbal = $this->createConnectionMock(); - $dbal + $queue = new DbalDestination('queue'); + + $message = new DbalMessage(); + $message->setBody('theBody'); + + $producerMock = $this->createProducerMock(); + $producerMock ->expects($this->once()) - ->method('insert') - ->willReturn(0) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($message)) ; $context = $this->createContextMock(); $context ->expects($this->once()) - ->method('getDbalConnection') - ->will($this->returnValue($dbal)) + ->method('createProducer') + ->will($this->returnValue($producerMock)) ; - $message = new DbalMessage(); - $message->setBody('theBody'); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Expected record was inserted but it is not. message:'); + $consumer = new DbalConsumer($context, $queue); - $consumer = new DbalConsumer($context, new DbalDestination('queue')); $consumer->reject($message, true); } + /** + * @return DbalProducer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createProducerMock() + { + return $this->createMock(DbalProducer::class); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Connection */ diff --git a/pkg/dbal/Tests/Spec/DbalRequeueMessageTest.php b/pkg/dbal/Tests/Spec/DbalRequeueMessageTest.php new file mode 100644 index 000000000..ec22326ad --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalRequeueMessageTest.php @@ -0,0 +1,21 @@ +createDbalContext(); + } +} diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json index 80c8858cd..1547e015e 100644 --- a/pkg/dbal/composer.json +++ b/pkg/dbal/composer.json @@ -15,7 +15,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5.3@dev", + "queue-interop/queue-spec": "^0.5.4@dev", "symfony/dependency-injection": "^2.8|^3|^4", "symfony/config": "^2.8|^3|^4" },