Skip to content

Commit bc28f1e

Browse files
committed
[redis] introduce Subscription Consumer.
1 parent f08bcde commit bc28f1e

File tree

1 file changed

+123
-0
lines changed

1 file changed

+123
-0
lines changed

Diff for: pkg/redis/RedisSubscriptionConsumer.php

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
<?php
2+
3+
namespace Enqueue\Redis;
4+
5+
use Interop\Queue\PsrConsumer;
6+
use Interop\Queue\PsrSubscriptionConsumer;
7+
8+
class RedisSubscriptionConsumer implements PsrSubscriptionConsumer
9+
{
10+
/**
11+
* @var RedisContext
12+
*/
13+
private $context;
14+
15+
/**
16+
* an item contains an array: [RedisConsumer $consumer, callable $callback];.
17+
*
18+
* @var array
19+
*/
20+
private $subscribers;
21+
22+
/**
23+
* @param RedisContext $context
24+
*/
25+
public function __construct(RedisContext $context)
26+
{
27+
$this->context = $context;
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*/
33+
public function consume($timeout = 0)
34+
{
35+
if (empty($this->subscribers)) {
36+
throw new \LogicException('No subscribers');
37+
}
38+
39+
$timeout /= 1000;
40+
$endAt = microtime(true) + $timeout;
41+
42+
$queueNames = [];
43+
foreach (array_keys($this->subscribers) as $queueName) {
44+
$queueNames[$queueName] = $queueName;
45+
}
46+
47+
$currentQueueNames = [];
48+
while (true) {
49+
if (empty($currentQueueNames)) {
50+
$currentQueueNames = $queueNames;
51+
}
52+
53+
/**
54+
* @var string
55+
* @var PsrConsumer $consumer
56+
* @var callable $processor
57+
*/
58+
$result = $this->context->getRedis()->brpop($currentQueueNames, $timeout || 5000);
59+
if ($result) {
60+
$message = RedisMessage::jsonUnserialize($result->getMessage());
61+
$callback = $this->subscribers[$result->getKey()];
62+
if (false === call_user_func($callback, $message, $consumer)) {
63+
return;
64+
}
65+
66+
unset($currentQueueNames[$result->getKey()]);
67+
} else {
68+
$currentQueueNames = [];
69+
70+
if ($timeout && microtime(true) >= $endAt) {
71+
return;
72+
}
73+
}
74+
75+
if ($timeout && microtime(true) >= $endAt) {
76+
return;
77+
}
78+
}
79+
}
80+
81+
/**
82+
* {@inheritdoc}
83+
*
84+
* @param RedisConsumer $consumer
85+
*/
86+
public function subscribe(PsrConsumer $consumer, callable $callback)
87+
{
88+
if (false == $consumer instanceof RedisConsumer) {
89+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', RedisConsumer::class, get_class($consumer)));
90+
}
91+
92+
$queueName = $consumer->getQueue()->getQueueName();
93+
if (array_key_exists($queueName, $this->subscribers)) {
94+
return;
95+
}
96+
97+
$this->subscribers[$queueName] = [$consumer, $callback];
98+
}
99+
100+
/**
101+
* {@inheritdoc}
102+
*
103+
* @param RedisConsumer $consumer
104+
*/
105+
public function unsubscribe(PsrConsumer $consumer)
106+
{
107+
if (false == $consumer instanceof RedisConsumer) {
108+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', RedisConsumer::class, get_class($consumer)));
109+
}
110+
111+
$queueName = $consumer->getQueue()->getQueueName();
112+
113+
unset($this->subscribers[$queueName]);
114+
}
115+
116+
/**
117+
* {@inheritdoc}
118+
*/
119+
public function unsubscribeAll()
120+
{
121+
$this->subscribers = [];
122+
}
123+
}

0 commit comments

Comments
 (0)