diff --git a/composer.json b/composer.json index 3d5146194..66573eaf8 100644 --- a/composer.json +++ b/composer.json @@ -23,9 +23,9 @@ "enqueue/simple-client": "*@dev", "enqueue/test": "*@dev", "enqueue/async-event-dispatcher": "*@dev", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", + "queue-interop/amqp-interop": "^0.6@dev", "queue-interop/queue-spec": "^0.5@dev", - "queue-interop/amqp-interop": "^0.5@dev", "phpunit/phpunit": "^5", "doctrine/doctrine-bundle": "~1.2", diff --git a/pkg/amqp-ext/AmqpProducer.php b/pkg/amqp-ext/AmqpProducer.php index ea268de4a..684455ca1 100644 --- a/pkg/amqp-ext/AmqpProducer.php +++ b/pkg/amqp-ext/AmqpProducer.php @@ -6,6 +6,7 @@ use Interop\Amqp\AmqpProducer as InteropAmqpProducer; use Interop\Amqp\AmqpQueue; use Interop\Amqp\AmqpTopic; +use Interop\Queue\DeliveryDelayNotSupportedException; use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrDestination; @@ -14,6 +15,16 @@ class AmqpProducer implements InteropAmqpProducer { + /** + * @var int|null + */ + private $priority; + + /** + * @var int|float|null + */ + private $timeToLive; + /** * @var \AMQPChannel */ @@ -42,6 +53,14 @@ public function send(PsrDestination $destination, PsrMessage $message) InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); + if (null !== $this->priority && null === $message->getPriority()) { + $message->setPriority($this->priority); + } + + if (null !== $this->timeToLive && null === $message->getExpiration()) { + $message->setExpiration($this->timeToLive); + } + $amqpAttributes = $message->getHeaders(); if ($message->getProperties()) { @@ -74,4 +93,52 @@ public function send(PsrDestination $destination, PsrMessage $message) ); } } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + $this->priority = $priority; + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return $this->priority; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + $this->timeToLive = $timeToLive; + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return $this->timeToLive; + } } diff --git a/pkg/amqp-ext/Tests/Spec/AmqpProducerTest.php b/pkg/amqp-ext/Tests/Spec/AmqpProducerTest.php new file mode 100644 index 000000000..5c988c101 --- /dev/null +++ b/pkg/amqp-ext/Tests/Spec/AmqpProducerTest.php @@ -0,0 +1,22 @@ +createContext()->createProducer(); + } +} diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceivePriorityMessagesFromQueueTest.php new file mode 100644 index 000000000..e8251eea0 --- /dev/null +++ b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceivePriorityMessagesFromQueueTest.php @@ -0,0 +1,40 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $queue->setArguments(['x-max-priority' => 10]); + + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest.php new file mode 100644 index 000000000..16a67127c --- /dev/null +++ b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest.php @@ -0,0 +1,38 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-ext/composer.json b/pkg/amqp-ext/composer.json index 519b81cf5..5329c708d 100644 --- a/pkg/amqp-ext/composer.json +++ b/pkg/amqp-ext/composer.json @@ -13,8 +13,8 @@ "require": { "php": ">=5.6", "ext-amqp": "^1.6", - "queue-interop/queue-interop": "^0.5@dev", - "queue-interop/amqp-interop": "^0.5@dev", + + "queue-interop/amqp-interop": "^0.6@dev", "psr/log": "^1" }, "require-dev": { diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index e3f64776e..0ba8c899a 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -17,6 +17,7 @@ use Interop\Queue\PsrTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Wire\AMQPTable; class AmqpContext implements InteropAmqpContext { @@ -173,7 +174,7 @@ public function declareQueue(InteropAmqpQueue $queue) (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_EXCLUSIVE), (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_AUTODELETE), (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_NOWAIT), - $queue->getArguments() + $queue->getArguments() ? new AMQPTable($queue->getArguments()) : null ); } diff --git a/pkg/amqp-lib/AmqpProducer.php b/pkg/amqp-lib/AmqpProducer.php index 68c216960..5c3ba2094 100644 --- a/pkg/amqp-lib/AmqpProducer.php +++ b/pkg/amqp-lib/AmqpProducer.php @@ -6,6 +6,7 @@ use Interop\Amqp\AmqpProducer as InteropAmqpProducer; use Interop\Amqp\AmqpQueue as InteropAmqpQueue; use Interop\Amqp\AmqpTopic as InteropAmqpTopic; +use Interop\Queue\DeliveryDelayNotSupportedException; use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrDestination; @@ -17,6 +18,16 @@ class AmqpProducer implements InteropAmqpProducer { + /** + * @var int|null + */ + private $priority; + + /** + * @var int|float|null + */ + private $timeToLive; + /** * @var AMQPChannel */ @@ -43,6 +54,14 @@ public function send(PsrDestination $destination, PsrMessage $message) InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class); + if (null !== $this->priority && null === $message->getPriority()) { + $message->setPriority($this->priority); + } + + if (null !== $this->timeToLive && null === $message->getExpiration()) { + $message->setExpiration($this->timeToLive); + } + $amqpProperties = $message->getHeaders(); if ($appProperties = $message->getProperties()) { @@ -69,4 +88,52 @@ public function send(PsrDestination $destination, PsrMessage $message) ); } } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + $this->priority = $priority; + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return $this->priority; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + $this->timeToLive = $timeToLive; + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return $this->timeToLive; + } } diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceivePriorityMessagesFromQueueTest.php new file mode 100644 index 000000000..881c8dd8c --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceivePriorityMessagesFromQueueTest.php @@ -0,0 +1,40 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $queue->setArguments(['x-max-priority' => 10]); + + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest.php new file mode 100644 index 000000000..b62882fad --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest.php @@ -0,0 +1,38 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/composer.json b/pkg/amqp-lib/composer.json index be9cb64e4..1241cc233 100644 --- a/pkg/amqp-lib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -13,8 +13,8 @@ "require": { "php": ">=5.6", "php-amqplib/php-amqplib": "^2.7@dev", - "queue-interop/queue-interop": "^0.5@dev", - "queue-interop/amqp-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", + "queue-interop/amqp-interop": "^0.6@dev", "psr/log": "^1" }, "require-dev": { diff --git a/pkg/async-event-dispatcher/composer.json b/pkg/async-event-dispatcher/composer.json index 122500207..3ccb9f1d9 100644 --- a/pkg/async-event-dispatcher/composer.json +++ b/pkg/async-event-dispatcher/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.6", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "symfony/event-dispatcher": "^2.8|^3" }, "require-dev": { diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index 8668a68ce..99657041c 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -85,4 +85,52 @@ public function send(PsrDestination $destination, PsrMessage $message) throw new Exception('The transport fails to send the message due to some internal error.', null, $e); } } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json index 6d7241de5..579f59992 100644 --- a/pkg/dbal/composer.json +++ b/pkg/dbal/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.6", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "doctrine/dbal": "~2.5", "psr/log": "^1" }, diff --git a/pkg/enqueue-bundle/composer.json b/pkg/enqueue-bundle/composer.json index c43285e47..f70ab6af8 100644 --- a/pkg/enqueue-bundle/composer.json +++ b/pkg/enqueue-bundle/composer.json @@ -21,6 +21,7 @@ "phpunit/phpunit": "~5.5", "enqueue/stomp": "^0.7@dev", "enqueue/amqp-ext": "^0.7@dev", + "php-amqplib/php-amqplib": "^2.7@dev", "enqueue/amqp-lib": "^0.7@dev", "enqueue/job-queue": "^0.7@dev", "enqueue/fs": "^0.7@dev", diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index 057979af9..02efee572 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.6", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "enqueue/null": "^0.7@dev", "ramsey/uuid": "^2|^3.5" }, diff --git a/pkg/fs/FsConsumer.php b/pkg/fs/FsConsumer.php index d51f8e1e5..6dca254f2 100644 --- a/pkg/fs/FsConsumer.php +++ b/pkg/fs/FsConsumer.php @@ -89,7 +89,14 @@ public function receiveNoWait() if ($rawMessage) { try { - $this->preFetchedMessages[] = FsMessage::jsonUnserialize($rawMessage); + $fetchedMessage = FsMessage::jsonUnserialize($rawMessage); + $expireAt = $fetchedMessage->getHeader('x-expire-at'); + if ($expireAt && $expireAt - microtime(true) < 0) { + // message has expired, just drop it. + return; + } + + $this->preFetchedMessages[] = $fetchedMessage; } catch (\Exception $e) { throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), null, $e); } diff --git a/pkg/fs/FsProducer.php b/pkg/fs/FsProducer.php index a61584321..16f8626c1 100644 --- a/pkg/fs/FsProducer.php +++ b/pkg/fs/FsProducer.php @@ -2,8 +2,10 @@ namespace Enqueue\Fs; +use Interop\Queue\DeliveryDelayNotSupportedException; use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; +use Interop\Queue\PriorityNotSupportedException; use Interop\Queue\PsrDestination; use Interop\Queue\PsrMessage; use Interop\Queue\PsrProducer; @@ -11,6 +13,11 @@ class FsProducer implements PsrProducer { + /** + * @var float|int|null + */ + private $timeToLive; + /** * @var FsContext */ @@ -41,6 +48,10 @@ public function send(PsrDestination $destination, PsrMessage $message) return; } + if (null !== $this->timeToLive) { + $message->setHeader('x-expire-at', microtime(true) + ($this->timeToLive / 1000)); + } + $rawMessage = '|'.json_encode($message); if (JSON_ERROR_NONE !== json_last_error()) { @@ -56,4 +67,52 @@ public function send(PsrDestination $destination, PsrMessage $message) fwrite($file, $rawMessage); }); } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw PriorityNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + $this->timeToLive = $timeToLive; + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/fs/Tests/Spec/FsSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/pkg/fs/Tests/Spec/FsSendAndReceiveTimeToLiveMessagesFromQueueTest.php new file mode 100644 index 000000000..3dd1697c7 --- /dev/null +++ b/pkg/fs/Tests/Spec/FsSendAndReceiveTimeToLiveMessagesFromQueueTest.php @@ -0,0 +1,20 @@ +createContext(); + } +} diff --git a/pkg/fs/composer.json b/pkg/fs/composer.json index edaeb297b..a80d9cde8 100644 --- a/pkg/fs/composer.json +++ b/pkg/fs/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.6", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "symfony/filesystem": "^2.8|^3", "makasim/temp-file": "^0.2", "psr/log": "^1" diff --git a/pkg/gearman/GearmanProducer.php b/pkg/gearman/GearmanProducer.php index c7c072c76..a891ecd43 100644 --- a/pkg/gearman/GearmanProducer.php +++ b/pkg/gearman/GearmanProducer.php @@ -41,4 +41,52 @@ public function send(PsrDestination $destination, PsrMessage $message) throw new \GearmanException(sprintf('The return code is not %s (GEARMAN_SUCCESS) but %s', \GEARMAN_SUCCESS, $code)); } } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/gearman/composer.json b/pkg/gearman/composer.json index 520a18cec..002a0c5ea 100644 --- a/pkg/gearman/composer.json +++ b/pkg/gearman/composer.json @@ -13,7 +13,7 @@ "require": { "php": ">=5.6", "ext-gearman": "^1.1", - "queue-interop/queue-interop": "^0.5@dev" + "queue-interop/queue-interop": "^0.6@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/null/NullProducer.php b/pkg/null/NullProducer.php index 307d992e8..47169a635 100644 --- a/pkg/null/NullProducer.php +++ b/pkg/null/NullProducer.php @@ -8,10 +8,70 @@ class NullProducer implements PsrProducer { + private $priority; + + private $timeToLive; + + private $deliveryDelay; + /** * {@inheritdoc} */ public function send(PsrDestination $destination, PsrMessage $message) { } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + $this->deliveryDelay = $deliveryDelay; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return $this->deliveryDelay; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + $this->priority = $priority; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return $this->priority; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + $this->timeToLive = $timeToLive; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return $this->timeToLive; + } } diff --git a/pkg/null/composer.json b/pkg/null/composer.json index 529deca4d..8635a4288 100644 --- a/pkg/null/composer.json +++ b/pkg/null/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.6", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "psr/log": "^1" }, "require-dev": { diff --git a/pkg/pheanstalk/PheanstalkProducer.php b/pkg/pheanstalk/PheanstalkProducer.php index 92ea44bec..d9fe8912d 100644 --- a/pkg/pheanstalk/PheanstalkProducer.php +++ b/pkg/pheanstalk/PheanstalkProducer.php @@ -51,4 +51,52 @@ public function send(PsrDestination $destination, PsrMessage $message) $message->getTimeToRun() ); } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/pheanstalk/composer.json b/pkg/pheanstalk/composer.json index 25c4f2225..16e2cda6b 100644 --- a/pkg/pheanstalk/composer.json +++ b/pkg/pheanstalk/composer.json @@ -13,7 +13,7 @@ "require": { "php": ">=5.6", "pda/pheanstalk": "^3", - "queue-interop/queue-interop": "^0.5@dev" + "queue-interop/queue-interop": "^0.6@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index b99c3300b..780f72646 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -42,4 +42,52 @@ public function send(PsrDestination $destination, PsrMessage $message) $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); $topic->produce($partition, 0 /* must be 0 */, $payload, $key); } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index 9986ce546..1737d4340 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -13,7 +13,7 @@ "require": { "php": ">=5.6", "ext-rdkafka": "^3.0.3", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "psr/log": "^1" }, "require-dev": { diff --git a/pkg/redis/RedisProducer.php b/pkg/redis/RedisProducer.php index 55f65b1c2..1860288d0 100644 --- a/pkg/redis/RedisProducer.php +++ b/pkg/redis/RedisProducer.php @@ -36,4 +36,52 @@ public function send(PsrDestination $destination, PsrMessage $message) $this->redis->lpush($destination->getName(), json_encode($message)); } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/redis/composer.json b/pkg/redis/composer.json index 4ee0bf1a9..6af42f97d 100644 --- a/pkg/redis/composer.json +++ b/pkg/redis/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.6", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "psr/log": "^1" }, "require-dev": { diff --git a/pkg/sqs/SqsProducer.php b/pkg/sqs/SqsProducer.php index 0ea55340d..8d69ef1cd 100644 --- a/pkg/sqs/SqsProducer.php +++ b/pkg/sqs/SqsProducer.php @@ -4,12 +4,19 @@ use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; +use Interop\Queue\PriorityNotSupportedException; use Interop\Queue\PsrDestination; use Interop\Queue\PsrMessage; use Interop\Queue\PsrProducer; +use Interop\Queue\TimeToLiveNotSupportedException; class SqsProducer implements PsrProducer { + /** + * @var int|float|null + */ + private $deliveryDelay; + /** * @var SqsContext */ @@ -55,6 +62,10 @@ public function send(PsrDestination $destination, PsrMessage $message) 'QueueUrl' => $this->context->getQueueUrl($destination), ]; + if (null !== $this->deliveryDelay) { + $arguments['DelaySeconds'] = (int) $this->deliveryDelay / 1000; + } + if ($message->getDelaySeconds()) { $arguments['DelaySeconds'] = $message->getDelaySeconds(); } @@ -73,4 +84,52 @@ public function send(PsrDestination $destination, PsrMessage $message) throw new \RuntimeException('Message was not sent'); } } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + $this->deliveryDelay = $deliveryDelay; + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return $this->deliveryDelay; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw PriorityNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + throw TimeToLiveNotSupportedException::providerDoestNotSupportIt(); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/sqs/Tests/Spec/SqsProducerTest.php b/pkg/sqs/Tests/Spec/SqsProducerTest.php new file mode 100644 index 000000000..cd61b9d2a --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsProducerTest.php @@ -0,0 +1,26 @@ + getenv('AWS__SQS__KEY'), + 'secret' => getenv('AWS__SQS__SECRET'), + 'region' => getenv('AWS__SQS__REGION'), + ]); + + return $factory->createContext()->createProducer(); + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php new file mode 100644 index 000000000..7d7b30aae --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsSendAndReceiveDelayedMessageFromQueueTest.php @@ -0,0 +1,43 @@ + getenv('AWS__SQS__KEY'), + 'secret' => getenv('AWS__SQS__SECRET'), + 'region' => getenv('AWS__SQS__REGION'), + ]); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + * + * @param SqsContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queueName = $queueName.time(); + + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + + return $queue; + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php new file mode 100644 index 000000000..3e73cd489 --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromQueueTest.php @@ -0,0 +1,43 @@ + getenv('AWS__SQS__KEY'), + 'secret' => getenv('AWS__SQS__SECRET'), + 'region' => getenv('AWS__SQS__REGION'), + ]); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + * + * @param SqsContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queueName = $queueName.time(); + + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + + return $queue; + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php new file mode 100644 index 000000000..5c4595e88 --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveFromTopicTest.php @@ -0,0 +1,43 @@ + getenv('AWS__SQS__KEY'), + 'secret' => getenv('AWS__SQS__SECRET'), + 'region' => getenv('AWS__SQS__REGION'), + ]); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + * + * @param SqsContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topicName = $topicName.time(); + + $topic = $context->createTopic($topicName); + $context->declareQueue($topic); + + return $topic; + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..9301c647b --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,24 @@ +markTestSkipped('The test is fragile. This is how SQS.'); + } + + /** + * {@inheritdoc} + */ + protected function createContext() + { + throw new \LogicException('Should not be ever called'); + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 000000000..3d9149b05 --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,24 @@ +markTestSkipped('The test is fragile. This is how SQS.'); + } + + /** + * {@inheritdoc} + */ + protected function createContext() + { + throw new \LogicException('Should not be ever called'); + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveFromQueueTest.php new file mode 100644 index 000000000..a9db45362 --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveFromQueueTest.php @@ -0,0 +1,24 @@ +markTestSkipped('The SQS does not support it'); + } + + /** + * {@inheritdoc} + */ + protected function createContext() + { + throw new \LogicException('Should not be ever called'); + } +} diff --git a/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..bbb9be63a --- /dev/null +++ b/pkg/sqs/Tests/Spec/SqsSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,24 @@ +markTestSkipped('The SQS does not support it'); + } + + /** + * {@inheritdoc} + */ + protected function createContext() + { + throw new \LogicException('Should not be ever called'); + } +} diff --git a/pkg/sqs/Tests/SqsProducerTest.php b/pkg/sqs/Tests/SqsProducerTest.php index 9be09ce78..445b9cef0 100644 --- a/pkg/sqs/Tests/SqsProducerTest.php +++ b/pkg/sqs/Tests/SqsProducerTest.php @@ -130,6 +130,56 @@ public function testShouldSendMessage() $producer->send($destination, $message); } + public function testShouldSendDelayedMessage() + { + $expectedArguments = [ + 'MessageAttributes' => [ + 'Headers' => [ + 'DataType' => 'String', + 'StringValue' => '[{"hkey":"hvaleu"},{"key":"value"}]', + ], + ], + 'MessageBody' => 'theBody', + 'QueueUrl' => 'theQueueUrl', + 'DelaySeconds' => 12345, + 'MessageDeduplicationId' => 'theDeduplicationId', + 'MessageGroupId' => 'groupId', + ]; + + $client = $this->createSqsClientMock(); + $client + ->expects($this->once()) + ->method('sendMessage') + ->with($this->identicalTo($expectedArguments)) + ->willReturn(new Result()) + ; + + $context = $this->createSqsContextMock(); + $context + ->expects($this->once()) + ->method('getQueueUrl') + ->willReturn('theQueueUrl') + ; + $context + ->expects($this->once()) + ->method('getClient') + ->will($this->returnValue($client)) + ; + + $destination = new SqsDestination('queue-name'); + $message = new SqsMessage('theBody', ['key' => 'value'], ['hkey' => 'hvaleu']); + $message->setDelaySeconds(12345); + $message->setMessageDeduplicationId('theDeduplicationId'); + $message->setMessageGroupId('groupId'); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Message was not sent'); + + $producer = new SqsProducer($context); + $producer->setDeliveryDelay(5000); + $producer->send($destination, $message); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|SqsContext */ diff --git a/pkg/sqs/composer.json b/pkg/sqs/composer.json index fd007abbc..e14063309 100644 --- a/pkg/sqs/composer.json +++ b/pkg/sqs/composer.json @@ -12,7 +12,7 @@ ], "require": { "php": ">=5.6", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "aws/aws-sdk-php": "~3.26", "psr/log": "^1" }, diff --git a/pkg/stomp/StompProducer.php b/pkg/stomp/StompProducer.php index 17bafbebe..e5206419c 100644 --- a/pkg/stomp/StompProducer.php +++ b/pkg/stomp/StompProducer.php @@ -44,4 +44,52 @@ public function send(PsrDestination $destination, PsrMessage $message) $this->stomp->send($destination->getQueueName(), $stompMessage); } + + /** + * {@inheritdoc} + */ + public function setDeliveryDelay($deliveryDelay) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getDeliveryDelay() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setPriority($priority) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getPriority() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function setTimeToLive($timeToLive) + { + throw new \LogicException('Not implemented'); + } + + /** + * {@inheritdoc} + */ + public function getTimeToLive() + { + return null; + } } diff --git a/pkg/stomp/composer.json b/pkg/stomp/composer.json index 5fb04fea2..996dce855 100644 --- a/pkg/stomp/composer.json +++ b/pkg/stomp/composer.json @@ -13,7 +13,7 @@ "require": { "php": ">=5.6", "stomp-php/stomp-php": "^4", - "queue-interop/queue-interop": "^0.5@dev", + "queue-interop/queue-interop": "^0.6@dev", "php-http/guzzle6-adapter": "^1.1", "richardfullmer/rabbitmq-management-api": "^2.0", "psr/log": "^1"