diff --git a/eng/tox/allowed_pylint_failures.py b/eng/tox/allowed_pylint_failures.py index 3982eba05561..2433975c377e 100644 --- a/eng/tox/allowed_pylint_failures.py +++ b/eng/tox/allowed_pylint_failures.py @@ -39,7 +39,6 @@ "azure-eventgrid", "azure-graphrbac", "azure-loganalytics", - "azure-servicebus", "azure-servicefabric", "azure-template", "azure-keyvault", diff --git a/eng/tox/mypy_hard_failure_packages.py b/eng/tox/mypy_hard_failure_packages.py index 4d9932a98916..4f458daf44aa 100644 --- a/eng/tox/mypy_hard_failure_packages.py +++ b/eng/tox/mypy_hard_failure_packages.py @@ -8,6 +8,7 @@ MYPY_HARD_FAILURE_OPTED = [ "azure-core", "azure-eventhub", + "azure-servicebus", "azure-ai-textanalytics", "azure-ai-formrecognizer" ] diff --git a/sdk/servicebus/azure-servicebus/azure/__init__.py b/sdk/servicebus/azure-servicebus/azure/__init__.py index 625f4623bd06..e604558a8bed 100644 --- a/sdk/servicebus/azure-servicebus/azure/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/__init__.py @@ -1 +1 @@ -__path__ = __import__('pkgutil').extend_path(__path__, __name__) +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py index 24869d901162..7ff53c728df8 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py @@ -7,7 +7,7 @@ import uuid import time from datetime import timedelta -from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any +from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any, Callable, Type try: from urllib import quote_plus # type: ignore @@ -90,6 +90,31 @@ def _generate_sas_token(uri, policy, key, expiry=None): return _AccessToken(token=token, expires_on=abs_expiry) +def _convert_connection_string_to_kwargs(conn_str, shared_key_credential_type, **kwargs): + # type: (str, Type, Any) -> Dict[str, Any] + host, policy, key, entity_in_conn_str = _parse_conn_str(conn_str) + queue_name = kwargs.get("queue_name") + topic_name = kwargs.get("topic_name") + if not (queue_name or topic_name or entity_in_conn_str): + raise ValueError("Entity name is missing. Please specify `queue_name` or `topic_name`" + " or use a connection string including the entity information.") + + if queue_name and topic_name: + raise ValueError("`queue_name` and `topic_name` can not be specified simultaneously.") + + entity_in_kwargs = queue_name or topic_name + if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs): + raise ServiceBusAuthorizationError( + "Entity names do not match, the entity name in connection string is {};" + " the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs) + ) + + kwargs["fully_qualified_namespace"] = host + kwargs["entity_name"] = entity_in_conn_str or entity_in_kwargs + kwargs["credential"] = shared_key_credential_type(policy, key) + return kwargs + + class ServiceBusSharedKeyCredential(object): """The shared access key credential used for authentication. @@ -110,7 +135,7 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument return _generate_sas_token(scopes[0], self.policy, self.key) -class BaseHandler(object): # pylint:disable=too-many-instance-attributes +class BaseHandler: # pylint:disable=too-many-instance-attributes def __init__( self, fully_qualified_namespace, @@ -118,6 +143,7 @@ def __init__( credential, **kwargs ): + # type: (str, str, TokenCredential, Any) -> None self.fully_qualified_namespace = fully_qualified_namespace self._entity_name = entity_name @@ -128,7 +154,7 @@ def __init__( self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8] self._config = Configuration(**kwargs) self._running = False - self._handler = None + self._handler = None # type: uamqp.AMQPClient self._auth_uri = None self._properties = create_properties() @@ -140,6 +166,7 @@ def __exit__(self, *args): self.close() def _handle_exception(self, exception): + # type: (BaseException) -> ServiceBusError error, error_need_close_handler, error_need_raise = _create_servicebus_exception(_LOGGER, exception, self) if error_need_close_handler: self._close_handler() @@ -148,31 +175,6 @@ def _handle_exception(self, exception): return error - @staticmethod - def _from_connection_string(conn_str, **kwargs): - # type: (str, Any) -> Dict[str, Any] - host, policy, key, entity_in_conn_str = _parse_conn_str(conn_str) - queue_name = kwargs.get("queue_name") - topic_name = kwargs.get("topic_name") - if not (queue_name or topic_name or entity_in_conn_str): - raise ValueError("Entity name is missing. Please specify `queue_name` or `topic_name`" - " or use a connection string including the entity information.") - - if queue_name and topic_name: - raise ValueError("`queue_name` and `topic_name` can not be specified simultaneously.") - - entity_in_kwargs = queue_name or topic_name - if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs): - raise ServiceBusAuthorizationError( - "Entity names do not match, the entity name in connection string is {};" - " the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs) - ) - - kwargs["fully_qualified_namespace"] = host - kwargs["entity_name"] = entity_in_conn_str or entity_in_kwargs - kwargs["credential"] = ServiceBusSharedKeyCredential(policy, key) - return kwargs - def _backoff( self, retried_times, @@ -180,6 +182,7 @@ def _backoff( timeout=None, entity_name=None ): + # type: (int, Exception, Optional[float], str) -> None entity_name = entity_name or self._container_id backoff = self._config.retry_backoff_factor * 2 ** retried_times if backoff <= self._config.retry_backoff_max and ( @@ -200,16 +203,14 @@ def _backoff( raise last_exception def _do_retryable_operation(self, operation, timeout=None, **kwargs): + # type: (Callable, Optional[float], Any) -> Any require_last_exception = kwargs.pop("require_last_exception", False) require_timeout = kwargs.pop("require_timeout", False) retried_times = 0 - last_exception = None max_retries = self._config.retry_total while retried_times <= max_retries: try: - if require_last_exception: - kwargs["last_exception"] = last_exception if require_timeout: kwargs["timeout"] = timeout return operation(**kwargs) @@ -217,23 +218,24 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs): raise except Exception as exception: # pylint: disable=broad-except last_exception = self._handle_exception(exception) + if require_last_exception: + kwargs["last_exception"] = last_exception retried_times += 1 if retried_times > max_retries: - break + _LOGGER.info( + "%r operation has exhausted retry. Last exception: %r.", + self._container_id, + last_exception, + ) + raise last_exception self._backoff( retried_times=retried_times, last_exception=last_exception, timeout=timeout ) - _LOGGER.info( - "%r operation has exhausted retry. Last exception: %r.", - self._container_id, - last_exception, - ) - raise last_exception - def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs): + # type: (str, uamqp.Message, Callable, bool, Any) -> uamqp.Message self._open() application_properties = {} # Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default. @@ -265,6 +267,7 @@ def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_a raise ServiceBusError("Management request failed: {}".format(exp), exp) def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, **kwargs): + # type: (bytes, Dict[str, Any], Callable, Any) -> Any return self._do_retryable_operation( self._mgmt_request_response, mgmt_operation=mgmt_operation, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/client_mixins.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/client_mixins.py index f492cc74b69d..4056f10388c5 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/client_mixins.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/client_mixins.py @@ -8,8 +8,8 @@ import uuid import requests try: - from urlparse import urlparse - from urllib import unquote_plus + from urlparse import urlparse # type: ignore + from urllib import unquote_plus # type: ignore except ImportError: from urllib.parse import urlparse from urllib.parse import unquote_plus diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 16c52bfd7fa9..22e47d847534 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -8,10 +8,10 @@ import uuid import functools import logging -from typing import Optional, List, Union, Generator +from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Dict, Any -import uamqp -from uamqp import types, errors +import uamqp.message +from uamqp import types from .constants import ( _BATCH_MESSAGE_OVERHEAD_COST, @@ -46,6 +46,9 @@ MessageSettleFailed, MessageContentTooLarge) from .utils import utc_from_timestamp, utc_now +if TYPE_CHECKING: + from .._servicebus_receiver import ServiceBusReceiver + from .._servicebus_session_receiver import ServiceBusSessionReceiver _LOGGER = logging.getLogger(__name__) @@ -86,8 +89,6 @@ def __init__(self, body, **kwargs): self._annotations = {} self._app_properties = {} - self._expiry = None - self._receiver = None self.session_id = kwargs.get("session_id", None) if 'message' in kwargs: self.message = kwargs['message'] @@ -268,10 +269,10 @@ def scheduled_enqueue_time_utc(self, value): @property def body(self): - # type: () -> Union[bytes, Generator[bytes]] + # type: () -> Union[bytes, Iterable[bytes]] """The body of the Message. - :rtype: bytes or generator[bytes] + :rtype: bytes or Iterable[bytes] """ return self.message.get_data() @@ -317,7 +318,8 @@ def __len__(self): def _from_list(self, messages): for each in messages: if not isinstance(each, Message): - raise ValueError("Populating a message batch only supports iterables containing Message Objects. Received instead: {}".format(each.__class__.__name__)) + raise ValueError("Populating a message batch only supports iterables containing Message Objects. " + "Received instead: {}".format(each.__class__.__name__)) self.add(each) @property @@ -446,91 +448,14 @@ class ReceivedMessage(PeekMessage): :dedent: 4 :caption: Checking the properties on a received message. """ + def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs): super(ReceivedMessage, self).__init__(message=message) self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete) self._is_deferred_message = kwargs.get("is_deferred_message", False) self.auto_renew_error = None - - def _check_live(self, action): - # pylint: disable=no-member - if not self._receiver or not self._receiver._running: # pylint: disable=protected-access - raise MessageSettleFailed(action, "Orphan message had no open connection.") - if self.settled: - raise MessageAlreadySettled(action) - try: - if self.expired: - raise MessageLockExpired(inner_exception=self.auto_renew_error) - except TypeError: - pass - try: - if self._receiver.session.expired: - raise SessionLockExpired(inner_exception=self._receiver.session.auto_renew_error) - except AttributeError: - pass - - def _settle_message( - self, - settle_operation, - dead_letter_details=None - ): - try: - if not self._is_deferred_message: - try: - self._settle_via_receiver_link(settle_operation, dead_letter_details)() - return - except RuntimeError as exception: - _LOGGER.info( - "Message settling: %r has encountered an exception (%r)." - "Trying to settle through management link", - settle_operation, - exception - ) - self._settle_via_mgmt_link(settle_operation, dead_letter_details)() - except Exception as e: - raise MessageSettleFailed(settle_operation, e) - - def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None): - # pylint: disable=protected-access - if settle_operation == MESSAGE_COMPLETE: - return functools.partial( - self._receiver._settle_message, - SETTLEMENT_COMPLETE, - [self.lock_token], - ) - if settle_operation == MESSAGE_ABANDON: - return functools.partial( - self._receiver._settle_message, - SETTLEMENT_ABANDON, - [self.lock_token], - ) - if settle_operation == MESSAGE_DEAD_LETTER: - return functools.partial( - self._receiver._settle_message, - SETTLEMENT_DEADLETTER, - [self.lock_token], - dead_letter_details=dead_letter_details - ) - if settle_operation == MESSAGE_DEFER: - return functools.partial( - self._receiver._settle_message, - SETTLEMENT_DEFER, - [self.lock_token], - ) - raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) - - def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None): - if settle_operation == MESSAGE_COMPLETE: - return functools.partial(self.message.accept) - if settle_operation == MESSAGE_ABANDON: - return functools.partial(self.message.modify, True, False) - if settle_operation == MESSAGE_DEAD_LETTER: - # note: message.reject() can not set reason and description properly due to the issue - # https://github.com/Azure/azure-uamqp-python/issues/155 - return functools.partial(self.message.reject, condition=DEADLETTERNAME) - if settle_operation == MESSAGE_DEFER: - return functools.partial(self.message.modify, True, True) - raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + self._receiver = None # type: ignore + self._expiry = None @property def settled(self): @@ -597,6 +522,93 @@ def lock_token(self): return delivery_annotations.get(_X_OPT_LOCK_TOKEN) return None + def _check_live(self, action): + # pylint: disable=no-member + if not self._receiver or not self._receiver._running: # pylint: disable=protected-access + raise MessageSettleFailed(action, "Orphan message had no open connection.") + if self.settled: + raise MessageAlreadySettled(action) + try: + if self.expired: + raise MessageLockExpired(inner_exception=self.auto_renew_error) + except TypeError: + pass + try: + if self._receiver.session.expired: + raise SessionLockExpired(inner_exception=self._receiver.session.auto_renew_error) + except AttributeError: + pass + + def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None): + # type: (str, Dict[str, Any]) -> Callable + # pylint: disable=protected-access + if settle_operation == MESSAGE_COMPLETE: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_COMPLETE, + [self.lock_token], + ) + if settle_operation == MESSAGE_ABANDON: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_ABANDON, + [self.lock_token], + ) + if settle_operation == MESSAGE_DEAD_LETTER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEADLETTER, + [self.lock_token], + dead_letter_details=dead_letter_details + ) + if settle_operation == MESSAGE_DEFER: + return functools.partial( + self._receiver._settle_message, + SETTLEMENT_DEFER, + [self.lock_token], + ) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + + def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None): # pylint: disable=unused-argument + # type: (str, Dict[str, Any]) -> Callable + # dead_letter_detail is not used because of uamqp receiver link doesn't accept it while it + # should be accepted. Will revisit this later. + # uamqp management link accepts dead_letter_details. Refer to method _settle_via_mgmt_link + # TODO: to make dead_letter_details useful + if settle_operation == MESSAGE_COMPLETE: + return functools.partial(self.message.accept) + if settle_operation == MESSAGE_ABANDON: + return functools.partial(self.message.modify, True, False) + if settle_operation == MESSAGE_DEAD_LETTER: + # note: message.reject() can not set reason and description properly due to the issue + # https://github.com/Azure/azure-uamqp-python/issues/155 + return functools.partial(self.message.reject, condition=DEADLETTERNAME) + if settle_operation == MESSAGE_DEFER: + return functools.partial(self.message.modify, True, True) + raise ValueError("Unsupported settle operation type: {}".format(settle_operation)) + + def _settle_message( + self, + settle_operation, + dead_letter_details=None + ): + # type: (str, Dict[str, Any]) -> None + try: + if not self._is_deferred_message: + try: + self._settle_via_receiver_link(settle_operation, dead_letter_details)() + return + except RuntimeError as exception: + _LOGGER.info( + "Message settling: %r has encountered an exception (%r)." + "Trying to settle through management link", + settle_operation, + exception + ) + self._settle_via_mgmt_link(settle_operation, dead_letter_details)() + except Exception as e: + raise MessageSettleFailed(settle_operation, e) + def complete(self): # type: () -> None """Complete the message. @@ -700,5 +712,5 @@ def renew_lock(self): if not token: raise ValueError("Unable to renew lock - no lock token found.") - expiry = self._receiver._renew_locks(token) # pylint: disable=protected-access + expiry = self._receiver._renew_locks(token) # pylint: disable=protected-access,no-member self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py index 15b43563d8d1..ea6f7d1e08db 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/receiver_mixins.py @@ -4,12 +4,13 @@ # license information. # ------------------------------------------------------------------------- import uuid +from uamqp import Source from .message import ReceivedMessage from .constants import ( - NEXT_AVAILABLE, - SESSION_FILTER, - SESSION_LOCKED_UNTIL, - DATETIMEOFFSET_EPOCH, + NEXT_AVAILABLE, + SESSION_FILTER, + SESSION_LOCKED_UNTIL, + DATETIMEOFFSET_EPOCH, MGMT_REQUEST_SESSION_ID, ReceiveSettleMode ) @@ -18,7 +19,6 @@ SessionLockExpired ) from .utils import utc_from_timestamp, utc_now -from uamqp import Source class ReceiverMixin(object): # pylint: disable=too-many-instance-attributes @@ -52,10 +52,10 @@ def _get_source(self): return self._entity_uri def _on_attach(self, source, target, properties, error): - return + pass def _populate_message_properties(self, message): - return + pass class SessionReceiverMixin(ReceiverMixin): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 549e14a91e8c..802626c0acb0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -61,26 +61,6 @@ def utc_now(): return datetime.datetime.now(tz=TZ_UTC) -def get_running_loop(): - try: - import asyncio # pylint: disable=import-error - return asyncio.get_running_loop() - except AttributeError: # 3.5 / 3.6 - loop = None - try: - loop = asyncio._get_running_loop() # pylint: disable=protected-access - except AttributeError: - _log.warning('This version of Python is deprecated, please upgrade to >= v3.5.3') - if loop is None: - _log.warning('No running event loop') - loop = asyncio.get_event_loop() - return loop - except RuntimeError: - # For backwards compatibility, create new event loop - _log.warning('No running event loop') - return asyncio.get_event_loop() - - def parse_conn_str(conn_str): endpoint = None shared_access_key_name = None diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_models.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_models.py index 624052524ca4..a1f3ede58da7 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_models.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_models.py @@ -79,8 +79,8 @@ def __init__(self, xml_element_name): try: - _unicode_type = unicode - _strtype = basestring + _unicode_type = unicode # type: ignore + _strtype = basestring # type: ignore except NameError: _unicode_type = str _strtype = str diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_serialization.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_serialization.py index 33e72795e8e4..f2daaf3cf3ef 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_serialization.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/_common_serialization.py @@ -9,7 +9,7 @@ try: from xml.etree import cElementTree as ETree except ImportError: - from xml.etree import ElementTree as ETree + from xml.etree import ElementTree as ETree # type: ignore try: from cStringIO import StringIO except ImportError: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/models.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/models.py index 23819fc9fc63..9f0ea92812b9 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/models.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/models.py @@ -9,6 +9,7 @@ import sys import json from datetime import datetime +import warnings from azure.common import AzureException from ._common_models import WindowsAzureData, _unicode_type @@ -73,7 +74,6 @@ def __init__(self, default_message_time_to_live=None, @property def max_size_in_mega_bytes(self): - import warnings warnings.warn( 'This attribute has been changed to max_size_in_megabytes.') return self.max_size_in_megabytes diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/servicebusservice.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/servicebusservice.py index 6adedb647ac9..1b635c602fd4 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/servicebusservice.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_control_client/servicebusservice.py @@ -9,6 +9,8 @@ import os import time import json +from typing import Dict + try: from urllib2 import quote as url_quote from urllib2 import unquote as url_unquote @@ -1252,7 +1254,7 @@ def _update_service_bus_header(self, request): # Token cache for Authentication # Shared by the different instances of ServiceBusWrapTokenAuthentication -_tokens = {} +_tokens = {} # type: Dict[str, str] class ServiceBusWrapTokenAuthentication: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index a8adff9da7e8..d8d79494d0e9 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -131,7 +131,7 @@ def from_connection_string( return cls( fully_qualified_namespace=host, entity_name=entity_in_conn_str or kwargs.pop("entity_name", None), - credential=ServiceBusSharedKeyCredential(policy, key), + credential=ServiceBusSharedKeyCredential(policy, key), # type: ignore **kwargs ) @@ -298,7 +298,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): ) def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs): - # type: (str, str, Any) -> ServiceBusReceiver + # type: (str, str, str, Any) -> ServiceBusReceiver """Get ServiceBusReceiver for the specific subscription under the topic. :param str topic_name: The name of specific Service Bus Topic the client connects to. @@ -402,4 +402,3 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): http_proxy=self._config.http_proxy, **kwargs ) - diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 61fa703daceb..65c4f79b9a35 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -5,12 +5,13 @@ import time import logging import functools -from typing import Any, List, TYPE_CHECKING, Optional, Union +from typing import Any, List, TYPE_CHECKING, Optional, Dict -from uamqp import ReceiveClient, Source, types +from uamqp import ReceiveClient, types from uamqp.constants import SenderSettleMode +from uamqp.authentication.common import AMQPAuth -from ._base_handler import BaseHandler +from ._base_handler import BaseHandler, ServiceBusSharedKeyCredential, _convert_connection_string_to_kwargs from ._common.utils import create_authentication from ._common.message import PeekMessage, ReceivedMessage from ._common.constants import ( @@ -103,17 +104,16 @@ def __init__( **kwargs ) else: - queue_name = kwargs.get("queue_name") - topic_name = kwargs.get("topic_name") + queue_name = kwargs.get("queue_name") # type: Optional[str] + topic_name = kwargs.get("topic_name") # type: Optional[str] subscription_name = kwargs.get("subscription_name") if queue_name and topic_name: raise ValueError("Queue/Topic name can not be specified simultaneously.") - if not (queue_name or topic_name): - raise ValueError("Queue/Topic name is missing. Please specify queue_name/topic_name.") if topic_name and not subscription_name: raise ValueError("Subscription name is missing for the topic. Please specify subscription_name.") - entity_name = queue_name or topic_name + if not entity_name: + raise ValueError("Queue/Topic name is missing. Please specify queue_name/topic_name.") super(ServiceBusReceiver, self).__init__( fully_qualified_namespace=fully_qualified_namespace, @@ -147,6 +147,7 @@ def _iter_next(self): return message def _create_handler(self, auth): + # type: (AMQPAuth) -> None self._handler = ReceiveClient( self._get_source(), auth=auth, @@ -182,6 +183,7 @@ def _open(self): raise def _receive(self, max_batch_size=None, timeout=None): + # type: (Optional[int], Optional[float]) -> List[ReceivedMessage] self._open() max_batch_size = max_batch_size or self._handler._prefetch # pylint: disable=protected-access @@ -194,6 +196,7 @@ def _receive(self, max_batch_size=None, timeout=None): return [self._build_message(message) for message in batch] def _settle_message(self, settlement, lock_tokens, dead_letter_details=None): + # type: (bytes, List[str], Optional[Dict[str, Any]]) -> Any message = { MGMT_REQUEST_DISPOSITION_STATUS: settlement, MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens) @@ -210,6 +213,7 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None): ) def _renew_locks(self, *lock_tokens): + # type: (*str) -> Any message = {MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)} return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RENEWLOCK_OPERATION, @@ -265,8 +269,9 @@ def from_connection_string( :caption: Create a new instance of the ServiceBusReceiver from connection string. """ - constructor_args = cls._from_connection_string( + constructor_args = _convert_connection_string_to_kwargs( conn_str, + ServiceBusSharedKeyCredential, **kwargs ) if kwargs.get("queue_name") and kwargs.get("subscription_name"): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 96464f088616..39ea04d16434 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -5,12 +5,13 @@ import logging import time import uuid -from typing import Any, TYPE_CHECKING, Union, List +from typing import Any, TYPE_CHECKING, Union, List, Optional import uamqp from uamqp import SendClient, types +from uamqp.authentication.common import AMQPAuth -from ._base_handler import BaseHandler +from ._base_handler import BaseHandler, ServiceBusSharedKeyCredential, _convert_connection_string_to_kwargs from ._common import mgmt_handlers from ._common.message import Message, BatchMessage from .exceptions import ( @@ -137,9 +138,9 @@ def __init__( topic_name = kwargs.get("topic_name") if queue_name and topic_name: raise ValueError("Queue/Topic name can not be specified simultaneously.") - if not (queue_name or topic_name): - raise ValueError("Queue/Topic name is missing. Please specify queue_name/topic_name.") entity_name = queue_name or topic_name + if not entity_name: + raise ValueError("Queue/Topic name is missing. Please specify queue_name/topic_name.") super(ServiceBusSender, self).__init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, @@ -152,6 +153,7 @@ def __init__( self._connection = kwargs.get("connection") def _create_handler(self, auth): + # type: (AMQPAuth) -> None self._handler = SendClient( self._entity_uri, auth=auth, @@ -183,6 +185,7 @@ def _open(self): raise def _send(self, message, timeout=None, last_exception=None): + # type: (Message, Optional[float], Exception) -> None self._open() self._set_msg_timeout(timeout, last_exception) self._handler.send_message(message.message) @@ -285,8 +288,9 @@ def from_connection_string( :caption: Create a new instance of the ServiceBusSender from connection string. """ - constructor_args = cls._from_connection_string( + constructor_args = _convert_connection_string_to_kwargs( conn_str, + ServiceBusSharedKeyCredential, **kwargs ) return cls(**constructor_args) @@ -322,11 +326,11 @@ def send(self, message): """ try: batch = self.create_batch() - batch._from_list(message) + batch._from_list(message) # pylint: disable=protected-access message = batch except TypeError: # Message was not a list or generator. pass - if isinstance(message, BatchMessage) and len(message) == 0: + if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") self._do_retryable_operation( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py index 5de466887c9e..ae3478169dc4 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py @@ -2,9 +2,8 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -import time import logging -from typing import Any, List, TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Union, Optional import six from ._common.utils import utc_from_timestamp, utc_now @@ -22,11 +21,56 @@ if TYPE_CHECKING: import datetime + from ._servicebus_session_receiver import ServiceBusSessionReceiver + from .aio._servicebus_session_receiver_async import ServiceBusSessionReceiver as ServiceBusSessionReceiverAsync _LOGGER = logging.getLogger(__name__) -class ServiceBusSession(object): +class BaseSession(object): + def __init__(self, session_id, receiver, encoding="UTF-8"): + # type: (str, Union[ServiceBusSessionReceiver, ServiceBusSessionReceiverAsync], str) -> None + self._session_id = session_id + self._receiver = receiver + self._encoding = encoding + self._session_start = None + self._locked_until_utc = None # type: Optional[datetime.datetime] + self.auto_renew_error = None + + @property + def session_id(self): + # type: () -> str + """ + Session id of the current session. + + :rtype: str + """ + return self._session_id + + @property + def expired(self): + # type: () -> bool + """Whether the receivers lock on a particular session has expired. + + :rtype: bool + """ + return bool(self._locked_until_utc and self._locked_until_utc <= utc_now()) + + @property + def locked_until_utc(self): + # type: () -> Optional[datetime.datetime] + """The time at which this session's lock will expire. + + :rtype: datetime.datetime + """ + return self._locked_until_utc + + def _check_live(self): + if self.expired: + raise SessionLockExpired(inner_exception=self.auto_renew_error) + + +class ServiceBusSession(BaseSession): """ The ServiceBusSession is used for manage session states and lock renewal. @@ -45,17 +89,6 @@ class ServiceBusSession(object): :dedent: 4 :caption: Get session from a receiver """ - def __init__(self, session_id, receiver, encoding="UTF-8"): - self._session_id = session_id - self._receiver = receiver - self._encoding = encoding - self._session_start = None - self._locked_until_utc = None - self.auto_renew_error = None - - def _check_live(self): - if self.expired: - raise SessionLockExpired(inner_exception=self.auto_renew_error) def get_session_state(self): # type: () -> str @@ -135,31 +168,3 @@ def renew_lock(self): mgmt_handlers.default ) self._locked_until_utc = utc_from_timestamp(expiry[MGMT_RESPONSE_RECEIVER_EXPIRATION]/1000.0) - - @property - def session_id(self): - # type: () -> str - """ - Session id of the current session. - - :rtype: str - """ - return self._session_id - - @property - def expired(self): - # type: () -> bool - """Whether the receivers lock on a particular session has expired. - - :rtype: bool - """ - return bool(self._locked_until_utc and self._locked_until_utc <= utc_now()) - - @property - def locked_until_utc(self): - # type: () -> datetime.datetime - """The time at which this session's lock will expire. - - :rtype: datetime.datetime - """ - return self._locked_until_utc diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py index 4a4d2bd702e9..cb9dd586ee71 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py @@ -2,14 +2,9 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -import time import logging -from typing import Any, TYPE_CHECKING, Optional, Union -import six +from typing import Any, TYPE_CHECKING -from ._base_handler import BaseHandler -from ._common.utils import utc_from_timestamp, utc_now -from ._common.constants import ReceiveSettleMode from ._common.receiver_mixins import SessionReceiverMixin from ._servicebus_receiver import ServiceBusReceiver from ._servicebus_session import ServiceBusSession @@ -155,4 +150,4 @@ def from_connection_string( :caption: Create a new instance of the ServiceBusReceiver from connection string. """ - return super(ServiceBusSessionReceiver, cls).from_connection_string(conn_str, **kwargs) + return super(ServiceBusSessionReceiver, cls).from_connection_string(conn_str, **kwargs) # type: ignore diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index e154e91a8bb5..bf916a391bb4 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -8,7 +8,6 @@ from .._common import message as sync_message from .._common.constants import ( - ReceiveSettleMode, MGMT_RESPONSE_MESSAGE_EXPIRATION, MESSAGE_COMPLETE, MESSAGE_DEAD_LETTER, @@ -16,7 +15,8 @@ MESSAGE_DEFER, MESSAGE_RENEW_LOCK ) -from .._common.utils import get_running_loop, utc_from_timestamp +from .._common.utils import utc_from_timestamp +from ._async_utils import get_running_loop from ..exceptions import MessageSettleFailed _LOGGER = logging.getLogger(__name__) @@ -26,12 +26,7 @@ class ReceivedMessage(sync_message.ReceivedMessage): """A Service Bus Message received from service side. """ - - def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None, **kwargs): - self._loop = loop or get_running_loop() - super(ReceivedMessage, self).__init__(message=message, mode=mode, **kwargs) - - async def _settle_message( + async def _settle_message( # type: ignore self, settle_operation, dead_letter_details=None @@ -39,7 +34,7 @@ async def _settle_message( try: if not self._is_deferred_message: try: - await self._loop.run_in_executor( + await get_running_loop().run_in_executor( None, self._settle_via_receiver_link(settle_operation, dead_letter_details) ) @@ -55,8 +50,7 @@ async def _settle_message( except Exception as e: raise MessageSettleFailed(settle_operation, e) - async def complete(self): - # type: () -> None + async def complete(self) -> None: # type: ignore """Complete the message. This removes the message from the queue. @@ -72,8 +66,9 @@ async def complete(self): await self._settle_message(MESSAGE_COMPLETE) self._settled = True - async def dead_letter(self, reason=None, description=None): - # type: (Optional[str], Optional[str]) -> None + async def dead_letter( # type: ignore + self, reason: Optional[str] = None, description: Optional[str] = None + ) -> None: # pylint: disable=unused-argument """Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be @@ -92,10 +87,9 @@ async def dead_letter(self, reason=None, description=None): await self._settle_message(MESSAGE_DEAD_LETTER) self._settled = True - async def abandon(self): - # type: () -> None - """Abandon the message. - + async def abandon(self) -> None: # type: ignore + """Abandon the message. + This message will be returned to the queue and made available to be received again. :rtype: None @@ -108,10 +102,9 @@ async def abandon(self): await self._settle_message(MESSAGE_ABANDON) self._settled = True - async def defer(self): - # type: () -> None + async def defer(self) -> None: # type: ignore """Defers the message. - + This message will remain in the queue but must be requested specifically by its sequence number in order to be received. @@ -125,8 +118,7 @@ async def defer(self): await self._settle_message(MESSAGE_DEFER) self._settled = True - async def renew_lock(self): - # type: () -> None + async def renew_lock(self) -> None: # type: ignore """Renew the message lock. This will maintain the lock on the message to ensure diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py index 59ca15e83014..cfb9a7cdcdf5 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py @@ -11,7 +11,7 @@ from uamqp import authentication -from .._common.utils import renewable_start_time, get_running_loop, utc_now +from .._common.utils import renewable_start_time, utc_now from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed from .._common.constants import ( JWT_TOKEN_SCOPE, @@ -23,6 +23,25 @@ _log = logging.getLogger(__name__) +def get_running_loop(): + try: + return asyncio.get_running_loop() + except AttributeError: # 3.5 / 3.6 + loop = None + try: + loop = asyncio._get_running_loop() # pylint: disable=protected-access + except AttributeError: + _log.warning('This version of Python is deprecated, please upgrade to >= v3.5.3') + if loop is None: + _log.warning('No running event loop') + loop = asyncio.get_event_loop() + return loop + except RuntimeError: + # For backwards compatibility, create new event loop + _log.warning('No running event loop') + return asyncio.get_event_loop() + + async def create_authentication(client): # pylint: disable=protected-access try: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index a2ae3957cc98..2ad4335e1333 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -4,17 +4,19 @@ # -------------------------------------------------------------------------------------------- import logging import asyncio +import uuid from typing import TYPE_CHECKING, Any import uamqp from uamqp.message import MessageProperties - -from .._base_handler import BaseHandler, _generate_sas_token +from .._base_handler import _generate_sas_token +from .._common._configuration import Configuration +from .._common.utils import create_properties from .._common.constants import ( TOKEN_TYPE_SASTOKEN, MGMT_REQUEST_OP_TYPE_ENTITY_MGMT, - ASSOCIATEDLINKPROPERTYNAME -) + ASSOCIATEDLINKPROPERTYNAME, + CONTAINER_PREFIX, MANAGEMENT_PATH_SUFFIX) from ..exceptions import ( ServiceBusError, _create_servicebus_exception @@ -44,21 +46,28 @@ async def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument return _generate_sas_token(scopes[0], self.policy, self.key) -class BaseHandlerAsync(BaseHandler): +class BaseHandler: def __init__( self, - fully_qualified_namespace: str, - entity_name: str, - credential: "TokenCredential", - **kwargs: Any - ) -> None: - self._loop = kwargs.pop("loop", None) - super(BaseHandlerAsync, self).__init__( - fully_qualified_namespace=fully_qualified_namespace, - entity_name=entity_name, - credential=credential, - **kwargs - ) + fully_qualified_namespace, + entity_name, + credential, + **kwargs + ): + # type: (str, str, TokenCredential, Any) -> None + self.fully_qualified_namespace = fully_qualified_namespace + self._entity_name = entity_name + + subscription_name = kwargs.get("subscription_name") + self._mgmt_target = self._entity_name + (("/Subscriptions/" + subscription_name) if subscription_name else '') + self._mgmt_target = "{}{}".format(self._mgmt_target, MANAGEMENT_PATH_SUFFIX) + self._credential = credential + self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8] + self._config = Configuration(**kwargs) + self._running = False + self._handler = None # type: uamqp.AMQPClient + self._auth_uri = None + self._properties = create_properties() async def __aenter__(self): await self._open_with_retry() @@ -136,7 +145,9 @@ async def _do_retryable_operation(self, operation, timeout=None, **kwargs): ) raise last_exception - async def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs): + async def _mgmt_request_response( + self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs + ): await self._open() application_properties = {} @@ -174,12 +185,6 @@ async def _mgmt_request_response_with_retry(self, mgmt_operation, message, callb **kwargs ) - @staticmethod - def _from_connection_string(conn_str, **kwargs): - kwargs = BaseHandler._from_connection_string(conn_str, **kwargs) - kwargs["credential"] = ServiceBusSharedKeyCredential(kwargs["credential"].policy, kwargs["credential"].key) - return kwargs - async def _open(self): # pylint: disable=no-self-use raise ValueError("Subclass should override the method.") diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index b5e5f4029454..fc2b40c7db0a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -298,7 +298,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): ) def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs): - # type: (str, str, Any) -> ServiceBusReceiver + # type: (str, str, str, Any) -> ServiceBusReceiver """Get ServiceBusReceiver for the specific subscription under the topic. :param str topic_name: The name of specific Service Bus Topic the client connects to. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index ece5fa697f0e..f8870cfc035d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -6,13 +6,14 @@ import collections import functools import logging -from typing import Any, TYPE_CHECKING, List, Union +from typing import Any, TYPE_CHECKING, List from uamqp import ReceiveClientAsync, types from uamqp.constants import SenderSettleMode -from ._base_handler_async import BaseHandlerAsync +from ._base_handler_async import BaseHandler, ServiceBusSharedKeyCredential from ._async_message import ReceivedMessage +from .._base_handler import _convert_connection_string_to_kwargs from .._common.receiver_mixins import ReceiverMixin from .._common.constants import ( REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, @@ -36,7 +37,7 @@ _LOGGER = logging.getLogger(__name__) -class ServiceBusReceiver(collections.abc.AsyncIterator, BaseHandlerAsync, ReceiverMixin): +class ServiceBusReceiver(collections.abc.AsyncIterator, BaseHandler, ReceiverMixin): """The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription. @@ -255,8 +256,9 @@ def from_connection_string( :caption: Create a new instance of the ServiceBusReceiver from connection string. """ - constructor_args = cls._from_connection_string( + constructor_args = _convert_connection_string_to_kwargs( conn_str, + ServiceBusSharedKeyCredential, **kwargs ) if kwargs.get("queue_name") and kwargs.get("subscription_name"): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index e672dc868ad4..8a78979c5373 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -10,8 +10,9 @@ from uamqp import SendClientAsync, types from .._common.message import Message, BatchMessage +from .._base_handler import _convert_connection_string_to_kwargs from .._servicebus_sender import SenderMixin -from ._base_handler_async import BaseHandlerAsync +from ._base_handler_async import BaseHandler, ServiceBusSharedKeyCredential from .._common.constants import ( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, @@ -27,7 +28,7 @@ _LOGGER = logging.getLogger(__name__) -class ServiceBusSender(BaseHandlerAsync, SenderMixin): +class ServiceBusSender(BaseHandler, SenderMixin): """The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic. @@ -228,8 +229,9 @@ def from_connection_string( :caption: Create a new instance of the ServiceBusSender from connection string. """ - constructor_args = cls._from_connection_string( + constructor_args = _convert_connection_string_to_kwargs( conn_str, + ServiceBusSharedKeyCredential, **kwargs ) return cls(**constructor_args) @@ -265,11 +267,11 @@ async def send(self, message): """ try: batch = await self.create_batch() - batch._from_list(message) + batch._from_list(message) # pylint: disable=protected-access message = batch except TypeError: # Message was not a list or generator. pass - if isinstance(message, BatchMessage) and len(message) == 0: + if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") await self._do_retryable_operation( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py index 3f424e8078d2..b0704782fbe6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py @@ -3,10 +3,10 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import logging -from typing import Any, TYPE_CHECKING, List, Union +from typing import Union import six -from .._servicebus_session import ServiceBusSession as BaseSession +from .._servicebus_session import BaseSession from .._common.constants import ( REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py index 7e532a68ab8e..6d8f496b975d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py @@ -5,8 +5,7 @@ import logging from typing import Any, TYPE_CHECKING, List, Union -from .._common.receiver_mixins import ReceiverMixin, SessionReceiverMixin -from .._common.constants import ReceiveSettleMode +from .._common.receiver_mixins import SessionReceiverMixin from ._servicebus_receiver_async import ServiceBusReceiver from ._servicebus_session_async import ServiceBusSession @@ -88,7 +87,7 @@ def from_connection_string( cls, conn_str: str, **kwargs: Any - ) -> "ServiceBusReceiver": + ) -> "ServiceBusSessionReceiver": """Create a ServiceBusSessionReceiver from a connection string. :param conn_str: The connection string of a Service Bus. @@ -134,8 +133,7 @@ def from_connection_string( :caption: Create a new instance of the ServiceBusReceiver from connection string. """ - return super(ServiceBusSessionReceiver, cls).from_connection_string(conn_str, **kwargs) - + return super(ServiceBusSessionReceiver, cls).from_connection_string(conn_str, **kwargs) # type: ignore @property def session(self): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py index 7270154a2db9..4b41cf982d1e 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py @@ -62,7 +62,7 @@ def _error_handler(error): return errors.ErrorAction(retry=True) -def _create_servicebus_exception(logger, exception, handler): +def _create_servicebus_exception(logger, exception, handler): # pylint: disable=too-many-statements error_need_close_handler = True error_need_raise = False if isinstance(exception, errors.MessageAlreadySettled): diff --git a/sdk/servicebus/azure-servicebus/dev_requirements.txt b/sdk/servicebus/azure-servicebus/dev_requirements.txt index f13040f4405c..6ea28753f4c8 100644 --- a/sdk/servicebus/azure-servicebus/dev_requirements.txt +++ b/sdk/servicebus/azure-servicebus/dev_requirements.txt @@ -1,3 +1,4 @@ +-e ../../core/azure-core -e ../../../tools/azure-devtools -e ../../../tools/azure-sdk-tools -e ../azure-mgmt-servicebus \ No newline at end of file