diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 1bab506c32..c4dd30812b 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -78,7 +78,7 @@ jobs: pip install hiredis fi invoke devenv - sleep 5 # time to settle + sleep 10 # time to settle invoke ${{matrix.test-type}}-tests - uses: actions/upload-artifact@v2 diff --git a/dockers/Dockerfile.cluster b/dockers/Dockerfile.cluster index 204232a665..3a0d73415e 100644 --- a/dockers/Dockerfile.cluster +++ b/dockers/Dockerfile.cluster @@ -1,4 +1,4 @@ -FROM redis/redis-stack-server:latest as rss +FROM redis/redis-stack-server:edge as rss COPY dockers/create_cluster.sh /create_cluster.sh RUN ls -R /opt/redis-stack diff --git a/dockers/cluster.redis.conf b/dockers/cluster.redis.conf index 26da33567a..cd5c08b7b8 100644 --- a/dockers/cluster.redis.conf +++ b/dockers/cluster.redis.conf @@ -1,4 +1,5 @@ protected-mode no +enable-debug-command yes loadmodule /opt/redis-stack/lib/redisearch.so loadmodule /opt/redis-stack/lib/redisgraph.so loadmodule /opt/redis-stack/lib/redistimeseries.so diff --git a/redis/_parsers/commands.py b/redis/_parsers/commands.py index d3b4a99ed3..b5109252ae 100644 --- a/redis/_parsers/commands.py +++ b/redis/_parsers/commands.py @@ -7,7 +7,53 @@ from redis.asyncio.cluster import ClusterNode -class CommandsParser: +class AbstractCommandsParser: + def _get_pubsub_keys(self, *args): + """ + Get the keys from pubsub command. + Although PubSub commands have predetermined key locations, they are not + supported in the 'COMMAND's output, so the key positions are hardcoded + in this method + """ + if len(args) < 2: + # The command has no keys in it + return None + args = [str_if_bytes(arg) for arg in args] + command = args[0].upper() + keys = None + if command == "PUBSUB": + # the second argument is a part of the command name, e.g. + # ['PUBSUB', 'NUMSUB', 'foo']. + pubsub_type = args[1].upper() + if pubsub_type in ["CHANNELS", "NUMSUB", "SHARDCHANNELS", "SHARDNUMSUB"]: + keys = args[2:] + elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]: + # format example: + # SUBSCRIBE channel [channel ...] + keys = list(args[1:]) + elif command in ["PUBLISH", "SPUBLISH"]: + # format example: + # PUBLISH channel message + keys = [args[1]] + return keys + + def parse_subcommand(self, command, **options): + cmd_dict = {} + cmd_name = str_if_bytes(command[0]) + cmd_dict["name"] = cmd_name + cmd_dict["arity"] = int(command[1]) + cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]] + cmd_dict["first_key_pos"] = command[3] + cmd_dict["last_key_pos"] = command[4] + cmd_dict["step_count"] = command[5] + if len(command) > 7: + cmd_dict["tips"] = command[7] + cmd_dict["key_specifications"] = command[8] + cmd_dict["subcommands"] = command[9] + return cmd_dict + + +class CommandsParser(AbstractCommandsParser): """ Parses Redis commands to get command keys. COMMAND output is used to determine key locations. @@ -30,21 +76,6 @@ def initialize(self, r): commands[cmd.lower()] = commands.pop(cmd) self.commands = commands - def parse_subcommand(self, command, **options): - cmd_dict = {} - cmd_name = str_if_bytes(command[0]) - cmd_dict["name"] = cmd_name - cmd_dict["arity"] = int(command[1]) - cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]] - cmd_dict["first_key_pos"] = command[3] - cmd_dict["last_key_pos"] = command[4] - cmd_dict["step_count"] = command[5] - if len(command) > 7: - cmd_dict["tips"] = command[7] - cmd_dict["key_specifications"] = command[8] - cmd_dict["subcommands"] = command[9] - return cmd_dict - # As soon as this PR is merged into Redis, we should reimplement # our logic to use COMMAND INFO changes to determine the key positions # https://github.com/redis/redis/pull/8324 @@ -138,37 +169,8 @@ def _get_moveable_keys(self, redis_conn, *args): raise e return keys - def _get_pubsub_keys(self, *args): - """ - Get the keys from pubsub command. - Although PubSub commands have predetermined key locations, they are not - supported in the 'COMMAND's output, so the key positions are hardcoded - in this method - """ - if len(args) < 2: - # The command has no keys in it - return None - args = [str_if_bytes(arg) for arg in args] - command = args[0].upper() - keys = None - if command == "PUBSUB": - # the second argument is a part of the command name, e.g. - # ['PUBSUB', 'NUMSUB', 'foo']. - pubsub_type = args[1].upper() - if pubsub_type in ["CHANNELS", "NUMSUB", "SHARDCHANNELS", "SHARDNUMSUB"]: - keys = args[2:] - elif command in ["SUBSCRIBE", "PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE"]: - # format example: - # SUBSCRIBE channel [channel ...] - keys = list(args[1:]) - elif command in ["PUBLISH", "SPUBLISH"]: - # format example: - # PUBLISH channel message - keys = [args[1]] - return keys - -class AsyncCommandsParser: +class AsyncCommandsParser(AbstractCommandsParser): """ Parses Redis commands to get command keys. @@ -194,52 +196,75 @@ async def initialize(self, node: Optional["ClusterNode"] = None) -> None: self.node = node commands = await self.node.execute_command("COMMAND") - for cmd, command in commands.items(): - if "movablekeys" in command["flags"]: - commands[cmd] = -1 - elif command["first_key_pos"] == 0 and command["last_key_pos"] == 0: - commands[cmd] = 0 - elif command["first_key_pos"] == 1 and command["last_key_pos"] == 1: - commands[cmd] = 1 - self.commands = {cmd.upper(): command for cmd, command in commands.items()} + self.commands = {cmd.lower(): command for cmd, command in commands.items()} # As soon as this PR is merged into Redis, we should reimplement # our logic to use COMMAND INFO changes to determine the key positions # https://github.com/redis/redis/pull/8324 async def get_keys(self, *args: Any) -> Optional[Tuple[str, ...]]: + """ + Get the keys from the passed command. + + NOTE: Due to a bug in redis<7.0, this function does not work properly + for EVAL or EVALSHA when the `numkeys` arg is 0. + - issue: https://github.com/redis/redis/issues/9493 + - fix: https://github.com/redis/redis/pull/9733 + + So, don't use this function with EVAL or EVALSHA. + """ if len(args) < 2: # The command has no keys in it return None - try: - command = self.commands[args[0]] - except KeyError: - # try to split the command name and to take only the main command + cmd_name = args[0].lower() + if cmd_name not in self.commands: + # try to split the command name and to take only the main command, # e.g. 'memory' for 'memory usage' - args = args[0].split() + list(args[1:]) - cmd_name = args[0].upper() - if cmd_name not in self.commands: + cmd_name_split = cmd_name.split() + cmd_name = cmd_name_split[0] + if cmd_name in self.commands: + # save the splitted command to args + args = cmd_name_split + list(args[1:]) + else: # We'll try to reinitialize the commands cache, if the engine # version has changed, the commands may not be current await self.initialize() if cmd_name not in self.commands: raise RedisError( - f"{cmd_name} command doesn't exist in Redis commands" + f"{cmd_name.upper()} command doesn't exist in Redis commands" ) - command = self.commands[cmd_name] + command = self.commands.get(cmd_name) + if "movablekeys" in command["flags"]: + keys = await self._get_moveable_keys(*args) + elif "pubsub" in command["flags"] or command["name"] == "pubsub": + keys = self._get_pubsub_keys(*args) + else: + if ( + command["step_count"] == 0 + and command["first_key_pos"] == 0 + and command["last_key_pos"] == 0 + ): + is_subcmd = False + if "subcommands" in command: + subcmd_name = f"{cmd_name}|{args[1].lower()}" + for subcmd in command["subcommands"]: + if str_if_bytes(subcmd[0]) == subcmd_name: + command = self.parse_subcommand(subcmd) + is_subcmd = True - if command == 1: - return (args[1],) - if command == 0: - return None - if command == -1: - return await self._get_moveable_keys(*args) + # The command doesn't have keys in it + if not is_subcmd: + return None + last_key_pos = command["last_key_pos"] + if last_key_pos < 0: + last_key_pos = len(args) - abs(last_key_pos) + keys_pos = list( + range(command["first_key_pos"], last_key_pos + 1, command["step_count"]) + ) + keys = [args[pos] for pos in keys_pos] - last_key_pos = command["last_key_pos"] - if last_key_pos < 0: - last_key_pos = len(args) + last_key_pos - return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]] + return keys async def _get_moveable_keys(self, *args: Any) -> Optional[Tuple[str, ...]]: try: diff --git a/redis/_parsers/helpers.py b/redis/_parsers/helpers.py index f27e3b12c0..ab4ede1fd0 100644 --- a/redis/_parsers/helpers.py +++ b/redis/_parsers/helpers.py @@ -733,6 +733,7 @@ def string_keys_to_dict(key_string, callback): "MODULE UNLOAD": bool, "PING": lambda r: str_if_bytes(r) == "PONG", "PUBSUB NUMSUB": parse_pubsub_numsub, + "PUBSUB SHARDNUMSUB": parse_pubsub_numsub, "QUIT": bool_ok, "SET": parse_set_result, "SCAN": parse_scan, diff --git a/redis/cluster.py b/redis/cluster.py index c179511b0c..3549ced35d 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -101,6 +101,8 @@ def parse_cluster_shards(resp, **options): """ Parse CLUSTER SHARDS response. """ + if isinstance(resp[0], dict): + return resp shards = [] for x in resp: shard = {"slots": [], "nodes": []} diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index ee498e71f7..eb7aafdf68 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -30,7 +30,9 @@ from redis.utils import str_if_bytes from tests.conftest import ( assert_resp_response, + is_resp2_connection, skip_if_redis_enterprise, + skip_if_server_version_gte, skip_if_server_version_lt, skip_unless_arch_bits, ) @@ -157,7 +159,7 @@ async def execute_command(*_args, **_kwargs): def cmd_init_mock(self, r: ClusterNode) -> None: self.commands = { - "GET": { + "get": { "name": "get", "arity": 2, "flags": ["readonly", "fast"], @@ -607,7 +609,7 @@ def map_7007(self): def cmd_init_mock(self, r: ClusterNode) -> None: self.commands = { - "GET": { + "get": { "name": "get", "arity": 2, "flags": ["readonly", "fast"], @@ -818,6 +820,8 @@ async def test_not_require_full_coverage_cluster_down_error( assert all(await r.cluster_delslots(missing_slot)) with pytest.raises(ClusterDownError): await r.exists("foo") + except ResponseError as e: + assert "CLUSTERDOWN" in str(e) finally: try: # Add back the missing slot @@ -1065,11 +1069,14 @@ async def test_cluster_delslots(self) -> None: @skip_if_server_version_lt("7.0.0") @skip_if_redis_enterprise() - async def test_cluster_delslotsrange(self, r: RedisCluster): + async def test_cluster_delslotsrange(self): + r = await get_mocked_redis_client(host=default_host, port=default_port) + mock_all_nodes_resp(r, "OK") node = r.get_random_node() - mock_node_resp(node, "OK") await r.cluster_addslots(node, 1, 2, 3, 4, 5) assert await r.cluster_delslotsrange(1, 5) + assert node._free.pop().read_response.called + await r.close() @skip_if_redis_enterprise() async def test_cluster_failover(self, r: RedisCluster) -> None: @@ -1255,11 +1262,18 @@ async def test_cluster_replicas(self, r: RedisCluster) -> None: async def test_cluster_links(self, r: RedisCluster): node = r.get_random_node() res = await r.cluster_links(node) - links_to = sum(x.count("to") for x in res) - links_for = sum(x.count("from") for x in res) - assert links_to == links_for - for i in range(0, len(res) - 1, 2): - assert res[i][3] == res[i + 1][3] + if is_resp2_connection(r): + links_to = sum(x.count(b"to") for x in res) + links_for = sum(x.count(b"from") for x in res) + assert links_to == links_for + for i in range(0, len(res) - 1, 2): + assert res[i][3] == res[i + 1][3] + else: + links_to = len(list(filter(lambda x: x[b"direction"] == b"to", res))) + links_for = len(list(filter(lambda x: x[b"direction"] == b"from", res))) + assert links_to == links_for + for i in range(0, len(res) - 1, 2): + assert res[i][b"node"] == res[i + 1][b"node"] @skip_if_redis_enterprise() async def test_readonly(self) -> None: @@ -1896,25 +1910,25 @@ async def test_cluster_bzpopmin(self, r: RedisCluster) -> None: r, await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}b", b"b1", 10), - [b"b", b"b1", 10], + [b"{foo}b", b"b1", 10], ) assert_resp_response( r, await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}b", b"b2", 20), - [b"b", b"b2", 20], + [b"{foo}b", b"b2", 20], ) assert_resp_response( r, await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}a", b"a1", 1), - [b"a", b"a1", 1], + [b"{foo}a", b"a1", 1], ) assert_resp_response( r, await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}a", b"a2", 2), - [b"a", b"a2", 2], + [b"{foo}a", b"a2", 2], ) assert await r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) is None await r.zadd("{foo}c", {"c1": 100}) @@ -2744,6 +2758,7 @@ async def test_asking_error(self, r: RedisCluster) -> None: assert ask_node._free.pop().read_response.await_count assert res == ["MOCK_OK"] + @skip_if_server_version_gte("7.0.0") async def test_moved_redirection_on_slave_with_default( self, r: RedisCluster ) -> None: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 31c31026be..84654e70c3 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -43,6 +43,7 @@ from .conftest import ( _get_client, assert_resp_response, + is_resp2_connection, skip_if_redis_enterprise, skip_if_server_version_lt, skip_unless_arch_bits, @@ -775,6 +776,8 @@ def test_not_require_full_coverage_cluster_down_error(self, r): assert all(r.cluster_delslots(missing_slot)) with pytest.raises(ClusterDownError): r.exists("foo") + except ResponseError as e: + assert "CLUSTERDOWN" in str(e) finally: try: # Add back the missing slot @@ -1157,8 +1160,15 @@ def test_cluster_shards(self, r): b"health", ] for x in cluster_shards: - assert list(x.keys()) == ["slots", "nodes"] - for node in x["nodes"]: + assert_resp_response( + r, list(x.keys()), ["slots", "nodes"], [b"slots", b"nodes"] + ) + try: + x["nodes"] + key = "nodes" + except KeyError: + key = b"nodes" + for node in x[key]: for attribute in node.keys(): assert attribute in attributes @@ -1415,11 +1425,18 @@ def test_cluster_replicas(self, r): def test_cluster_links(self, r): node = r.get_random_node() res = r.cluster_links(node) - links_to = sum(x.count("to") for x in res) - links_for = sum(x.count("from") for x in res) - assert links_to == links_for - for i in range(0, len(res) - 1, 2): - assert res[i][3] == res[i + 1][3] + if is_resp2_connection(r): + links_to = sum(x.count(b"to") for x in res) + links_for = sum(x.count(b"from") for x in res) + assert links_to == links_for + for i in range(0, len(res) - 1, 2): + assert res[i][3] == res[i + 1][3] + else: + links_to = len(list(filter(lambda x: x[b"direction"] == b"to", res))) + links_for = len(list(filter(lambda x: x[b"direction"] == b"from", res))) + assert links_to == links_for + for i in range(0, len(res) - 1, 2): + assert res[i][b"node"] == res[i + 1][b"node"] def test_cluster_flshslots_not_implemented(self, r): with pytest.raises(NotImplementedError): @@ -2041,25 +2058,25 @@ def test_cluster_bzpopmin(self, r): r, r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}b", b"b1", 10), - [b"b", b"b1", 10], + [b"{foo}b", b"b1", 10], ) assert_resp_response( r, r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}b", b"b2", 20), - [b"b", b"b2", 20], + [b"{foo}b", b"b2", 20], ) assert_resp_response( r, r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}a", b"a1", 1), - [b"a", b"a1", 1], + [b"{foo}a", b"a1", 1], ) assert_resp_response( r, r.bzpopmin(["{foo}b", "{foo}a"], timeout=1), (b"{foo}a", b"a2", 2), - [b"a", b"a2", 2], + [b"{foo}a", b"a2", 2], ) assert r.bzpopmin(["{foo}b", "{foo}a"], timeout=1) is None r.zadd("{foo}c", {"c1": 100}) diff --git a/tests/test_function.py b/tests/test_function.py index 22db904273..9d6712ecf7 100644 --- a/tests/test_function.py +++ b/tests/test_function.py @@ -93,17 +93,28 @@ def test_function_list_on_cluster(self, r): [[b"name", b"myfunc", b"description", None, b"flags", [b"no-writes"]]], ] ] + resp3_function_list = [ + { + b"library_name": b"mylib", + b"engine": b"LUA", + b"functions": [ + {b"name": b"myfunc", b"description": None, b"flags": {b"no-writes"}} + ], + } + ] primaries = r.get_primaries() res = {} + resp3_res = {} for node in primaries: res[node.name] = function_list - assert r.function_list() == res - assert r.function_list(library="*lib") == res + resp3_res[node.name] = resp3_function_list + assert_resp_response(r, r.function_list(), res, resp3_res) + assert_resp_response(r, r.function_list(library="*lib"), res, resp3_res) node = primaries[0].name - assert ( - r.function_list(withcode=True)[node][0][7] - == f"#!{engine} name={lib} \n {function}".encode() - ) + code = f"#!{engine} name={lib} \n {function}".encode() + res[node][0].extend([b"library_code", code]) + resp3_res[node][0][b"library_code"] = code + assert_resp_response(r, r.function_list(withcode=True), res, resp3_res) def test_fcall(self, r): r.function_load(f"#!{engine} name={lib} \n {set_function}")