Skip to content

Commit ff197f3

Browse files
authored
Updates for v1.3.0 (Azure#91)
* Added support for storing the state of the Event Processor along the Checkpoint. Both Checkpoint and the EP state are stored as pickled objects. * Fixing pylint complaints. * Switched from pickle back to JSON for lease persistence. * Fixes bug when accessing leases that don't contain EP context. Also, minor renaming. * Better SAS token support * Fixed pylint * Improved auth error handling * Test stabilization * Improved stored EPH context * Updated EPH context storing * Skip test on OSX * Skip tests on OSX Fail due to large message body bug. * Some cleanup * Fixed error handling * Improved SAS token parsing
1 parent dbae147 commit ff197f3

35 files changed

+824
-183
lines changed

HISTORY.rst

+17-3
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,28 @@
33
Release History
44
===============
55

6-
1.2.0
7-
+++++
6+
1.3.0 (2019-01-29)
7+
++++++++++++++++++
8+
9+
**Bugfixes**
10+
11+
- Added support for auto reconnect on token expiration and other auth errors (issue #89).
12+
13+
**Features**
14+
15+
- Added ability to create ServiceBusClient from an existing SAS auth token, including
16+
provding a function to auto-renew that token on expiry.
17+
- Added support for storing a custom EPH context value in checkpoint (PR #84, thanks @konstantinmiller)
18+
19+
20+
1.2.0 (2018-11-29)
21+
++++++++++++++++++
822

923
- Support for Python 2.7 in azure.eventhub module (azure.eventprocessorhost will not support Python 2.7).
1024
- Parse EventData.enqueued_time as a UTC timestamp (issue #72, thanks @vjrantal)
1125

1226

13-
1.1.1 (2019-10-03)
27+
1.1.1 (2018-10-03)
1428
++++++++++++++++++
1529

1630
- Fixed bug in Azure namespace package.

azure/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11

2-
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
2+
__path__ = __import__('pkgutil').extend_path(__path__, __name__)

azure/eventhub/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
55

6-
__version__ = "1.2.0"
6+
__version__ = "1.3.0"
77

88
from azure.eventhub.common import EventData, EventHubError, Offset
99
from azure.eventhub.client import EventHubClient

azure/eventhub/async_ops/__init__.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
SendClientAsync,
1818
ReceiveClientAsync)
1919

20+
from azure.eventhub.common import parse_sas_token
2021
from azure.eventhub import (
2122
Sender,
2223
Receiver,
@@ -37,18 +38,28 @@ class EventHubClientAsync(EventHubClient):
3738
sending events to and receiving events from the Azure Event Hubs service.
3839
"""
3940

40-
def _create_auth(self, username=None, password=None): # pylint: disable=no-self-use
41+
def _create_auth(self, username=None, password=None):
4142
"""
4243
Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate
4344
the session.
4445
45-
:param auth_uri: The URI to authenticate against.
46-
:type auth_uri: str
4746
:param username: The name of the shared access policy.
4847
:type username: str
4948
:param password: The shared access key.
5049
:type password: str
5150
"""
51+
if self.sas_token:
52+
token = self.sas_token() if callable(self.sas_token) else self.sas_token
53+
try:
54+
expiry = int(parse_sas_token(token)['se'])
55+
except (KeyError, TypeError, IndexError):
56+
raise ValueError("Supplied SAS token has no valid expiry value.")
57+
return authentication.SASTokenAsync(
58+
self.auth_uri, self.auth_uri, token,
59+
expires_at=expiry,
60+
timeout=self.auth_timeout,
61+
http_proxy=self.http_proxy)
62+
5263
username = username or self._auth_config['username']
5364
password = password or self._auth_config['password']
5465
if "@sas.root" in username:

azure/eventhub/async_ops/receiver_async.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ async def open_async(self):
107107
while not await self._handler.client_ready_async():
108108
await asyncio.sleep(0.05)
109109

110-
async def reconnect_async(self):
110+
async def reconnect_async(self): # pylint: disable=too-many-statements
111111
"""If the Receiver was disconnected from the service with
112112
a retryable error - attempt to reconnect."""
113113
# pylint: disable=protected-access
@@ -134,6 +134,11 @@ async def reconnect_async(self):
134134
await self._handler.open_async()
135135
while not await self._handler.client_ready_async():
136136
await asyncio.sleep(0.05)
137+
except errors.TokenExpired as shutdown:
138+
log.info("AsyncReceiver disconnected due to token expiry. Shutting down.")
139+
error = EventHubError(str(shutdown), shutdown)
140+
await self.close_async(exception=error)
141+
raise error
137142
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
138143
if shutdown.action.retry and self.auto_reconnect:
139144
log.info("AsyncReceiver detached. Attempting reconnect.")
@@ -152,6 +157,15 @@ async def reconnect_async(self):
152157
error = EventHubError(str(shutdown), shutdown)
153158
await self.close_async(exception=error)
154159
raise error
160+
except errors.AMQPConnectionError as shutdown:
161+
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
162+
log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.")
163+
await self.reconnect_async()
164+
else:
165+
log.info("AsyncReceiver connection error (%r). Shutting down.", e)
166+
error = EventHubError(str(shutdown), shutdown)
167+
await self.close_async(exception=error)
168+
raise error
155169
except Exception as e:
156170
log.info("Unexpected error occurred (%r). Shutting down.", e)
157171
error = EventHubError("Receiver reconnect failed: {}".format(e))
@@ -232,6 +246,10 @@ async def receive(self, max_batch_size=None, timeout=None):
232246
self.offset = event_data.offset
233247
data_batch.append(event_data)
234248
return data_batch
249+
except (errors.TokenExpired, errors.AuthenticationException):
250+
log.info("AsyncReceiver disconnected due to token error. Attempting reconnect.")
251+
await self.reconnect_async()
252+
return data_batch
235253
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
236254
if shutdown.action.retry and self.auto_reconnect:
237255
log.info("AsyncReceiver detached. Attempting reconnect.")

azure/eventhub/async_ops/sender_async.py

+20
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ async def reconnect_async(self):
119119
await self._handler.open_async()
120120
self._handler.queue_message(*unsent_events)
121121
await self._handler.wait_async()
122+
except errors.TokenExpired as shutdown:
123+
log.info("AsyncSender disconnected due to token expiry. Shutting down.")
124+
error = EventHubError(str(shutdown), shutdown)
125+
await self.close_async(exception=error)
126+
raise error
122127
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
123128
if shutdown.action.retry and self.auto_reconnect:
124129
log.info("AsyncSender detached. Attempting reconnect.")
@@ -137,6 +142,15 @@ async def reconnect_async(self):
137142
error = EventHubError(str(shutdown), shutdown)
138143
await self.close_async(exception=error)
139144
raise error
145+
except errors.AMQPConnectionError as shutdown:
146+
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
147+
log.info("AsyncSender couldn't authenticate. Attempting reconnect.")
148+
await self.reconnect_async()
149+
else:
150+
log.info("AsyncSender connection error (%r). Shutting down.", e)
151+
error = EventHubError(str(shutdown), shutdown)
152+
await self.close_async(exception=error)
153+
raise error
140154
except Exception as e:
141155
log.info("Unexpected error occurred (%r). Shutting down.", e)
142156
error = EventHubError("Sender reconnect failed: {}".format(e))
@@ -211,6 +225,9 @@ async def send(self, event_data):
211225
await self._handler.send_message_async(event_data.message)
212226
if self._outcome != constants.MessageSendResult.Ok:
213227
raise Sender._error(self._outcome, self._condition)
228+
except (errors.TokenExpired, errors.AuthenticationException):
229+
log.info("AsyncSender disconnected due to token error. Attempting reconnect.")
230+
await self.reconnect_async()
214231
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
215232
if shutdown.action.retry and self.auto_reconnect:
216233
log.info("AsyncSender detached. Attempting reconnect.")
@@ -247,6 +264,9 @@ async def wait_async(self):
247264
raise ValueError("Unable to send until client has been started.")
248265
try:
249266
await self._handler.wait_async()
267+
except (errors.TokenExpired, errors.AuthenticationException):
268+
log.info("AsyncSender disconnected due to token error. Attempting reconnect.")
269+
await self.reconnect_async()
250270
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
251271
if shutdown.action.retry and self.auto_reconnect:
252272
log.info("AsyncSender detached. Attempting reconnect.")

azure/eventhub/client.py

+51-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
from azure.eventhub import __version__
2525
from azure.eventhub.sender import Sender
2626
from azure.eventhub.receiver import Receiver
27-
from azure.eventhub.common import EventHubError
27+
from azure.eventhub.common import EventHubError, parse_sas_token
28+
2829

2930
log = logging.getLogger(__name__)
3031

@@ -90,7 +91,9 @@ class EventHubClient(object):
9091
events to and receiving events from the Azure Event Hubs service.
9192
"""
9293

93-
def __init__(self, address, username=None, password=None, debug=False, http_proxy=None, auth_timeout=60):
94+
def __init__(
95+
self, address, username=None, password=None, debug=False,
96+
http_proxy=None, auth_timeout=60, sas_token=None):
9497
"""
9598
Constructs a new EventHubClient with the given address URL.
9699
@@ -113,8 +116,13 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox
113116
:param auth_timeout: The time in seconds to wait for a token to be authorized by the service.
114117
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
115118
:type auth_timeout: int
119+
:param sas_token: A SAS token or function that returns a SAS token. If a function is supplied,
120+
it will be used to retrieve subsequent tokens in the case of token expiry. The function should
121+
take no arguments.
122+
:type sas_token: str or callable
116123
"""
117124
self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8]
125+
self.sas_token = sas_token
118126
self.address = urlparse(address)
119127
self.eh_name = self.address.path.lstrip('/')
120128
self.http_proxy = http_proxy
@@ -123,8 +131,8 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox
123131
username = username or url_username
124132
url_password = unquote_plus(self.address.password) if self.address.password else None
125133
password = password or url_password
126-
if not username or not password:
127-
raise ValueError("Missing username and/or password.")
134+
if (not username or not password) and not sas_token:
135+
raise ValueError("Please supply either username and password, or a SAS token")
128136
self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path)
129137
self._auth_config = {'username': username, 'password': password}
130138
self.get_auth = functools.partial(self._create_auth)
@@ -136,9 +144,34 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox
136144
log.info("%r: Created the Event Hub client", self.container_id)
137145

138146
@classmethod
139-
def from_connection_string(cls, conn_str, eventhub=None, **kwargs):
147+
def from_sas_token(cls, address, sas_token, eventhub=None, **kwargs):
148+
"""Create an EventHubClient from an existing auth token or token generator.
149+
150+
:param address: The Event Hub address URL
151+
:type address: str
152+
:param sas_token: A SAS token or function that returns a SAS token. If a function is supplied,
153+
it will be used to retrieve subsequent tokens in the case of token expiry. The function should
154+
take no arguments.
155+
:type sas_token: str or callable
156+
:param eventhub: The name of the EventHub, if not already included in the address URL.
157+
:type eventhub: str
158+
:param debug: Whether to output network trace logs to the logger. Default
159+
is `False`.
160+
:type debug: bool
161+
:param http_proxy: HTTP proxy settings. This must be a dictionary with the following
162+
keys: 'proxy_hostname' (str value) and 'proxy_port' (int value).
163+
Additionally the following keys may also be present: 'username', 'password'.
164+
:type http_proxy: dict[str, Any]
165+
:param auth_timeout: The time in seconds to wait for a token to be authorized by the service.
166+
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
167+
:type auth_timeout: int
140168
"""
141-
Create an EventHubClient from a connection string.
169+
address = _build_uri(address, eventhub)
170+
return cls(address, sas_token=sas_token, **kwargs)
171+
172+
@classmethod
173+
def from_connection_string(cls, conn_str, eventhub=None, **kwargs):
174+
"""Create an EventHubClient from a connection string.
142175
143176
:param conn_str: The connection string.
144177
:type conn_str: str
@@ -196,13 +229,23 @@ def _create_auth(self, username=None, password=None):
196229
Create an ~uamqp.authentication.SASTokenAuth instance to authenticate
197230
the session.
198231
199-
:param auth_uri: The URI to authenticate against.
200-
:type auth_uri: str
201232
:param username: The name of the shared access policy.
202233
:type username: str
203234
:param password: The shared access key.
204235
:type password: str
205236
"""
237+
if self.sas_token:
238+
token = self.sas_token() if callable(self.sas_token) else self.sas_token
239+
try:
240+
expiry = int(parse_sas_token(token)['se'])
241+
except (KeyError, TypeError, IndexError):
242+
raise ValueError("Supplied SAS token has no valid expiry value.")
243+
return authentication.SASTokenAuth(
244+
self.auth_uri, self.auth_uri, token,
245+
expires_at=expiry,
246+
timeout=self.auth_timeout,
247+
http_proxy=self.http_proxy)
248+
206249
username = username or self._auth_config['username']
207250
password = password or self._auth_config['password']
208251
if "@sas.root" in username:

azure/eventhub/common.py

+16
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,22 @@ def _error_handler(error):
4646
return errors.ErrorAction(retry=True)
4747

4848

49+
def parse_sas_token(sas_token):
50+
"""Parse a SAS token into its components.
51+
52+
:param sas_token: The SAS token.
53+
:type sas_token: str
54+
:rtype: dict[str, str]
55+
"""
56+
sas_data = {}
57+
token = sas_token.partition(' ')[2]
58+
fields = token.split('&')
59+
for field in fields:
60+
key, value = field.split('=', 1)
61+
sas_data[key.lower()] = value
62+
return sas_data
63+
64+
4965
class EventData(object):
5066
"""
5167
The EventData class is a holder of event content.

0 commit comments

Comments
 (0)