From 26758677a9a2328c8007802eb90812d41d4b4478 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Wed, 8 Jul 2020 17:47:04 -0700 Subject: [PATCH 1/8] resend received message --- .../azure/servicebus/_common/message.py | 18 ++++++++++++++++++ .../azure/servicebus/_servicebus_sender.py | 7 ++++++- .../servicebus/aio/_servicebus_sender_async.py | 5 ++++- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 69bd3bb7b5e2..36b80b92e40c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -114,6 +114,19 @@ def _build_message(self, body): else: self.message = uamqp.Message(body, properties=self.properties, header=self.header) + @classmethod + def _from_received_message(cls, received_message): + amqp_message = received_message.message + amqp_body = amqp_message._body + + if isinstance(amqp_body, uamqp.message.DataBody): + body = b''.join(amqp_body.data) + else: + # amqp_body is type of uamqp.message.ValueBody + body = amqp_body.data + # TODO: other properties to be copied from the received message + return cls(body=body) + @property def session_id(self): # type: () -> str @@ -322,6 +335,8 @@ def _from_list(self, messages): if not isinstance(each, Message): raise ValueError("Only Message or an iterable object containing Message objects are accepted." "Received instead: {}".format(each.__class__.__name__)) + if isinstance(each, (PeekMessage, ReceivedMessage)): + each = Message._from_received_message(each) # pylint: disable=protected-access self.add(each) @property @@ -346,6 +361,9 @@ def add(self, message): :rtype: None :raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit. """ + if isinstance(message, (PeekMessage, ReceivedMessage)): + message = Message._from_received_message(message) # pylint: disable=protected-access + message_size = message.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 96d5442d741b..9e4bffafeffb 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -13,7 +13,7 @@ from ._base_handler import BaseHandler, ServiceBusSharedKeyCredential, _convert_connection_string_to_kwargs from ._common import mgmt_handlers -from ._common.message import Message, BatchMessage +from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage from .exceptions import ( OperationTimeoutError, _ServiceBusErrorPolicy, @@ -68,6 +68,8 @@ def _build_schedule_request(cls, schedule_time_utc, *messages): if not isinstance(message, Message): raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." " Received instead: {}".format(message.__class__.__name__)) + if isinstance(message, (PeekMessage,ReceivedMessage)): + message = Message._from_received_message(message) message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.properties.message_id @@ -333,6 +335,9 @@ def send_messages(self, message): if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") + if isinstance(message, (PeekMessage, ReceivedMessage)): + message = Message._from_received_message(message) # pylint: disable=protected-access + self._do_retryable_operation( self._send, message=message, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index c59bc36a5037..8ad021ec5cec 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -9,7 +9,7 @@ import uamqp from uamqp import SendClientAsync, types -from .._common.message import Message, BatchMessage +from .._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage from .._base_handler import _convert_connection_string_to_kwargs from .._servicebus_sender import SenderMixin from ._base_handler_async import BaseHandler, ServiceBusSharedKeyCredential @@ -274,6 +274,9 @@ async def send_messages(self, message): if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") + if isinstance(message, (PeekMessage, ReceivedMessage)): + message = Message._from_received_message(message) # pylint: disable=protected-access + await self._do_retryable_operation( self._send, message=message, From c1b39bc71f4a71b5614957364f9e2e3af31f8ee7 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Sat, 18 Jul 2020 19:37:27 -0700 Subject: [PATCH 2/8] update implementation and add test --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 4 +- .../azure/servicebus/_common/message.py | 24 +++- .../azure/servicebus/_servicebus_sender.py | 6 +- .../aio/_servicebus_sender_async.py | 4 +- .../tests/async_tests/test_queues_async.py | 26 +++- .../azure-servicebus/tests/test_queues.py | 127 +++++++++++++++--- .../azure-servicebus/tests/test_sessions.py | 20 ++- 7 files changed, 176 insertions(+), 35 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index f424295345a0..70edd1e73362 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -6,9 +6,9 @@ * Added new properties to Message, PeekMessage and ReceivedMessage: `content_type`, `correlation_id`, `label`, `message_id`, `reply_to`, `reply_to_session_id` and `to`. Please refer to the docstring for further information. - -* Add new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`, +* Added new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`, `dead_letter_reason`, `dead_letter_source`, `delivery_count` and `expires_at_utc`. Please refer to the docstring for further information. +* Added support for sending received messages via `ServiceBusSender.send_messages`. **Breaking Changes** diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index af5e2cc0144b..0842a0cc2918 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -154,6 +154,7 @@ def _set_message_annotations(self, key, value): @classmethod def _from_received_message(cls, received_message): + # type: (Message) -> Message amqp_message = received_message.message amqp_body = amqp_message._body @@ -162,8 +163,23 @@ def _from_received_message(cls, received_message): else: # amqp_body is type of uamqp.message.ValueBody body = amqp_body.data - # TODO: other properties to be copied from the received message - return cls(body=body) + + return cls( + body=body, + content_type=received_message.content_type, + correlation_id=received_message.correlation_id, + label=received_message.label, + message_id=received_message.message_id, + partition_key=received_message.partition_key, + properties=received_message.properties, + reply_to=received_message.reply_to, + reply_to_session_id=received_message.reply_to_session_id, + session_id=received_message.session_id, + scheduled_enqueue_time_utc=received_message.scheduled_enqueue_time_utc, + time_to_live=received_message.time_to_live, + to=received_message.to, + via_partition_key=received_message.via_partition_key + ) @property def session_id(self): @@ -529,7 +545,7 @@ def _from_list(self, messages): if not isinstance(each, Message): raise ValueError("Only Message or an iterable object containing Message objects are accepted." "Received instead: {}".format(each.__class__.__name__)) - if isinstance(each, (PeekMessage, ReceivedMessage)): + if isinstance(each, (PeekMessage, ReceivedMessageBase)): each = Message._from_received_message(each) # pylint: disable=protected-access self.add(each) @@ -555,7 +571,7 @@ def add(self, message): :rtype: None :raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit. """ - if isinstance(message, (PeekMessage, ReceivedMessage)): + if isinstance(message, (PeekMessage, ReceivedMessageBase)): message = Message._from_received_message(message) # pylint: disable=protected-access message_size = message.message.get_message_encoded_size() diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 67d2f0d1a9bc..7b86eccfd73b 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -13,7 +13,7 @@ from ._base_handler import BaseHandler, ServiceBusSharedKeyCredential, _convert_connection_string_to_kwargs from ._common import mgmt_handlers -from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage +from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage, ReceivedMessageBase from .exceptions import ( OperationTimeoutError, _ServiceBusErrorPolicy, @@ -68,7 +68,7 @@ def _build_schedule_request(cls, schedule_time_utc, *messages): if not isinstance(message, Message): raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." " Received instead: {}".format(message.__class__.__name__)) - if isinstance(message, (PeekMessage,ReceivedMessage)): + if isinstance(message, (PeekMessage, ReceivedMessageBase)): message = Message._from_received_message(message) message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} @@ -335,7 +335,7 @@ def send_messages(self, message): if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") - if isinstance(message, (PeekMessage, ReceivedMessage)): + if isinstance(message, (PeekMessage, ReceivedMessageBase)): message = Message._from_received_message(message) # pylint: disable=protected-access self._do_retryable_operation( diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 8ad021ec5cec..3c93c7ac59bf 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -9,7 +9,7 @@ import uamqp from uamqp import SendClientAsync, types -from .._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage +from .._common.message import Message, BatchMessage, PeekMessage, ReceivedMessageBase from .._base_handler import _convert_connection_string_to_kwargs from .._servicebus_sender import SenderMixin from ._base_handler_async import BaseHandler, ServiceBusSharedKeyCredential @@ -274,7 +274,7 @@ async def send_messages(self, message): if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") - if isinstance(message, (PeekMessage, ReceivedMessage)): + if isinstance(message, (PeekMessage, ReceivedMessageBase)): message = Message._from_received_message(message) # pylint: disable=protected-access await self._do_retryable_operation( diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 3a0968ab5368..f4b841af14ee 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1156,13 +1156,15 @@ def message_content(): for i in range(20): yield Message("Message no. {}".format(i)) - async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + async with sender, receiver: message = BatchMessage() for each in message_content(): message.add(each) await sender.send_messages(message) - async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: receive_counter = 0 message_received_cnt = 0 while message_received_cnt < 20: @@ -1173,6 +1175,26 @@ def message_content(): message_received_cnt += len(messages) for m in messages: print_message(_logger, m) + await sender.send_messages(message) + await m.complete() + + assert message_received_cnt == 20 + # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration + assert receive_counter < 10 # Dynamic link credit issuing come info effect + + # received resent messages + + receive_counter = 0 + message_received_cnt = 0 + while message_received_cnt < 20: + messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + if not messages: + break + receive_counter += 1 + message_received_cnt += len(messages) + for m in messages: + print_message(_logger, m) + await sender.send_messages(message) await m.complete() assert message_received_cnt == 20 diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 1185bfb5136c..a3c42ad27acf 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -744,23 +744,65 @@ def test_queue_by_servicebus_client_browse_messages_with_receiver(self, serviceb with ServiceBusClient.from_connection_string( servicebus_namespace_connection_string, logging_enable=False) as sb_client: - - with sb_client.get_queue_receiver(servicebus_queue.name, - idle_timeout=5, - mode=ReceiveSettleMode.PeekLock) as receiver: - with sb_client.get_queue_sender(servicebus_queue.name) as sender: - for i in range(5): - message = Message("Test message no. {}".format(i)) - sender.send_messages(message) + + receiver = sb_client.get_queue_receiver(servicebus_queue.name, + idle_timeout=5, + mode=ReceiveSettleMode.PeekLock) + sender = sb_client.get_queue_sender(servicebus_queue.name) + with receiver, sender: + for i in range(5): + message = Message( + body="Test message", + properties={'key': 'value'}, + label='label', + content_type='application/text', + correlation_id='cid', + message_id='mid', + partition_key='pk', + via_partition_key='via_pk', + to='to', + reply_to='reply_to', + time_to_live=timedelta(seconds=60) + ) + sender.send_messages(message) messages = receiver.peek_messages(5) assert len(messages) > 0 assert all(isinstance(m, PeekMessage) for m in messages) for message in messages: print_message(_logger, message) + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) with pytest.raises(AttributeError): message.complete() - + + sender.send_messages(message) + + cnt = 0 + for message in receiver: + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) + message.complete() + cnt += 1 + assert cnt == 10 @pytest.mark.liveTest @pytest.mark.live_test_only @@ -1469,15 +1511,29 @@ def test_queue_receive_batch_without_setting_prefetch(self, servicebus_namespace def message_content(): for i in range(20): - yield Message("Message no. {}".format(i)) - - with sb_client.get_queue_sender(servicebus_queue.name) as sender: + yield Message( + body="Test message", + properties={'key': 'value'}, + label='label', + content_type='application/text', + correlation_id='cid', + message_id='mid', + partition_key='pk', + via_partition_key='via_pk', + to='to', + reply_to='reply_to', + time_to_live=timedelta(seconds=60) + ) + + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + with sender, receiver: message = BatchMessage() for each in message_content(): message.add(each) sender.send_messages(message) - with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: receive_counter = 0 message_received_cnt = 0 while message_received_cnt < 20: @@ -1486,9 +1542,48 @@ def message_content(): break receive_counter += 1 message_received_cnt += len(messages) - for m in messages: - print_message(_logger, m) - m.complete() + for message in messages: + print_message(_logger, message) + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) + message.complete() + sender.send_messages(message) # resending received message + + assert message_received_cnt == 20 + # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration + assert receive_counter < 10 # Dynamic link credit issuing come info effect + + receive_counter = 0 + message_received_cnt = 0 + while message_received_cnt < 20: + messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5) + if not messages: + break + receive_counter += 1 + message_received_cnt += len(messages) + for message in messages: + print_message(_logger, message) + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) + message.complete() assert message_received_cnt == 20 # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index d72721d8c182..01227c28746d 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -51,7 +51,10 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi servicebus_namespace_connection_string, logging_enable=False) as sb_client: session_id = str(uuid.uuid4()) - with sb_client.get_queue_sender(servicebus_queue.name) as sender: + sender = sb_client.get_queue_sender(servicebus_queue.name) + session = sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5) + + with sender, session: for i in range(3): message = Message("Handler message no. {}".format(i)) message.session_id = session_id @@ -67,11 +70,11 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi message.reply_to_session_id = 'reply_to_session_id' sender.send_messages(message) - with pytest.raises(ServiceBusConnectionError): - session = sb_client.get_queue_receiver(servicebus_queue.name, idle_timeout=5)._open_with_retry() + with pytest.raises(ServiceBusConnectionError): + session = sb_client.get_queue_receiver(servicebus_queue.name, idle_timeout=5)._open_with_retry() - with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5) as session: count = 0 + received_cnt_dic = {} for message in session: print_message(_logger, message) assert message.delivery_count == 0 @@ -80,7 +83,6 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi assert message.label == 'label' assert message.content_type == 'application/text' assert message.correlation_id == 'cid' - assert message.message_id == str(count) assert message.partition_key == 'pk' assert message.via_partition_key == 'via_pk' assert message.to == 'to' @@ -91,8 +93,14 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi assert message.reply_to_session_id == 'reply_to_session_id' count += 1 message.complete() + if message.message_id not in received_cnt_dic: + received_cnt_dic[message.message_id] = 1 + sender.send_messages(message) + else: + received_cnt_dic[message.message_id] += 1 - assert count == 3 + assert received_cnt_dic['0'] == 2 and received_cnt_dic['1'] == 2 and received_cnt_dic['2'] == 2 + assert count == 6 @pytest.mark.liveTest @pytest.mark.live_test_only From 2f18e7d564566adafedd4b68307620e6d25de4fd Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Mon, 20 Jul 2020 10:32:33 -0700 Subject: [PATCH 3/8] add schedule resend test and fix pylint --- .../azure/servicebus/_common/message.py | 2 +- .../azure/servicebus/_servicebus_sender.py | 2 +- .../azure-servicebus/tests/test_queues.py | 52 +++++++++++-------- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 0842a0cc2918..92ce1ffa6a1f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -156,7 +156,7 @@ def _set_message_annotations(self, key, value): def _from_received_message(cls, received_message): # type: (Message) -> Message amqp_message = received_message.message - amqp_body = amqp_message._body + amqp_body = amqp_message._body # pylint: disable=protected-access if isinstance(amqp_body, uamqp.message.DataBody): body = b''.join(amqp_body.data) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 7b86eccfd73b..ac77a4b6ec92 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -69,7 +69,7 @@ def _build_schedule_request(cls, schedule_time_utc, *messages): raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." " Received instead: {}".format(message.__class__.__name__)) if isinstance(message, (PeekMessage, ReceivedMessageBase)): - message = Message._from_received_message(message) + message = Message._from_received_message(message) # pylint: disable=protected-access message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index a3c42ad27acf..4c37027b20f6 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1234,28 +1234,36 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ servicebus_namespace_connection_string, logging_enable=False) as sb_client: enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) - with sb_client.get_queue_receiver(servicebus_queue.name, - prefetch=20) as receiver: - with sb_client.get_queue_sender(servicebus_queue.name) as sender: - content = str(uuid.uuid4()) - message_id_a = uuid.uuid4() - message_a = Message(content) - message_a.message_id = message_id_a - message_id_b = uuid.uuid4() - message_b = Message(content) - message_b.message_id = message_id_b - message_arry = [message_a, message_b] - for message in message_arry: - message.properties = {'key': 'value'} - message.label = 'label' - message.content_type = 'application/text' - message.correlation_id = 'cid' - message.partition_key = 'pk' - message.via_partition_key = 'via_pk' - message.to = 'to' - message.reply_to = 'reply_to' - tokens = sender.schedule_messages(message_arry, enqueue_time) - assert len(tokens) == 2 + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) + + with sender, receiver: + content = str(uuid.uuid4()) + message_id_a = uuid.uuid4() + message_a = Message(content) + message_a.message_id = message_id_a + message_id_b = uuid.uuid4() + message_b = Message(content) + message_b.message_id = message_id_b + message_arry = [message_a, message_b] + for message in message_arry: + message.properties = {'key': 'value'} + message.label = 'label' + message.content_type = 'application/text' + message.correlation_id = 'cid' + message.partition_key = 'pk' + message.via_partition_key = 'via_pk' + message.to = 'to' + message.reply_to = 'reply_to' + + sender.send_messages(message_arry) + + received_messages = receiver.receive_messages(max_batch_size=2, max_wait_time=5) + for message in received_messages: + message.complete() + + tokens = sender.schedule_messages(received_messages, enqueue_time) + assert len(tokens) == 2 messages = receiver.receive_messages(max_wait_time=120) messages.extend(receiver.receive_messages(max_wait_time=5)) From 8b15df1247351895669175aae4f9ca01a7243eb6 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Mon, 20 Jul 2020 12:22:20 -0700 Subject: [PATCH 4/8] async test --- .../tests/async_tests/test_queues_async.py | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index f4b841af14ee..924cd8ada395 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -993,20 +993,27 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace servicebus_namespace_connection_string, logging_enable=False) as sb_client: enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) messages = [] - async with sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) as receiver: - async with sb_client.get_queue_sender(servicebus_queue.name) as sender: - content = str(uuid.uuid4()) - message_id_a = uuid.uuid4() - message_a = Message(content) - message_a.message_id = message_id_a - message_id_b = uuid.uuid4() - message_b = Message(content) - message_b.message_id = message_id_b - tokens = await sender.schedule_messages([message_a, message_b], enqueue_time) - assert len(tokens) == 2 + receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) + sender = sb_client.get_queue_sender(servicebus_queue.name) + async with sender, receiver: + content = str(uuid.uuid4()) + message_id_a = uuid.uuid4() + message_a = Message(content) + message_a.message_id = message_id_a + message_id_b = uuid.uuid4() + message_b = Message(content) + message_b.message_id = message_id_b - recv = await receiver.receive_messages(max_wait_time=120) - messages.extend(recv) + await sender.send_messages([message_a, message_b]) + + received_messages = await receiver.receive_messages(max_batch_size=2, max_wait_time=5) + for message in received_messages: + await message.complete() + + tokens = await sender.schedule_messages(received_messages, enqueue_time) + assert len(tokens) == 2 + + messages = await receiver.receive_messages(max_wait_time=120) recv = await receiver.receive_messages(max_wait_time=5) messages.extend(recv) if messages: @@ -1194,7 +1201,6 @@ def message_content(): message_received_cnt += len(messages) for m in messages: print_message(_logger, m) - await sender.send_messages(message) await m.complete() assert message_received_cnt == 20 From 183ebac7a74a8c4df98dba9ec78d2a3fbeff8a54 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Mon, 20 Jul 2020 16:26:29 -0700 Subject: [PATCH 5/8] update implementation to be more pythonic --- .../azure/servicebus/_common/message.py | 66 +++++++++---------- .../azure/servicebus/_common/utils.py | 21 ++++++ .../azure/servicebus/_servicebus_sender.py | 11 ++-- .../aio/_servicebus_sender_async.py | 7 +- 4 files changed, 59 insertions(+), 46 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 92ce1ffa6a1f..0e84c471d211 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -53,7 +53,7 @@ SessionLockExpired, MessageSettleFailed, MessageContentTooLarge) -from .utils import utc_from_timestamp, utc_now +from .utils import utc_from_timestamp, utc_now, copy_messages_if_needed if TYPE_CHECKING: from .._servicebus_receiver import ServiceBusReceiver from .._servicebus_session_receiver import ServiceBusSessionReceiver @@ -152,35 +152,6 @@ def _set_message_annotations(self, key, value): else: self.message.annotations[ANNOTATION_SYMBOL_KEY_MAP[key]] = value - @classmethod - def _from_received_message(cls, received_message): - # type: (Message) -> Message - amqp_message = received_message.message - amqp_body = amqp_message._body # pylint: disable=protected-access - - if isinstance(amqp_body, uamqp.message.DataBody): - body = b''.join(amqp_body.data) - else: - # amqp_body is type of uamqp.message.ValueBody - body = amqp_body.data - - return cls( - body=body, - content_type=received_message.content_type, - correlation_id=received_message.correlation_id, - label=received_message.label, - message_id=received_message.message_id, - partition_key=received_message.partition_key, - properties=received_message.properties, - reply_to=received_message.reply_to, - reply_to_session_id=received_message.reply_to_session_id, - session_id=received_message.session_id, - scheduled_enqueue_time_utc=received_message.scheduled_enqueue_time_utc, - time_to_live=received_message.time_to_live, - to=received_message.to, - via_partition_key=received_message.via_partition_key - ) - @property def session_id(self): # type: () -> str @@ -545,8 +516,6 @@ def _from_list(self, messages): if not isinstance(each, Message): raise ValueError("Only Message or an iterable object containing Message objects are accepted." "Received instead: {}".format(each.__class__.__name__)) - if isinstance(each, (PeekMessage, ReceivedMessageBase)): - each = Message._from_received_message(each) # pylint: disable=protected-access self.add(each) @property @@ -571,9 +540,7 @@ def add(self, message): :rtype: None :raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit. """ - if isinstance(message, (PeekMessage, ReceivedMessageBase)): - message = Message._from_received_message(message) # pylint: disable=protected-access - + message = copy_messages_if_needed(message) message_size = message.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that @@ -609,6 +576,35 @@ class PeekMessage(Message): def __init__(self, message): super(PeekMessage, self).__init__(None, message=message) + def _to_outgoing_message(self): + # type: () -> Message + amqp_message = self.message + amqp_body = amqp_message._body # pylint: disable=protected-access + + if isinstance(amqp_body, uamqp.message.DataBody): + body = b''.join(amqp_body.data) + else: + # amqp_body is type of uamqp.message.ValueBody + body = amqp_body.data + + return Message( + body=body, + content_type=self.content_type, + correlation_id=self.correlation_id, + label=self.label, + message_id=self.message_id, + partition_key=self.partition_key, + properties=self.properties, + reply_to=self.reply_to, + reply_to_session_id=self.reply_to_session_id, + session_id=self.session_id, + scheduled_enqueue_time_utc=self.scheduled_enqueue_time_utc, + time_to_live=self.time_to_live, + to=self.to, + via_partition_key=self.via_partition_key + ) + + @property def dead_letter_error_description(self): # type: () -> Optional[str] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index e6d047e8ad37..6a1631cdc5a8 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -15,6 +15,7 @@ except ImportError: from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor +from typing import List, Union from uamqp import authentication @@ -27,6 +28,7 @@ DEAD_LETTER_QUEUE_SUFFIX, TRANSFER_DEAD_LETTER_QUEUE_SUFFIX ) +from .message import BatchMessage _log = logging.getLogger(__name__) @@ -158,6 +160,25 @@ def generate_dead_letter_entity_name( return entity_name +def copy_messages_if_needed(messages): + # pylint: disable=protected-access + if isinstance(messages, BatchMessage): + return messages + try: + msgs_to_return = [] + for each in messages: + try: + msgs_to_return.append(each._to_outgoing_message()) + except AttributeError: + msgs_to_return.append(each) + return msgs_to_return + except TypeError: + try: + return messages._to_outgoing_message() + except AttributeError: + return messages + + class AutoLockRenew(object): """Auto renew locks for messages and sessions using a background thread pool. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index ac77a4b6ec92..649f77c77ec2 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -13,12 +13,12 @@ from ._base_handler import BaseHandler, ServiceBusSharedKeyCredential, _convert_connection_string_to_kwargs from ._common import mgmt_handlers -from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage, ReceivedMessageBase +from ._common.message import Message, BatchMessage from .exceptions import ( OperationTimeoutError, _ServiceBusErrorPolicy, ) -from ._common.utils import create_authentication +from ._common.utils import create_authentication, copy_messages_if_needed from ._common.constants import ( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, @@ -68,8 +68,7 @@ def _build_schedule_request(cls, schedule_time_utc, *messages): if not isinstance(message, Message): raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." " Received instead: {}".format(message.__class__.__name__)) - if isinstance(message, (PeekMessage, ReceivedMessageBase)): - message = Message._from_received_message(message) # pylint: disable=protected-access + message = copy_messages_if_needed(message) message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id @@ -326,6 +325,7 @@ def send_messages(self, message): :caption: Send message. """ + message = copy_messages_if_needed(message) try: batch = self.create_batch() batch._from_list(message) # pylint: disable=protected-access @@ -335,9 +335,6 @@ def send_messages(self, message): if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") - if isinstance(message, (PeekMessage, ReceivedMessageBase)): - message = Message._from_received_message(message) # pylint: disable=protected-access - self._do_retryable_operation( self._send, message=message, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 3c93c7ac59bf..f6007e5ff368 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -9,7 +9,7 @@ import uamqp from uamqp import SendClientAsync, types -from .._common.message import Message, BatchMessage, PeekMessage, ReceivedMessageBase +from .._common.message import Message, BatchMessage from .._base_handler import _convert_connection_string_to_kwargs from .._servicebus_sender import SenderMixin from ._base_handler_async import BaseHandler, ServiceBusSharedKeyCredential @@ -19,6 +19,7 @@ MGMT_REQUEST_SEQUENCE_NUMBERS ) from .._common import mgmt_handlers +from .._common.utils import copy_messages_if_needed from ._async_utils import create_authentication if TYPE_CHECKING: @@ -265,6 +266,7 @@ async def send_messages(self, message): :caption: Send message. """ + message = copy_messages_if_needed(message) try: batch = await self.create_batch() batch._from_list(message) # pylint: disable=protected-access @@ -274,9 +276,6 @@ async def send_messages(self, message): if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition raise ValueError("A BatchMessage or list of Message must have at least one Message") - if isinstance(message, (PeekMessage, ReceivedMessageBase)): - message = Message._from_received_message(message) # pylint: disable=protected-access - await self._do_retryable_operation( self._send, message=message, From 845af5cb47a63510e94fec700a3cde03be5de6f0 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Tue, 21 Jul 2020 10:32:04 -0700 Subject: [PATCH 6/8] remove circular import --- .../azure-servicebus/azure/servicebus/_common/utils.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 6a1631cdc5a8..cf19eb7d12a7 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -28,7 +28,6 @@ DEAD_LETTER_QUEUE_SUFFIX, TRANSFER_DEAD_LETTER_QUEUE_SUFFIX ) -from .message import BatchMessage _log = logging.getLogger(__name__) @@ -162,8 +161,6 @@ def generate_dead_letter_entity_name( def copy_messages_if_needed(messages): # pylint: disable=protected-access - if isinstance(messages, BatchMessage): - return messages try: msgs_to_return = [] for each in messages: From f6f34a75e519a1fc894e0ac154ca2df6ca7ae82e Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Tue, 21 Jul 2020 17:40:56 -0700 Subject: [PATCH 7/8] update according to review --- .../azure-servicebus/azure/servicebus/_common/message.py | 4 ++-- .../azure-servicebus/azure/servicebus/_common/utils.py | 5 ++++- .../azure-servicebus/azure/servicebus/_servicebus_sender.py | 6 +++--- .../azure/servicebus/aio/_servicebus_sender_async.py | 4 ++-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 0e84c471d211..3ac801f1a606 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -53,7 +53,7 @@ SessionLockExpired, MessageSettleFailed, MessageContentTooLarge) -from .utils import utc_from_timestamp, utc_now, copy_messages_if_needed +from .utils import utc_from_timestamp, utc_now, copy_messages_to_sendable_if_needed if TYPE_CHECKING: from .._servicebus_receiver import ServiceBusReceiver from .._servicebus_session_receiver import ServiceBusSessionReceiver @@ -540,7 +540,7 @@ def add(self, message): :rtype: None :raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit. """ - message = copy_messages_if_needed(message) + message = copy_messages_to_sendable_if_needed(message) message_size = message.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index cf19eb7d12a7..3db59127e6dc 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -159,7 +159,10 @@ def generate_dead_letter_entity_name( return entity_name -def copy_messages_if_needed(messages): +def copy_messages_to_sendable_if_needed(messages): + """ + This method is to convert single/multiple received messages to sendable messages to enable message resending. + """ # pylint: disable=protected-access try: msgs_to_return = [] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 649f77c77ec2..98921db2cc17 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -18,7 +18,7 @@ OperationTimeoutError, _ServiceBusErrorPolicy, ) -from ._common.utils import create_authentication, copy_messages_if_needed +from ._common.utils import create_authentication, copy_messages_to_sendable_if_needed from ._common.constants import ( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, @@ -68,7 +68,7 @@ def _build_schedule_request(cls, schedule_time_utc, *messages): if not isinstance(message, Message): raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." " Received instead: {}".format(message.__class__.__name__)) - message = copy_messages_if_needed(message) + message = copy_messages_to_sendable_if_needed(message) message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id @@ -325,7 +325,7 @@ def send_messages(self, message): :caption: Send message. """ - message = copy_messages_if_needed(message) + message = copy_messages_to_sendable_if_needed(message) try: batch = self.create_batch() batch._from_list(message) # pylint: disable=protected-access diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index f6007e5ff368..b0cabec6b7ac 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -19,7 +19,7 @@ MGMT_REQUEST_SEQUENCE_NUMBERS ) from .._common import mgmt_handlers -from .._common.utils import copy_messages_if_needed +from .._common.utils import copy_messages_to_sendable_if_needed from ._async_utils import create_authentication if TYPE_CHECKING: @@ -266,7 +266,7 @@ async def send_messages(self, message): :caption: Send message. """ - message = copy_messages_if_needed(message) + message = copy_messages_to_sendable_if_needed(message) try: batch = await self.create_batch() batch._from_list(message) # pylint: disable=protected-access From 9f38c1ee0a648322d62639b124cb7f9fe0757a95 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Tue, 21 Jul 2020 18:23:23 -0700 Subject: [PATCH 8/8] remove unused import --- .../azure-servicebus/azure/servicebus/_common/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 61ab44fb1a4d..ca26b4f82814 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -17,7 +17,6 @@ except ImportError: from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor -from typing import List, Union from uamqp import authentication, types