Skip to content

Settlement refactor #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,10 @@ class ServiceBusReceivedMessage(ServiceBusMessage):

def __init__(self, message, receive_mode=ServiceBusReceiveMode.PEEK_LOCK, **kwargs):
# type: (Message, Union[ServiceBusReceiveMode, str], Any) -> None
frame, message = message
self.delivery_tag = frame[2]
self.delivery_id = frame[1]

super(ServiceBusReceivedMessage, self).__init__(None, message=message) # type: ignore
self._settled = receive_mode == ServiceBusReceiveMode.RECEIVE_AND_DELETE
self._received_timestamp_utc = utc_now()
Expand Down Expand Up @@ -1078,8 +1082,8 @@ def lock_token(self):
if self._settled:
return None

if self.message.delivery_tag:
return uuid.UUID(bytes_le=self.message.delivery_tag)
if self.delivery_tag:
return uuid.UUID(bytes_le=self.delivery_tag)

delivery_annotations = self._raw_amqp_message.delivery_annotations
if delivery_annotations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Optional, Callable

from .._pyamqp.endpoints import Source
from .._pyamqp.error import AMQPError

from .message import ServiceBusReceivedMessage
from .constants import (
Expand Down Expand Up @@ -139,21 +140,34 @@ def _settle_message_via_receiver_link(
# type: (ServiceBusReceivedMessage, str, Optional[str], Optional[str]) -> Callable
# pylint: disable=no-self-use
if settle_operation == MESSAGE_COMPLETE:
return functools.partial(message.message.accept)
self._handler.settle_messages(message.delivery_id, 'accepted')
if settle_operation == MESSAGE_ABANDON:
return functools.partial(message.message.modify, True, False)
self._handler.settle_messages(
message.delivery_id,
'modified',
delivery_failed=True,
undeliverable_here=False
)
if settle_operation == MESSAGE_DEAD_LETTER:
return functools.partial(
message.message.reject,
condition=DEADLETTERNAME,
description=dead_letter_error_description,
info={
RECEIVER_LINK_DEAD_LETTER_REASON: dead_letter_reason,
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION: dead_letter_error_description,
},
self._handler.settle_messages(
message.delivery_id,
'rejected',
error=AMQPError(
condition=DEADLETTERNAME,
description=dead_letter_error_description,
info={
RECEIVER_LINK_DEAD_LETTER_REASON: dead_letter_reason,
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION: dead_letter_error_description,
}
)
)
if settle_operation == MESSAGE_DEFER:
return functools.partial(message.message.modify, True, True)
self._handler.settle_messages(
message.delivery_id,
'modified',
delivery_failed=True,
undeliverable_here=True
)
raise ValueError(
"Unsupported settle operation type: {}".format(settle_operation)
)
Expand All @@ -177,10 +191,10 @@ def _populate_message_properties(self, message):
if self._session:
message[MGMT_REQUEST_SESSION_ID] = self._session_id

def _enhanced_message_received(self, message):
def _enhanced_message_received(self, frame, message):
# pylint: disable=protected-access
self._handler._was_message_received = True
if self._receive_context.is_set():
self._handler._received_messages.put(message)
self._handler._received_messages.put((frame, message))
else:
message.release()
121 changes: 106 additions & 15 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import logging
import time
from typing import Any, Dict, Literal, Optional, Tuple, Union, overload
import uuid
import certifi
import queue
Expand All @@ -27,9 +28,17 @@
ErrorCondition,
MessageException,
MessageSendFailed,
RetryPolicy
RetryPolicy,
AMQPError
)
from ._counter import TickCounter
from .outcomes import(
Received,
Rejected,
Released,
Accepted,
Modified
)

from .constants import (
MessageDeliveryState,
Expand Down Expand Up @@ -613,14 +622,15 @@ class ReceiveClient(AMQPClient):

def __init__(self, hostname, source, auth=None, **kwargs):
self.source = source
self._streaming_receive = kwargs.pop("streaming_receive", False) # TODO: whether public?
self._streaming_receive = kwargs.pop("streaming_receive", False)
self._received_messages = queue.Queue()
self._message_received_callback = kwargs.pop("message_received_callback", None) # TODO: whether public?
self._on_incoming_transfer = kwargs.pop("on_incoming_transfer", None)

# Sender and Link settings
# Receiver and Link settings
self._max_message_size = kwargs.pop('max_message_size', None) or MAX_FRAME_SIZE_BYTES
self._link_properties = kwargs.pop('link_properties', None)
self._link_credit = kwargs.pop('link_credit', 300)
self._on_transfer_received = kwargs.pop('on_transfer_received', None)
self._counter = TickCounter()
super(ReceiveClient, self).__init__(hostname, auth=auth, **kwargs)

Expand All @@ -642,7 +652,7 @@ def _client_ready(self):
send_settle_mode=self._send_settle_mode,
rcv_settle_mode=self._receive_settle_mode,
max_message_size=self._max_message_size,
on_message_received=self._message_received,
on_transfer_received=self._message_received,
properties=self._link_properties,
desired_capabilities=self._desired_capabilities
)
Expand All @@ -667,7 +677,7 @@ def _client_run(self, **kwargs):
return False
return True

def _message_received(self, message):
def _message_received(self, frame, message):
"""Callback run on receipt of every message. If there is
a user-defined callback, this will be called.
Additionally if the client is retrieving messages for a batch
Expand All @@ -676,14 +686,10 @@ def _message_received(self, message):
:param message: Received message.
:type message: ~uamqp.message.Message
"""
if self._message_received_callback:
self._message_received_callback(message)
if self._on_incoming_transfer:
self._on_incoming_transfer(frame, message)
if not self._streaming_receive:
self._received_messages.put(message)
# TODO: do we need settled property for a message?
#elif not message.settled:
# # Message was received with callback processing and wasn't settled.
# _logger.info("Message was not settled.")
self._received_messages.put((frame, message))

def _receive_message_batch_impl(self, max_batch_size=None, on_message_received=None, timeout=0):
self._message_received_callback = on_message_received
Expand All @@ -694,7 +700,8 @@ def _receive_message_batch_impl(self, max_batch_size=None, on_message_received=N
self.open()
while len(batch) < max_batch_size:
try:
batch.append(self._received_messages.get_nowait())
_, message = self._received_messages.get_nowait()
batch.append(message)
self._received_messages.task_done()
except queue.Empty:
break
Expand Down Expand Up @@ -722,7 +729,8 @@ def _receive_message_batch_impl(self, max_batch_size=None, on_message_received=N

while len(batch) < max_batch_size:
try:
batch.append(self._received_messages.get_nowait())
_, message = self._received_messages.get_nowait()
batch.append(message)
self._received_messages.task_done()
except queue.Empty:
break
Expand Down Expand Up @@ -761,3 +769,86 @@ def receive_message_batch(self, **kwargs):
self._receive_message_batch_impl,
**kwargs
)

@overload
def settle_messages(
self,
delivery_id: Union[int, Tuple[int, int]],
outcome: Literal["accepted"],
*,
batchable: Optional[bool] = None
):
...

@overload
def settle_messages(
self,
delivery_id: Union[int, Tuple[int, int]],
outcome: Literal["released"],
*,
batchable: Optional[bool] = None
):
...

@overload
def settle_messages(
self,
delivery_id: Union[int, Tuple[int, int]],
outcome: Literal["rejected"],
*,
error: Optional[AMQPError] = None,
batchable: Optional[bool] = None
):
...

@overload
def settle_messages(
self,
delivery_id: Union[int, Tuple[int, int]],
outcome: Literal["modified"],
*,
delivery_failed: Optional[bool] = None,
undeliverable_here: Optional[bool] = None,
message_annotations: Optional[Dict[Union[str, bytes], Any]],
batchable: Optional[bool] = None
):
...

@overload
def settle_messages(
self,
delivery_id: Union[int, Tuple[int, int]],
outcome: Literal["received"],
*,
section_number: int,
section_offset: int,
batchable: Optional[bool] = None
):
...

def settle_messages(self, delivery_id: Union[int, Tuple[int, int]], outcome: str, **kwargs):
batchable = kwargs.pop('batchable', None)
if outcome.lower() == 'accepted':
state = Accepted()
elif outcome.lower() == 'released':
state = Released()
elif outcome.lower() == 'rejected':
state = Rejected(**kwargs)
elif outcome.lower() == 'modified':
state = Modified(**kwargs)
elif outcome.lower() == 'received':
state = Received(**kwargs)
else:
raise ValueError("Unrecognized message output: {}".format(outcome))
try:
first, last = delivery_id
except TypeError:
first = delivery_id
last = None
self._link.send_disposition(
first_delivery_id=first,
last_delivery_id=last,
settled=True,
delivery_state=state,
batchable=batchable
)
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,9 @@
this is taken to be the same as first.
:param bool settled: Indicates deliveries are settled.
If true, indicates that the referenced deliveries are considered settled by the issuing endpoint.
:param bytes state: Indicates state of deliveries.
:param state: Indicates state of deliveries.
Communicates the state of all the deliveries referenced by this disposition.
:paramtype state:
:param bool batchable: Batchable hint.
If true, then the issuer is hinting that there is no need for the peer to urgently communicate the impact
of the updated delivery states. This hint may be used to artificially increase the amount of batching an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
import logging
from io import BytesIO
from typing import Optional, Union

from ._decode import decode_payload
from .constants import DEFAULT_LINK_CREDIT, Role
Expand All @@ -27,6 +28,13 @@
DispositionFrame,
FlowFrame,
)
from .outcomes import (
Received,
Accepted,
Rejected,
Released,
Modified
)


_LOGGER = logging.getLogger(__name__)
Expand All @@ -40,16 +48,13 @@ def __init__(self, session, handle, source_address, **kwargs):
if 'target_address' not in kwargs:
kwargs['target_address'] = "receiver-link-{}".format(name)
super(ReceiverLink, self).__init__(session, handle, name, role, source_address=source_address, **kwargs)
self.on_message_received = kwargs.get('on_message_received')
self.on_transfer_received = kwargs.get('on_transfer_received')
if not self.on_message_received and not self.on_transfer_received:
raise ValueError("Must specify either a message or transfer handler.")

def _process_incoming_message(self, frame, message):
try:
if self.on_message_received:
return self.on_message_received(message)
elif self.on_transfer_received:
if self.on_transfer_received:
return self.on_transfer_received(frame, message)
except Exception as e:
_LOGGER.error("Handler function failed with error: %r", e)
Expand Down Expand Up @@ -83,25 +88,44 @@ def _incoming_transfer(self, frame):
message = decode_payload(frame[11])
delivery_state = self._process_incoming_message(frame, message)
if not frame[4] and delivery_state: # settled
self._outgoing_disposition(frame[1], delivery_state)
self._outgoing_disposition(first=frame[1], settled=True, state=delivery_state)
if self.current_link_credit <= 0:
self.current_link_credit = self.link_credit
self._outgoing_flow()

def _outgoing_disposition(self, delivery_id, delivery_state):
def _outgoing_disposition(
self,
first: int,
last: Optional[int],
settled: Optional[bool],
state: Optional[Union[Received, Accepted, Rejected, Released, Modified]],
batchable: Optional[bool]):
disposition_frame = DispositionFrame(
role=self.role,
first=delivery_id,
last=delivery_id,
settled=True,
state=delivery_state,
batchable=None
first=first,
last=last,
settled=settled,
state=state,
batchable=batchable
)
if self.network_trace:
_LOGGER.info("-> %r", DispositionFrame(*disposition_frame), extra=self.network_trace_params)
self._session._outgoing_disposition(disposition_frame)

def send_disposition(self, delivery_id, delivery_state=None):
def send_disposition(
self,
first_delivery_id: int,
last_delivery_id: Optional[int] = None,
settled: Optional[bool] = None,
delivery_state: Optional[Union[Received, Accepted, Rejected, Released, Modified]] = None,
batchable: Optional[bool] = None
):
if self._is_closed:
raise ValueError("Link already closed.")
self._outgoing_disposition(delivery_id, delivery_state)
self._outgoing_disposition(
first_delivery_id,
last_delivery_id,
settled,
delivery_state,
batchable
)
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _create_uamqp_connection(self):
self._connection = Connection(
hostname=self.fully_qualified_namespace,
sasl=auth,
debug=self._config.logging_enable,
nextwork_trace=self._config.logging_enable,
)

def close(self):
Expand Down
Loading