Skip to content

Commit 2c2860d

Browse files
chayimdvora-h
andauthored
Change cluster docker to edge and enable debug command (redis#2853)
* debug in cluster docker, replace for master * sleep time for cluster to settle... * fix test_cluster_delslotsrange * fix tests --------- Co-authored-by: dvora-h <[email protected]>
1 parent 2732a85 commit 2c2860d

File tree

9 files changed

+177
-105
lines changed

9 files changed

+177
-105
lines changed

.github/workflows/integration.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ jobs:
7878
pip install hiredis
7979
fi
8080
invoke devenv
81-
sleep 5 # time to settle
81+
sleep 10 # time to settle
8282
invoke ${{matrix.test-type}}-tests
8383
8484
- uses: actions/upload-artifact@v2

dockers/Dockerfile.cluster

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM redis/redis-stack-server:latest as rss
1+
FROM redis/redis-stack-server:edge as rss
22

33
COPY dockers/create_cluster.sh /create_cluster.sh
44
RUN ls -R /opt/redis-stack

dockers/cluster.redis.conf

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
protected-mode no
2+
enable-debug-command yes
23
loadmodule /opt/redis-stack/lib/redisearch.so
34
loadmodule /opt/redis-stack/lib/redisgraph.so
45
loadmodule /opt/redis-stack/lib/redistimeseries.so

redis/_parsers/commands.py

+98-73
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,53 @@
77
from redis.asyncio.cluster import ClusterNode
88

99

10-
class CommandsParser:
10+
class AbstractCommandsParser:
11+
def _get_pubsub_keys(self, *args):
12+
"""
13+
Get the keys from pubsub command.
14+
Although PubSub commands have predetermined key locations, they are not
15+
supported in the 'COMMAND's output, so the key positions are hardcoded
16+
in this method
17+
"""
18+
if len(args) < 2:
19+
# The command has no keys in it
20+
return None
21+
args = [str_if_bytes(arg) for arg in args]
22+
command = args[0].upper()
23+
keys = None
24+
if command == "PUBSUB":
25+
# the second argument is a part of the command name, e.g.
26+
# ['PUBSUB', 'NUMSUB', 'foo'].
27+
pubsub_type = args[1].upper()
28+
if pubsub_type in ["CHANNELS", "NUMSUB", "SHARDCHANNELS", "SHARDNUMSUB"]:
29+
keys = args[2:]
30+
elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]:
31+
# format example:
32+
# SUBSCRIBE channel [channel ...]
33+
keys = list(args[1:])
34+
elif command in ["PUBLISH", "SPUBLISH"]:
35+
# format example:
36+
# PUBLISH channel message
37+
keys = [args[1]]
38+
return keys
39+
40+
def parse_subcommand(self, command, **options):
41+
cmd_dict = {}
42+
cmd_name = str_if_bytes(command[0])
43+
cmd_dict["name"] = cmd_name
44+
cmd_dict["arity"] = int(command[1])
45+
cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
46+
cmd_dict["first_key_pos"] = command[3]
47+
cmd_dict["last_key_pos"] = command[4]
48+
cmd_dict["step_count"] = command[5]
49+
if len(command) > 7:
50+
cmd_dict["tips"] = command[7]
51+
cmd_dict["key_specifications"] = command[8]
52+
cmd_dict["subcommands"] = command[9]
53+
return cmd_dict
54+
55+
56+
class CommandsParser(AbstractCommandsParser):
1157
"""
1258
Parses Redis commands to get command keys.
1359
COMMAND output is used to determine key locations.
@@ -30,21 +76,6 @@ def initialize(self, r):
3076
commands[cmd.lower()] = commands.pop(cmd)
3177
self.commands = commands
3278

33-
def parse_subcommand(self, command, **options):
34-
cmd_dict = {}
35-
cmd_name = str_if_bytes(command[0])
36-
cmd_dict["name"] = cmd_name
37-
cmd_dict["arity"] = int(command[1])
38-
cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
39-
cmd_dict["first_key_pos"] = command[3]
40-
cmd_dict["last_key_pos"] = command[4]
41-
cmd_dict["step_count"] = command[5]
42-
if len(command) > 7:
43-
cmd_dict["tips"] = command[7]
44-
cmd_dict["key_specifications"] = command[8]
45-
cmd_dict["subcommands"] = command[9]
46-
return cmd_dict
47-
4879
# As soon as this PR is merged into Redis, we should reimplement
4980
# our logic to use COMMAND INFO changes to determine the key positions
5081
# https://github.com/redis/redis/pull/8324
@@ -138,37 +169,8 @@ def _get_moveable_keys(self, redis_conn, *args):
138169
raise e
139170
return keys
140171

141-
def _get_pubsub_keys(self, *args):
142-
"""
143-
Get the keys from pubsub command.
144-
Although PubSub commands have predetermined key locations, they are not
145-
supported in the 'COMMAND's output, so the key positions are hardcoded
146-
in this method
147-
"""
148-
if len(args) < 2:
149-
# The command has no keys in it
150-
return None
151-
args = [str_if_bytes(arg) for arg in args]
152-
command = args[0].upper()
153-
keys = None
154-
if command == "PUBSUB":
155-
# the second argument is a part of the command name, e.g.
156-
# ['PUBSUB', 'NUMSUB', 'foo'].
157-
pubsub_type = args[1].upper()
158-
if pubsub_type in ["CHANNELS", "NUMSUB", "SHARDCHANNELS", "SHARDNUMSUB"]:
159-
keys = args[2:]
160-
elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]:
161-
# format example:
162-
# SUBSCRIBE channel [channel ...]
163-
keys = list(args[1:])
164-
elif command in ["PUBLISH", "SPUBLISH"]:
165-
# format example:
166-
# PUBLISH channel message
167-
keys = [args[1]]
168-
return keys
169172

170-
171-
class AsyncCommandsParser:
173+
class AsyncCommandsParser(AbstractCommandsParser):
172174
"""
173175
Parses Redis commands to get command keys.
174176
@@ -194,52 +196,75 @@ async def initialize(self, node: Optional["ClusterNode"] = None) -> None:
194196
self.node = node
195197

196198
commands = await self.node.execute_command("COMMAND")
197-
for cmd, command in commands.items():
198-
if "movablekeys" in command["flags"]:
199-
commands[cmd] = -1
200-
elif command["first_key_pos"] == 0 and command["last_key_pos"] == 0:
201-
commands[cmd] = 0
202-
elif command["first_key_pos"] == 1 and command["last_key_pos"] == 1:
203-
commands[cmd] = 1
204-
self.commands = {cmd.upper(): command for cmd, command in commands.items()}
199+
self.commands = {cmd.lower(): command for cmd, command in commands.items()}
205200

206201
# As soon as this PR is merged into Redis, we should reimplement
207202
# our logic to use COMMAND INFO changes to determine the key positions
208203
# https://github.com/redis/redis/pull/8324
209204
async def get_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
205+
"""
206+
Get the keys from the passed command.
207+
208+
NOTE: Due to a bug in redis<7.0, this function does not work properly
209+
for EVAL or EVALSHA when the `numkeys` arg is 0.
210+
- issue: https://github.com/redis/redis/issues/9493
211+
- fix: https://github.com/redis/redis/pull/9733
212+
213+
So, don't use this function with EVAL or EVALSHA.
214+
"""
210215
if len(args) < 2:
211216
# The command has no keys in it
212217
return None
213218

214-
try:
215-
command = self.commands[args[0]]
216-
except KeyError:
217-
# try to split the command name and to take only the main command
219+
cmd_name = args[0].lower()
220+
if cmd_name not in self.commands:
221+
# try to split the command name and to take only the main command,
218222
# e.g. 'memory' for 'memory usage'
219-
args = args[0].split() + list(args[1:])
220-
cmd_name = args[0].upper()
221-
if cmd_name not in self.commands:
223+
cmd_name_split = cmd_name.split()
224+
cmd_name = cmd_name_split[0]
225+
if cmd_name in self.commands:
226+
# save the splitted command to args
227+
args = cmd_name_split + list(args[1:])
228+
else:
222229
# We'll try to reinitialize the commands cache, if the engine
223230
# version has changed, the commands may not be current
224231
await self.initialize()
225232
if cmd_name not in self.commands:
226233
raise RedisError(
227-
f"{cmd_name} command doesn't exist in Redis commands"
234+
f"{cmd_name.upper()} command doesn't exist in Redis commands"
228235
)
229236

230-
command = self.commands[cmd_name]
237+
command = self.commands.get(cmd_name)
238+
if "movablekeys" in command["flags"]:
239+
keys = await self._get_moveable_keys(*args)
240+
elif "pubsub" in command["flags"] or command["name"] == "pubsub":
241+
keys = self._get_pubsub_keys(*args)
242+
else:
243+
if (
244+
command["step_count"] == 0
245+
and command["first_key_pos"] == 0
246+
and command["last_key_pos"] == 0
247+
):
248+
is_subcmd = False
249+
if "subcommands" in command:
250+
subcmd_name = f"{cmd_name}|{args[1].lower()}"
251+
for subcmd in command["subcommands"]:
252+
if str_if_bytes(subcmd[0]) == subcmd_name:
253+
command = self.parse_subcommand(subcmd)
254+
is_subcmd = True
231255

232-
if command == 1:
233-
return (args[1],)
234-
if command == 0:
235-
return None
236-
if command == -1:
237-
return await self._get_moveable_keys(*args)
256+
# The command doesn't have keys in it
257+
if not is_subcmd:
258+
return None
259+
last_key_pos = command["last_key_pos"]
260+
if last_key_pos < 0:
261+
last_key_pos = len(args) - abs(last_key_pos)
262+
keys_pos = list(
263+
range(command["first_key_pos"], last_key_pos + 1, command["step_count"])
264+
)
265+
keys = [args[pos] for pos in keys_pos]
238266

239-
last_key_pos = command["last_key_pos"]
240-
if last_key_pos < 0:
241-
last_key_pos = len(args) + last_key_pos
242-
return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]]
267+
return keys
243268

244269
async def _get_moveable_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
245270
try:

redis/_parsers/helpers.py

+1
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,7 @@ def string_keys_to_dict(key_string, callback):
733733
"MODULE UNLOAD": bool,
734734
"PING": lambda r: str_if_bytes(r) == "PONG",
735735
"PUBSUB NUMSUB": parse_pubsub_numsub,
736+
"PUBSUB SHARDNUMSUB": parse_pubsub_numsub,
736737
"QUIT": bool_ok,
737738
"SET": parse_set_result,
738739
"SCAN": parse_scan,

redis/cluster.py

+2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ def parse_cluster_shards(resp, **options):
101101
"""
102102
Parse CLUSTER SHARDS response.
103103
"""
104+
if isinstance(resp[0], dict):
105+
return resp
104106
shards = []
105107
for x in resp:
106108
shard = {"slots": [], "nodes": []}

tests/test_asyncio/test_cluster.py

+28-13
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
from redis.utils import str_if_bytes
3131
from tests.conftest import (
3232
assert_resp_response,
33+
is_resp2_connection,
3334
skip_if_redis_enterprise,
35+
skip_if_server_version_gte,
3436
skip_if_server_version_lt,
3537
skip_unless_arch_bits,
3638
)
@@ -157,7 +159,7 @@ async def execute_command(*_args, **_kwargs):
157159

158160
def cmd_init_mock(self, r: ClusterNode) -> None:
159161
self.commands = {
160-
"GET": {
162+
"get": {
161163
"name": "get",
162164
"arity": 2,
163165
"flags": ["readonly", "fast"],
@@ -607,7 +609,7 @@ def map_7007(self):
607609

608610
def cmd_init_mock(self, r: ClusterNode) -> None:
609611
self.commands = {
610-
"GET": {
612+
"get": {
611613
"name": "get",
612614
"arity": 2,
613615
"flags": ["readonly", "fast"],
@@ -818,6 +820,8 @@ async def test_not_require_full_coverage_cluster_down_error(
818820
assert all(await r.cluster_delslots(missing_slot))
819821
with pytest.raises(ClusterDownError):
820822
await r.exists("foo")
823+
except ResponseError as e:
824+
assert "CLUSTERDOWN" in str(e)
821825
finally:
822826
try:
823827
# Add back the missing slot
@@ -1065,11 +1069,14 @@ async def test_cluster_delslots(self) -> None:
10651069

10661070
@skip_if_server_version_lt("7.0.0")
10671071
@skip_if_redis_enterprise()
1068-
async def test_cluster_delslotsrange(self, r: RedisCluster):
1072+
async def test_cluster_delslotsrange(self):
1073+
r = await get_mocked_redis_client(host=default_host, port=default_port)
1074+
mock_all_nodes_resp(r, "OK")
10691075
node = r.get_random_node()
1070-
mock_node_resp(node, "OK")
10711076
await r.cluster_addslots(node, 1, 2, 3, 4, 5)
10721077
assert await r.cluster_delslotsrange(1, 5)
1078+
assert node._free.pop().read_response.called
1079+
await r.close()
10731080

10741081
@skip_if_redis_enterprise()
10751082
async def test_cluster_failover(self, r: RedisCluster) -> None:
@@ -1255,11 +1262,18 @@ async def test_cluster_replicas(self, r: RedisCluster) -> None:
12551262
async def test_cluster_links(self, r: RedisCluster):
12561263
node = r.get_random_node()
12571264
res = await r.cluster_links(node)
1258-
links_to = sum(x.count("to") for x in res)
1259-
links_for = sum(x.count("from") for x in res)
1260-
assert links_to == links_for
1261-
for i in range(0, len(res) - 1, 2):
1262-
assert res[i][3] == res[i + 1][3]
1265+
if is_resp2_connection(r):
1266+
links_to = sum(x.count(b"to") for x in res)
1267+
links_for = sum(x.count(b"from") for x in res)
1268+
assert links_to == links_for
1269+
for i in range(0, len(res) - 1, 2):
1270+
assert res[i][3] == res[i + 1][3]
1271+
else:
1272+
links_to = len(list(filter(lambda x: x[b"direction"] == b"to", res)))
1273+
links_for = len(list(filter(lambda x: x[b"direction"] == b"from", res)))
1274+
assert links_to == links_for
1275+
for i in range(0, len(res) - 1, 2):
1276+
assert res[i][b"node"] == res[i + 1][b"node"]
12631277

12641278
@skip_if_redis_enterprise()
12651279
async def test_readonly(self) -> None:
@@ -1896,25 +1910,25 @@ async def test_cluster_bzpopmin(self, r: RedisCluster) -> None:
18961910
r,
18971911
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
18981912
(b"{foo}b", b"b1", 10),
1899-
[b"b", b"b1", 10],
1913+
[b"{foo}b", b"b1", 10],
19001914
)
19011915
assert_resp_response(
19021916
r,
19031917
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
19041918
(b"{foo}b", b"b2", 20),
1905-
[b"b", b"b2", 20],
1919+
[b"{foo}b", b"b2", 20],
19061920
)
19071921
assert_resp_response(
19081922
r,
19091923
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
19101924
(b"{foo}a", b"a1", 1),
1911-
[b"a", b"a1", 1],
1925+
[b"{foo}a", b"a1", 1],
19121926
)
19131927
assert_resp_response(
19141928
r,
19151929
await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1),
19161930
(b"{foo}a", b"a2", 2),
1917-
[b"a", b"a2", 2],
1931+
[b"{foo}a", b"a2", 2],
19181932
)
19191933
assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) is None
19201934
await r.zadd("{foo}c", {"c1": 100})
@@ -2744,6 +2758,7 @@ async def test_asking_error(self, r: RedisCluster) -> None:
27442758
assert ask_node._free.pop().read_response.await_count
27452759
assert res == ["MOCK_OK"]
27462760

2761+
@skip_if_server_version_gte("7.0.0")
27472762
async def test_moved_redirection_on_slave_with_default(
27482763
self, r: RedisCluster
27492764
) -> None:

0 commit comments

Comments
 (0)