Skip to content

Commit f08bcde

Browse files
committed
prepare redis interface and impl to consume from multiple queues.
1 parent 764ec22 commit f08bcde

8 files changed

+169
-73
lines changed

Diff for: pkg/redis/PRedis.php

+20-10
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ public function __construct(array $config)
3939
/**
4040
* {@inheritdoc}
4141
*/
42-
public function lpush($key, $value)
42+
public function lpush(string $key, string $value): int
4343
{
4444
try {
45-
$this->redis->lpush($key, [$value]);
45+
return $this->redis->lpush($key, [$value]);
4646
} catch (PRedisServerException $e) {
4747
throw new ServerException('lpush command has failed', null, $e);
4848
}
@@ -51,12 +51,14 @@ public function lpush($key, $value)
5151
/**
5252
* {@inheritdoc}
5353
*/
54-
public function brpop($key, $timeout)
54+
public function brpop(array $keys, int $timeout): ?RedisResult
5555
{
5656
try {
57-
if ($result = $this->redis->brpop([$key], $timeout)) {
58-
return $result[1];
57+
if ($result = $this->redis->brpop($keys, $timeout)) {
58+
return new RedisResult($result[0], $result[1]);
5959
}
60+
61+
return null;
6062
} catch (PRedisServerException $e) {
6163
throw new ServerException('brpop command has failed', null, $e);
6264
}
@@ -65,10 +67,14 @@ public function brpop($key, $timeout)
6567
/**
6668
* {@inheritdoc}
6769
*/
68-
public function rpop($key)
70+
public function rpop(string $key): ?RedisResult
6971
{
7072
try {
71-
return $this->redis->rpop($key);
73+
if ($message = $this->redis->rpop($key)) {
74+
return new RedisResult($key, $message);
75+
}
76+
77+
return null;
7278
} catch (PRedisServerException $e) {
7379
throw new ServerException('rpop command has failed', null, $e);
7480
}
@@ -77,8 +83,12 @@ public function rpop($key)
7783
/**
7884
* {@inheritdoc}
7985
*/
80-
public function connect()
86+
public function connect(): void
8187
{
88+
if ($this->redis) {
89+
return;
90+
}
91+
8292
$this->redis = new Client($this->config, ['exceptions' => true]);
8393

8494
if ($this->config['pass']) {
@@ -91,15 +101,15 @@ public function connect()
91101
/**
92102
* {@inheritdoc}
93103
*/
94-
public function disconnect()
104+
public function disconnect(): void
95105
{
96106
$this->redis->disconnect();
97107
}
98108

99109
/**
100110
* {@inheritdoc}
101111
*/
102-
public function del($key)
112+
public function del(string $key): void
103113
{
104114
$this->redis->del([$key]);
105115
}

Diff for: pkg/redis/PhpRedis.php

+44-38
Original file line numberDiff line numberDiff line change
@@ -24,80 +24,86 @@ public function __construct(array $config)
2424
'port' => null,
2525
'pass' => null,
2626
'user' => null,
27-
'timeout' => null,
27+
'timeout' => .0,
2828
'reserved' => null,
2929
'retry_interval' => null,
3030
'persisted' => false,
3131
'database' => 0,
3232
], $config);
33+
34+
var_dump($this->config);
3335
}
3436

3537
/**
3638
* {@inheritdoc}
3739
*/
38-
public function lpush($key, $value)
40+
public function lpush(string $key, string $value): int
3941
{
40-
if (false == $this->redis->lPush($key, $value)) {
41-
throw new ServerException($this->redis->getLastError());
42-
}
42+
return $this->redis->lPush($key, $value);
4343
}
4444

4545
/**
4646
* {@inheritdoc}
4747
*/
48-
public function brpop($key, $timeout)
48+
public function brpop(array $keys, int $timeout): ?RedisResult
4949
{
50-
if ($result = $this->redis->brPop([$key], $timeout)) {
51-
return $result[1];
50+
if ($result = $this->redis->brPop($keys, $timeout)) {
51+
return new RedisResult($result[0], $result[1]);
5252
}
53+
54+
return null;
5355
}
5456

5557
/**
5658
* {@inheritdoc}
5759
*/
58-
public function rpop($key)
60+
public function rpop(string $key): ?RedisResult
5961
{
60-
return $this->redis->rPop($key);
62+
if ($message = $this->redis->rPop($key)) {
63+
return new RedisResult($key, $message);
64+
}
65+
66+
return null;
6167
}
6268

6369
/**
6470
* {@inheritdoc}
6571
*/
66-
public function connect()
72+
public function connect(): void
6773
{
68-
if (false == $this->redis) {
69-
$this->redis = new \Redis();
70-
71-
if ($this->config['persisted']) {
72-
$this->redis->pconnect(
73-
$this->config['host'],
74-
$this->config['port'],
75-
$this->config['timeout']
76-
);
77-
} else {
78-
$this->redis->connect(
79-
$this->config['host'],
80-
$this->config['port'],
81-
$this->config['timeout'],
82-
$this->config['reserved'],
83-
$this->config['retry_interval']
84-
);
85-
}
86-
87-
if ($this->config['pass']) {
88-
$this->redis->auth($this->config['pass']);
89-
}
90-
91-
$this->redis->select($this->config['database']);
74+
if ($this->redis) {
75+
return;
76+
}
77+
78+
$this->redis = new \Redis();
79+
80+
if ($this->config['persisted']) {
81+
$this->redis->pconnect(
82+
$this->config['host'],
83+
$this->config['port'],
84+
$this->config['timeout']
85+
);
86+
} else {
87+
$this->redis->connect(
88+
$this->config['host'],
89+
$this->config['port'],
90+
$this->config['timeout'],
91+
$this->config['reserved'],
92+
$this->config['retry_interval']
93+
);
94+
}
95+
96+
if ($this->config['pass']) {
97+
$this->redis->auth($this->config['pass']);
9298
}
9399

94-
return $this->redis;
100+
$this->redis->select($this->config['database']);
95101
}
96102

97103
/**
98104
* {@inheritdoc}
99105
*/
100-
public function disconnect()
106+
public function disconnect(): void
101107
{
102108
if ($this->redis) {
103109
$this->redis->close();
@@ -107,7 +113,7 @@ public function disconnect()
107113
/**
108114
* {@inheritdoc}
109115
*/
110-
public function del($key)
116+
public function del(string $key): void
111117
{
112118
$this->redis->del($key);
113119
}

Diff for: pkg/redis/Redis.php

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace Enqueue\Redis;
46

57
interface Redis
@@ -10,29 +12,29 @@ interface Redis
1012
*
1113
* @return int length of the list
1214
*/
13-
public function lpush($key, $value);
15+
public function lpush(string $key, string $value): int;
1416

1517
/**
16-
* @param string $key
17-
* @param int $timeout in seconds
18+
* @param string[] $keys
19+
* @param int $timeout in seconds
1820
*
19-
* @return string|null
21+
* @return RedisResult|null
2022
*/
21-
public function brpop($key, $timeout);
23+
public function brpop(array $keys, int $timeout): ?RedisResult;
2224

2325
/**
2426
* @param string $key
2527
*
26-
* @return string|null
28+
* @return RedisResult|null
2729
*/
28-
public function rpop($key);
30+
public function rpop(string $key): ?RedisResult;
2931

30-
public function connect();
32+
public function connect(): void;
3133

32-
public function disconnect();
34+
public function disconnect(): void;
3335

3436
/**
3537
* @param string $key
3638
*/
37-
public function del($key);
39+
public function del(string $key): void;
3840
}

Diff for: pkg/redis/RedisConnectionFactory.php

+5-1
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ private function parseDsn($dsn)
136136
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
137137
}
138138

139+
if (array_key_exists('port', $config)) {
140+
$config['port'] = (int) $config['port'];
141+
}
142+
139143
if ($query = parse_url($dsn, PHP_URL_QUERY)) {
140144
$queryConfig = [];
141145
parse_str($query, $queryConfig);
@@ -159,7 +163,7 @@ private function defaultConfig()
159163
return [
160164
'host' => 'localhost',
161165
'port' => 6379,
162-
'timeout' => null,
166+
'timeout' => .0,
163167
'reserved' => null,
164168
'retry_interval' => null,
165169
'vendor' => 'phpredis',

Diff for: pkg/redis/RedisConsumer.php

+9-9
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ public function receive($timeout = 0)
4747
{
4848
$timeout = (int) ($timeout / 1000);
4949
if (empty($timeout)) {
50-
// Caused by
51-
// Predis\Response\ServerException: ERR timeout is not an integer or out of range
52-
// /mqdev/vendor/predis/predis/src/Client.php:370
53-
54-
return $this->receiveNoWait();
50+
while (true) {
51+
if ($message = $this->receive(5000)) {
52+
return $message;
53+
}
54+
}
5555
}
5656

57-
if ($message = $this->getRedis()->brpop($this->queue->getName(), $timeout)) {
58-
return RedisMessage::jsonUnserialize($message);
57+
if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) {
58+
return RedisMessage::jsonUnserialize($result->getMessage());
5959
}
6060
}
6161

@@ -66,8 +66,8 @@ public function receive($timeout = 0)
6666
*/
6767
public function receiveNoWait()
6868
{
69-
if ($message = $this->getRedis()->rpop($this->queue->getName())) {
70-
return RedisMessage::jsonUnserialize($message);
69+
if ($result = $this->getRedis()->rpop($this->queue->getName())) {
70+
return RedisMessage::jsonUnserialize($result->getMessage());
7171
}
7272
}
7373

Diff for: pkg/redis/RedisResult.php

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
namespace Enqueue\Redis;
4+
5+
class RedisResult
6+
{
7+
/**
8+
* @var string
9+
*/
10+
private $key;
11+
12+
/**
13+
* @var string
14+
*/
15+
private $message;
16+
17+
public function __construct(string $key, string $message)
18+
{
19+
$this->key = $key;
20+
$this->message = $message;
21+
}
22+
23+
public function getKey(): string
24+
{
25+
return $this->key;
26+
}
27+
28+
public function getMessage(): string
29+
{
30+
return $this->message;
31+
}
32+
}

0 commit comments

Comments
 (0)