Skip to content

[ServiceBus] Resend received message #12457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 23, 2020
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

* Added new properties to Message, PeekMessage and ReceivedMessage: `content_type`, `correlation_id`, `label`,
`message_id`, `reply_to`, `reply_to_session_id` and `to`. Please refer to the docstring for further information.

* Add new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`,
* Added new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`,
`dead_letter_reason`, `dead_letter_source`, `delivery_count` and `expires_at_utc`. Please refer to the docstring for further information.
* Added support for sending received messages via `ServiceBusSender.send_messages`.

**Breaking Changes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,35 @@ def _set_message_annotations(self, key, value):
else:
self.message.annotations[ANNOTATION_SYMBOL_KEY_MAP[key]] = value

@classmethod
def _from_received_message(cls, received_message):
# type: (Message) -> Message
amqp_message = received_message.message
amqp_body = amqp_message._body # pylint: disable=protected-access

if isinstance(amqp_body, uamqp.message.DataBody):
body = b''.join(amqp_body.data)
else:
# amqp_body is type of uamqp.message.ValueBody
body = amqp_body.data

return cls(
body=body,
content_type=received_message.content_type,
correlation_id=received_message.correlation_id,
label=received_message.label,
message_id=received_message.message_id,
partition_key=received_message.partition_key,
properties=received_message.properties,
reply_to=received_message.reply_to,
reply_to_session_id=received_message.reply_to_session_id,
session_id=received_message.session_id,
scheduled_enqueue_time_utc=received_message.scheduled_enqueue_time_utc,
time_to_live=received_message.time_to_live,
to=received_message.to,
via_partition_key=received_message.via_partition_key
)

@property
def session_id(self):
# type: () -> str
Expand Down Expand Up @@ -516,6 +545,8 @@ def _from_list(self, messages):
if not isinstance(each, Message):
raise ValueError("Only Message or an iterable object containing Message objects are accepted."
"Received instead: {}".format(each.__class__.__name__))
if isinstance(each, (PeekMessage, ReceivedMessageBase)):
each = Message._from_received_message(each) # pylint: disable=protected-access
self.add(each)

@property
Expand All @@ -540,6 +571,9 @@ def add(self, message):
:rtype: None
:raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit.
"""
if isinstance(message, (PeekMessage, ReceivedMessageBase)):
message = Message._from_received_message(message) # pylint: disable=protected-access

message_size = message.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from ._base_handler import BaseHandler, ServiceBusSharedKeyCredential, _convert_connection_string_to_kwargs
from ._common import mgmt_handlers
from ._common.message import Message, BatchMessage
from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage, ReceivedMessageBase
from .exceptions import (
OperationTimeoutError,
_ServiceBusErrorPolicy,
Expand Down Expand Up @@ -68,6 +68,8 @@ def _build_schedule_request(cls, schedule_time_utc, *messages):
if not isinstance(message, Message):
raise ValueError("Scheduling batch messages only supports iterables containing Message Objects."
" Received instead: {}".format(message.__class__.__name__))
if isinstance(message, (PeekMessage, ReceivedMessageBase)):
message = Message._from_received_message(message) # pylint: disable=protected-access
message.scheduled_enqueue_time_utc = schedule_time_utc
message_data = {}
message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id
Expand Down Expand Up @@ -333,6 +335,9 @@ def send_messages(self, message):
if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition
raise ValueError("A BatchMessage or list of Message must have at least one Message")

if isinstance(message, (PeekMessage, ReceivedMessageBase)):
message = Message._from_received_message(message) # pylint: disable=protected-access

self._do_retryable_operation(
self._send,
message=message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import uamqp
from uamqp import SendClientAsync, types

from .._common.message import Message, BatchMessage
from .._common.message import Message, BatchMessage, PeekMessage, ReceivedMessageBase
from .._base_handler import _convert_connection_string_to_kwargs
from .._servicebus_sender import SenderMixin
from ._base_handler_async import BaseHandler, ServiceBusSharedKeyCredential
Expand Down Expand Up @@ -274,6 +274,9 @@ async def send_messages(self, message):
if isinstance(message, BatchMessage) and len(message) == 0: # pylint: disable=len-as-condition
raise ValueError("A BatchMessage or list of Message must have at least one Message")

if isinstance(message, (PeekMessage, ReceivedMessageBase)):
message = Message._from_received_message(message) # pylint: disable=protected-access

await self._do_retryable_operation(
self._send,
message=message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1156,13 +1156,15 @@ def message_content():
for i in range(20):
yield Message("Message no. {}".format(i))

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)

async with sender, receiver:
message = BatchMessage()
for each in message_content():
message.add(each)
await sender.send_messages(message)

async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
Expand All @@ -1173,6 +1175,26 @@ def message_content():
message_received_cnt += len(messages)
for m in messages:
print_message(_logger, m)
await sender.send_messages(message)
await m.complete()

assert message_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect

# received resent messages

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for m in messages:
print_message(_logger, m)
await sender.send_messages(message)
await m.complete()

assert message_received_cnt == 20
Expand Down
179 changes: 141 additions & 38 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,23 +744,65 @@ def test_queue_by_servicebus_client_browse_messages_with_receiver(self, serviceb

with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

with sb_client.get_queue_receiver(servicebus_queue.name,
idle_timeout=5,
mode=ReceiveSettleMode.PeekLock) as receiver:
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
for i in range(5):
message = Message("Test message no. {}".format(i))
sender.send_messages(message)

receiver = sb_client.get_queue_receiver(servicebus_queue.name,
idle_timeout=5,
mode=ReceiveSettleMode.PeekLock)
sender = sb_client.get_queue_sender(servicebus_queue.name)
with receiver, sender:
for i in range(5):
message = Message(
body="Test message",
properties={'key': 'value'},
label='label',
content_type='application/text',
correlation_id='cid',
message_id='mid',
partition_key='pk',
via_partition_key='via_pk',
to='to',
reply_to='reply_to',
time_to_live=timedelta(seconds=60)
)
sender.send_messages(message)

messages = receiver.peek_messages(5)
assert len(messages) > 0
assert all(isinstance(m, PeekMessage) for m in messages)
for message in messages:
print_message(_logger, message)
assert b''.join(message.body) == b'Test message'
assert message.properties[b'key'] == b'value'
assert message.label == 'label'
assert message.content_type == 'application/text'
assert message.correlation_id == 'cid'
assert message.message_id == 'mid'
assert message.partition_key == 'pk'
assert message.via_partition_key == 'via_pk'
assert message.to == 'to'
assert message.reply_to == 'reply_to'
assert message.time_to_live == timedelta(seconds=60)
with pytest.raises(AttributeError):
message.complete()


sender.send_messages(message)

cnt = 0
for message in receiver:
assert b''.join(message.body) == b'Test message'
assert message.properties[b'key'] == b'value'
assert message.label == 'label'
assert message.content_type == 'application/text'
assert message.correlation_id == 'cid'
assert message.message_id == 'mid'
assert message.partition_key == 'pk'
assert message.via_partition_key == 'via_pk'
assert message.to == 'to'
assert message.reply_to == 'reply_to'
assert message.time_to_live == timedelta(seconds=60)
message.complete()
cnt += 1
assert cnt == 10

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down Expand Up @@ -1192,28 +1234,36 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
with sb_client.get_queue_receiver(servicebus_queue.name,
prefetch=20) as receiver:
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
content = str(uuid.uuid4())
message_id_a = uuid.uuid4()
message_a = Message(content)
message_a.message_id = message_id_a
message_id_b = uuid.uuid4()
message_b = Message(content)
message_b.message_id = message_id_b
message_arry = [message_a, message_b]
for message in message_arry:
message.properties = {'key': 'value'}
message.label = 'label'
message.content_type = 'application/text'
message.correlation_id = 'cid'
message.partition_key = 'pk'
message.via_partition_key = 'via_pk'
message.to = 'to'
message.reply_to = 'reply_to'
tokens = sender.schedule_messages(message_arry, enqueue_time)
assert len(tokens) == 2
sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20)

with sender, receiver:
content = str(uuid.uuid4())
message_id_a = uuid.uuid4()
message_a = Message(content)
message_a.message_id = message_id_a
message_id_b = uuid.uuid4()
message_b = Message(content)
message_b.message_id = message_id_b
message_arry = [message_a, message_b]
for message in message_arry:
message.properties = {'key': 'value'}
message.label = 'label'
message.content_type = 'application/text'
message.correlation_id = 'cid'
message.partition_key = 'pk'
message.via_partition_key = 'via_pk'
message.to = 'to'
message.reply_to = 'reply_to'

sender.send_messages(message_arry)

received_messages = receiver.receive_messages(max_batch_size=2, max_wait_time=5)
for message in received_messages:
message.complete()

tokens = sender.schedule_messages(received_messages, enqueue_time)
assert len(tokens) == 2

messages = receiver.receive_messages(max_wait_time=120)
messages.extend(receiver.receive_messages(max_wait_time=5))
Expand Down Expand Up @@ -1469,15 +1519,29 @@ def test_queue_receive_batch_without_setting_prefetch(self, servicebus_namespace

def message_content():
for i in range(20):
yield Message("Message no. {}".format(i))

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
yield Message(
body="Test message",
properties={'key': 'value'},
label='label',
content_type='application/text',
correlation_id='cid',
message_id='mid',
partition_key='pk',
via_partition_key='via_pk',
to='to',
reply_to='reply_to',
time_to_live=timedelta(seconds=60)
)

sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)

with sender, receiver:
message = BatchMessage()
for each in message_content():
message.add(each)
sender.send_messages(message)

with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
Expand All @@ -1486,9 +1550,48 @@ def message_content():
break
receive_counter += 1
message_received_cnt += len(messages)
for m in messages:
print_message(_logger, m)
m.complete()
for message in messages:
print_message(_logger, message)
assert b''.join(message.body) == b'Test message'
assert message.properties[b'key'] == b'value'
assert message.label == 'label'
assert message.content_type == 'application/text'
assert message.correlation_id == 'cid'
assert message.message_id == 'mid'
assert message.partition_key == 'pk'
assert message.via_partition_key == 'via_pk'
assert message.to == 'to'
assert message.reply_to == 'reply_to'
assert message.time_to_live == timedelta(seconds=60)
message.complete()
sender.send_messages(message) # resending received message

assert message_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for message in messages:
print_message(_logger, message)
assert b''.join(message.body) == b'Test message'
assert message.properties[b'key'] == b'value'
assert message.label == 'label'
assert message.content_type == 'application/text'
assert message.correlation_id == 'cid'
assert message.message_id == 'mid'
assert message.partition_key == 'pk'
assert message.via_partition_key == 'via_pk'
assert message.to == 'to'
assert message.reply_to == 'reply_to'
assert message.time_to_live == timedelta(seconds=60)
message.complete()

assert message_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
Expand Down
Loading