diff --git a/pkg/sqs/SqsConsumer.php b/pkg/sqs/SqsConsumer.php index 3c8714e3d..4b89eb7ff 100644 --- a/pkg/sqs/SqsConsumer.php +++ b/pkg/sqs/SqsConsumer.php @@ -138,7 +138,7 @@ public function reject(Message $message, bool $requeue = false): void '@region' => $this->queue->getRegion(), 'QueueUrl' => $this->context->getQueueUrl($this->queue), 'ReceiptHandle' => $message->getReceiptHandle(), - 'VisibilityTimeout' => 0, + 'VisibilityTimeout' => $message->getRequeueVisibilityTimeout(), ]); } else { $this->context->getSqsClient()->deleteMessage([ diff --git a/pkg/sqs/SqsMessage.php b/pkg/sqs/SqsMessage.php index 1d9d26bec..3b0f46bd9 100644 --- a/pkg/sqs/SqsMessage.php +++ b/pkg/sqs/SqsMessage.php @@ -48,6 +48,11 @@ class SqsMessage implements Message */ private $receiptHandle; + /** + * @var int + */ + private $requeueVisibilityTimeout; + public function __construct(string $body = '', array $properties = [], array $headers = []) { $this->body = $body; @@ -55,6 +60,7 @@ public function __construct(string $body = '', array $properties = [], array $he $this->headers = $headers; $this->redelivered = false; $this->delaySeconds = 0; + $this->requeueVisibilityTimeout = 0; } public function setBody(string $body): void @@ -229,4 +235,19 @@ public function getReceiptHandle(): ?string { return $this->receiptHandle; } + + /** + * The number of seconds before the message can be visible again when requeuing. Valid values: 0 to 43200. Maximum: 12 hours. + * + * Set requeue visibility timeout + */ + public function setRequeueVisibilityTimeout(int $seconds): void + { + $this->requeueVisibilityTimeout = $seconds; + } + + public function getRequeueVisibilityTimeout(): int + { + return $this->requeueVisibilityTimeout; + } } diff --git a/pkg/sqs/Tests/SqsConsumerTest.php b/pkg/sqs/Tests/SqsConsumerTest.php index 67e6f8fc4..ea5c557b5 100644 --- a/pkg/sqs/Tests/SqsConsumerTest.php +++ b/pkg/sqs/Tests/SqsConsumerTest.php @@ -238,6 +238,47 @@ public function testShouldRejectMessageAndRequeue() $consumer->reject($message, true); } + public function testShouldRejectMessageAndRequeueWithVisibilityTimeout() + { + $client = $this->createSqsClientMock(); + $client + ->expects($this->once()) + ->method('changeMessageVisibility') + ->with($this->identicalTo([ + '@region' => 'theRegion', + 'QueueUrl' => 'theQueueUrl', + 'ReceiptHandle' => 'theReceipt', + 'VisibilityTimeout' => 30, + ])) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getSqsClient') + ->willReturn($client) + ; + $context + ->expects($this->once()) + ->method('getQueueUrl') + ->willReturn('theQueueUrl') + ; + $context + ->expects($this->never()) + ->method('createProducer') + ; + + $message = new SqsMessage(); + $message->setReceiptHandle('theReceipt'); + $message->setRequeueVisibilityTimeout(30); + + $destination = new SqsDestination('queue'); + $destination->setRegion('theRegion'); + + $consumer = new SqsConsumer($context, $destination); + $consumer->reject($message, true); + } + public function testShouldReceiveMessage() { $expectedAttributes = [