Skip to content

[ServiceBus] Improve resend test stability #12721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,10 @@ async def test_async_queue_receive_batch_without_setting_prefetch(self, serviceb

def message_content():
for i in range(20):
yield Message("Message no. {}".format(i))
yield Message(
body="Message no. {}".format(i),
label='1st'
)

sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)
Expand All @@ -1246,36 +1249,24 @@ def message_content():
await sender.send_messages(message)

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
message_1st_received_cnt = 0
message_2nd_received_cnt = 0
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for m in messages:
print_message(_logger, m)
await sender.send_messages(message)
await m.complete()

assert message_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect

# received resent messages

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for m in messages:
print_message(_logger, m)
await m.complete()
for message in messages:
print_message(_logger, message)
if message.label == '1st':
message_1st_received_cnt += 1
await message.complete()
message.label = '2nd'
await sender.send_messages(message) # resending received message
elif message.label == '2nd':
message_2nd_received_cnt += 1
await message.complete()

assert message_received_cnt == 20
assert message_1st_received_cnt == 20 and message_2nd_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect
47 changes: 13 additions & 34 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,7 @@ def message_content():
yield Message(
body="Test message",
properties={'key': 'value'},
label='label',
label='1st',
content_type='application/text',
correlation_id='cid',
message_id='mid',
Expand All @@ -1614,18 +1614,17 @@ def message_content():
sender.send_messages(message)

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
message_1st_received_cnt = 0
message_2nd_received_cnt = 0
while message_1st_received_cnt < 20 or message_2nd_received_cnt < 20:
messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for message in messages:
print_message(_logger, message)
assert b''.join(message.body) == b'Test message'
assert message.properties[b'key'] == b'value'
assert message.label == 'label'
assert message.content_type == 'application/text'
assert message.correlation_id == 'cid'
assert message.message_id == 'mid'
Expand All @@ -1634,36 +1633,16 @@ def message_content():
assert message.to == 'to'
assert message.reply_to == 'reply_to'
assert message.time_to_live == timedelta(seconds=60)
message.complete()
sender.send_messages(message) # resending received message

assert message_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for message in messages:
print_message(_logger, message)
assert b''.join(message.body) == b'Test message'
assert message.properties[b'key'] == b'value'
assert message.label == 'label'
assert message.content_type == 'application/text'
assert message.correlation_id == 'cid'
assert message.message_id == 'mid'
assert message.partition_key == 'pk'
assert message.via_partition_key == 'via_pk'
assert message.to == 'to'
assert message.reply_to == 'reply_to'
assert message.time_to_live == timedelta(seconds=60)
message.complete()
if message.label == '1st':
message_1st_received_cnt += 1
message.complete()
message.label = '2nd'
sender.send_messages(message) # resending received message
elif message.label == '2nd':
message_2nd_received_cnt += 1
message.complete()

assert message_received_cnt == 20
assert message_1st_received_cnt == 20 and message_2nd_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect