diff --git a/eng/.docsettings.yml b/eng/.docsettings.yml index 07f073a72430..66bb50636466 100644 --- a/eng/.docsettings.yml +++ b/eng/.docsettings.yml @@ -8,6 +8,7 @@ omitted_paths: - doc/* - sdk/**/samples/* - sdk/identity/azure-identity/tests/* + - sdk/**/tests/perfstress_tests/* language: python root_check_enabled: True @@ -87,6 +88,7 @@ known_content_issues: - ['sdk/schemaregistry/azure-schemaregistry/swagger/README.md', '#4554'] - ['sdk/servicebus/azure-servicebus/README.md', '#4554'] - ['sdk/servicebus/azure-servicebus/swagger/README.md', '#4554'] + - ['sdk/servicebus/azure-servicebus/tests/perf_tests/README.md', '#4554'] - ['sdk/servicefabric/azure-servicefabric/README.md', '#4554'] - ['sdk/storage/azure-storage-nspkg/README.md', '#4554'] - ['sdk/storage/azure-storage-blob/swagger/README.md', '#4554'] diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md b/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md new file mode 100644 index 000000000000..f9717f21a62f --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/README.md @@ -0,0 +1,69 @@ +# ServiceBus Performance Tests + +In order to run the performance tests, the `azure-devtools` package must be installed. This is done as part of the `dev_requirements`. +Start be creating a new virtual environment for your perf tests. This will need to be a Python 3 environment, preferably >=3.7. +Note that tests for T1 and T2 SDKs cannot be run from the same environment, and will need to be setup separately. + +### Setup for T2 perf test runs + +```cmd +(env) ~/azure-servicebus> pip install -r dev_requirements.txt +(env) ~/azure-servicebus> pip install . +``` + +### Setup for T1 perf test runs + +```cmd +(env) ~/azure-servicebus> pip install -r dev_requirements.txt +(env) ~/azure-servicebus> pip install tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt +``` + +## Test commands + +When `azure-devtools` is installed, you will have access to the `perfstress` command line tool, which will scan the current module for runable perf tests. Only a specific test can be run at a time (i.e. there is no "run all" feature). + +```cmd +(env) ~/azure-servicebus> cd tests +(env) ~/azure-servicebus/tests> perfstress +``` +Using the `perfstress` command alone will list the available perf tests found. Note that the available tests discovered will vary depending on whether your environment is configured for the T1 or T2 SDK. + +### Common perf command line options +These options are available for all perf tests: +- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10. +- `--iterations=1` Number of test iterations to run. Default is 1. +- `--parallel=1` Number of tests to run in parallel. Default is 1. +- `--no-client-share` Whether each parallel test instance should share a single client, or use their own. Default is False (sharing). +- `--warm-up=5` Number of seconds to spend warming up the connection before measuing begins. Default is 5. +- `--sync` Whether to run the tests in sync or async. Default is False (async). +- `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). + +### Common Service Bus command line options +The options are available for all SB perf tests: +- `--message-size=100` Number of bytes each message contains. Default is 100. +- `--num-messages` Number of messages to send/receive as part of a single run. + +#### Receive command line options +The receiving tests have these additional command line options: +- `--peeklock` Whether to run the test using peeklock or receive and delete. If peeklock is used, messages will be completed. Default is False (receive and delete). +- `--max-wait-time=0` The max time to wait for the specified number of messages to be received. Default is 0 (indefinitely). +- `--preload=10000` The number of messages to preload into the queue before the receiving tests start. Default is 10000 messages. + +### T2 Tests +The tests currently written for the T2 SDK: +- `SendMessageTest` Sends a single message per run. +- `SendMessageBatchTest` Sends `num-messages` in a batch per run. +- `ReceiveMessageStreamTest` Receives `num-messages` using an iterator. Receive command options apply. +- `ReceiveMessageBatchTest` Receives `num-messages` using a single fetch call. Receive command options apply. + +### T1 Tests +The tests currently written for the T2 SDK: +- `LegacySendMessageTest` Sends a single message per run. +- `LegacySendMessageBatchTest` Sends `num-messages` in a batch per run. +- `LegacyReceiveMessageStreamTest` Receives `num-messages` using an iterator. Receive command options apply. +- `LegacyReceiveMessageBatchTest` Receives `num-messages` using a single fetch call. Receive command options apply. + +## Example command +```cmd +(env) ~/azure-servicebus/tests> perfstress ReceiveMessageBatchTest --parallel=2 --message-size=10240 --num-messages=100 --peeklock +``` \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/__init__.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/_test_base.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/_test_base.py new file mode 100644 index 000000000000..654b7b40b532 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/_test_base.py @@ -0,0 +1,153 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import uuid +from urllib.parse import urlparse + +from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes + +from azure.servicebus import ServiceBusClient, ReceiveSettleMode, Message +from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient +from azure.servicebus.control_client import ServiceBusService +from azure.servicebus.control_client.models import Queue + +MAX_QUEUE_SIZE = 40960 + + +def parse_connection_string(conn_str): + conn_settings = [s.split("=", 1) for s in conn_str.split(";")] + conn_settings = dict(conn_settings) + shared_access_key = conn_settings.get('SharedAccessKey') + shared_access_key_name = conn_settings.get('SharedAccessKeyName') + endpoint = conn_settings.get('Endpoint') + parsed = urlparse(endpoint.rstrip('/')) + namespace = parsed.netloc.strip().split('.')[0] + return { + 'namespace': namespace, + 'endpoint': endpoint, + 'entity_path': conn_settings.get('EntityPath'), + 'shared_access_key_name': shared_access_key_name, + 'shared_access_key': shared_access_key + } + + +class _ServiceTest(PerfStressTest): + service_client = None + async_service_client = None + + def __init__(self, arguments): + super().__init__(arguments) + + connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") + if self.args.no_client_share: + self.service_client = ServiceBusClient.from_connection_string(connection_string) + self.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) + else: + if not _ServiceTest.service_client: + _ServiceTest.service_client = ServiceBusClient.from_connection_string(connection_string) + _ServiceTest.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) + self.service_client = _ServiceTest.service_client + self.async_service_client =_ServiceTest.async_service_client + + @staticmethod + def add_arguments(parser): + super(_ServiceTest, _ServiceTest).add_arguments(parser) + parser.add_argument('--message-size', nargs='?', type=int, help='Size of a single message. Defaults to 100 bytes', default=100) + parser.add_argument('--no-client-share', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False) + parser.add_argument('--num-messages', nargs='?', type=int, help='Number of messages to send or receive. Defaults to 100', default=100) + + +class _QueueTest(_ServiceTest): + queue_name = "perfstress-" + str(uuid.uuid4()) + queue_client = None + async_queue_client = None + + def __init__(self, arguments): + super().__init__(arguments) + connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") + connection_props = parse_connection_string(connection_string) + self.mgmt_client = ServiceBusService( + service_namespace=connection_props['namespace'], + shared_access_key_name=connection_props['shared_access_key_name'], + shared_access_key_value=connection_props['shared_access_key']) + + async def global_setup(self): + await super().global_setup() + queue = Queue(max_size_in_megabytes=MAX_QUEUE_SIZE) + self.mgmt_client.create_queue(self.queue_name, queue=queue) + + async def setup(self): + await super().setup() + # In T1, these operations check for the existance of the queue + # so must be created during setup, rather than in the constructor. + self.queue_client = self.service_client.get_queue(self.queue_name) + self.async_queue_client = self.async_service_client.get_queue(self.queue_name) + + async def global_cleanup(self): + self.mgmt_client.delete_queue(self.queue_name) + await super().global_cleanup() + + +class _SendTest(_QueueTest): + sender = None + async_sender = None + + async def setup(self): + await super().setup() + self.sender = self.queue_client.get_sender() + self.async_sender = self.async_queue_client.get_sender() + self.sender.open() + await self.async_sender.open() + + async def close(self): + self.sender.close() + await self.async_sender.close() + await super().close() + + +class _ReceiveTest(_QueueTest): + receiver = None + async_receiver = None + + async def global_setup(self): + await super().global_setup() + await self._preload_queue() + + async def setup(self): + await super().setup() + mode = ReceiveSettleMode.PeekLock if self.args.peeklock else ReceiveSettleMode.ReceiveAndDelete + self.receiver = self.queue_client.get_receiver( + mode=mode, + prefetch=self.args.num_messages, + idle_timeout=self.args.max_wait_time) + self.async_receiver = self.async_queue_client.get_receiver( + mode=mode, + prefetch=self.args.num_messages, + idle_timeout=self.args.max_wait_time) + self.receiver.open() + await self.async_receiver.open() + + async def _preload_queue(self): + data = get_random_bytes(self.args.message_size) + async_queue_client = self.async_service_client.get_queue(self.queue_name) + async with async_queue_client.get_sender() as sender: + for i in range(self.args.preload): + sender.queue_message(Message(data)) + if i % 1000 == 0: + print("Loaded {} messages".format(i)) + await sender.send_pending_messages() + await sender.send_pending_messages() + + async def close(self): + self.receiver.close() + await self.async_receiver.close() + await super().close() + + @staticmethod + def add_arguments(parser): + super(_ReceiveTest, _ReceiveTest).add_arguments(parser) + parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False) + parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0) + parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_batch.py new file mode 100644 index 000000000000..171786f81144 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_batch.py @@ -0,0 +1,27 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio + +from ._test_base import _ReceiveTest + + +class LegacyReceiveMessageBatchTest(_ReceiveTest): + def run_sync(self): + count = 0 + while count < self.args.num_messages: + batch = self.receiver.fetch_next(max_batch_size=self.args.num_messages - count) + if self.args.peeklock: + for msg in batch: + msg.complete() + count += len(batch) + + async def run_async(self): + count = 0 + while count < self.args.num_messages: + batch = await self.async_receiver.fetch_next(max_batch_size=self.args.num_messages - count) + if self.args.peeklock: + await asyncio.gather(*[m.complete() for m in batch]) + count += len(batch) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_stream.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_stream.py new file mode 100644 index 000000000000..f740302e3c91 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_stream.py @@ -0,0 +1,38 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio + +from ._test_base import _ReceiveTest + + +class LegacyReceiveMessageStreamTest(_ReceiveTest): + def run_sync(self): + count = 0 + if self.args.peeklock: + for msg in self.receiver: + if count >= self.args.num_messages: + break + count += 1 + msg.complete() + else: + for msg in self.receiver: + if count >= self.args.num_messages: + break + count += 1 + + async def run_async(self): + count = 0 + if self.args.peeklock: + async for msg in self.async_receiver: + if count >= self.args.num_messages: + break + count += 1 + await msg.complete() + else: + async for msg in self.async_receiver: + if count >= self.args.num_messages: + break + count += 1 diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message.py new file mode 100644 index 000000000000..0e4c9fb79642 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message.py @@ -0,0 +1,25 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _SendTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import Message +from azure.servicebus.aio import Message as AsyncMessage + + +class LegacySendMessageTest(_SendTest): + def __init__(self, arguments): + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_sync(self): + message = Message(self.data) + self.sender.send(message) + + async def run_async(self): + message = AsyncMessage(self.data) + await self.async_sender.send(message) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message_batch.py new file mode 100644 index 000000000000..fc73cf0bb2d7 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message_batch.py @@ -0,0 +1,26 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _SendTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import BatchMessage + + +class LegacySendMessageBatchTest(_SendTest): + def __init__(self, arguments): + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_sync(self): + messages = (self.data for _ in range(self.args.num_messages)) + batch = BatchMessage(messages) + self.sender.send(batch) + + async def run_async(self): + messages = (self.data for _ in range(self.args.num_messages)) + batch = BatchMessage(messages) + await self.async_sender.send(batch) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt new file mode 100644 index 000000000000..e07a582d0f19 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt @@ -0,0 +1 @@ +azure-servicebus>=0.5,<1.0.0 diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/__init__.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py new file mode 100644 index 000000000000..f0ac79bf020e --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py @@ -0,0 +1,128 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import uuid + +from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes + +from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode, ServiceBusMessage +from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient +from azure.servicebus.aio.management import ServiceBusAdministrationClient + +MAX_QUEUE_SIZE = 40960 + + +class _ServiceTest(PerfStressTest): + service_client = None + async_service_client = None + + def __init__(self, arguments): + super().__init__(arguments) + + connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") + if self.args.no_client_share: + self.service_client = ServiceBusClient.from_connection_string(connection_string) + self.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) + else: + if not _ServiceTest.service_client: + _ServiceTest.service_client = ServiceBusClient.from_connection_string(connection_string) + _ServiceTest.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) + self.service_client = _ServiceTest.service_client + self.async_service_client =_ServiceTest.async_service_client + + + async def close(self): + self.service_client.close() + await self.async_service_client.close() + await super().close() + + @staticmethod + def add_arguments(parser): + super(_ServiceTest, _ServiceTest).add_arguments(parser) + parser.add_argument('--message-size', nargs='?', type=int, help='Size of a single message. Defaults to 100 bytes', default=100) + parser.add_argument('--no-client-share', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False) + parser.add_argument('--num-messages', nargs='?', type=int, help='Number of messages to send or receive. Defaults to 100', default=100) + + +class _QueueTest(_ServiceTest): + queue_name = "perfstress-" + str(uuid.uuid4()) + + def __init__(self, arguments): + super().__init__(arguments) + connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") + self.async_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_string) + + async def global_setup(self): + await super().global_setup() + await self.async_mgmt_client.create_queue(self.queue_name, max_size_in_megabytes=MAX_QUEUE_SIZE) + + async def global_cleanup(self): + await self.async_mgmt_client.delete_queue(self.queue_name) + await super().global_cleanup() + + async def close(self): + await self.async_mgmt_client.close() + await super().close() + + +class _SendTest(_QueueTest): + def __init__(self, arguments): + super().__init__(arguments) + connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") + self.async_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_string) + self.sender = self.service_client.get_queue_sender(self.queue_name) + self.async_sender = self.async_service_client.get_queue_sender(self.queue_name) + + async def close(self): + self.sender.close() + await self.async_sender.close() + await super().close() + + +class _ReceiveTest(_QueueTest): + def __init__(self, arguments): + super().__init__(arguments) + mode = ServiceBusReceiveMode.PEEK_LOCK if self.args.peeklock else ServiceBusReceiveMode.RECEIVE_AND_DELETE + self.receiver = self.service_client.get_queue_receiver( + queue_name=self.queue_name, + receive_mode=mode, + prefetch_count=self.args.num_messages, + max_wait_time=self.args.max_wait_time or None) + self.async_receiver = self.async_service_client.get_queue_receiver( + queue_name=self.queue_name, + receive_mode=mode, + prefetch_count=self.args.num_messages, + max_wait_time=self.args.max_wait_time or None) + + async def _preload_queue(self): + data = get_random_bytes(self.args.message_size) + async with self.async_service_client.get_queue_sender(self.queue_name) as sender: + batch = await sender.create_message_batch() + for i in range(self.args.preload): + try: + batch.add_message(ServiceBusMessage(data)) + except ValueError: + # Batch full + await sender.send_messages(batch) + print("Loaded {} messages".format(i)) + batch = await sender.create_message_batch() + batch.add_message(ServiceBusMessage(data)) + await sender.send_messages(batch) + + async def global_setup(self): + await super().global_setup() + await self._preload_queue() + + async def close(self): + self.receiver.close() + await self.async_receiver.close() + await super().close() + + @staticmethod + def add_arguments(parser): + super(_ReceiveTest, _ReceiveTest).add_arguments(parser) + parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False) + parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0) + parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_batch.py new file mode 100644 index 000000000000..e46eb331bb16 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_batch.py @@ -0,0 +1,31 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio + +from ._test_base import _ReceiveTest + + +class ReceiveMessageBatchTest(_ReceiveTest): + def run_sync(self): + count = 0 + while count < self.args.num_messages: + batch = self.receiver.receive_messages( + max_message_count=self.args.num_messages - count, + max_wait_time=self.args.max_wait_time or None) + if self.args.peeklock: + for msg in batch: + self.receiver.complete_message(msg) + count += len(batch) + + async def run_async(self): + count = 0 + while count < self.args.num_messages: + batch = await self.async_receiver.receive_messages( + max_message_count=self.args.num_messages - count, + max_wait_time=self.args.max_wait_time or None) + if self.args.peeklock: + await asyncio.gather(*[self.async_receiver.complete_message(m) for m in batch]) + count += len(batch) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_stream.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_stream.py new file mode 100644 index 000000000000..f9ef6473481b --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_stream.py @@ -0,0 +1,38 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio + +from ._test_base import _ReceiveTest + + +class ReceiveMessageStreamTest(_ReceiveTest): + def run_sync(self): + count = 0 + if self.args.peeklock: + for msg in self.receiver: + if count >= self.args.num_messages: + break + count += 1 + self.receiver.complete_message(msg) + else: + for msg in self.receiver: + if count >= self.args.num_messages: + break + count += 1 + + async def run_async(self): + count = 0 + if self.args.peeklock: + async for msg in self.async_receiver: + if count >= self.args.num_messages: + break + count += 1 + await self.async_receiver.complete_message(msg) + else: + async for msg in self.async_receiver: + if count >= self.args.num_messages: + break + count += 1 \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message.py new file mode 100644 index 000000000000..03887562c5a2 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message.py @@ -0,0 +1,23 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _SendTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import ServiceBusMessage + +class SendMessageTest(_SendTest): + def __init__(self, arguments): + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_sync(self): + message = ServiceBusMessage(self.data) + self.sender.send_messages(message) + + async def run_async(self): + message = ServiceBusMessage(self.data) + await self.async_sender.send_messages(message) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message_batch.py new file mode 100644 index 000000000000..78bb0bf8f669 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message_batch.py @@ -0,0 +1,40 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _SendTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import ServiceBusMessage + + +class SendMessageBatchTest(_SendTest): + def __init__(self, arguments): + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_sync(self): + batch = self.sender.create_message_batch() + for i in range(self.args.num_messages): + try: + batch.add_message(ServiceBusMessage(self.data)) + except ValueError: + # Batch full + self.sender.send_messages(batch) + batch = self.sender.create_message_batch() + batch.add_message(ServiceBusMessage(self.data)) + self.sender.send_messages(batch) + + async def run_async(self): + batch = await self.async_sender.create_message_batch() + for i in range(self.args.num_messages): + try: + batch.add_message(ServiceBusMessage(self.data)) + except ValueError: + # Batch full + await self.async_sender.send_messages(batch) + batch = await self.async_sender.create_message_batch() + batch.add_message(ServiceBusMessage(self.data)) + await self.async_sender.send_messages(batch)