Skip to content

Add stress tests for max batch size/prefetch, and for unsettled message receipt. #12344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def __init__(self,
message_size = 10,
idle_timeout = 10,
send_delay = .01,
receive_delay = 0):
receive_delay = 0,
should_complete_messages = True,
max_batch_size = 1):
self.senders = senders
self.receivers = receivers
self.duration=duration
Expand All @@ -65,6 +67,8 @@ def __init__(self,
self.idle_timeout = idle_timeout
self.send_delay = send_delay
self.receive_delay = receive_delay
self.should_complete_messages = should_complete_messages
self.max_batch_size = max_batch_size

# Because of pickle we need to create a state object and not just pass around ourselves.
# If we ever require multiple runs of this one after another, just make Run() reset this.
Expand Down Expand Up @@ -144,14 +148,15 @@ def _Receive(self, receiver, end_time):
with receiver:
while end_time > datetime.utcnow():
if self.receive_type == ReceiveType.pull:
batch = receiver.receive_messages()
batch = receiver.receive_messages(max_batch_size=self.max_batch_size)
elif self.receive_type == ReceiveType.push:
batch = receiver

for message in batch:
self.OnReceive(self._state, message)
try:
message.complete()
if self.should_complete_messages:
message.complete()
except MessageAlreadySettled: # It may have been settled in the plugin callback.
pass
self._state.total_received += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,46 @@ def test_stress_queue_receive_and_delete(self, servicebus_namespace_connection_s
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete)],
duration=timedelta(seconds=60))

result = stress_test.Run()
assert(result.total_sent > 0)
assert(result.total_received > 0)


@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_unsettled_messages(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name)],
duration = timedelta(seconds=350),
should_complete_messages = False)

result = stress_test.Run()
# This test is prompted by reports of an issue where enough unsettled messages saturate a service-side cache
# and prevent further receipt.
assert(result.total_sent > 2500)
assert(result.total_received > 2500)


@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
@ServiceBusNamespacePreparer(name_prefix='servicebustest')
@ServiceBusQueuePreparer(name_prefix='servicebustest')
def test_stress_queue_receive_large_batch_size(self, servicebus_namespace_connection_string, servicebus_queue):
sb_client = ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, debug=False)

stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(servicebus_queue.name)],
receivers = [sb_client.get_queue_receiver(servicebus_queue.name, prefetch=50)],
duration = timedelta(seconds=60),
max_batch_size = 50)

result = stress_test.Run()
assert(result.total_sent > 0)
assert(result.total_received > 0)