13
13
import functools
14
14
import threading
15
15
from typing import Any , TYPE_CHECKING
16
-
17
- import uamqp # type: ignore
18
- from uamqp import Message # type: ignore
19
- from uamqp import authentication # type: ignore
20
- from uamqp import constants # type: ignore
21
-
22
- from uamqp import types # type: ignore
23
- from azure .eventhub import __version__
24
- from .configuration import Configuration
25
- from .common import EventHubSharedKeyCredential , EventHubSASTokenCredential , _Address , parse_sas_token
26
- from .error import _handle_exception
27
- from ._connection_manager import get_connection_manager
28
-
29
16
try :
30
17
from urlparse import urlparse # type: ignore
31
18
from urllib import urlencode , quote_plus # type: ignore
32
19
except ImportError :
33
20
from urllib .parse import urlparse , urlencode , quote_plus
34
21
22
+ from uamqp import (
23
+ AMQPClient ,
24
+ Message ,
25
+ authentication ,
26
+ constants ,
27
+ errors ,
28
+ compat
29
+ )
30
+
31
+ from azure .eventhub import __version__
32
+ from .exceptions import _handle_exception , EventHubError
33
+ from ._configuration import Configuration
34
+ from ._utils import parse_sas_token
35
+ from ._common import EventHubSharedKeyCredential , EventHubSASTokenCredential
36
+ from ._connection_manager import get_connection_manager
37
+ from ._constants import (
38
+ CONTAINER_PREFIX ,
39
+ JWT_TOKEN_SCOPE ,
40
+ MGMT_OPERATION ,
41
+ MGMT_PARTITION_OPERATION
42
+ )
43
+
35
44
if TYPE_CHECKING :
36
45
from azure .core .credentials import TokenCredential # type: ignore
37
46
@@ -94,11 +103,17 @@ def _build_uri(address, entity):
94
103
return address
95
104
96
105
106
+ class _Address (object ):
107
+ def __init__ (self , hostname = None , path = None ):
108
+ self .hostname = hostname
109
+ self .path = path
110
+
111
+
97
112
class ClientBase (object ): # pylint:disable=too-many-instance-attributes
98
113
def __init__ (self , host , event_hub_path , credential , ** kwargs ):
99
114
self .eh_name = event_hub_path
100
115
self ._host = host
101
- self ._container_id = "eventhub.pysdk-" + str (uuid .uuid4 ())[:8 ]
116
+ self ._container_id = CONTAINER_PREFIX + str (uuid .uuid4 ())[:8 ]
102
117
self ._address = _Address ()
103
118
self ._address .hostname = host
104
119
self ._address .path = "/" + event_hub_path if event_hub_path else ""
@@ -110,15 +125,26 @@ def __init__(self, host, event_hub_path, credential, **kwargs):
110
125
self ._config = Configuration (** kwargs )
111
126
self ._debug = self ._config .network_tracing
112
127
self ._conn_manager = get_connection_manager (** kwargs )
113
- self ._lock = threading .RLock ()
114
128
log .info ("%r: Created the Event Hub client" , self ._container_id )
115
129
116
130
def __enter__ (self ):
117
131
return self
118
132
119
- def __exit__ (self , exc_type , exc_val , exc_tb ):
133
+ def __exit__ (self , * args ):
120
134
self .close ()
121
135
136
+ @classmethod
137
+ def from_connection_string (cls , conn_str , ** kwargs ):
138
+ event_hub_path = kwargs .pop ("event_hub_path" , None )
139
+ address , policy , key , entity = _parse_conn_str (conn_str )
140
+ entity = event_hub_path or entity
141
+ left_slash_pos = address .find ("//" )
142
+ if left_slash_pos != - 1 :
143
+ host = address [left_slash_pos + 2 :]
144
+ else :
145
+ host = address
146
+ return cls (host , entity , EventHubSharedKeyCredential (policy , key ), ** kwargs )
147
+
122
148
def _create_auth (self ):
123
149
"""
124
150
Create an ~uamqp.authentication.SASTokenAuth instance to authenticate
@@ -153,42 +179,11 @@ def _create_auth(self):
153
179
transport_type = transport_type )
154
180
155
181
else : # Azure credential
156
- get_jwt_token = functools .partial (self ._credential .get_token ,
157
- 'https://eventhubs.azure.net//.default' )
182
+ get_jwt_token = functools .partial (self ._credential .get_token , JWT_TOKEN_SCOPE )
158
183
return authentication .JWTTokenAuth (self ._auth_uri , self ._auth_uri ,
159
184
get_jwt_token , http_proxy = http_proxy ,
160
185
transport_type = transport_type )
161
186
162
- @classmethod
163
- def _create_properties (cls , user_agent = None ): # pylint: disable=no-self-use
164
- """
165
- Format the properties with which to instantiate the connection.
166
- This acts like a user agent over HTTP.
167
-
168
- :rtype: dict
169
- """
170
- properties = {}
171
- product = "azsdk-python-eventhubs"
172
- properties [types .AMQPSymbol ("product" )] = product
173
- properties [types .AMQPSymbol ("version" )] = __version__
174
- framework = "Python {}.{}.{}, {}" .format (
175
- sys .version_info [0 ], sys .version_info [1 ], sys .version_info [2 ], platform .python_implementation ()
176
- )
177
- properties [types .AMQPSymbol ("framework" )] = framework
178
- platform_str = platform .platform ()
179
- properties [types .AMQPSymbol ("platform" )] = platform_str
180
-
181
- final_user_agent = '{}/{} ({}, {})' .format (product , __version__ , framework , platform_str )
182
- if user_agent :
183
- final_user_agent = '{}, {}' .format (final_user_agent , user_agent )
184
-
185
- if len (final_user_agent ) > MAX_USER_AGENT_LENGTH :
186
- raise ValueError ("The user-agent string cannot be more than {} in length."
187
- "Current user_agent string is: {} with length: {}" .format (
188
- MAX_USER_AGENT_LENGTH , final_user_agent , len (final_user_agent )))
189
- properties [types .AMQPSymbol ("user-agent" )] = final_user_agent
190
- return properties
191
-
192
187
def _close_connection (self ):
193
188
self ._conn_manager .reset_connection_if_broken ()
194
189
@@ -234,18 +229,6 @@ def _add_span_request_attributes(self, span):
234
229
span .add_attribute ("message_bus.destination" , self ._address .path )
235
230
span .add_attribute ("peer.address" , self ._address .hostname )
236
231
237
- @classmethod
238
- def from_connection_string (cls , conn_str , ** kwargs ):
239
- event_hub_path = kwargs .pop ("event_hub_path" , None )
240
- address , policy , key , entity = _parse_conn_str (conn_str )
241
- entity = event_hub_path or entity
242
- left_slash_pos = address .find ("//" )
243
- if left_slash_pos != - 1 :
244
- host = address [left_slash_pos + 2 :]
245
- else :
246
- host = address
247
- return cls (host , entity , EventHubSharedKeyCredential (policy , key ), ** kwargs )
248
-
249
232
def get_properties (self ):
250
233
# type:() -> Dict[str, Any]
251
234
"""
@@ -260,7 +243,7 @@ def get_properties(self):
260
243
:raises: :class:`EventHubError<azure.eventhub.EventHubError>`
261
244
"""
262
245
mgmt_msg = Message (application_properties = {'name' : self .eh_name })
263
- response = self ._management_request (mgmt_msg , op_type = b'com.microsoft:eventhub' )
246
+ response = self ._management_request (mgmt_msg , op_type = MGMT_OPERATION )
264
247
output = {}
265
248
eh_info = response .get_data ()
266
249
if eh_info :
@@ -300,7 +283,7 @@ def get_partition_properties(self, partition):
300
283
"""
301
284
mgmt_msg = Message (application_properties = {'name' : self .eh_name ,
302
285
'partition' : partition })
303
- response = self ._management_request (mgmt_msg , op_type = b'com.microsoft:partition' )
286
+ response = self ._management_request (mgmt_msg , op_type = MGMT_PARTITION_OPERATION )
304
287
partition_info = response .get_data ()
305
288
output = {}
306
289
if partition_info :
@@ -317,3 +300,82 @@ def get_partition_properties(self, partition):
317
300
def close (self ):
318
301
# type:() -> None
319
302
self ._conn_manager .close_connection ()
303
+
304
+
305
+ class ConsumerProducerMixin (object ):
306
+
307
+ def __enter__ (self ):
308
+ return self
309
+
310
+ def __exit__ (self , exc_type , exc_val , exc_tb ):
311
+ self .close ()
312
+
313
+ def _check_closed (self ):
314
+ if self .closed :
315
+ raise EventHubError ("{} has been closed. Please create a new one to handle event data." .format (self ._name ))
316
+
317
+ def _open (self ):
318
+ """Open the EventHubConsumer/EventHubProducer using the supplied connection.
319
+
320
+ """
321
+ # pylint: disable=protected-access
322
+ if not self .running :
323
+ if self ._handler :
324
+ self ._handler .close ()
325
+ self ._create_handler ()
326
+ self ._handler .open (connection = self ._client ._conn_manager .get_connection ( # pylint: disable=protected-access
327
+ self ._client ._address .hostname ,
328
+ self ._client ._create_auth ()
329
+ ))
330
+ while not self ._handler .client_ready ():
331
+ time .sleep (0.05 )
332
+ self ._max_message_size_on_link = self ._handler .message_handler ._link .peer_max_message_size \
333
+ or constants .MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
334
+ self .running = True
335
+
336
+ def _close_handler (self ):
337
+ if self ._handler :
338
+ self ._handler .close () # close the link (sharing connection) or connection (not sharing)
339
+ self .running = False
340
+
341
+ def _close_connection (self ):
342
+ self ._close_handler ()
343
+ self ._client ._conn_manager .reset_connection_if_broken () # pylint: disable=protected-access
344
+
345
+ def _handle_exception (self , exception ):
346
+ if not self .running and isinstance (exception , compat .TimeoutException ):
347
+ exception = errors .AuthenticationException ("Authorization timeout." )
348
+ return _handle_exception (exception , self )
349
+
350
+ def _do_retryable_operation (self , operation , timeout = 100000 , ** kwargs ):
351
+ # pylint:disable=protected-access
352
+ # timeout equals to 0 means no timeout, set the value to be a large number.
353
+ timeout_time = time .time () + (timeout if timeout else 100000 )
354
+ retried_times = 0
355
+ last_exception = kwargs .pop ('last_exception' , None )
356
+ operation_need_param = kwargs .pop ('operation_need_param' , True )
357
+
358
+ while retried_times <= self ._client ._config .max_retries : # pylint: disable=protected-access
359
+ try :
360
+ if operation_need_param :
361
+ return operation (timeout_time = timeout_time , last_exception = last_exception , ** kwargs )
362
+ return operation ()
363
+ except Exception as exception : # pylint:disable=broad-except
364
+ last_exception = self ._handle_exception (exception )
365
+ self ._client ._try_delay (retried_times = retried_times , last_exception = last_exception ,
366
+ timeout_time = timeout_time , entity_name = self ._name )
367
+ retried_times += 1
368
+
369
+ log .info ("%r operation has exhausted retry. Last exception: %r." , self ._name , last_exception )
370
+ raise last_exception
371
+
372
+ def close (self ):
373
+ # type:() -> None
374
+ """
375
+ Close down the handler. If the handler has already closed,
376
+ this will be a no op.
377
+ """
378
+ self .running = False
379
+ if self ._handler :
380
+ self ._handler .close () # this will close link if sharing connection. Otherwise close connection
381
+ self .closed = True
0 commit comments