Skip to content

Redis New Implementation #585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 30, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pkg/redis/LuaScripts.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

class LuaScripts
{
/**
* Get the Lua script to migrate expired messages back onto the queue.
*
* KEYS[1] - The queue we are removing messages from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving messages to, for example: queues:foo
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpired()
{
return <<<'LUA'
-- Get all of the messages with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate messages onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

for i = 1, #val, 100 do
redis.call('lpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end

return val
LUA;
}
}
28 changes: 28 additions & 0 deletions pkg/redis/PRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ public function __construct(array $config)
}
}

public function eval(string $script, array $keys = [], array $args = [])
{
try {
// mixed eval($script, $numkeys, $keyOrArg1 = null, $keyOrArgN = null)
return call_user_func_array([$this->redis, 'eval'], array_merge([$script, count($keys)], $keys, $args));
} catch (PRedisServerException $e) {
throw new ServerException('eval command has failed', null, $e);
}
}

public function zadd(string $key, string $value, float $score): int
{
try {
return $this->redis->zadd($key, [$value => $score]);
} catch (PRedisServerException $e) {
throw new ServerException('zadd command has failed', null, $e);
}
}

public function zrem(string $key, string $value): int
{
try {
return $this->redis->zrem($key, [$value]);
} catch (PRedisServerException $e) {
throw new ServerException('zrem command has failed', null, $e);
}
}

public function lpush(string $key, string $value): int
{
try {
Expand Down
27 changes: 27 additions & 0 deletions pkg/redis/PhpRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,33 @@ public function __construct(array $config)
$this->config = $config;
}

public function eval(string $script, array $keys = [], array $args = [])
{
try {
return $this->redis->eval($script, array_merge($keys, $args), count($keys));
} catch (\RedisException $e) {
throw new ServerException('eval command has failed', null, $e);
}
}

public function zadd(string $key, string $value, float $score): int
{
try {
return $this->redis->zAdd($key, $score, $value);
} catch (\RedisException $e) {
throw new ServerException('zadd command has failed', null, $e);
}
}

public function zrem(string $key, string $value): int
{
try {
return $this->redis->zRem($key, $value);
} catch (\RedisException $e) {
throw new ServerException('zrem command has failed', null, $e);
}
}

public function lpush(string $key, string $value): int
{
try {
Expand Down
32 changes: 32 additions & 0 deletions pkg/redis/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,38 @@

interface Redis
{
/**
* @param string $script
* @param array $keys
* @param array $args
*
* @throws ServerException
*
* @return mixed
*/
public function eval(string $script, array $keys = [], array $args = []);

/**
* @param string $key
* @param string $value
* @param float $score
*
* @throws ServerException
*
* @return int
*/
public function zadd(string $key, string $value, float $score): int;

/**
* @param string $key
* @param string $value
*
* @throws ServerException
*
* @return int
*/
public function zrem(string $key, string $value): int;

/**
* @param string $key
* @param string $value
Expand Down
71 changes: 58 additions & 13 deletions pkg/redis/RedisConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,42 @@ class RedisConsumer implements Consumer
*/
private $context;

/**
* @var int
*/
private $retryDelay;

/**
* @var RedisQueueConsumer
*/
private $queueConsumer;

public function __construct(RedisContext $context, RedisDestination $queue)
{
$this->context = $context;
$this->queue = $queue;
}

/**
* @return int
*/
public function getRetryDelay(): ?int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redeliveryDelay

{
return $this->retryDelay;
}

/**
* @param int $retryDelay
*/
public function setRetryDelay(int $retryDelay): void
{
$this->retryDelay = $retryDelay;

if ($this->queueConsumer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think setting retryDelay property is enough. No need to set same value on queue consumer too. (It will be passed as argument to receiveXXX methods).

$this->queueConsumer->setRetryDelay($this->retryDelay);
}
}

/**
* @return RedisDestination
*/
Expand All @@ -40,40 +70,37 @@ public function getQueue(): Queue
*/
public function receive(int $timeout = 0): ?Message
{
$timeout = (int) ($timeout / 1000);
if (empty($timeout)) {
$timeout = (int) ceil($timeout / 1000);

if ($timeout <= 0) {
while (true) {
if ($message = $this->receive(5000)) {
return $message;
}
}
}

if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
$this->initQueueConsumer();

return null;
return $this->queueConsumer->receiveMessage($timeout);
}

/**
* @return RedisMessage
*/
public function receiveNoWait(): ?Message
{
if ($result = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
$this->initQueueConsumer();

return null;
return $this->queueConsumer->receiveMessageNoWait($this->queue);
}

/**
* @param RedisMessage $message
*/
public function acknowledge(Message $message): void
{
// do nothing. redis transport always works in auto ack mode
$this->getRedis()->zrem($this->queue->getName().':reserved', $message->getReservedKey());
}

/**
Expand All @@ -83,15 +110,33 @@ public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class);

// do nothing on reject. redis transport always works in auto ack mode
$this->acknowledge($message);

if ($requeue) {
$this->context->createProducer()->send($this->queue, $message);
$message = RedisMessage::jsonUnserialize($message->getReservedKey());
$message->setHeader('attempts', 0);

if ($message->getTimeToLive()) {
$message->setHeader('expires_at', time() + $message->getTimeToLive());
}

$this->getRedis()->lpush($this->queue->getName(), json_encode($message));
}
}

private function getRedis(): Redis
{
return $this->context->getRedis();
}

private function initQueueConsumer(): void
{
if (null === $this->queueConsumer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make a RedisQueueConsumer stateless we could move it to the context and pass to real consumers as a dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init would not be needed.

$this->queueConsumer = new RedisQueueConsumer($this->getRedis(), [$this->queue]);

if ($this->retryDelay) {
$this->queueConsumer->setRetryDelay($this->retryDelay);
}
}
}
}
13 changes: 10 additions & 3 deletions pkg/redis/RedisContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public function deleteQueue(Queue $queue): void
{
InvalidDestinationException::assertDestinationInstanceOf($queue, RedisDestination::class);

$this->getRedis()->del($queue->getName());
$this->deleteDestination($queue);
}

/**
Expand All @@ -88,7 +88,7 @@ public function deleteTopic(Topic $topic): void
{
InvalidDestinationException::assertDestinationInstanceOf($topic, RedisDestination::class);

$this->getRedis()->del($topic->getName());
$this->deleteDestination($topic);
}

public function createTemporaryQueue(): Queue
Expand Down Expand Up @@ -129,7 +129,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer
*/
public function purgeQueue(Queue $queue): void
{
$this->getRedis()->del($queue->getName());
$this->deleteDestination($queue);
}

public function close(): void
Expand All @@ -154,4 +154,11 @@ public function getRedis(): Redis

return $this->redis;
}

private function deleteDestination(RedisDestination $destination): void
{
$this->getRedis()->del($destination->getName());
$this->getRedis()->del($destination->getName().':delayed');
$this->getRedis()->del($destination->getName().':reserved');
}
}
Loading