7
7
import uuid
8
8
import time
9
9
from datetime import timedelta
10
- from typing import cast , Optional , Tuple , TYPE_CHECKING , Dict , Any
10
+ from typing import cast , Optional , Tuple , TYPE_CHECKING , Dict , Any , Callable , Type
11
11
12
12
try :
13
13
from urllib import quote_plus # type: ignore
@@ -90,6 +90,31 @@ def _generate_sas_token(uri, policy, key, expiry=None):
90
90
return _AccessToken (token = token , expires_on = abs_expiry )
91
91
92
92
93
+ def _convert_connection_string_to_kwargs (conn_str , shared_key_credential_type , ** kwargs ):
94
+ # type: (str, Type, Any) -> Dict[str, Any]
95
+ host , policy , key , entity_in_conn_str = _parse_conn_str (conn_str )
96
+ queue_name = kwargs .get ("queue_name" )
97
+ topic_name = kwargs .get ("topic_name" )
98
+ if not (queue_name or topic_name or entity_in_conn_str ):
99
+ raise ValueError ("Entity name is missing. Please specify `queue_name` or `topic_name`"
100
+ " or use a connection string including the entity information." )
101
+
102
+ if queue_name and topic_name :
103
+ raise ValueError ("`queue_name` and `topic_name` can not be specified simultaneously." )
104
+
105
+ entity_in_kwargs = queue_name or topic_name
106
+ if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs ):
107
+ raise ServiceBusAuthorizationError (
108
+ "Entity names do not match, the entity name in connection string is {};"
109
+ " the entity name in parameter is {}." .format (entity_in_conn_str , entity_in_kwargs )
110
+ )
111
+
112
+ kwargs ["fully_qualified_namespace" ] = host
113
+ kwargs ["entity_name" ] = entity_in_conn_str or entity_in_kwargs
114
+ kwargs ["credential" ] = shared_key_credential_type (policy , key )
115
+ return kwargs
116
+
117
+
93
118
class ServiceBusSharedKeyCredential (object ):
94
119
"""The shared access key credential used for authentication.
95
120
@@ -110,14 +135,15 @@ def get_token(self, *scopes, **kwargs): # pylint:disable=unused-argument
110
135
return _generate_sas_token (scopes [0 ], self .policy , self .key )
111
136
112
137
113
- class BaseHandler ( object ) : # pylint:disable=too-many-instance-attributes
138
+ class BaseHandler : # pylint:disable=too-many-instance-attributes
114
139
def __init__ (
115
140
self ,
116
141
fully_qualified_namespace ,
117
142
entity_name ,
118
143
credential ,
119
144
** kwargs
120
145
):
146
+ # type: (str, str, TokenCredential, Any) -> None
121
147
self .fully_qualified_namespace = fully_qualified_namespace
122
148
self ._entity_name = entity_name
123
149
@@ -128,7 +154,7 @@ def __init__(
128
154
self ._container_id = CONTAINER_PREFIX + str (uuid .uuid4 ())[:8 ]
129
155
self ._config = Configuration (** kwargs )
130
156
self ._running = False
131
- self ._handler = None
157
+ self ._handler = None # type: uamqp.AMQPClient
132
158
self ._auth_uri = None
133
159
self ._properties = create_properties ()
134
160
@@ -140,6 +166,7 @@ def __exit__(self, *args):
140
166
self .close ()
141
167
142
168
def _handle_exception (self , exception ):
169
+ # type: (BaseException) -> ServiceBusError
143
170
error , error_need_close_handler , error_need_raise = _create_servicebus_exception (_LOGGER , exception , self )
144
171
if error_need_close_handler :
145
172
self ._close_handler ()
@@ -148,38 +175,14 @@ def _handle_exception(self, exception):
148
175
149
176
return error
150
177
151
- @staticmethod
152
- def _from_connection_string (conn_str , ** kwargs ):
153
- # type: (str, Any) -> Dict[str, Any]
154
- host , policy , key , entity_in_conn_str = _parse_conn_str (conn_str )
155
- queue_name = kwargs .get ("queue_name" )
156
- topic_name = kwargs .get ("topic_name" )
157
- if not (queue_name or topic_name or entity_in_conn_str ):
158
- raise ValueError ("Entity name is missing. Please specify `queue_name` or `topic_name`"
159
- " or use a connection string including the entity information." )
160
-
161
- if queue_name and topic_name :
162
- raise ValueError ("`queue_name` and `topic_name` can not be specified simultaneously." )
163
-
164
- entity_in_kwargs = queue_name or topic_name
165
- if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs ):
166
- raise ServiceBusAuthorizationError (
167
- "Entity names do not match, the entity name in connection string is {};"
168
- " the entity name in parameter is {}." .format (entity_in_conn_str , entity_in_kwargs )
169
- )
170
-
171
- kwargs ["fully_qualified_namespace" ] = host
172
- kwargs ["entity_name" ] = entity_in_conn_str or entity_in_kwargs
173
- kwargs ["credential" ] = ServiceBusSharedKeyCredential (policy , key )
174
- return kwargs
175
-
176
178
def _backoff (
177
179
self ,
178
180
retried_times ,
179
181
last_exception ,
180
182
timeout = None ,
181
183
entity_name = None
182
184
):
185
+ # type: (int, Exception, Optional[float], str) -> None
183
186
entity_name = entity_name or self ._container_id
184
187
backoff = self ._config .retry_backoff_factor * 2 ** retried_times
185
188
if backoff <= self ._config .retry_backoff_max and (
@@ -200,40 +203,39 @@ def _backoff(
200
203
raise last_exception
201
204
202
205
def _do_retryable_operation (self , operation , timeout = None , ** kwargs ):
206
+ # type: (Callable, Optional[float], Any) -> Any
203
207
require_last_exception = kwargs .pop ("require_last_exception" , False )
204
208
require_timeout = kwargs .pop ("require_timeout" , False )
205
209
retried_times = 0
206
- last_exception = None
207
210
max_retries = self ._config .retry_total
208
211
209
212
while retried_times <= max_retries :
210
213
try :
211
- if require_last_exception :
212
- kwargs ["last_exception" ] = last_exception
213
214
if require_timeout :
214
215
kwargs ["timeout" ] = timeout
215
216
return operation (** kwargs )
216
217
except StopIteration :
217
218
raise
218
219
except Exception as exception : # pylint: disable=broad-except
219
220
last_exception = self ._handle_exception (exception )
221
+ if require_last_exception :
222
+ kwargs ["last_exception" ] = last_exception
220
223
retried_times += 1
221
224
if retried_times > max_retries :
222
- break
225
+ _LOGGER .info (
226
+ "%r operation has exhausted retry. Last exception: %r." ,
227
+ self ._container_id ,
228
+ last_exception ,
229
+ )
230
+ raise last_exception
223
231
self ._backoff (
224
232
retried_times = retried_times ,
225
233
last_exception = last_exception ,
226
234
timeout = timeout
227
235
)
228
236
229
- _LOGGER .info (
230
- "%r operation has exhausted retry. Last exception: %r." ,
231
- self ._container_id ,
232
- last_exception ,
233
- )
234
- raise last_exception
235
-
236
237
def _mgmt_request_response (self , mgmt_operation , message , callback , keep_alive_associated_link = True , ** kwargs ):
238
+ # type: (str, uamqp.Message, Callable, bool, Any) -> uamqp.Message
237
239
self ._open ()
238
240
application_properties = {}
239
241
# Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default.
@@ -265,6 +267,7 @@ def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_a
265
267
raise ServiceBusError ("Management request failed: {}" .format (exp ), exp )
266
268
267
269
def _mgmt_request_response_with_retry (self , mgmt_operation , message , callback , ** kwargs ):
270
+ # type: (bytes, Dict[str, Any], Callable, Any) -> Any
268
271
return self ._do_retryable_operation (
269
272
self ._mgmt_request_response ,
270
273
mgmt_operation = mgmt_operation ,
0 commit comments