From b703db1713df8f1481d1ae3575c33b6a90505384 Mon Sep 17 00:00:00 2001 From: antisch Date: Tue, 12 Nov 2019 20:28:23 -0800 Subject: [PATCH 1/2] Consolidated credentials --- .../azure/eventhub/__init__.py | 3 +- .../azure/eventhub/_client_base.py | 83 ++++++------------- .../azure/eventhub/_consumer.py | 4 +- .../azure/eventhub/_consumer_client.py | 8 +- .../eventhub/_consumer_producer_mixin.py | 15 ++-- .../azure/eventhub/_producer.py | 4 +- .../azure/eventhub/_producer_client.py | 9 +- .../azure/eventhub/aio/__init__.py | 2 + .../azure/eventhub/aio/_client_base_async.py | 83 ++++++++++--------- .../azure/eventhub/aio/_consumer_async.py | 4 +- .../eventhub/aio/_consumer_client_async.py | 8 +- .../aio/_consumer_producer_mixin_async.py | 14 ++-- .../azure/eventhub/aio/_producer_async.py | 4 +- .../eventhub/aio/_producer_client_async.py | 8 +- .../azure-eventhubs/azure/eventhub/common.py | 76 ++++++++--------- 15 files changed, 150 insertions(+), 175 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 42a3b4d12dca..044dd38c945b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -11,7 +11,7 @@ AuthenticationError, EventDataSendError, ConnectionLostError from ._producer_client import EventHubProducerClient from ._consumer_client import EventHubConsumerClient -from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential +from .common import EventHubSharedKeyCredential from ._eventprocessor.partition_manager import PartitionManager from ._eventprocessor.common import CloseReason, OwnershipLostError from ._eventprocessor.partition_context import PartitionContext @@ -32,7 +32,6 @@ "EventHubConsumerClient", "TransportType", "EventHubSharedKeyCredential", - "EventHubSASTokenCredential", "PartitionManager", "CloseReason", "OwnershipLostError", diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_client_base.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_client_base.py index 00cb0a7dd4af..3800845561a5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_client_base.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_client_base.py @@ -22,7 +22,7 @@ from uamqp import types # type: ignore from azure.eventhub import __version__ from .configuration import Configuration -from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address, parse_sas_token +from .common import EventHubSharedKeyCredential, _Address, parse_sas_token from .error import _handle_exception from ._connection_manager import get_connection_manager @@ -61,29 +61,6 @@ def _parse_conn_str(conn_str): return endpoint, shared_access_key_name, shared_access_key, entity_path -def _generate_sas_token(uri, policy, key, expiry=None): - """Create a shared access signiture token as a string literal. - :returns: SAS token as string literal. - :rtype: str - """ - from base64 import b64encode, b64decode - from hashlib import sha256 - from hmac import HMAC - if not expiry: - expiry = time.time() + 3600 # Default to 1 hour. - encoded_uri = quote_plus(uri) - ttl = int(expiry) - sign_key = '%s\n%d' % (encoded_uri, ttl) - signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest()) - result = { - 'sr': uri, - 'sig': signature, - 'se': str(ttl)} - if policy: - result['skn'] = policy - return 'SharedAccessSignature ' + urlencode(result) - - def _build_uri(address, entity): parsed = urlparse(address) if parsed.path: @@ -124,40 +101,30 @@ def _create_auth(self): Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. """ - http_proxy = self._config.http_proxy - transport_type = self._config.transport_type - auth_timeout = self._config.auth_timeout - - # TODO: the following code can be refactored to create auth from classes directly instead of using if-else - if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = self._credential.policy - password = self._credential.key - if "@sas.root" in username: - return authentication.SASLPlain( - self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) - return authentication.SASTokenAuth.from_shared_access_key( - self._auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, - transport_type=transport_type) - - elif isinstance(self._credential, EventHubSASTokenCredential): - token = self._credential.get_sas_token() - try: - expiry = int(parse_sas_token(token)['se']) - except (KeyError, TypeError, IndexError): - raise ValueError("Supplied SAS token has no valid expiry value.") - return authentication.SASTokenAuth( - self._auth_uri, self._auth_uri, token, - expires_at=expiry, - timeout=auth_timeout, - http_proxy=http_proxy, - transport_type=transport_type) - - else: # Azure credential - get_jwt_token = functools.partial(self._credential.get_token, - 'https://eventhubs.azure.net//.default') - return authentication.JWTTokenAuth(self._auth_uri, self._auth_uri, - get_jwt_token, http_proxy=http_proxy, - transport_type=transport_type) + try: + token_type = self._credential.token_type + except AttributeError: + token_type = b'jwt' + if token_type == b"servicebus.windows.net:sastoken": + auth = authentication.JWTTokenAuth( + self._auth_uri, + self._auth_uri, + functools.partial(self._credential.get_token, self._auth_uri), + token_type=token_type, + timeout=self._config.auth_timeout, + http_proxy=self._config.http_proxy, + transport_type=self._config.transport_type) + auth.update_token() + return auth + token_scope = 'https://eventhubs.azure.net//.default' + return authentication.JWTTokenAuth( + self._auth_uri, + self._auth_uri, + functools.partial(self._credential.get_token, token_scope), + token_type=token_type, + timeout=self._config.auth_timeout, + http_proxy=self._config.http_proxy, + transport_type=self._config.transport_type) @classmethod def _create_properties(cls, user_agent=None): # pylint: disable=no-self-use diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer.py index a6b88daa8dbc..fd9a30a69ded 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer.py @@ -131,7 +131,7 @@ def __next__(self): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - def _create_handler(self): + def _create_handler(self, auth): source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access @@ -146,7 +146,7 @@ def _create_handler(self): self._handler = ReceiveClient( source, - auth=self._client._create_auth(), # pylint:disable=protected-access + auth=auth, debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py index e50929743eb1..d0868618d148 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_client.py @@ -5,7 +5,7 @@ import logging from typing import Any, Union, Dict, Tuple, TYPE_CHECKING, Callable, List -from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, EventData +from .common import EventData from ._client_base import ClientBase from ._consumer import EventHubConsumer from ._eventprocessor.event_processor import EventProcessor @@ -38,8 +38,8 @@ class EventHubConsumerClient(ClientBase): :param str event_hub_path: The path of the specific Event Hub to connect the client to. :param credential: The credential object used for authentication which implements particular interface of getting tokens. It accepts :class:`EventHubSharedKeyCredential`, - :class:`EventHubSASTokenCredential`, or credential objects generated by - the azure-identity library and objects that implement `get_token(self, *scopes)` method. + or credential objects generated by the azure-identity library and objects that + implement `get_token(self, *scopes)` method. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. @@ -70,7 +70,7 @@ class EventHubConsumerClient(ClientBase): """ def __init__(self, host, event_hub_path, credential, **kwargs): - # type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None + # type:(str, str, TokenCredential, Any) -> None """""" receive_timeout = kwargs.get("receive_timeout", 3) if receive_timeout <= 0: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index c7dad6d3846c..62b32a8d0a1b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -31,9 +31,6 @@ def _check_closed(self): if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) - def _create_handler(self): - pass - def _open(self): """Open the EventHubConsumer/EventHubProducer using the supplied connection. @@ -42,15 +39,15 @@ def _open(self): if not self._running: if self._handler: self._handler.close() - self._create_handler() - self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access - self._client._address.hostname, - self._client._create_auth() - )) + auth = self._client._create_auth() + self._create_handler(auth) + self._handler.open( + connection=self._client._conn_manager.get_connection(self._client._address.hostname, auth) + ) while not self._handler.client_ready(): time.sleep(0.05) self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ - or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access + or constants.MAX_MESSAGE_LENGTH_BYTES self._running = True def _close_handler(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer.py index 8b50a5e26748..61b033213211 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer.py @@ -101,10 +101,10 @@ def __init__(self, client, target, **kwargs): self._condition = None self._link_properties = {types.AMQPSymbol(self._timeout_symbol): types.AMQPLong(int(self._timeout * 1000))} - def _create_handler(self): + def _create_handler(self, auth): self._handler = SendClient( self._target, - auth=self._client._create_auth(), # pylint:disable=protected-access + auth=auth, debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py index b99364e80660..5dcd0f6364c0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_producer_client.py @@ -9,8 +9,7 @@ from uamqp import constants # type:ignore from ._client_base import ClientBase from ._producer import EventHubProducer -from .common import EventData, \ - EventHubSharedKeyCredential, EventHubSASTokenCredential, EventDataBatch +from .common import EventData, EventDataBatch if TYPE_CHECKING: from azure.core.credentials import TokenCredential # type: ignore @@ -27,8 +26,8 @@ class EventHubProducerClient(ClientBase): :param str event_hub_path: The path of the specific Event Hub to connect the client to. :param credential: The credential object used for authentication which implements particular interface of getting tokens. It accepts :class:`EventHubSharedKeyCredential`, - :class:`EventHubSASTokenCredential`, or credential objects generated by - the azure-identity library and objects that implement `get_token(self, *scopes)` method. + or credential objects generated by the azure-identity library and objects that + implement `get_token(self, *scopes)` method. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. @@ -53,7 +52,7 @@ class EventHubProducerClient(ClientBase): """ def __init__(self, host, event_hub_path, credential, **kwargs): - # type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None + # type:(str, str, TokenCredential, Any) -> None """""" super(EventHubProducerClient, self).__init__( host=host, event_hub_path=event_hub_path, credential=credential, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py index 2f4abf2e3be3..fe5f64a71ee9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py @@ -2,12 +2,14 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +from ._client_base_async import EventHubSharedKeyCredential from ._consumer_client_async import EventHubConsumerClient from ._producer_client_async import EventHubProducerClient from .eventprocessor.partition_manager import PartitionManager from .eventprocessor.partition_context import PartitionContext __all__ = [ + "EventHubSharedKeyCredential", "EventHubConsumerClient", "EventHubProducerClient", "PartitionManager", diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_client_base_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_client_base_async.py index 6932953f3c1f..b34b6148aabc 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_client_base_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_client_base_async.py @@ -14,7 +14,7 @@ from uamqp import authentication, constants # type: ignore from uamqp import Message, AMQPClientAsync # type: ignore -from ..common import parse_sas_token, EventHubSharedKeyCredential, EventHubSASTokenCredential +from ..common import _generate_sas_token from .error_async import _handle_exception from .._client_base import ClientBase from ._connection_manager_async import get_connection_manager @@ -26,6 +26,24 @@ MAX_USER_AGENT_LENGTH = 512 +class EventHubSharedKeyCredential(object): + """ + The shared access key credential used for authentication. + + :param str policy: The name of the shared access policy. + :param str key: The shared access key. + """ + def __init__(self, policy, key): + self.policy = policy + self.key = key + self.token_type = b"servicebus.windows.net:sastoken" + + async def get_token(self, *scopes, **kwargs): + if not scopes: + raise ValueError("No token scope provided.") + return _generate_sas_token(scopes[0], self.policy, self.key) + + class ClientBaseAsync(ClientBase): def __init__(self, host, event_hub_path, credential, **kwargs): super(ClientBaseAsync, self).__init__(host=host, event_hub_path=event_hub_path, @@ -39,44 +57,35 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() - def _create_auth(self): + async def _create_auth(self): """ - Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate + Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. - """ - http_proxy = self._config.http_proxy - transport_type = self._config.transport_type - auth_timeout = self._config.auth_timeout - - if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return - username = self._credential.policy - password = self._credential.key - if "@sas.root" in username: - return authentication.SASLPlain( - self._host, username, password, http_proxy=http_proxy, transport_type=transport_type) - return authentication.SASTokenAsync.from_shared_access_key( - self._auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy, - transport_type=transport_type) - - elif isinstance(self._credential, EventHubSASTokenCredential): - token = self._credential.get_sas_token() - try: - expiry = int(parse_sas_token(token)['se']) - except (KeyError, TypeError, IndexError): - raise ValueError("Supplied SAS token has no valid expiry value.") - return authentication.SASTokenAsync( - self._auth_uri, self._auth_uri, token, - expires_at=expiry, - timeout=auth_timeout, - http_proxy=http_proxy, - transport_type=transport_type) - - else: - get_jwt_token = functools.partial(self._credential.get_token, 'https://eventhubs.azure.net//.default') - return authentication.JWTTokenAsync(self._auth_uri, self._auth_uri, - get_jwt_token, http_proxy=http_proxy, - transport_type=transport_type) + try: + token_type = self._credential.token_type + except AttributeError: + token_type = b'jwt' + if token_type == b"servicebus.windows.net:sastoken": + auth = authentication.JWTTokenAsync( + self._auth_uri, + self._auth_uri, + functools.partial(self._credential.get_token, self._auth_uri), + token_type=token_type, + timeout=self._config.auth_timeout, + http_proxy=self._config.http_proxy, + transport_type=self._config.transport_type) + await auth.update_token() + return auth + token_scope = 'https://eventhubs.azure.net//.default' + return authentication.JWTTokenAsync( + self._auth_uri, + self._auth_uri, + functools.partial(self._credential.get_token, token_scope), + token_type=token_type, + timeout=self._config.auth_timeout, + http_proxy=self._config.http_proxy, + transport_type=self._config.transport_type) async def _close_connection(self): await self._conn_manager.reset_connection_if_broken() @@ -97,7 +106,7 @@ async def _management_request(self, mgmt_msg, op_type): retried_times = 0 last_exception = None while retried_times <= self._config.max_retries: - mgmt_auth = self._create_auth() + mgmt_auth = await self._create_auth() mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing) try: conn = await self._conn_manager.get_connection(self._host, mgmt_auth) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_async.py index 26fdc6e90d70..61bf10922fa3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_async.py @@ -133,7 +133,7 @@ async def __anext__(self): log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception) raise last_exception - def _create_handler(self): + def _create_handler(self, auth): source = Source(self._source) if self._offset is not None: source.set_filter(self._offset._selector()) # pylint:disable=protected-access @@ -148,7 +148,7 @@ def _create_handler(self): self._handler = ReceiveClientAsync( source, - auth=self._client._create_auth(), # pylint:disable=protected-access + auth=auth, debug=self._client._config.network_tracing, # pylint:disable=protected-access prefetch=self._prefetch, link_properties=self._link_properties, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py index f2c5b850801c..1280f566ae01 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_client_async.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging from typing import Any, Union, TYPE_CHECKING, Dict, Tuple -from azure.eventhub import EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential +from azure.eventhub import EventPosition from .eventprocessor.event_processor import EventProcessor from ._consumer_async import EventHubConsumer from ._client_base_async import ClientBaseAsync @@ -35,8 +35,8 @@ class EventHubConsumerClient(ClientBaseAsync): :param str event_hub_path: The path of the specific Event Hub to connect the client to. :param credential: The credential object used for authentication which implements particular interface of getting tokens. It accepts :class:`EventHubSharedKeyCredential`, - :class:`EventHubSASTokenCredential`, or credential objects generated by - the azure-identity library and objects that implement `get_token(self, *scopes)` method. + or credential objects generated by the azure-identity library and objects that + implement `get_token(self, *scopes)` method. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. @@ -67,7 +67,7 @@ class EventHubConsumerClient(ClientBaseAsync): """ def __init__(self, host, event_hub_path, credential, **kwargs) -> None: - # type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None + # type:(str, str, TokenCredential, Any) -> None """""" self._partition_manager = kwargs.pop("partition_manager", None) self._load_balancing_interval = kwargs.pop("load_balancing_interval", 10) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 5c7b98c26720..8f5101d8723f 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -32,7 +32,7 @@ def _check_closed(self): if self._closed: raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name)) - def _create_handler(self): + def _create_handler(self, auth): pass async def _open(self): @@ -44,15 +44,15 @@ async def _open(self): if not self._running: if self._handler: await self._handler.close_async() - self._create_handler() - await self._handler.open_async(connection=await self._client._conn_manager.get_connection( - self._client._address.hostname, - self._client._create_auth() - )) + auth = await self._client._create_auth() + self._create_handler(auth) + await self._handler.open_async( + connection=await self._client._conn_manager.get_connection(self._client._address.hostname, auth) + ) while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ - or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access + or constants.MAX_MESSAGE_LENGTH_BYTES self._running = True async def _close_handler(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_async.py index b338dd06bfd4..ce25cd42abb7 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_async.py @@ -86,10 +86,10 @@ def __init__( # pylint: disable=super-init-not-called self._condition = None self._link_properties = {types.AMQPSymbol(self._timeout_symbol): types.AMQPLong(int(self._timeout * 1000))} - def _create_handler(self): + def _create_handler(self, auth): self._handler = SendClientAsync( self._target, - auth=self._client._create_auth(), # pylint:disable=protected-access + auth=auth, debug=self._client._config.network_tracing, # pylint:disable=protected-access msg_timeout=self._timeout, error_policy=self._retry_policy, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py index 13578a93061b..811d038d0b57 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_producer_client_async.py @@ -7,7 +7,7 @@ from typing import Any, Union, TYPE_CHECKING, Iterable, List from uamqp import constants # type: ignore -from azure.eventhub import EventData, EventHubSharedKeyCredential, EventHubSASTokenCredential, EventDataBatch +from azure.eventhub import EventData, EventDataBatch from ._client_base_async import ClientBaseAsync from ._producer_async import EventHubProducer @@ -26,8 +26,8 @@ class EventHubProducerClient(ClientBaseAsync): :param str event_hub_path: The path of the specific Event Hub to connect the client to. :param credential: The credential object used for authentication which implements particular interface of getting tokens. It accepts :class:`EventHubSharedKeyCredential`, - :class:`EventHubSASTokenCredential`, or credential objects generated by - the azure-identity library and objects that implement `get_token(self, *scopes)` method. + or credential objects generated by the azure-identity library and objects that + implement `get_token(self, *scopes)` method. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword float auth_timeout: The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. @@ -52,7 +52,7 @@ class EventHubProducerClient(ClientBaseAsync): """ def __init__(self, host, event_hub_path, credential, **kwargs) -> None: - # type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None + # type:(str, str, TokenCredential, Any) -> None """""" super(EventHubProducerClient, self).__init__( host=host, event_hub_path=event_hub_path, credential=credential, diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 3a255f2d9312..817642600cef 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -23,20 +23,41 @@ _BATCH_MESSAGE_OVERHEAD_COST = [5, 8] -def parse_sas_token(sas_token): - """Parse a SAS token into its components. - - :param sas_token: The SAS token. - :type sas_token: str - :rtype: dict[str, str] +def _generate_sas_token(uri, policy, key, expiry=None): + """Create a shared access signiture token as a string literal. + :returns: SAS token as string literal. + :rtype: str """ - sas_data = {} - token = sas_token.partition(' ')[2] - fields = token.split('&') - for field in fields: - key, value = field.split('=', 1) - sas_data[key.lower()] = value - return sas_data + from base64 import b64encode, b64decode + from hashlib import sha256 + from hmac import HMAC + if not expiry: + expiry = time.time() + 3600 # Default to 1 hour. + encoded_uri = quote_plus(uri) + ttl = int(expiry) + sign_key = '%s\n%d' % (encoded_uri, ttl) + signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest()) + result = { + 'sr': uri, + 'sig': signature, + 'se': str(ttl)} + if policy: + result['skn'] = policy + token = 'SharedAccessSignature ' + urlencode(result) + return _AccessToken(token=token, expiry=ttl) + + +class _AccessToken(object): + + def __init__(self, token=None, expiry=None): + self.token = token + self.expires_on = expiry + + +class _Address(object): + def __init__(self, hostname=None, path=None): + self.hostname = hostname + self.path = path class EventData(object): @@ -455,25 +476,6 @@ def _selector(self): return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8') -# TODO: move some behaviors to these two classes. -class EventHubSASTokenCredential(object): - """ - SAS token used for authentication. - - :param token: A SAS token or function that returns a SAS token. If a function is supplied, - it will be used to retrieve subsequent tokens in the case of token expiry. The function should - take no arguments. The token can be type of str or Callable object. - """ - def __init__(self, token): - self.token = token - - def get_sas_token(self): - if callable(self.token): # pylint:disable=no-else-return - return self.token() - else: - return self.token - - class EventHubSharedKeyCredential(object): """ The shared access key credential used for authentication. @@ -484,9 +486,9 @@ class EventHubSharedKeyCredential(object): def __init__(self, policy, key): self.policy = policy self.key = key + self.token_type = b"servicebus.windows.net:sastoken" - -class _Address(object): - def __init__(self, hostname=None, path=None): - self.hostname = hostname - self.path = path + def get_token(self, *scopes, **kwargs): + if not scopes: + raise ValueError("No token scope provided.") + return _generate_sas_token(scopes[0], self.policy, self.key) From 81e6b0d8b031c890032919562293771e251add3d Mon Sep 17 00:00:00 2001 From: antisch Date: Mon, 18 Nov 2019 13:56:08 -0800 Subject: [PATCH 2/2] Updated tests credentials --- sdk/eventhub/azure-eventhubs/conftest.py | 1 - .../samples/async_samples/sample_code_eventhub_async.py | 6 ++---- .../tests/livetest/asynctests/test_properties_async.py | 3 +-- .../tests/livetest/asynctests/test_reconnect_async.py | 4 ++-- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/conftest.py b/sdk/eventhub/azure-eventhubs/conftest.py index 39ff979bd0b7..6139b459ede2 100644 --- a/sdk/eventhub/azure-eventhubs/conftest.py +++ b/sdk/eventhub/azure-eventhubs/conftest.py @@ -21,7 +21,6 @@ collect_ignore.append("samples/async_samples") collect_ignore.append("examples/async_examples") -# from azure.eventhub.client import EventHubClient from azure.eventhub import EventHubConsumerClient from azure.eventhub import EventHubProducerClient from azure.eventhub import EventPosition diff --git a/sdk/eventhub/azure-eventhubs/samples/async_samples/sample_code_eventhub_async.py b/sdk/eventhub/azure-eventhubs/samples/async_samples/sample_code_eventhub_async.py index 6fb29ab47015..52a39b815454 100644 --- a/sdk/eventhub/azure-eventhubs/samples/async_samples/sample_code_eventhub_async.py +++ b/sdk/eventhub/azure-eventhubs/samples/async_samples/sample_code_eventhub_async.py @@ -20,8 +20,7 @@ def create_async_eventhub_producer_client(): # [START create_eventhub_producer_client_async] import os - from azure.eventhub import EventHubSharedKeyCredential - from azure.eventhub.aio import EventHubProducerClient + from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential hostname = os.environ['EVENT_HUB_HOSTNAME'] event_hub = os.environ['EVENT_HUB_NAME'] @@ -47,8 +46,7 @@ def create_async_eventhub_consumer_client(): # [START create_eventhub_consumer_client_async] import os - from azure.eventhub import EventHubSharedKeyCredential - from azure.eventhub.aio import EventHubConsumerClient + from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential hostname = os.environ['EVENT_HUB_HOSTNAME'] event_hub = os.environ['EVENT_HUB_NAME'] diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py index d40613be1c7c..feb6ee82f6eb 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_properties_async.py @@ -5,8 +5,7 @@ #-------------------------------------------------------------------------- import pytest -from azure.eventhub import EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient +from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient, EventHubSharedKeyCredential @pytest.mark.liveTest diff --git a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py index 9f850f2080c9..6773b9b27c3e 100644 --- a/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/livetest/asynctests/test_reconnect_async.py @@ -8,8 +8,8 @@ import asyncio import pytest -from azure.eventhub import EventData, EventHubSharedKeyCredential -from azure.eventhub.aio import EventHubProducerClient +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential import uamqp from uamqp import authentication