Skip to content

Commit 7d89780

Browse files
authored
Merge pull request Azure#65 from annatisch/eh_scenarios
Issue fixes
2 parents c5be0bb + da94d06 commit 7d89780

24 files changed

+261
-89
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ pip-delete-this-directory.txt
3939
azure/storage/
4040
azure/common/
4141
azure/profiles/
42+
*.log.1
43+
*.log.2
44+
*.log.3
4245

4346
htmlcov/
4447
.tox/

.travis.yml

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
language: python
22
cache: pip
3-
python:
4-
- "3.6"
5-
# command to install dependencies
3+
dist: xenial
4+
sudo: required
5+
matrix:
6+
include:
7+
- os: linux
8+
python: "3.5"
9+
- os: linux
10+
python: "3.6"
11+
- os: linux
12+
python: "3.7"
613
install:
714
- pip install -r dev_requirements.txt
815
- pip install -e .

HISTORY.rst

+15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,21 @@
33
Release History
44
===============
55

6+
1.1.0 (2018-09-21)
7+
++++++++++++++++++
8+
9+
- Changes to `AzureStorageCheckpointLeaseManager` parameters to support other connection options (issue #61):
10+
11+
- The `storage_account_name`, `storage_account_key` and `lease_container_name` arguments are now optional keyword arguments.
12+
- Added a `sas_token` argument that must be specified with `storage_account_name` in place of `storage_account_key`.
13+
- Added an `endpoint_suffix` argument to support storage endpoints in National Clouds.
14+
- Added a `connection_string` argument that, if specified, overrides all other endpoint arguments.
15+
- The `lease_container_name` argument now defaults to `"eph-leases"` if not specified.
16+
17+
- Fix for clients failing to start if run called multipled times (issue #64).
18+
- Added convenience methods `body_as_str` and `body_as_json` to EventData object for easier processing of message data.
19+
20+
621
1.0.0 (2018-08-22)
722
++++++++++++++++++
823

azure/eventhub/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
55

6-
__version__ = "1.0.0"
6+
__version__ = "1.1.0"
77

88
from azure.eventhub.common import EventData, EventHubError, Offset
99
from azure.eventhub.client import EventHubClient

azure/eventhub/async_ops/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ async def _wait_for_client(self, client):
7575

7676
async def _start_client_async(self, client):
7777
try:
78-
await client.open_async()
78+
if not client.running:
79+
await client.open_async()
7980
except Exception as exp: # pylint: disable=broad-except
8081
log.info("Encountered error while starting handler: %r", exp)
8182
await client.close_async(exception=exp)

azure/eventhub/async_ops/receiver_async.py

+17-15
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__( # pylint: disable=super-init-not-called
4040
:param loop: An event loop.
4141
"""
4242
self.loop = loop or asyncio.get_event_loop()
43+
self.running = False
4344
self.client = client
4445
self.source = source
4546
self.offset = offset
@@ -81,6 +82,7 @@ async def open_async(self):
8182
:type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync
8283
"""
8384
# pylint: disable=protected-access
85+
self.running = True
8486
if self.redirected:
8587
self.source = self.redirected.address
8688
source = Source(self.source)
@@ -171,12 +173,11 @@ async def has_started(self):
171173
timeout, auth_in_progress = await self._handler._auth.handle_token_async()
172174
if timeout:
173175
raise EventHubError("Authorization timeout.")
174-
elif auth_in_progress:
176+
if auth_in_progress:
175177
return False
176-
elif not await self._handler._client_ready_async():
178+
if not await self._handler._client_ready_async():
177179
return False
178-
else:
179-
return True
180+
return True
180181

181182
async def close_async(self, exception=None):
182183
"""
@@ -188,9 +189,10 @@ async def close_async(self, exception=None):
188189
due to an error.
189190
:type exception: Exception
190191
"""
192+
self.running = False
191193
if self.error:
192194
return
193-
elif isinstance(exception, errors.LinkRedirect):
195+
if isinstance(exception, errors.LinkRedirect):
194196
self.redirected = exception
195197
elif isinstance(exception, EventHubError):
196198
self.error = exception
@@ -216,6 +218,8 @@ async def receive(self, max_batch_size=None, timeout=None):
216218
"""
217219
if self.error:
218220
raise self.error
221+
if not self.running:
222+
raise ValueError("Unable to receive until client has been started.")
219223
data_batch = []
220224
try:
221225
timeout_ms = 1000 * timeout if timeout else 0
@@ -232,21 +236,19 @@ async def receive(self, max_batch_size=None, timeout=None):
232236
log.info("AsyncReceiver detached. Attempting reconnect.")
233237
await self.reconnect_async()
234238
return data_batch
235-
else:
236-
log.info("AsyncReceiver detached. Shutting down.")
237-
error = EventHubError(str(shutdown), shutdown)
238-
await self.close_async(exception=error)
239-
raise error
239+
log.info("AsyncReceiver detached. Shutting down.")
240+
error = EventHubError(str(shutdown), shutdown)
241+
await self.close_async(exception=error)
242+
raise error
240243
except errors.MessageHandlerError as shutdown:
241244
if self.auto_reconnect:
242245
log.info("AsyncReceiver detached. Attempting reconnect.")
243246
await self.reconnect_async()
244247
return data_batch
245-
else:
246-
log.info("AsyncReceiver detached. Shutting down.")
247-
error = EventHubError(str(shutdown), shutdown)
248-
await self.close_async(exception=error)
249-
raise error
248+
log.info("AsyncReceiver detached. Shutting down.")
249+
error = EventHubError(str(shutdown), shutdown)
250+
await self.close_async(exception=error)
251+
raise error
250252
except Exception as e:
251253
log.info("Unexpected error occurred (%r). Shutting down.", e)
252254
error = EventHubError("Receive failed: {}".format(e))

azure/eventhub/async_ops/sender_async.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__( # pylint: disable=super-init-not-called
4747
:param loop: An event loop. If not specified the default event loop will be used.
4848
"""
4949
self.loop = loop or asyncio.get_event_loop()
50+
self.running = False
5051
self.client = client
5152
self.target = target
5253
self.partition = partition
@@ -82,6 +83,7 @@ async def open_async(self):
8283
:param connection: The underlying client shared connection.
8384
:type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync
8485
"""
86+
self.running = True
8587
if self.redirected:
8688
self.target = self.redirected.address
8789
self._handler = SendClientAsync(
@@ -156,12 +158,11 @@ async def has_started(self):
156158
timeout, auth_in_progress = await self._handler._auth.handle_token_async()
157159
if timeout:
158160
raise EventHubError("Authorization timeout.")
159-
elif auth_in_progress:
161+
if auth_in_progress:
160162
return False
161-
elif not await self._handler._client_ready_async():
163+
if not await self._handler._client_ready_async():
162164
return False
163-
else:
164-
return True
165+
return True
165166

166167
async def close_async(self, exception=None):
167168
"""
@@ -173,9 +174,10 @@ async def close_async(self, exception=None):
173174
due to an error.
174175
:type exception: Exception
175176
"""
177+
self.running = False
176178
if self.error:
177179
return
178-
elif isinstance(exception, errors.LinkRedirect):
180+
if isinstance(exception, errors.LinkRedirect):
179181
self.redirected = exception
180182
elif isinstance(exception, EventHubError):
181183
self.error = exception
@@ -199,6 +201,8 @@ async def send(self, event_data):
199201
"""
200202
if self.error:
201203
raise self.error
204+
if not self.running:
205+
raise ValueError("Unable to send until client has been started.")
202206
if event_data.partition_key and self.partition:
203207
raise ValueError("EventData partition key cannot be used with a partition sender.")
204208
event_data.message.on_send_complete = self._on_outcome
@@ -238,6 +242,8 @@ async def wait_async(self):
238242
"""
239243
if self.error:
240244
raise self.error
245+
if not self.running:
246+
raise ValueError("Unable to send until client has been started.")
241247
try:
242248
await self._handler.wait_async()
243249
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:

azure/eventhub/client.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ def _close_clients(self):
233233
def _start_clients(self):
234234
for client in self.clients:
235235
try:
236-
client.open()
236+
if not client.running:
237+
client.open()
237238
except Exception as exp: # pylint: disable=broad-except
238239
client.close(exception=exp)
239240

@@ -329,8 +330,6 @@ def get_eventhub_info(self):
329330
output['partition_count'] = eh_info[b'partition_count']
330331
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
331332
return output
332-
except:
333-
raise
334333
finally:
335334
mgmt_client.close()
336335

azure/eventhub/common.py

+45-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import datetime
77
import time
8+
import json
89

910
from uamqp import Message, BatchMessage
1011
from uamqp import types, constants, errors
@@ -31,13 +32,13 @@ def _error_handler(error):
3132
"""
3233
if error.condition == b'com.microsoft:server-busy':
3334
return errors.ErrorAction(retry=True, backoff=4)
34-
elif error.condition == b'com.microsoft:timeout':
35+
if error.condition == b'com.microsoft:timeout':
3536
return errors.ErrorAction(retry=True, backoff=2)
36-
elif error.condition == b'com.microsoft:operation-cancelled':
37+
if error.condition == b'com.microsoft:operation-cancelled':
3738
return errors.ErrorAction(retry=True)
38-
elif error.condition == b"com.microsoft:container-close":
39+
if error.condition == b"com.microsoft:container-close":
3940
return errors.ErrorAction(retry=True, backoff=4)
40-
elif error.condition in _NO_RETRY_ERRORS:
41+
if error.condition in _NO_RETRY_ERRORS:
4142
return errors.ErrorAction(retry=False)
4243
return errors.ErrorAction(retry=True)
4344

@@ -88,7 +89,6 @@ def __init__(self, body=None, batch=None, to_device=None, message=None):
8889
else:
8990
self.message = Message(body, properties=self.msg_properties)
9091

91-
9292
@property
9393
def sequence_number(self):
9494
"""
@@ -188,7 +188,45 @@ def body(self):
188188
189189
:rtype: bytes or Generator[bytes]
190190
"""
191-
return self.message.get_data()
191+
try:
192+
return self.message.get_data()
193+
except TypeError:
194+
raise ValueError("Message data empty.")
195+
196+
def body_as_str(self, encoding='UTF-8'):
197+
"""
198+
The body of the event data as a string if the data is of a
199+
compatible type.
200+
201+
:param encoding: The encoding to use for decoding message data.
202+
Default is 'UTF-8'
203+
:rtype: str
204+
"""
205+
data = self.body
206+
try:
207+
return "".join(b.decode(encoding) for b in data)
208+
except TypeError:
209+
return str(data)
210+
except: # pylint: disable=bare-except
211+
pass
212+
try:
213+
return data.decode(encoding)
214+
except Exception as e:
215+
raise TypeError("Message data is not compatible with string type: {}".format(e))
216+
217+
def body_as_json(self, encoding='UTF-8'):
218+
"""
219+
The body of the event loaded as a JSON object is the data is compatible.
220+
221+
:param encoding: The encoding to use for decoding message data.
222+
Default is 'UTF-8'
223+
:rtype: dict
224+
"""
225+
data_str = self.body_as_str(encoding=encoding)
226+
try:
227+
return json.loads(data_str)
228+
except Exception as e:
229+
raise TypeError("Event data is not compatible with JSON type: {}".format(e))
192230

193231

194232
class Offset(object):
@@ -231,7 +269,7 @@ def selector(self):
231269
if isinstance(self.value, datetime.datetime):
232270
timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000)
233271
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
234-
elif isinstance(self.value, int):
272+
if isinstance(self.value, int):
235273
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
236274
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
237275

0 commit comments

Comments
 (0)