Skip to content

Commit 9217139

Browse files
authored
Fixed datetime offset (Azure#99)
* Fixed datetime offset * Updated pylint * Removed 3.4 pylint pass
1 parent ff197f3 commit 9217139

16 files changed

+165
-113
lines changed

.travis.yml

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ matrix:
1717
script:
1818
- pytest
1919
- python ./setup.py check -r -s
20-
- pylint --ignore=async_ops azure.eventhub
2120
- os: linux
2221
python: "3.5"
2322
script:

HISTORY.rst

+9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@
33
Release History
44
===============
55

6+
1.3.1 (2019-02-28)
7+
++++++++++++++++++
8+
9+
**BugFixes**
10+
11+
- Fixed bug where datetime offset filter was using a local timestamp rather than UTC.
12+
- Fixed stackoverflow error in continuous connection reconnect attempts.
13+
14+
615
1.3.0 (2019-01-29)
716
++++++++++++++++++
817

azure/eventhub/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
55

6-
__version__ = "1.3.0"
6+
__version__ = "1.3.1"
77

88
from azure.eventhub.common import EventData, EventHubError, Offset
99
from azure.eventhub.client import EventHubClient

azure/eventhub/async_ops/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async def run_async(self):
123123
if failed and len(failed) == len(self.clients):
124124
log.warning("%r: All clients failed to start.", self.container_id)
125125
raise failed[0]
126-
elif failed:
126+
if failed:
127127
log.warning("%r: %r clients failed to start.", self.container_id, len(failed))
128128
elif redirects:
129129
await self._handle_redirect(redirects)

azure/eventhub/async_ops/receiver_async.py

+24-21
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__( # pylint: disable=super-init-not-called
4949
self.keep_alive = keep_alive
5050
self.auto_reconnect = auto_reconnect
5151
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
52+
self.reconnect_backoff = 1
5253
self.redirected = None
5354
self.error = None
5455
self.properties = None
@@ -107,9 +108,7 @@ async def open_async(self):
107108
while not await self._handler.client_ready_async():
108109
await asyncio.sleep(0.05)
109110

110-
async def reconnect_async(self): # pylint: disable=too-many-statements
111-
"""If the Receiver was disconnected from the service with
112-
a retryable error - attempt to reconnect."""
111+
async def _reconnect_async(self): # pylint: disable=too-many-statements
113112
# pylint: disable=protected-access
114113
alt_creds = {
115114
"username": self.client._auth_config.get("iot_username"),
@@ -134,6 +133,7 @@ async def reconnect_async(self): # pylint: disable=too-many-statements
134133
await self._handler.open_async()
135134
while not await self._handler.client_ready_async():
136135
await asyncio.sleep(0.05)
136+
return True
137137
except errors.TokenExpired as shutdown:
138138
log.info("AsyncReceiver disconnected due to token expiry. Shutting down.")
139139
error = EventHubError(str(shutdown), shutdown)
@@ -142,36 +142,39 @@ async def reconnect_async(self): # pylint: disable=too-many-statements
142142
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
143143
if shutdown.action.retry and self.auto_reconnect:
144144
log.info("AsyncReceiver detached. Attempting reconnect.")
145-
await self.reconnect_async()
146-
else:
147-
log.info("AsyncReceiver detached. Shutting down.")
148-
error = EventHubError(str(shutdown), shutdown)
149-
await self.close_async(exception=error)
150-
raise error
145+
return False
146+
log.info("AsyncReceiver detached. Shutting down.")
147+
error = EventHubError(str(shutdown), shutdown)
148+
await self.close_async(exception=error)
149+
raise error
151150
except errors.MessageHandlerError as shutdown:
152151
if self.auto_reconnect:
153152
log.info("AsyncReceiver detached. Attempting reconnect.")
154-
await self.reconnect_async()
155-
else:
156-
log.info("AsyncReceiver detached. Shutting down.")
157-
error = EventHubError(str(shutdown), shutdown)
158-
await self.close_async(exception=error)
159-
raise error
153+
return False
154+
log.info("AsyncReceiver detached. Shutting down.")
155+
error = EventHubError(str(shutdown), shutdown)
156+
await self.close_async(exception=error)
157+
raise error
160158
except errors.AMQPConnectionError as shutdown:
161159
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
162160
log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.")
163-
await self.reconnect_async()
164-
else:
165-
log.info("AsyncReceiver connection error (%r). Shutting down.", e)
166-
error = EventHubError(str(shutdown), shutdown)
167-
await self.close_async(exception=error)
168-
raise error
161+
return False
162+
log.info("AsyncReceiver connection error (%r). Shutting down.", e)
163+
error = EventHubError(str(shutdown), shutdown)
164+
await self.close_async(exception=error)
165+
raise error
169166
except Exception as e:
170167
log.info("Unexpected error occurred (%r). Shutting down.", e)
171168
error = EventHubError("Receiver reconnect failed: {}".format(e))
172169
await self.close_async(exception=error)
173170
raise error
174171

172+
async def reconnect_async(self):
173+
"""If the Receiver was disconnected from the service with
174+
a retryable error - attempt to reconnect."""
175+
while not await self._reconnect_async():
176+
await asyncio.sleep(self.reconnect_backoff)
177+
175178
async def has_started(self):
176179
"""
177180
Whether the handler has completed all start up processes such as

azure/eventhub/async_ops/sender_async.py

+24-21
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__( # pylint: disable=super-init-not-called
5555
self.auto_reconnect = auto_reconnect
5656
self.timeout = send_timeout
5757
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
58+
self.reconnect_backoff = 1
5859
self.name = "EHSender-{}".format(uuid.uuid4())
5960
self.redirected = None
6061
self.error = None
@@ -100,9 +101,7 @@ async def open_async(self):
100101
while not await self._handler.client_ready_async():
101102
await asyncio.sleep(0.05)
102103

103-
async def reconnect_async(self):
104-
"""If the Receiver was disconnected from the service with
105-
a retryable error - attempt to reconnect."""
104+
async def _reconnect_async(self):
106105
await self._handler.close_async()
107106
unsent_events = self._handler.pending_messages
108107
self._handler = SendClientAsync(
@@ -119,6 +118,7 @@ async def reconnect_async(self):
119118
await self._handler.open_async()
120119
self._handler.queue_message(*unsent_events)
121120
await self._handler.wait_async()
121+
return True
122122
except errors.TokenExpired as shutdown:
123123
log.info("AsyncSender disconnected due to token expiry. Shutting down.")
124124
error = EventHubError(str(shutdown), shutdown)
@@ -127,36 +127,39 @@ async def reconnect_async(self):
127127
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
128128
if shutdown.action.retry and self.auto_reconnect:
129129
log.info("AsyncSender detached. Attempting reconnect.")
130-
await self.reconnect_async()
131-
else:
132-
log.info("AsyncSender reconnect failed. Shutting down.")
133-
error = EventHubError(str(shutdown), shutdown)
134-
await self.close_async(exception=error)
135-
raise error
130+
return False
131+
log.info("AsyncSender reconnect failed. Shutting down.")
132+
error = EventHubError(str(shutdown), shutdown)
133+
await self.close_async(exception=error)
134+
raise error
136135
except errors.MessageHandlerError as shutdown:
137136
if self.auto_reconnect:
138137
log.info("AsyncSender detached. Attempting reconnect.")
139-
await self.reconnect_async()
140-
else:
141-
log.info("AsyncSender reconnect failed. Shutting down.")
142-
error = EventHubError(str(shutdown), shutdown)
143-
await self.close_async(exception=error)
144-
raise error
138+
return False
139+
log.info("AsyncSender reconnect failed. Shutting down.")
140+
error = EventHubError(str(shutdown), shutdown)
141+
await self.close_async(exception=error)
142+
raise error
145143
except errors.AMQPConnectionError as shutdown:
146144
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
147145
log.info("AsyncSender couldn't authenticate. Attempting reconnect.")
148-
await self.reconnect_async()
149-
else:
150-
log.info("AsyncSender connection error (%r). Shutting down.", e)
151-
error = EventHubError(str(shutdown), shutdown)
152-
await self.close_async(exception=error)
153-
raise error
146+
return False
147+
log.info("AsyncSender connection error (%r). Shutting down.", e)
148+
error = EventHubError(str(shutdown), shutdown)
149+
await self.close_async(exception=error)
150+
raise error
154151
except Exception as e:
155152
log.info("Unexpected error occurred (%r). Shutting down.", e)
156153
error = EventHubError("Sender reconnect failed: {}".format(e))
157154
await self.close_async(exception=error)
158155
raise error
159156

157+
async def reconnect_async(self):
158+
"""If the Receiver was disconnected from the service with
159+
a retryable error - attempt to reconnect."""
160+
while not await self._reconnect_async():
161+
await asyncio.sleep(self.reconnect_backoff)
162+
160163
async def has_started(self):
161164
"""
162165
Whether the handler has completed all start up processes such as

azure/eventhub/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ def run(self):
320320
if failed and len(failed) == len(self.clients):
321321
log.warning("%r: All clients failed to start.", self.container_id)
322322
raise failed[0]
323-
elif failed:
323+
if failed:
324324
log.warning("%r: %r clients failed to start.", self.container_id, len(failed))
325325
elif redirects:
326326
self._handle_redirect(redirects)

azure/eventhub/common.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from __future__ import unicode_literals
66

77
import datetime
8-
import time
8+
import calendar
99
import json
1010

1111
import six
@@ -288,7 +288,7 @@ def selector(self):
288288
"""
289289
operator = ">=" if self.inclusive else ">"
290290
if isinstance(self.value, datetime.datetime):
291-
timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000)
291+
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
292292
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
293293
if isinstance(self.value, six.integer_types):
294294
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')

azure/eventhub/receiver.py

+24-21
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_a
4747
self.keep_alive = keep_alive
4848
self.auto_reconnect = auto_reconnect
4949
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
50+
self.reconnect_backoff = 1
5051
self.properties = None
5152
self.redirected = None
5253
self.error = None
@@ -103,9 +104,7 @@ def open(self):
103104
while not self._handler.client_ready():
104105
time.sleep(0.05)
105106

106-
def reconnect(self): # pylint: disable=too-many-statements
107-
"""If the Receiver was disconnected from the service with
108-
a retryable error - attempt to reconnect."""
107+
def _reconnect(self): # pylint: disable=too-many-statements
109108
# pylint: disable=protected-access
110109
alt_creds = {
111110
"username": self.client._auth_config.get("iot_username"),
@@ -129,6 +128,7 @@ def reconnect(self): # pylint: disable=too-many-statements
129128
self._handler.open()
130129
while not self._handler.client_ready():
131130
time.sleep(0.05)
131+
return True
132132
except errors.TokenExpired as shutdown:
133133
log.info("Receiver disconnected due to token expiry. Shutting down.")
134134
error = EventHubError(str(shutdown), shutdown)
@@ -137,36 +137,39 @@ def reconnect(self): # pylint: disable=too-many-statements
137137
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
138138
if shutdown.action.retry and self.auto_reconnect:
139139
log.info("Receiver detached. Attempting reconnect.")
140-
self.reconnect()
141-
else:
142-
log.info("Receiver detached. Shutting down.")
143-
error = EventHubError(str(shutdown), shutdown)
144-
self.close(exception=error)
145-
raise error
140+
return False
141+
log.info("Receiver detached. Shutting down.")
142+
error = EventHubError(str(shutdown), shutdown)
143+
self.close(exception=error)
144+
raise error
146145
except errors.MessageHandlerError as shutdown:
147146
if self.auto_reconnect:
148147
log.info("Receiver detached. Attempting reconnect.")
149-
self.reconnect()
150-
else:
151-
log.info("Receiver detached. Shutting down.")
152-
error = EventHubError(str(shutdown), shutdown)
153-
self.close(exception=error)
154-
raise error
148+
return False
149+
log.info("Receiver detached. Shutting down.")
150+
error = EventHubError(str(shutdown), shutdown)
151+
self.close(exception=error)
152+
raise error
155153
except errors.AMQPConnectionError as shutdown:
156154
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
157155
log.info("Receiver couldn't authenticate. Attempting reconnect.")
158-
self.reconnect()
159-
else:
160-
log.info("Receiver connection error (%r). Shutting down.", e)
161-
error = EventHubError(str(shutdown), shutdown)
162-
self.close(exception=error)
163-
raise error
156+
return False
157+
log.info("Receiver connection error (%r). Shutting down.", e)
158+
error = EventHubError(str(shutdown))
159+
self.close(exception=error)
160+
raise error
164161
except Exception as e:
165162
log.info("Unexpected error occurred (%r). Shutting down.", e)
166163
error = EventHubError("Receiver reconnect failed: {}".format(e))
167164
self.close(exception=error)
168165
raise error
169166

167+
def reconnect(self):
168+
"""If the Receiver was disconnected from the service with
169+
a retryable error - attempt to reconnect."""
170+
while not self._reconnect():
171+
time.sleep(self.reconnect_backoff)
172+
170173
def get_handler_state(self):
171174
"""
172175
Get the state of the underlying handler with regards to start

0 commit comments

Comments
 (0)