Skip to content

Commit ef0edbb

Browse files
committed
close spawned children
1 parent a1039c8 commit ef0edbb

File tree

5 files changed

+164
-18
lines changed

5 files changed

+164
-18
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 7.0.0b6 (Unreleased)
44

5+
**Breaking Changes**
6+
7+
* `ServiceBusClient.close()` now closes spawned senders and receivers.
58

69
## 7.0.0b5 (2020-08-10)
710

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def __init__(
6969
self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name)
7070
# Internal flag for switching whether to apply connection sharing, pending fix in uamqp library
7171
self._connection_sharing = False
72+
self._handlers = []
7273

7374
def __enter__(self):
7475
if self._connection_sharing:
@@ -89,10 +90,15 @@ def _create_uamqp_connection(self):
8990
def close(self):
9091
# type: () -> None
9192
"""
92-
Close down the ServiceBus client and the underlying connection.
93+
Close down the ServiceBus client.
94+
All spawned senders, receivers and underlying connection will be shutdown.
9395
9496
:return: None
9597
"""
98+
for handler in self._handlers:
99+
handler.close()
100+
self._handlers.clear()
101+
96102
if self._connection_sharing and self._connection:
97103
self._connection.destroy()
98104

@@ -157,7 +163,7 @@ def get_queue_sender(self, queue_name, **kwargs):
157163
158164
"""
159165
# pylint: disable=protected-access
160-
return ServiceBusSender(
166+
handler = ServiceBusSender(
161167
fully_qualified_namespace=self.fully_qualified_namespace,
162168
queue_name=queue_name,
163169
credential=self._credential,
@@ -168,6 +174,8 @@ def get_queue_sender(self, queue_name, **kwargs):
168174
user_agent=self._config.user_agent,
169175
**kwargs
170176
)
177+
self._handlers.append(handler)
178+
return handler
171179

172180
def get_queue_receiver(self, queue_name, **kwargs):
173181
# type: (str, Any) -> ServiceBusReceiver
@@ -205,7 +213,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
205213
206214
"""
207215
# pylint: disable=protected-access
208-
return ServiceBusReceiver(
216+
handler = ServiceBusReceiver(
209217
fully_qualified_namespace=self.fully_qualified_namespace,
210218
queue_name=queue_name,
211219
credential=self._credential,
@@ -216,6 +224,8 @@ def get_queue_receiver(self, queue_name, **kwargs):
216224
user_agent=self._config.user_agent,
217225
**kwargs
218226
)
227+
self._handlers.append(handler)
228+
return handler
219229

220230
def get_queue_deadletter_receiver(self, queue_name, **kwargs):
221231
# type: (str, Any) -> ServiceBusReceiver
@@ -265,7 +275,7 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs):
265275
queue_name=queue_name,
266276
transfer_deadletter=kwargs.get('transfer_deadletter', False)
267277
)
268-
return ServiceBusReceiver(
278+
handler = ServiceBusReceiver(
269279
fully_qualified_namespace=self.fully_qualified_namespace,
270280
entity_name=entity_name,
271281
credential=self._credential,
@@ -277,6 +287,8 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs):
277287
user_agent=self._config.user_agent,
278288
**kwargs
279289
)
290+
self._handlers.append(handler)
291+
return handler
280292

281293
def get_topic_sender(self, topic_name, **kwargs):
282294
# type: (str, Any) -> ServiceBusSender
@@ -300,7 +312,7 @@ def get_topic_sender(self, topic_name, **kwargs):
300312
:caption: Create a new instance of the ServiceBusSender from ServiceBusClient.
301313
302314
"""
303-
return ServiceBusSender(
315+
handler = ServiceBusSender(
304316
fully_qualified_namespace=self.fully_qualified_namespace,
305317
topic_name=topic_name,
306318
credential=self._credential,
@@ -311,6 +323,8 @@ def get_topic_sender(self, topic_name, **kwargs):
311323
user_agent=self._config.user_agent,
312324
**kwargs
313325
)
326+
self._handlers.append(handler)
327+
return handler
314328

315329
def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
316330
# type: (str, str, Any) -> ServiceBusReceiver
@@ -353,7 +367,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
353367
354368
"""
355369
# pylint: disable=protected-access
356-
return ServiceBusReceiver(
370+
handler = ServiceBusReceiver(
357371
fully_qualified_namespace=self.fully_qualified_namespace,
358372
topic_name=topic_name,
359373
subscription_name=subscription_name,
@@ -365,6 +379,8 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
365379
user_agent=self._config.user_agent,
366380
**kwargs
367381
)
382+
self._handlers.append(handler)
383+
return handler
368384

369385
def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs):
370386
# type: (str, str, Any) -> ServiceBusReceiver
@@ -416,7 +432,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
416432
subscription_name=subscription_name,
417433
transfer_deadletter=kwargs.get('transfer_deadletter', False)
418434
)
419-
return ServiceBusReceiver(
435+
handler = ServiceBusReceiver(
420436
fully_qualified_namespace=self.fully_qualified_namespace,
421437
entity_name=entity_name,
422438
credential=self._credential,
@@ -428,6 +444,8 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
428444
user_agent=self._config.user_agent,
429445
**kwargs
430446
)
447+
self._handlers.append(handler)
448+
return handler
431449

432450
def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs):
433451
# type: (str, str, str, Any) -> ServiceBusReceiver
@@ -473,7 +491,7 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi
473491
474492
"""
475493
# pylint: disable=protected-access
476-
return ServiceBusSessionReceiver(
494+
handler = ServiceBusSessionReceiver(
477495
fully_qualified_namespace=self.fully_qualified_namespace,
478496
topic_name=topic_name,
479497
subscription_name=subscription_name,
@@ -486,6 +504,8 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi
486504
user_agent=self._config.user_agent,
487505
**kwargs
488506
)
507+
self._handlers.append(handler)
508+
return handler
489509

490510
def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
491511
# type: (str, str, Any) -> ServiceBusSessionReceiver
@@ -526,7 +546,7 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
526546
527547
"""
528548
# pylint: disable=protected-access
529-
return ServiceBusSessionReceiver(
549+
handler = ServiceBusSessionReceiver(
530550
fully_qualified_namespace=self.fully_qualified_namespace,
531551
queue_name=queue_name,
532552
credential=self._credential,
@@ -538,3 +558,5 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
538558
user_agent=self._config.user_agent,
539559
**kwargs
540560
)
561+
self._handlers.append(handler)
562+
return handler

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ def __init__(
7171
self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name)
7272
# Internal flag for switching whether to apply connection sharing, pending fix in uamqp library
7373
self._connection_sharing = False
74+
self._handlers = []
7475

7576
async def __aenter__(self):
7677
if self._connection_sharing:
@@ -133,9 +134,14 @@ async def close(self):
133134
# type: () -> None
134135
"""
135136
Close down the ServiceBus client.
137+
All spawned senders, receivers and underlying connection will be shutdown.
136138
137139
:return: None
138140
"""
141+
for handler in self._handlers:
142+
await handler.close()
143+
self._handlers.clear()
144+
139145
if self._connection_sharing and self._connection:
140146
await self._connection.destroy_async()
141147

@@ -159,7 +165,7 @@ def get_queue_sender(self, queue_name, **kwargs):
159165
160166
"""
161167
# pylint: disable=protected-access
162-
return ServiceBusSender(
168+
handler = ServiceBusSender(
163169
fully_qualified_namespace=self.fully_qualified_namespace,
164170
queue_name=queue_name,
165171
credential=self._credential,
@@ -170,6 +176,8 @@ def get_queue_sender(self, queue_name, **kwargs):
170176
user_agent=self._config.user_agent,
171177
**kwargs
172178
)
179+
self._handlers.append(handler)
180+
return handler
173181

174182
def get_queue_receiver(self, queue_name, **kwargs):
175183
# type: (str, Any) -> ServiceBusReceiver
@@ -206,7 +214,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
206214
207215
"""
208216
# pylint: disable=protected-access
209-
return ServiceBusReceiver(
217+
handler = ServiceBusReceiver(
210218
fully_qualified_namespace=self.fully_qualified_namespace,
211219
queue_name=queue_name,
212220
credential=self._credential,
@@ -217,6 +225,8 @@ def get_queue_receiver(self, queue_name, **kwargs):
217225
user_agent=self._config.user_agent,
218226
**kwargs
219227
)
228+
self._handlers.append(handler)
229+
return handler
220230

221231
def get_queue_deadletter_receiver(self, queue_name, **kwargs):
222232
# type: (str, Any) -> ServiceBusReceiver
@@ -266,7 +276,7 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs):
266276
queue_name=queue_name,
267277
transfer_deadletter=kwargs.get('transfer_deadletter', False)
268278
)
269-
return ServiceBusReceiver(
279+
handler = ServiceBusReceiver(
270280
fully_qualified_namespace=self.fully_qualified_namespace,
271281
entity_name=entity_name,
272282
credential=self._credential,
@@ -278,6 +288,8 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs):
278288
user_agent=self._config.user_agent,
279289
**kwargs
280290
)
291+
self._handlers.append(handler)
292+
return handler
281293

282294
def get_topic_sender(self, topic_name, **kwargs):
283295
# type: (str, Any) -> ServiceBusSender
@@ -301,7 +313,7 @@ def get_topic_sender(self, topic_name, **kwargs):
301313
:caption: Create a new instance of the ServiceBusSender from ServiceBusClient.
302314
303315
"""
304-
return ServiceBusSender(
316+
handler = ServiceBusSender(
305317
fully_qualified_namespace=self.fully_qualified_namespace,
306318
topic_name=topic_name,
307319
credential=self._credential,
@@ -312,6 +324,8 @@ def get_topic_sender(self, topic_name, **kwargs):
312324
user_agent=self._config.user_agent,
313325
**kwargs
314326
)
327+
self._handlers.append(handler)
328+
return handler
315329

316330
def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
317331
# type: (str, str, Any) -> ServiceBusReceiver
@@ -354,7 +368,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
354368
355369
"""
356370
# pylint: disable=protected-access
357-
return ServiceBusReceiver(
371+
handler = ServiceBusReceiver(
358372
fully_qualified_namespace=self.fully_qualified_namespace,
359373
topic_name=topic_name,
360374
subscription_name=subscription_name,
@@ -366,6 +380,8 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
366380
user_agent=self._config.user_agent,
367381
**kwargs
368382
)
383+
self._handlers.append(handler)
384+
return handler
369385

370386
def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs):
371387
# type: (str, str, Any) -> ServiceBusReceiver
@@ -417,7 +433,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
417433
subscription_name=subscription_name,
418434
transfer_deadletter=kwargs.get('transfer_deadletter', False)
419435
)
420-
return ServiceBusReceiver(
436+
handler = ServiceBusReceiver(
421437
fully_qualified_namespace=self.fully_qualified_namespace,
422438
entity_name=entity_name,
423439
credential=self._credential,
@@ -429,6 +445,8 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
429445
user_agent=self._config.user_agent,
430446
**kwargs
431447
)
448+
self._handlers.append(handler)
449+
return handler
432450

433451
def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs):
434452
# type: (str, str, str, Any) -> ServiceBusReceiver
@@ -474,7 +492,7 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi
474492
475493
"""
476494
# pylint: disable=protected-access
477-
return ServiceBusSessionReceiver(
495+
handler = ServiceBusSessionReceiver(
478496
fully_qualified_namespace=self.fully_qualified_namespace,
479497
topic_name=topic_name,
480498
subscription_name=subscription_name,
@@ -487,6 +505,8 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi
487505
user_agent=self._config.user_agent,
488506
**kwargs
489507
)
508+
self._handlers.append(handler)
509+
return handler
490510

491511
def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
492512
# type: (str, str, Any) -> ServiceBusSessionReceiver
@@ -526,7 +546,7 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
526546
527547
"""
528548
# pylint: disable=protected-access
529-
return ServiceBusSessionReceiver(
549+
handler = ServiceBusSessionReceiver(
530550
fully_qualified_namespace=self.fully_qualified_namespace,
531551
queue_name=queue_name,
532552
credential=self._credential,
@@ -538,3 +558,5 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
538558
user_agent=self._config.user_agent,
539559
**kwargs
540560
)
561+
self._handlers.append(handler)
562+
return handler
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#--------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for
4+
# license information.
5+
#--------------------------------------------------------------------------
6+
7+
8+
import logging
9+
import pytest
10+
11+
from azure.servicebus.aio import ServiceBusClient
12+
from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer
13+
from servicebus_preparer import CachedServiceBusNamespacePreparer, CachedServiceBusQueuePreparer
14+
from utilities import get_logger
15+
16+
_logger = get_logger(logging.DEBUG)
17+
18+
19+
class ServiceBusClientAsyncTests(AzureMgmtTestCase):
20+
@pytest.mark.liveTest
21+
@pytest.mark.live_test_only
22+
@CachedResourceGroupPreparer()
23+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
24+
@CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
25+
async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
26+
client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string)
27+
28+
# context manager
29+
async with client:
30+
assert len(client._handlers) == 0
31+
sender = client.get_queue_sender(servicebus_queue.name)
32+
receiver = client.get_queue_receiver(servicebus_queue.name)
33+
await sender._open()
34+
await receiver._open()
35+
36+
assert sender._handler and sender._running
37+
assert receiver._handler and receiver._running
38+
assert len(client._handlers) == 2
39+
40+
assert not sender._handler and not sender._running
41+
assert not receiver._handler and not receiver._running
42+
assert len(client._handlers) == 0
43+
44+
# close operation
45+
sender = client.get_queue_sender(servicebus_queue.name)
46+
receiver = client.get_queue_receiver(servicebus_queue.name)
47+
await sender._open()
48+
await receiver._open()
49+
50+
assert sender._handler and sender._running
51+
assert receiver._handler and receiver._running
52+
assert len(client._handlers) == 2
53+
54+
await client.close()
55+
56+
assert not sender._handler and not sender._running
57+
assert not receiver._handler and not receiver._running
58+
assert len(client._handlers) == 0

0 commit comments

Comments
 (0)