Skip to content

Commit 9fb2f34

Browse files
authored
[Perfstress][ServiceBus] Added ServiceBus perf tests (#16066)
* First test pass * Updated legacy tests * Fixed SB tests * Improved cleanup * Fix batch send * Bump queue size * Memory management * Preload more messages * Custom msg preload * Memory management * Added logging * Fix setup * Share data * Fixed setup * Review feedback * Added readme * Ignore readme * Some flag fixes * Fix arg name
1 parent ab7d89e commit 9fb2f34

15 files changed

+601
-0
lines changed

eng/.docsettings.yml

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ omitted_paths:
88
- doc/*
99
- sdk/**/samples/*
1010
- sdk/identity/azure-identity/tests/*
11+
- sdk/**/tests/perfstress_tests/*
1112

1213
language: python
1314
root_check_enabled: True
@@ -90,6 +91,7 @@ known_content_issues:
9091
- ['sdk/schemaregistry/azure-schemaregistry/swagger/README.md', '#4554']
9192
- ['sdk/servicebus/azure-servicebus/README.md', '#4554']
9293
- ['sdk/servicebus/azure-servicebus/swagger/README.md', '#4554']
94+
- ['sdk/servicebus/azure-servicebus/tests/perf_tests/README.md', '#4554']
9395
- ['sdk/servicefabric/azure-servicefabric/README.md', '#4554']
9496
- ['sdk/storage/azure-storage-nspkg/README.md', '#4554']
9597
- ['sdk/storage/azure-storage-blob/swagger/README.md', '#4554']
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# ServiceBus Performance Tests
2+
3+
In order to run the performance tests, the `azure-devtools` package must be installed. This is done as part of the `dev_requirements`.
4+
Start be creating a new virtual environment for your perf tests. This will need to be a Python 3 environment, preferably >=3.7.
5+
Note that tests for T1 and T2 SDKs cannot be run from the same environment, and will need to be setup separately.
6+
7+
### Setup for T2 perf test runs
8+
9+
```cmd
10+
(env) ~/azure-servicebus> pip install -r dev_requirements.txt
11+
(env) ~/azure-servicebus> pip install .
12+
```
13+
14+
### Setup for T1 perf test runs
15+
16+
```cmd
17+
(env) ~/azure-servicebus> pip install -r dev_requirements.txt
18+
(env) ~/azure-servicebus> pip install tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt
19+
```
20+
21+
## Test commands
22+
23+
When `azure-devtools` is installed, you will have access to the `perfstress` command line tool, which will scan the current module for runable perf tests. Only a specific test can be run at a time (i.e. there is no "run all" feature).
24+
25+
```cmd
26+
(env) ~/azure-servicebus> cd tests
27+
(env) ~/azure-servicebus/tests> perfstress
28+
```
29+
Using the `perfstress` command alone will list the available perf tests found. Note that the available tests discovered will vary depending on whether your environment is configured for the T1 or T2 SDK.
30+
31+
### Common perf command line options
32+
These options are available for all perf tests:
33+
- `--duration=10` Number of seconds to run as many operations (the "run" function) as possible. Default is 10.
34+
- `--iterations=1` Number of test iterations to run. Default is 1.
35+
- `--parallel=1` Number of tests to run in parallel. Default is 1.
36+
- `--no-client-share` Whether each parallel test instance should share a single client, or use their own. Default is False (sharing).
37+
- `--warm-up=5` Number of seconds to spend warming up the connection before measuing begins. Default is 5.
38+
- `--sync` Whether to run the tests in sync or async. Default is False (async).
39+
- `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted).
40+
41+
### Common Service Bus command line options
42+
The options are available for all SB perf tests:
43+
- `--message-size=100` Number of bytes each message contains. Default is 100.
44+
- `--num-messages` Number of messages to send/receive as part of a single run.
45+
46+
#### Receive command line options
47+
The receiving tests have these additional command line options:
48+
- `--peeklock` Whether to run the test using peeklock or receive and delete. If peeklock is used, messages will be completed. Default is False (receive and delete).
49+
- `--max-wait-time=0` The max time to wait for the specified number of messages to be received. Default is 0 (indefinitely).
50+
- `--preload=10000` The number of messages to preload into the queue before the receiving tests start. Default is 10000 messages.
51+
52+
### T2 Tests
53+
The tests currently written for the T2 SDK:
54+
- `SendMessageTest` Sends a single message per run.
55+
- `SendMessageBatchTest` Sends `num-messages` in a batch per run.
56+
- `ReceiveMessageStreamTest` Receives `num-messages` using an iterator. Receive command options apply.
57+
- `ReceiveMessageBatchTest` Receives `num-messages` using a single fetch call. Receive command options apply.
58+
59+
### T1 Tests
60+
The tests currently written for the T2 SDK:
61+
- `LegacySendMessageTest` Sends a single message per run.
62+
- `LegacySendMessageBatchTest` Sends `num-messages` in a batch per run.
63+
- `LegacyReceiveMessageStreamTest` Receives `num-messages` using an iterator. Receive command options apply.
64+
- `LegacyReceiveMessageBatchTest` Receives `num-messages` using a single fetch call. Receive command options apply.
65+
66+
## Example command
67+
```cmd
68+
(env) ~/azure-servicebus/tests> perfstress ReceiveMessageBatchTest --parallel=2 --message-size=10240 --num-messages=100 --peeklock
69+
```

sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
import uuid
7+
from urllib.parse import urlparse
8+
9+
from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes
10+
11+
from azure.servicebus import ServiceBusClient, ReceiveSettleMode, Message
12+
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient
13+
from azure.servicebus.control_client import ServiceBusService
14+
from azure.servicebus.control_client.models import Queue
15+
16+
MAX_QUEUE_SIZE = 40960
17+
18+
19+
def parse_connection_string(conn_str):
20+
conn_settings = [s.split("=", 1) for s in conn_str.split(";")]
21+
conn_settings = dict(conn_settings)
22+
shared_access_key = conn_settings.get('SharedAccessKey')
23+
shared_access_key_name = conn_settings.get('SharedAccessKeyName')
24+
endpoint = conn_settings.get('Endpoint')
25+
parsed = urlparse(endpoint.rstrip('/'))
26+
namespace = parsed.netloc.strip().split('.')[0]
27+
return {
28+
'namespace': namespace,
29+
'endpoint': endpoint,
30+
'entity_path': conn_settings.get('EntityPath'),
31+
'shared_access_key_name': shared_access_key_name,
32+
'shared_access_key': shared_access_key
33+
}
34+
35+
36+
class _ServiceTest(PerfStressTest):
37+
service_client = None
38+
async_service_client = None
39+
40+
def __init__(self, arguments):
41+
super().__init__(arguments)
42+
43+
connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING")
44+
if self.args.no_client_share:
45+
self.service_client = ServiceBusClient.from_connection_string(connection_string)
46+
self.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string)
47+
else:
48+
if not _ServiceTest.service_client:
49+
_ServiceTest.service_client = ServiceBusClient.from_connection_string(connection_string)
50+
_ServiceTest.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string)
51+
self.service_client = _ServiceTest.service_client
52+
self.async_service_client =_ServiceTest.async_service_client
53+
54+
@staticmethod
55+
def add_arguments(parser):
56+
super(_ServiceTest, _ServiceTest).add_arguments(parser)
57+
parser.add_argument('--message-size', nargs='?', type=int, help='Size of a single message. Defaults to 100 bytes', default=100)
58+
parser.add_argument('--no-client-share', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False)
59+
parser.add_argument('--num-messages', nargs='?', type=int, help='Number of messages to send or receive. Defaults to 100', default=100)
60+
61+
62+
class _QueueTest(_ServiceTest):
63+
queue_name = "perfstress-" + str(uuid.uuid4())
64+
queue_client = None
65+
async_queue_client = None
66+
67+
def __init__(self, arguments):
68+
super().__init__(arguments)
69+
connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING")
70+
connection_props = parse_connection_string(connection_string)
71+
self.mgmt_client = ServiceBusService(
72+
service_namespace=connection_props['namespace'],
73+
shared_access_key_name=connection_props['shared_access_key_name'],
74+
shared_access_key_value=connection_props['shared_access_key'])
75+
76+
async def global_setup(self):
77+
await super().global_setup()
78+
queue = Queue(max_size_in_megabytes=MAX_QUEUE_SIZE)
79+
self.mgmt_client.create_queue(self.queue_name, queue=queue)
80+
81+
async def setup(self):
82+
await super().setup()
83+
# In T1, these operations check for the existance of the queue
84+
# so must be created during setup, rather than in the constructor.
85+
self.queue_client = self.service_client.get_queue(self.queue_name)
86+
self.async_queue_client = self.async_service_client.get_queue(self.queue_name)
87+
88+
async def global_cleanup(self):
89+
self.mgmt_client.delete_queue(self.queue_name)
90+
await super().global_cleanup()
91+
92+
93+
class _SendTest(_QueueTest):
94+
sender = None
95+
async_sender = None
96+
97+
async def setup(self):
98+
await super().setup()
99+
self.sender = self.queue_client.get_sender()
100+
self.async_sender = self.async_queue_client.get_sender()
101+
self.sender.open()
102+
await self.async_sender.open()
103+
104+
async def close(self):
105+
self.sender.close()
106+
await self.async_sender.close()
107+
await super().close()
108+
109+
110+
class _ReceiveTest(_QueueTest):
111+
receiver = None
112+
async_receiver = None
113+
114+
async def global_setup(self):
115+
await super().global_setup()
116+
await self._preload_queue()
117+
118+
async def setup(self):
119+
await super().setup()
120+
mode = ReceiveSettleMode.PeekLock if self.args.peeklock else ReceiveSettleMode.ReceiveAndDelete
121+
self.receiver = self.queue_client.get_receiver(
122+
mode=mode,
123+
prefetch=self.args.num_messages,
124+
idle_timeout=self.args.max_wait_time)
125+
self.async_receiver = self.async_queue_client.get_receiver(
126+
mode=mode,
127+
prefetch=self.args.num_messages,
128+
idle_timeout=self.args.max_wait_time)
129+
self.receiver.open()
130+
await self.async_receiver.open()
131+
132+
async def _preload_queue(self):
133+
data = get_random_bytes(self.args.message_size)
134+
async_queue_client = self.async_service_client.get_queue(self.queue_name)
135+
async with async_queue_client.get_sender() as sender:
136+
for i in range(self.args.preload):
137+
sender.queue_message(Message(data))
138+
if i % 1000 == 0:
139+
print("Loaded {} messages".format(i))
140+
await sender.send_pending_messages()
141+
await sender.send_pending_messages()
142+
143+
async def close(self):
144+
self.receiver.close()
145+
await self.async_receiver.close()
146+
await super().close()
147+
148+
@staticmethod
149+
def add_arguments(parser):
150+
super(_ReceiveTest, _ReceiveTest).add_arguments(parser)
151+
parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False)
152+
parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0)
153+
parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
import asyncio
7+
8+
from ._test_base import _ReceiveTest
9+
10+
11+
class LegacyReceiveMessageBatchTest(_ReceiveTest):
12+
def run_sync(self):
13+
count = 0
14+
while count < self.args.num_messages:
15+
batch = self.receiver.fetch_next(max_batch_size=self.args.num_messages - count)
16+
if self.args.peeklock:
17+
for msg in batch:
18+
msg.complete()
19+
count += len(batch)
20+
21+
async def run_async(self):
22+
count = 0
23+
while count < self.args.num_messages:
24+
batch = await self.async_receiver.fetch_next(max_batch_size=self.args.num_messages - count)
25+
if self.args.peeklock:
26+
await asyncio.gather(*[m.complete() for m in batch])
27+
count += len(batch)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
import asyncio
7+
8+
from ._test_base import _ReceiveTest
9+
10+
11+
class LegacyReceiveMessageStreamTest(_ReceiveTest):
12+
def run_sync(self):
13+
count = 0
14+
if self.args.peeklock:
15+
for msg in self.receiver:
16+
if count >= self.args.num_messages:
17+
break
18+
count += 1
19+
msg.complete()
20+
else:
21+
for msg in self.receiver:
22+
if count >= self.args.num_messages:
23+
break
24+
count += 1
25+
26+
async def run_async(self):
27+
count = 0
28+
if self.args.peeklock:
29+
async for msg in self.async_receiver:
30+
if count >= self.args.num_messages:
31+
break
32+
count += 1
33+
await msg.complete()
34+
else:
35+
async for msg in self.async_receiver:
36+
if count >= self.args.num_messages:
37+
break
38+
count += 1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
from ._test_base import _SendTest
7+
8+
from azure_devtools.perfstress_tests import get_random_bytes
9+
10+
from azure.servicebus import Message
11+
from azure.servicebus.aio import Message as AsyncMessage
12+
13+
14+
class LegacySendMessageTest(_SendTest):
15+
def __init__(self, arguments):
16+
super().__init__(arguments)
17+
self.data = get_random_bytes(self.args.message_size)
18+
19+
def run_sync(self):
20+
message = Message(self.data)
21+
self.sender.send(message)
22+
23+
async def run_async(self):
24+
message = AsyncMessage(self.data)
25+
await self.async_sender.send(message)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License. See License.txt in the project root for license information.
4+
# --------------------------------------------------------------------------------------------
5+
6+
from ._test_base import _SendTest
7+
8+
from azure_devtools.perfstress_tests import get_random_bytes
9+
10+
from azure.servicebus import BatchMessage
11+
12+
13+
class LegacySendMessageBatchTest(_SendTest):
14+
def __init__(self, arguments):
15+
super().__init__(arguments)
16+
self.data = get_random_bytes(self.args.message_size)
17+
18+
def run_sync(self):
19+
messages = (self.data for _ in range(self.args.num_messages))
20+
batch = BatchMessage(messages)
21+
self.sender.send(batch)
22+
23+
async def run_async(self):
24+
messages = (self.data for _ in range(self.args.num_messages))
25+
batch = BatchMessage(messages)
26+
await self.async_sender.send(batch)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
azure-servicebus>=0.5,<1.0.0

sdk/servicebus/azure-servicebus/tests/perf_tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)