-
Notifications
You must be signed in to change notification settings - Fork 2.6k
RESP3 tests #2780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RESP3 tests #2780
Changes from 15 commits
e9adcb3
32e46a7
c4d1baa
0360c13
c36fb27
fb8e461
bc44be5
89eb576
3c511e0
89ef178
0d592ff
7b87e20
dc7fa20
8ac26e9
a3476c6
6965b41
599bdc0
4e08e63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -333,16 +333,35 @@ def _error_message(self, exception): | |
async def on_connect(self) -> None: | ||
"""Initialize the connection, authenticate and select a database""" | ||
self._parser.on_connect(self) | ||
parser = self._parser | ||
|
||
auth_args = None | ||
# if credential provider or username and/or password are set, authenticate | ||
if self.credential_provider or (self.username or self.password): | ||
cred_provider = ( | ||
self.credential_provider | ||
or UsernamePasswordCredentialProvider(self.username, self.password) | ||
) | ||
auth_args = cred_provider.get_credentials() | ||
# avoid checking health here -- PING will fail if we try | ||
# to check the health prior to the AUTH | ||
# if resp version is specified and we have auth args, | ||
# we need to send them via HELLO | ||
if auth_args and self.protocol != 2: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issue? self.protocol not in ['2', 2]? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, but in terms of performance it is less good |
||
if isinstance(self._parser, _AsyncRESP2Parser): | ||
self.set_parser(_AsyncRESP3Parser) | ||
# update cluster exception classes | ||
self._parser.EXCEPTION_CLASSES = parser.EXCEPTION_CLASSES | ||
self._parser.on_connect(self) | ||
if len(auth_args) == 1: | ||
auth_args = ["default", auth_args[0]] | ||
await self.send_command("HELLO", self.protocol, "AUTH", *auth_args) | ||
response = await self.read_response() | ||
if response.get(b"proto") != int(self.protocol) and response.get( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will have to be changed to accomodate that |
||
"proto" | ||
) != int(self.protocol): | ||
raise ConnectionError("Invalid RESP version") | ||
# avoid checking health here -- PING will fail if we try | ||
# to check the health prior to the AUTH | ||
elif auth_args: | ||
await self.send_command("AUTH", *auth_args, check_health=False) | ||
|
||
try: | ||
|
@@ -359,9 +378,11 @@ async def on_connect(self) -> None: | |
raise AuthenticationError("Invalid Username or Password") | ||
|
||
# if resp version is specified, switch to it | ||
if self.protocol != 2: | ||
elif self.protocol != 2: | ||
if isinstance(self._parser, _AsyncRESP2Parser): | ||
self.set_parser(_AsyncRESP3Parser) | ||
# update cluster exception classes | ||
self._parser.EXCEPTION_CLASSES = parser.EXCEPTION_CLASSES | ||
self._parser.on_connect(self) | ||
await self.send_command("HELLO", self.protocol) | ||
response = await self.read_response() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -331,9 +331,15 @@ def parse_xinfo_stream(response, **options): | |
data["last-entry"] = (last[0], pairs_to_dict(last[1])) | ||
else: | ||
data["entries"] = {_id: pairs_to_dict(entry) for _id, entry in data["entries"]} | ||
data["groups"] = [ | ||
pairs_to_dict(group, decode_keys=True) for group in data["groups"] | ||
] | ||
if isinstance(data["groups"][0], list): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. try/except instead == cheaper There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It won't work here because of the content of data["groups"] |
||
data["groups"] = [ | ||
pairs_to_dict(group, decode_keys=True) for group in data["groups"] | ||
] | ||
else: | ||
data["groups"] = [ | ||
{str_if_bytes(k): v for k, v in group.items()} | ||
for group in data["groups"] | ||
] | ||
return data | ||
|
||
|
||
|
@@ -581,14 +587,15 @@ def parse_command_resp3(response, **options): | |
cmd_name = str_if_bytes(command[0]) | ||
cmd_dict["name"] = cmd_name | ||
cmd_dict["arity"] = command[1] | ||
cmd_dict["flags"] = command[2] | ||
cmd_dict["flags"] = {str_if_bytes(flag) for flag in command[2]} | ||
cmd_dict["first_key_pos"] = command[3] | ||
cmd_dict["last_key_pos"] = command[4] | ||
cmd_dict["step_count"] = command[5] | ||
cmd_dict["acl_categories"] = command[6] | ||
cmd_dict["tips"] = command[7] | ||
cmd_dict["key_specifications"] = command[8] | ||
cmd_dict["subcommands"] = command[9] | ||
if len(command) > 7: | ||
cmd_dict["tips"] = command[7] | ||
cmd_dict["key_specifications"] = command[8] | ||
cmd_dict["subcommands"] = command[9] | ||
|
||
commands[cmd_name] = cmd_dict | ||
return commands | ||
|
@@ -626,17 +633,20 @@ def parse_acl_getuser(response, **options): | |
if data["channels"] == [""]: | ||
data["channels"] = [] | ||
if "selectors" in data: | ||
data["selectors"] = [ | ||
list(map(str_if_bytes, selector)) for selector in data["selectors"] | ||
] | ||
if data["selectors"] != [] and isinstance(data["selectors"][0], list): | ||
data["selectors"] = [ | ||
list(map(str_if_bytes, selector)) for selector in data["selectors"] | ||
] | ||
elif data["selectors"] != []: | ||
data["selectors"] = [ | ||
{str_if_bytes(k): str_if_bytes(v) for k, v in selector.items()} | ||
for selector in data["selectors"] | ||
] | ||
|
||
# split 'commands' into separate 'categories' and 'commands' lists | ||
commands, categories = [], [] | ||
for command in data["commands"].split(" "): | ||
if "@" in command: | ||
categories.append(command) | ||
else: | ||
commands.append(command) | ||
categories.append(command) if "@" in command else commands.append(command) | ||
|
||
data["commands"] = commands | ||
data["categories"] = categories | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ | |
from redis.parsers import CommandsParser, Encoder | ||
from redis.retry import Retry | ||
from redis.utils import ( | ||
HIREDIS_AVAILABLE, | ||
dict_merge, | ||
list_keys_to_dict, | ||
merge_result, | ||
|
@@ -1608,7 +1609,15 @@ class ClusterPubSub(PubSub): | |
https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html | ||
""" | ||
|
||
def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs): | ||
def __init__( | ||
self, | ||
redis_cluster, | ||
node=None, | ||
host=None, | ||
port=None, | ||
push_handler_func=None, | ||
**kwargs, | ||
): | ||
""" | ||
When a pubsub instance is created without specifying a node, a single | ||
node will be transparently chosen for the pubsub connection on the | ||
|
@@ -1633,7 +1642,10 @@ def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs): | |
self.node_pubsub_mapping = {} | ||
self._pubsubs_generator = self._pubsubs_generator() | ||
super().__init__( | ||
**kwargs, connection_pool=connection_pool, encoder=redis_cluster.encoder | ||
**kwargs, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pass the kwargs at the end of the init...? |
||
connection_pool=connection_pool, | ||
encoder=redis_cluster.encoder, | ||
push_handler_func=push_handler_func, | ||
) | ||
|
||
def set_pubsub_node(self, cluster, node=None, host=None, port=None): | ||
|
@@ -1717,14 +1729,18 @@ def execute_command(self, *args): | |
# register a callback that re-subscribes to any channels we | ||
# were listening to when we were disconnected | ||
self.connection.register_connect_callback(self.on_connect) | ||
if self.push_handler_func is not None and not HIREDIS_AVAILABLE: | ||
self.connection._parser.set_push_handler(self.push_handler_func) | ||
connection = self.connection | ||
self._execute(connection, connection.send_command, *args) | ||
|
||
def _get_node_pubsub(self, node): | ||
try: | ||
return self.node_pubsub_mapping[node.name] | ||
except KeyError: | ||
pubsub = node.redis_connection.pubsub() | ||
pubsub = node.redis_connection.pubsub( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good! |
||
push_handler_func=self.push_handler_func | ||
) | ||
self.node_pubsub_mapping[node.name] = pubsub | ||
return pubsub | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO you can remove the Iterable piece in 674. I know you didn't add it, but it's in function, so has no value.