Skip to content

Commit c77746b

Browse files
[ServiceBus] Test and failure improvements (#13345)
A smattering of small test improvements. - ensure incorrect_queue_conn_str test validates success and failure cases and consistency of different entry points. - ensure specificity of authorization/authenication errors - Begin adding "raises" stanzas to docstrings. - normalize convert_connection_string_to_kwargs to throw AuthenticationError on misaligned entity_path - fix flaky scheduled tests: both moving away from the receive_messages calls in tests, and making scheduled vs enqueue time a <= relationshi rather than ==. - similarly, if we're using the "source of truth" internal time for max wait time valdiation, must be<= since the milliseconds can actually be EXACT. - Add missing ServiceBusAuthorizationError as import for more precise tests. - ensure batch fetching in tests is recurrent to make more robust.
1 parent 05868af commit c77746b

12 files changed

+101
-39
lines changed

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
**Breaking Changes**
66

77
* `ServiceBusClient.close()` now closes spawned senders and receivers.
8+
* Attempting to initialize a sender or receiver with a different connection string entity and specified entity (e.g. `queue_name`) will result in an AuthenticationError
89

910
## 7.0.0b5 (2020-08-10)
1011

sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from ._common._configuration import Configuration
2222
from .exceptions import (
2323
ServiceBusError,
24-
ServiceBusAuthorizationError,
24+
ServiceBusAuthenticationError,
2525
_create_servicebus_exception
2626
)
2727
from ._common.utils import create_properties
@@ -104,7 +104,7 @@ def _convert_connection_string_to_kwargs(conn_str, shared_key_credential_type, *
104104

105105
entity_in_kwargs = queue_name or topic_name
106106
if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs):
107-
raise ServiceBusAuthorizationError(
107+
raise ServiceBusAuthenticationError(
108108
"Entity names do not match, the entity name in connection string is {};"
109109
" the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs)
110110
)

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,9 @@ def from_connection_string(
352352
within its request to the service.
353353
:rtype: ~azure.servicebus.ServiceBusReceiver
354354
355+
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
356+
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
357+
355358
.. admonition:: Example:
356359
357360
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@ def from_connection_string(
284284
285285
:rtype: ~azure.servicebus.ServiceBusSender
286286
287+
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
288+
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
289+
287290
.. admonition:: Example:
288291
289292
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ def from_connection_string(
154154
within its request to the service.
155155
:rtype: ~azure.servicebus.ServiceBusSessionReceiver
156156
157+
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
158+
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
159+
157160
.. admonition:: Example:
158161
159162
.. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py

sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
'ServiceBusSender',
1919
'ServiceBusReceiver',
2020
'ServiceBusSessionReceiver',
21+
'ServiceBusSession',
2122
'ServiceBusSharedKeyCredential',
22-
'AutoLockRenew',
23-
'ServiceBusSession'
23+
'AutoLockRenew'
2424
]

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,9 @@ def from_connection_string(
347347
within its request to the service.
348348
:rtype: ~azure.servicebus.aio.ServiceBusReceiver
349349
350+
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
351+
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
352+
350353
.. admonition:: Example:
351354
352355
.. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ def from_connection_string(
223223
:keyword str user_agent: If specified, this will be added in front of the built-in user agent string.
224224
:rtype: ~azure.servicebus.aio.ServiceBusSender
225225
226+
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
227+
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
228+
226229
.. admonition:: Example:
227230
228231
.. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_receiver_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ def from_connection_string(
137137
within its request to the service.
138138
:rtype: ~azure.servicebus.aio.ServiceBusSessionReceiver
139139
140+
:raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity.
141+
:raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure.
142+
140143
.. admonition:: Example:
141144
142145
.. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -959,14 +959,14 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio
959959
async with ServiceBusClient.from_connection_string(
960960
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
961961

962-
enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
962+
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
963963
async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
964964
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
965965
content = str(uuid.uuid4())
966966
message_id = uuid.uuid4()
967967
message = Message(content)
968968
message.message_id = message_id
969-
message.scheduled_enqueue_time_utc = enqueue_time
969+
message.scheduled_enqueue_time_utc = scheduled_enqueue_time
970970
await sender.send_messages(message)
971971

972972
messages = await receiver.receive_messages(max_wait_time=120)
@@ -975,8 +975,8 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio
975975
data = str(messages[0])
976976
assert data == content
977977
assert messages[0].message_id == message_id
978-
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
979-
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
978+
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
979+
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
980980
assert len(messages) == 1
981981
finally:
982982
for m in messages:
@@ -992,7 +992,7 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio
992992
async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
993993
async with ServiceBusClient.from_connection_string(
994994
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
995-
enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
995+
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
996996
messages = []
997997
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20)
998998
sender = sb_client.get_queue_sender(servicebus_queue.name)
@@ -1007,11 +1007,12 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace
10071007

10081008
await sender.send_messages([message_a, message_b])
10091009

1010-
received_messages = await receiver.receive_messages(max_batch_size=2, max_wait_time=5)
1011-
for message in received_messages:
1010+
received_messages = []
1011+
async for message in receiver.get_streaming_message_iter(max_wait_time=5):
1012+
received_messages.append(message)
10121013
await message.complete()
10131014

1014-
tokens = await sender.schedule_messages(received_messages, enqueue_time)
1015+
tokens = await sender.schedule_messages(received_messages, scheduled_enqueue_time)
10151016
assert len(tokens) == 2
10161017

10171018
messages = await receiver.receive_messages(max_wait_time=120)
@@ -1022,8 +1023,8 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace
10221023
data = str(messages[0])
10231024
assert data == content
10241025
assert messages[0].message_id in (message_id_a, message_id_b)
1025-
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
1026-
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
1026+
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
1027+
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
10271028
assert len(messages) == 2
10281029
finally:
10291030
for m in messages:
@@ -1252,7 +1253,11 @@ def message_content():
12521253
message_1st_received_cnt = 0
12531254
message_2nd_received_cnt = 0
12541255
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
1255-
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
1256+
messages = []
1257+
batch = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
1258+
while batch:
1259+
messages += batch
1260+
batch = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
12561261
if not messages:
12571262
break
12581263
receive_counter += 1
@@ -1382,22 +1387,22 @@ async def test_async_queue_receiver_respects_max_wait_time_overrides(self, servi
13821387
async for message in receiver.get_streaming_message_iter(max_wait_time=1):
13831388
messages.append(message)
13841389
time_3 = receiver._handler._counter.get_current_ms()
1385-
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) < timedelta(seconds=2)
1390+
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2)
13861391
time_4 = receiver._handler._counter.get_current_ms()
1387-
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) < timedelta(seconds=11)
1392+
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11)
13881393

13891394
async for message in receiver.get_streaming_message_iter(max_wait_time=3):
13901395
messages.append(message)
13911396
time_5 = receiver._handler._counter.get_current_ms()
1392-
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) < timedelta(seconds=4)
1397+
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4)
13931398

13941399
async for message in receiver:
13951400
messages.append(message)
13961401
time_6 = receiver._handler._counter.get_current_ms()
1397-
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) < timedelta(seconds=6)
1402+
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6)
13981403

13991404
async for message in receiver.get_streaming_message_iter():
14001405
messages.append(message)
14011406
time_7 = receiver._handler._counter.get_current_ms()
1402-
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) < timedelta(seconds=6)
1407+
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6)
14031408
assert len(messages) == 1

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,14 +1199,14 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se
11991199
with ServiceBusClient.from_connection_string(
12001200
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
12011201

1202-
enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
1202+
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
12031203
with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
12041204
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
12051205
content = str(uuid.uuid4())
12061206
message_id = uuid.uuid4()
12071207
message = Message(content)
12081208
message.message_id = message_id
1209-
message.scheduled_enqueue_time_utc = enqueue_time
1209+
message.scheduled_enqueue_time_utc = scheduled_enqueue_time
12101210
sender.send_messages(message)
12111211

12121212
messages = receiver.receive_messages(max_wait_time=120)
@@ -1215,8 +1215,8 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se
12151215
data = str(messages[0])
12161216
assert data == content
12171217
assert messages[0].message_id == message_id
1218-
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
1219-
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
1218+
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
1219+
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
12201220
assert len(messages) == 1
12211221
finally:
12221222
for m in messages:
@@ -1235,7 +1235,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
12351235
with ServiceBusClient.from_connection_string(
12361236
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
12371237

1238-
enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
1238+
scheduled_enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
12391239
sender = sb_client.get_queue_sender(servicebus_queue.name)
12401240
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20)
12411241

@@ -1265,7 +1265,7 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
12651265
received_messages.append(message)
12661266
message.complete()
12671267

1268-
tokens = sender.schedule_messages(received_messages, enqueue_time)
1268+
tokens = sender.schedule_messages(received_messages, scheduled_enqueue_time)
12691269
assert len(tokens) == 2
12701270

12711271
messages = receiver.receive_messages(max_wait_time=120)
@@ -1275,8 +1275,8 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
12751275
data = str(messages[0])
12761276
assert data == content
12771277
assert messages[0].message_id in (message_id_a, message_id_b)
1278-
assert messages[0].scheduled_enqueue_time_utc == enqueue_time
1279-
assert messages[0].scheduled_enqueue_time_utc == messages[0].enqueued_time_utc.replace(microsecond=0)
1278+
assert messages[0].scheduled_enqueue_time_utc == scheduled_enqueue_time
1279+
assert messages[0].scheduled_enqueue_time_utc <= messages[0].enqueued_time_utc.replace(microsecond=0)
12801280
assert messages[0].delivery_count == 0
12811281
assert messages[0].properties
12821282
assert messages[0].properties[b'key'] == b'value'
@@ -1619,7 +1619,9 @@ def message_content():
16191619
message_1st_received_cnt = 0
16201620
message_2nd_received_cnt = 0
16211621
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
1622-
messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5)
1622+
messages = []
1623+
for message in receiver.get_streaming_message_iter(max_wait_time=5):
1624+
messages.append(message)
16231625
if not messages:
16241626
break
16251627
receive_counter += 1
@@ -1788,22 +1790,38 @@ def test_queue_receiver_respects_max_wait_time_overrides(self, servicebus_namesp
17881790
for message in receiver.get_streaming_message_iter(max_wait_time=1):
17891791
messages.append(message)
17901792
time_3 = receiver._handler._counter.get_current_ms()
1791-
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) < timedelta(seconds=2)
1793+
assert timedelta(seconds=.5) < timedelta(milliseconds=(time_3 - time_2)) <= timedelta(seconds=2)
17921794
time_4 = receiver._handler._counter.get_current_ms()
1793-
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) < timedelta(seconds=11)
1795+
assert timedelta(seconds=8) < timedelta(milliseconds=(time_4 - time_3)) <= timedelta(seconds=11)
17941796

17951797
for message in receiver.get_streaming_message_iter(max_wait_time=3):
17961798
messages.append(message)
17971799
time_5 = receiver._handler._counter.get_current_ms()
1798-
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) < timedelta(seconds=4)
1800+
assert timedelta(seconds=1) < timedelta(milliseconds=(time_5 - time_4)) <= timedelta(seconds=4)
17991801

18001802
for message in receiver:
18011803
messages.append(message)
18021804
time_6 = receiver._handler._counter.get_current_ms()
1803-
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) < timedelta(seconds=6)
1805+
assert timedelta(seconds=3) < timedelta(milliseconds=(time_6 - time_5)) <= timedelta(seconds=6)
18041806

18051807
for message in receiver.get_streaming_message_iter():
18061808
messages.append(message)
18071809
time_7 = receiver._handler._counter.get_current_ms()
1808-
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) < timedelta(seconds=6)
1810+
assert timedelta(seconds=3) < timedelta(milliseconds=(time_7 - time_6)) <= timedelta(seconds=6)
18091811
assert len(messages) == 1
1812+
1813+
1814+
@pytest.mark.liveTest
1815+
@pytest.mark.live_test_only
1816+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
1817+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
1818+
@CachedServiceBusQueuePreparer(name_prefix='servicebustest')
1819+
def test_queue_receiver_invalid_mode(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
1820+
1821+
with ServiceBusClient.from_connection_string(
1822+
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
1823+
# with pytest.raises(StopIteration):
1824+
with sb_client.get_queue_receiver(servicebus_queue.name,
1825+
max_wait_time="oij") as receiver:
1826+
1827+
assert receiver

0 commit comments

Comments
 (0)