Skip to content

[Perfstress][ServiceBus] Added ServiceBus perf tests #16066

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import uuid
from urllib.parse import urlparse

from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes

from azure.servicebus import ServiceBusClient, ReceiveSettleMode, Message
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient
from azure.servicebus.control_client import ServiceBusService
from azure.servicebus.control_client.models import Queue


def parse_connection_string(conn_str):
conn_settings = [s.split("=", 1) for s in conn_str.split(";")]
conn_settings = dict(conn_settings)
shared_access_key = conn_settings.get('SharedAccessKey')
shared_access_key_name = conn_settings.get('SharedAccessKeyName')
endpoint = conn_settings.get('Endpoint')
parsed = urlparse(endpoint.rstrip('/'))
namespace = parsed.netloc.strip().split('.')[0]
return {
'namespace': namespace,
'endpoint': endpoint,
'entity_path': conn_settings.get('EntityPath'),
'shared_access_key_name': shared_access_key_name,
'shared_access_key': shared_access_key
}


class _ServiceTest(PerfStressTest):
service_client = None
async_service_client = None

def __init__(self, arguments):
super().__init__(arguments)

connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING")
if not _ServiceTest.service_client or self.args.service_client_per_instance:
_ServiceTest.service_client = ServiceBusClient.from_connection_string(connection_string)
_ServiceTest.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string)

self.service_client = _ServiceTest.service_client
self.async_service_client =_ServiceTest.async_service_client

@staticmethod
def add_arguments(parser):
super(_ServiceTest, _ServiceTest).add_arguments(parser)
parser.add_argument('--message-size', nargs='?', type=int, help='Size of a single message. Defaults to 100 bytes', default=100)
parser.add_argument('--service-client-per-instance', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False)
parser.add_argument('--num-messages', nargs='?', type=int, help='Number of messages to send or receive. Defaults to 100', default=100)


class _QueueTest(_ServiceTest):
queue_name = "perfstress-" + str(uuid.uuid4())
queue_client = None
async_queue_client = None

def __init__(self, arguments):
super().__init__(arguments)
connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING")
connection_props = parse_connection_string(connection_string)
self.mgmt_client = ServiceBusService(
service_namespace=connection_props['namespace'],
shared_access_key_name=connection_props['shared_access_key_name'],
shared_access_key_value=connection_props['shared_access_key'])

async def global_setup(self):
await super().global_setup()
queue = Queue(max_size_in_megabytes=40960)
self.mgmt_client.create_queue(self.queue_name, queue=queue)

async def setup(self):
await super().setup()
# In T1, these operations check for the existance of the queue
# so must be created during setup, rather than in the constructor.
self.queue_client = self.service_client.get_queue(self.queue_name)
self.async_queue_client = self.async_service_client.get_queue(self.queue_name)

async def global_cleanup(self):
self.mgmt_client.delete_queue(self.queue_name)
await super().global_cleanup()


class _SendTest(_QueueTest):
sender = None
async_sender = None

async def setup(self):
await super().setup()
self.sender = self.queue_client.get_sender()
self.async_sender = self.async_queue_client.get_sender()
self.sender.open()
await self.async_sender.open()

async def close(self):
self.sender.close()
await self.async_sender.close()
await super().close()


class _ReceiveTest(_QueueTest):
receiver = None
async_receiver = None

async def global_setup(self):
await super().global_setup()
await self._preload_queue()

async def setup(self):
await super().setup()
mode = ReceiveSettleMode.PeekLock if self.args.peeklock else ReceiveSettleMode.ReceiveAndDelete
self.receiver = self.queue_client.get_receiver(
mode=mode,
prefetch=self.args.num_messages,
idle_timeout=self.args.max_wait_time)
self.async_receiver = self.async_queue_client.get_receiver(
mode=mode,
prefetch=self.args.num_messages,
idle_timeout=self.args.max_wait_time)
self.receiver.open()
await self.async_receiver.open()

async def _preload_queue(self):
data = get_random_bytes(self.args.message_size)
async_queue_client = self.async_service_client.get_queue(self.queue_name)
async with async_queue_client.get_sender() as sender:
for i in range(self.args.preload):
sender.queue_message(Message(data))
if i % 1000 == 0:
print("Loaded {} messages".format(i))
await sender.send_pending_messages()
await sender.send_pending_messages()

async def close(self):
self.receiver.close()
await self.async_receiver.close()
await super().close()

@staticmethod
def add_arguments(parser):
super(_ReceiveTest, _ReceiveTest).add_arguments(parser)
parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False)
parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0)
parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio

from ._test_base import _ReceiveTest


class LegacyReceiveMessageBatchTest(_ReceiveTest):
def run_sync(self):
batch = self.receiver.fetch_next(max_batch_size=self.args.num_messages)
if self.args.peeklock:
for msg in batch:
msg.complete()

async def run_async(self):
batch = await self.async_receiver.fetch_next(max_batch_size=self.args.num_messages)
if self.args.peeklock:
await asyncio.gather(*[m.complete() for m in batch])
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import asyncio

from ._test_base import _ReceiveTest


class LegacyReceiveMessageStreamTest(_ReceiveTest):
def run_sync(self):
count = 0
if self.args.peeklock:
for msg in self.receiver:
if count >= self.args.num_messages:
break
count += 1
msg.complete()
else:
for msg in self.receiver:
if count >= self.args.num_messages:
break
count += 1

async def run_async(self):
count = 0
if self.args.peeklock:
async for msg in self.async_receiver:
if count >= self.args.num_messages:
break
count += 1
await msg.complete()
else:
async for msg in self.async_receiver:
if count >= self.args.num_messages:
break
count += 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from ._test_base import _SendTest

from azure_devtools.perfstress_tests import get_random_bytes

from azure.servicebus import Message
from azure.servicebus.aio import Message as AsyncMessage


class LegacySendMessageTest(_SendTest):
def __init__(self, arguments):
super().__init__(arguments)
self.data = get_random_bytes(self.args.message_size)

def run_sync(self):
message = Message(self.data)
self.sender.send(message)

async def run_async(self):
message = AsyncMessage(self.data)
await self.async_sender.send(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from ._test_base import _SendTest

from azure_devtools.perfstress_tests import get_random_bytes

from azure.servicebus import BatchMessage


class LegacySendMessageBatchTest(_SendTest):
def __init__(self, arguments):
super().__init__(arguments)
self.data = get_random_bytes(self.args.message_size)

def run_sync(self):
messages = (self.data for _ in range(self.args.num_messages))
batch = BatchMessage(messages)
self.sender.send(batch)

async def run_async(self):
messages = (self.data for _ in range(self.args.num_messages))
batch = BatchMessage(messages)
await self.async_sender.send(batch)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
azure-servicebus>=0.5,<1.0.0
Empty file.
121 changes: 121 additions & 0 deletions sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import uuid

from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes

from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode, ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient
from azure.servicebus.aio.management import ServiceBusAdministrationClient


class _ServiceTest(PerfStressTest):
service_client = None
async_service_client = None

def __init__(self, arguments):
super().__init__(arguments)

connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING")
if not _ServiceTest.service_client or self.args.service_client_per_instance:
_ServiceTest.service_client = ServiceBusClient.from_connection_string(conn_str=connection_string)
_ServiceTest.async_service_client = AsyncServiceBusClient.from_connection_string(conn_str=connection_string)

self.service_client = _ServiceTest.service_client
self.async_service_client =_ServiceTest.async_service_client

async def close(self):
await self.async_service_client.close()
await super().close()

@staticmethod
def add_arguments(parser):
super(_ServiceTest, _ServiceTest).add_arguments(parser)
parser.add_argument('--message-size', nargs='?', type=int, help='Size of a single message. Defaults to 100 bytes', default=100)
parser.add_argument('--service-client-per-instance', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False)
parser.add_argument('--num-messages', nargs='?', type=int, help='Number of messages to send or receive. Defaults to 100', default=100)


class _QueueTest(_ServiceTest):
queue_name = "perfstress-" + str(uuid.uuid4())

def __init__(self, arguments):
super().__init__(arguments)
connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING")
self.async_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_string)

async def global_setup(self):
await super().global_setup()
await self.async_mgmt_client.create_queue(self.queue_name, max_size_in_megabytes=40960)

async def global_cleanup(self):
await self.async_mgmt_client.delete_queue(self.queue_name)
await super().global_cleanup()

async def close(self):
await self.async_mgmt_client.close()
await super().close()


class _SendTest(_QueueTest):
def __init__(self, arguments):
super().__init__(arguments)
connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING")
self.async_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_string)
self.sender = self.service_client.get_queue_sender(self.queue_name)
self.async_sender = self.async_service_client.get_queue_sender(self.queue_name)

async def close(self):
self.sender.close()
await self.async_sender.close()
await super().close()


class _ReceiveTest(_QueueTest):
def __init__(self, arguments):
super().__init__(arguments)
mode = ServiceBusReceiveMode.PEEK_LOCK if self.args.peeklock else ServiceBusReceiveMode.RECEIVE_AND_DELETE
self.receiver = self.service_client.get_queue_receiver(
queue_name=self.queue_name,
receive_mode=mode,
prefetch_count=self.args.num_messages,
max_wait_time=self.args.max_wait_time or None)
self.async_receiver = self.async_service_client.get_queue_receiver(
queue_name=self.queue_name,
receive_mode=mode,
prefetch_count=self.args.num_messages,
max_wait_time=self.args.max_wait_time or None)

async def _preload_queue(self):
data = get_random_bytes(self.args.message_size)
async with self.async_service_client.get_queue_sender(self.queue_name) as sender:
batch = await sender.create_message_batch()
for i in range(self.args.preload):
try:
batch.add_message(ServiceBusMessage(data))
except ValueError:
# Batch full
await sender.send_messages(batch)
print("Loaded {} messages".format(i))
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(data))
await sender.send_messages(batch)

async def global_setup(self):
await super().global_setup()
await self._preload_queue()

async def close(self):
self.receiver.close()
await self.async_receiver.close()
await super().close()

@staticmethod
def add_arguments(parser):
super(_ReceiveTest, _ReceiveTest).add_arguments(parser)
parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False)
parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0)
parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000)
Loading