Skip to content

Further refactoring #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 15, 2019
14 changes: 10 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
__version__ = "5.0.0b6"
from uamqp import constants # type: ignore
from .common import EventData, EventDataBatch, EventPosition
from .error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from ._common import EventData, EventDataBatch, EventPosition
from ._producer_client import EventHubProducerClient
from ._consumer_client import EventHubConsumerClient
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential
from ._common import EventHubSharedKeyCredential, EventHubSASTokenCredential
from ._eventprocessor.partition_manager import PartitionManager
from ._eventprocessor.common import CloseReason, OwnershipLostError
from ._eventprocessor.partition_context import PartitionContext
from .exceptions import (
EventHubError,
EventDataError,
ConnectError,
AuthenticationError,
EventDataSendError,
ConnectionLostError
)

TransportType = constants.TransportType

Expand Down
186 changes: 124 additions & 62 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,34 @@
import functools
import threading
from typing import Any, TYPE_CHECKING

import uamqp # type: ignore
from uamqp import Message # type: ignore
from uamqp import authentication # type: ignore
from uamqp import constants # type: ignore

from uamqp import types # type: ignore
from azure.eventhub import __version__
from .configuration import Configuration
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address, parse_sas_token
from .error import _handle_exception
from ._connection_manager import get_connection_manager

try:
from urlparse import urlparse # type: ignore
from urllib import urlencode, quote_plus # type: ignore
except ImportError:
from urllib.parse import urlparse, urlencode, quote_plus

from uamqp import (
AMQPClient,
Message,
authentication,
constants,
errors,
compat
)

from azure.eventhub import __version__
from .exceptions import _handle_exception, EventHubError
from ._configuration import Configuration
from ._utils import parse_sas_token
from ._common import EventHubSharedKeyCredential, EventHubSASTokenCredential
from ._connection_manager import get_connection_manager
from ._constants import (
CONTAINER_PREFIX,
JWT_TOKEN_SCOPE,
MGMT_OPERATION,
MGMT_PARTITION_OPERATION
)

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential # type: ignore

Expand Down Expand Up @@ -94,11 +103,17 @@ def _build_uri(address, entity):
return address


class _Address(object):
def __init__(self, hostname=None, path=None):
self.hostname = hostname
self.path = path


class ClientBase(object): # pylint:disable=too-many-instance-attributes
def __init__(self, host, event_hub_path, credential, **kwargs):
self.eh_name = event_hub_path
self._host = host
self._container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8]
self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8]
self._address = _Address()
self._address.hostname = host
self._address.path = "/" + event_hub_path if event_hub_path else ""
Expand All @@ -110,15 +125,26 @@ def __init__(self, host, event_hub_path, credential, **kwargs):
self._config = Configuration(**kwargs)
self._debug = self._config.network_tracing
self._conn_manager = get_connection_manager(**kwargs)
self._lock = threading.RLock()
log.info("%r: Created the Event Hub client", self._container_id)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
def __exit__(self, *args):
self.close()

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
event_hub_path = kwargs.pop("event_hub_path", None)
address, policy, key, entity = _parse_conn_str(conn_str)
entity = event_hub_path or entity
left_slash_pos = address.find("//")
if left_slash_pos != -1:
host = address[left_slash_pos + 2:]
else:
host = address
return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs)

def _create_auth(self):
"""
Create an ~uamqp.authentication.SASTokenAuth instance to authenticate
Expand Down Expand Up @@ -153,42 +179,11 @@ def _create_auth(self):
transport_type=transport_type)

else: # Azure credential
get_jwt_token = functools.partial(self._credential.get_token,
'https://eventhubs.azure.net//.default')
get_jwt_token = functools.partial(self._credential.get_token, JWT_TOKEN_SCOPE)
return authentication.JWTTokenAuth(self._auth_uri, self._auth_uri,
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)

@classmethod
def _create_properties(cls, user_agent=None): # pylint: disable=no-self-use
"""
Format the properties with which to instantiate the connection.
This acts like a user agent over HTTP.

:rtype: dict
"""
properties = {}
product = "azsdk-python-eventhubs"
properties[types.AMQPSymbol("product")] = product
properties[types.AMQPSymbol("version")] = __version__
framework = "Python {}.{}.{}, {}".format(
sys.version_info[0], sys.version_info[1], sys.version_info[2], platform.python_implementation()
)
properties[types.AMQPSymbol("framework")] = framework
platform_str = platform.platform()
properties[types.AMQPSymbol("platform")] = platform_str

final_user_agent = '{}/{} ({}, {})'.format(product, __version__, framework, platform_str)
if user_agent:
final_user_agent = '{}, {}'.format(final_user_agent, user_agent)

if len(final_user_agent) > MAX_USER_AGENT_LENGTH:
raise ValueError("The user-agent string cannot be more than {} in length."
"Current user_agent string is: {} with length: {}".format(
MAX_USER_AGENT_LENGTH, final_user_agent, len(final_user_agent)))
properties[types.AMQPSymbol("user-agent")] = final_user_agent
return properties

def _close_connection(self):
self._conn_manager.reset_connection_if_broken()

Expand Down Expand Up @@ -234,18 +229,6 @@ def _add_span_request_attributes(self, span):
span.add_attribute("message_bus.destination", self._address.path)
span.add_attribute("peer.address", self._address.hostname)

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
event_hub_path = kwargs.pop("event_hub_path", None)
address, policy, key, entity = _parse_conn_str(conn_str)
entity = event_hub_path or entity
left_slash_pos = address.find("//")
if left_slash_pos != -1:
host = address[left_slash_pos + 2:]
else:
host = address
return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs)

def get_properties(self):
# type:() -> Dict[str, Any]
"""
Expand All @@ -260,7 +243,7 @@ def get_properties(self):
:raises: :class:`EventHubError<azure.eventhub.EventHubError>`
"""
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
response = self._management_request(mgmt_msg, op_type=MGMT_OPERATION)
output = {}
eh_info = response.get_data()
if eh_info:
Expand Down Expand Up @@ -300,7 +283,7 @@ def get_partition_properties(self, partition):
"""
mgmt_msg = Message(application_properties={'name': self.eh_name,
'partition': partition})
response = self._management_request(mgmt_msg, op_type=b'com.microsoft:partition')
response = self._management_request(mgmt_msg, op_type=MGMT_PARTITION_OPERATION)
partition_info = response.get_data()
output = {}
if partition_info:
Expand All @@ -317,3 +300,82 @@ def get_partition_properties(self, partition):
def close(self):
# type:() -> None
self._conn_manager.close_connection()


class ConsumerProducerMixin(object):

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def _check_closed(self):
if self.closed:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _open(self):
"""Open the EventHubConsumer/EventHubProducer using the supplied connection.

"""
# pylint: disable=protected-access
if not self.running:
if self._handler:
self._handler.close()
self._create_handler()
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
self._client._address.hostname,
self._client._create_auth()
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True

def _close_handler(self):
if self._handler:
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False

def _close_connection(self):
self._close_handler()
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, self)

def _do_retryable_operation(self, operation, timeout=100000, **kwargs):
# pylint:disable=protected-access
# timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + (timeout if timeout else 100000)
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

log.info("%r operation has exhausted retry. Last exception: %r.", self._name, last_exception)
raise last_exception

def close(self):
# type:() -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op.
"""
self.running = False
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
self.closed = True
Loading