Skip to content

Commit 91d96a8

Browse files
authored
[ServiceBus] Settle non-deferred message through receiver link (Azure#10800)
* settle non-deferred message through receiver link except dead_letter * revert dead-letter back to t1 as well * improve settlement and put is_deferred_letter into kwargs * add test * update according to comment * fix a bug in dead_letter through receiver_link
1 parent ce8cfad commit 91d96a8

File tree

5 files changed

+154
-55
lines changed

5 files changed

+154
-55
lines changed

sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66

77
import datetime
88
import uuid
9+
import functools
10+
import logging
911
from typing import Optional, List, Union, Generator
1012

1113
import uamqp
12-
from uamqp import types
14+
from uamqp import types, errors
1315

1416
from .constants import (
1517
_BATCH_MESSAGE_OVERHEAD_COST,
@@ -34,7 +36,8 @@
3436
MESSAGE_DEAD_LETTER,
3537
MESSAGE_ABANDON,
3638
MESSAGE_DEFER,
37-
MESSAGE_RENEW_LOCK
39+
MESSAGE_RENEW_LOCK,
40+
DEADLETTERNAME
3841
)
3942
from ..exceptions import (
4043
MessageAlreadySettled,
@@ -44,6 +47,8 @@
4447
)
4548
from .utils import utc_from_timestamp, utc_now
4649

50+
_LOGGER = logging.getLogger(__name__)
51+
4752

4853
class Message(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes
4954
"""A Service Bus Message.
@@ -436,9 +441,10 @@ class ReceivedMessage(PeekMessage):
436441
:dedent: 4
437442
:caption: Checking the properties on a received message.
438443
"""
439-
def __init__(self, message, mode=ReceiveSettleMode.PeekLock):
444+
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, **kwargs):
440445
super(ReceivedMessage, self).__init__(message=message)
441446
self._settled = (mode == ReceiveSettleMode.ReceiveAndDelete)
447+
self._is_deferred_message = kwargs.get("is_deferred_message", False)
442448
self.auto_renew_error = None
443449

444450
def _is_live(self, action):
@@ -458,6 +464,69 @@ def _is_live(self, action):
458464
except AttributeError:
459465
pass
460466

467+
def _settle_message(
468+
self,
469+
settle_operation,
470+
dead_letter_details=None
471+
):
472+
try:
473+
if not self._is_deferred_message:
474+
try:
475+
self._settle_via_receiver_link(settle_operation, dead_letter_details)()
476+
return
477+
except RuntimeError as exception:
478+
_LOGGER.info(
479+
"Message settling: %r has encountered an exception (%r)."
480+
"Trying to settle through management link",
481+
settle_operation,
482+
exception
483+
)
484+
self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
485+
except Exception as e:
486+
raise MessageSettleFailed(settle_operation, e)
487+
488+
def _settle_via_mgmt_link(self, settle_operation, dead_letter_details=None):
489+
# pylint: disable=protected-access
490+
if settle_operation == MESSAGE_COMPLETE:
491+
return functools.partial(
492+
self._receiver._settle_message,
493+
SETTLEMENT_COMPLETE,
494+
[self.lock_token],
495+
)
496+
if settle_operation == MESSAGE_ABANDON:
497+
return functools.partial(
498+
self._receiver._settle_message,
499+
SETTLEMENT_ABANDON,
500+
[self.lock_token],
501+
)
502+
if settle_operation == MESSAGE_DEAD_LETTER:
503+
return functools.partial(
504+
self._receiver._settle_message,
505+
SETTLEMENT_DEADLETTER,
506+
[self.lock_token],
507+
dead_letter_details=dead_letter_details
508+
)
509+
if settle_operation == MESSAGE_DEFER:
510+
return functools.partial(
511+
self._receiver._settle_message,
512+
SETTLEMENT_DEFER,
513+
[self.lock_token],
514+
)
515+
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))
516+
517+
def _settle_via_receiver_link(self, settle_operation, dead_letter_details=None):
518+
if settle_operation == MESSAGE_COMPLETE:
519+
return functools.partial(self.message.accept)
520+
if settle_operation == MESSAGE_ABANDON:
521+
return functools.partial(self.message.modify, True, False)
522+
if settle_operation == MESSAGE_DEAD_LETTER:
523+
# note: message.reject() can not set reason and description properly due to the issue
524+
# https://github.com/Azure/azure-uamqp-python/issues/155
525+
return functools.partial(self.message.reject, condition=DEADLETTERNAME)
526+
if settle_operation == MESSAGE_DEFER:
527+
return functools.partial(self.message.modify, True, True)
528+
raise ValueError("Unsupported settle operation type: {}".format(settle_operation))
529+
461530
@property
462531
def settled(self):
463532
# type: () -> bool
@@ -535,11 +604,9 @@ def complete(self):
535604
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
536605
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
537606
"""
607+
# pylint: disable=protected-access
538608
self._is_live(MESSAGE_COMPLETE)
539-
try:
540-
self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token]) # pylint: disable=protected-access
541-
except Exception as e:
542-
raise MessageSettleFailed(MESSAGE_COMPLETE, e)
609+
self._settle_message(MESSAGE_COMPLETE)
543610
self._settled = True
544611

545612
def dead_letter(self, reason=None, description=None):
@@ -560,17 +627,12 @@ def dead_letter(self, reason=None, description=None):
560627
"""
561628
# pylint: disable=protected-access
562629
self._is_live(MESSAGE_DEAD_LETTER)
630+
563631
details = {
564632
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
565633
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
566-
try:
567-
self._receiver._settle_message(
568-
SETTLEMENT_DEADLETTER,
569-
[self.lock_token],
570-
dead_letter_details=details
571-
)
572-
except Exception as e:
573-
raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e)
634+
635+
self._settle_message(MESSAGE_DEAD_LETTER, dead_letter_details=details)
574636
self._settled = True
575637

576638
def abandon(self):
@@ -585,11 +647,9 @@ def abandon(self):
585647
:raises: ~azure.servicebus.common.errors.SessionLockExpired if session lock has already expired.
586648
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
587649
"""
650+
# pylint: disable=protected-access
588651
self._is_live(MESSAGE_ABANDON)
589-
try:
590-
self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token]) # pylint: disable=protected-access
591-
except Exception as e:
592-
raise MessageSettleFailed(MESSAGE_ABANDON, e)
652+
self._settle_message(MESSAGE_ABANDON)
593653
self._settled = True
594654

595655
def defer(self):
@@ -606,10 +666,7 @@ def defer(self):
606666
:raises: ~azure.servicebus.common.errors.MessageSettleFailed if message settle operation fails.
607667
"""
608668
self._is_live(MESSAGE_DEFER)
609-
try:
610-
self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token]) # pylint: disable=protected-access
611-
except Exception as e:
612-
raise MessageSettleFailed(MESSAGE_DEFER, e)
669+
self._settle_message(MESSAGE_DEFER)
613670
self._settled = True
614671

615672
def renew_lock(self):

sdk/servicebus/azure-servicebus/azure/servicebus/_common/mgmt_handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def deferred_message_op(
6969
parsed = []
7070
for m in message.get_data()[b'messages']:
7171
wrapped = uamqp.Message.decode_from_bytes(bytearray(m[b'message']))
72-
parsed.append(message_type(wrapped, mode))
72+
parsed.append(message_type(wrapped, mode, is_deferred_message=True))
7373
return parsed
7474
if status_code in [202, 204]:
7575
return []

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,13 @@
33
# Licensed under the MIT License. See License.txt in the project root for
44
# license information.
55
# -------------------------------------------------------------------------
6+
import logging
67
from typing import Optional
78

89
from .._common import message as sync_message
910
from .._common.constants import (
10-
SETTLEMENT_ABANDON,
11-
SETTLEMENT_COMPLETE,
12-
SETTLEMENT_DEFER,
13-
SETTLEMENT_DEADLETTER,
1411
ReceiveSettleMode,
1512
MGMT_RESPONSE_MESSAGE_EXPIRATION,
16-
MGMT_REQUEST_DEAD_LETTER_REASON,
17-
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION,
1813
MESSAGE_COMPLETE,
1914
MESSAGE_DEAD_LETTER,
2015
MESSAGE_ABANDON,
@@ -24,15 +19,41 @@
2419
from .._common.utils import get_running_loop, utc_from_timestamp
2520
from ..exceptions import MessageSettleFailed
2621

22+
_LOGGER = logging.getLogger(__name__)
23+
2724

2825
class ReceivedMessage(sync_message.ReceivedMessage):
2926
"""A Service Bus Message received from service side.
3027
3128
"""
3229

33-
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None):
30+
def __init__(self, message, mode=ReceiveSettleMode.PeekLock, loop=None, **kwargs):
3431
self._loop = loop or get_running_loop()
35-
super(ReceivedMessage, self).__init__(message=message, mode=mode)
32+
super(ReceivedMessage, self).__init__(message=message, mode=mode, **kwargs)
33+
34+
async def _settle_message(
35+
self,
36+
settle_operation,
37+
dead_letter_details=None
38+
):
39+
try:
40+
if not self._is_deferred_message:
41+
try:
42+
await self._loop.run_in_executor(
43+
None,
44+
self._settle_via_receiver_link(settle_operation, dead_letter_details)
45+
)
46+
return
47+
except RuntimeError as exception:
48+
_LOGGER.info(
49+
"Message settling: %r has encountered an exception (%r)."
50+
"Trying to settle through management link",
51+
settle_operation,
52+
exception
53+
)
54+
await self._settle_via_mgmt_link(settle_operation, dead_letter_details)()
55+
except Exception as e:
56+
raise MessageSettleFailed(settle_operation, e)
3657

3758
async def complete(self):
3859
# type: () -> None
@@ -48,10 +69,7 @@ async def complete(self):
4869
"""
4970
# pylint: disable=protected-access
5071
self._is_live(MESSAGE_COMPLETE)
51-
try:
52-
await self._receiver._settle_message(SETTLEMENT_COMPLETE, [self.lock_token])
53-
except Exception as e:
54-
raise MessageSettleFailed(MESSAGE_COMPLETE, e)
72+
await self._settle_message(MESSAGE_COMPLETE)
5573
self._settled = True
5674

5775
async def dead_letter(self, reason=None, description=None):
@@ -71,17 +89,7 @@ async def dead_letter(self, reason=None, description=None):
7189
"""
7290
# pylint: disable=protected-access
7391
self._is_live(MESSAGE_DEAD_LETTER)
74-
details = {
75-
MGMT_REQUEST_DEAD_LETTER_REASON: str(reason) if reason else "",
76-
MGMT_REQUEST_DEAD_LETTER_DESCRIPTION: str(description) if description else ""}
77-
try:
78-
await self._receiver._settle_message(
79-
SETTLEMENT_DEADLETTER,
80-
[self.lock_token],
81-
dead_letter_details=details
82-
)
83-
except Exception as e:
84-
raise MessageSettleFailed(MESSAGE_DEAD_LETTER, e)
92+
await self._settle_message(MESSAGE_DEAD_LETTER)
8593
self._settled = True
8694

8795
async def abandon(self):
@@ -95,10 +103,7 @@ async def abandon(self):
95103
"""
96104
# pylint: disable=protected-access
97105
self._is_live(MESSAGE_ABANDON)
98-
try:
99-
await self._receiver._settle_message(SETTLEMENT_ABANDON, [self.lock_token])
100-
except Exception as e:
101-
raise MessageSettleFailed(MESSAGE_ABANDON, e)
106+
await self._settle_message(MESSAGE_ABANDON)
102107
self._settled = True
103108

104109
async def defer(self):
@@ -112,10 +117,7 @@ async def defer(self):
112117
"""
113118
# pylint: disable=protected-access
114119
self._is_live(MESSAGE_DEFER)
115-
try:
116-
await self._receiver._settle_message(SETTLEMENT_DEFER, [self.lock_token])
117-
except Exception as e:
118-
raise MessageSettleFailed(MESSAGE_DEFER, e)
120+
await self._settle_message(MESSAGE_DEFER)
119121
self._settled = True
120122

121123
async def renew_lock(self):

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,3 +1076,23 @@ def test_queue_message_http_proxy_setting(self):
10761076
receiver = sb_client.get_queue_receiver(queue_name="mock")
10771077
assert receiver._config.http_proxy == http_proxy
10781078
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket
1079+
1080+
@pytest.mark.liveTest
1081+
@pytest.mark.live_test_only
1082+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
1083+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
1084+
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
1085+
async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
1086+
async with ServiceBusClient.from_connection_string(
1087+
servicebus_namespace_connection_string,
1088+
logging_enable=False) as sb_client:
1089+
1090+
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1091+
message = Message("Test")
1092+
await sender.send(message)
1093+
1094+
async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
1095+
messages = await receiver.receive(max_wait_time=5)
1096+
await receiver._handler.message_handler.destroy_async() # destroy the underlying receiver link
1097+
assert len(messages) == 1
1098+
await messages[0].complete()

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,3 +1202,23 @@ def test_queue_message_http_proxy_setting(self):
12021202
receiver = sb_client.get_queue_receiver(queue_name="mock")
12031203
assert receiver._config.http_proxy == http_proxy
12041204
assert receiver._config.transport_type == TransportType.AmqpOverWebsocket
1205+
1206+
@pytest.mark.liveTest
1207+
@pytest.mark.live_test_only
1208+
@CachedResourceGroupPreparer(name_prefix='servicebustest')
1209+
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
1210+
@ServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True)
1211+
def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
1212+
with ServiceBusClient.from_connection_string(
1213+
servicebus_namespace_connection_string,
1214+
logging_enable=False) as sb_client:
1215+
1216+
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1217+
message = Message("Test")
1218+
sender.send(message)
1219+
1220+
with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
1221+
messages = receiver.receive(max_wait_time=5)
1222+
receiver._handler.message_handler.destroy() # destroy the underlying receiver link
1223+
assert len(messages) == 1
1224+
messages[0].complete()

0 commit comments

Comments
 (0)