From 48c2cbb9a0b90ac6bd64dc39104ee0db11795aff Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Thu, 25 Mar 2021 21:26:48 -0400 Subject: [PATCH 1/9] add setstate and getstate to message --- tests/test_message.py | 55 ++++++- uamqp/message.py | 338 +++++++++++++++++++++++++++++------------- 2 files changed, 287 insertions(+), 106 deletions(-) diff --git a/tests/test_message.py b/tests/test_message.py index 946be41b7..714f5a9ce 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -1,4 +1,5 @@ -from uamqp.message import MessageProperties +from uamqp.message import MessageProperties, MessageHeader, Message +import pickle def test_message_proeprties(): @@ -29,3 +30,55 @@ def test_message_proeprties(): properties = MessageProperties() properties.user_id = 'werid/0\0\1\t\n' assert properties.user_id == b'werid/0\0\1\t\n' + +def test_message_pickle(): + properties = MessageProperties() + properties.message_id = '2' + properties.user_id = '1' + properties.to = 'dkfj' + properties.subject = 'dsljv' + properties.reply_to = "kdjfk" + properties.correlation_id = 'ienag' + properties.content_type = 'b' + properties.content_encoding = '39ru' + properties.absolute_expiry_time = 24 + properties.creation_time = 10 + properties.group_id = '3irow' + properties.group_sequence = 39 + properties.reply_to_group_id = '39rud' + + header = MessageHeader() + header.delivery_count = 3 + header.time_to_live = 5 + header.first_acquirer = 'dkfj' + header.durable = True + header.priority = 4 + + + message = Message(properties=properties, header=header) + message.footer = {'a':2} + pickled = pickle.loads(pickle.dumps(message)) + + assert message.footer == pickled.footer + assert message.application_properties == pickled.application_properties + assert message.annotations == pickled.annotations + assert message.delivery_annotations == pickled.delivery_annotations + assert message.settled == pickled.settled + assert message.properties.message_id == pickled.properties.message_id + assert message.properties.user_id == pickled.properties.user_id + assert message.properties.to == pickled.properties.to + assert message.properties.subject == pickled.properties.subject + assert message.properties.reply_to == pickled.properties.reply_to + assert message.properties.correlation_id == pickled.properties.correlation_id + assert message.properties.content_type == pickled.properties.content_type + assert message.properties.content_encoding == pickled.properties.content_encoding + assert message.properties.absolute_expiry_time == pickled.properties.absolute_expiry_time + assert message.properties.creation_time == pickled.properties.creation_time + assert message.properties.group_id == pickled.properties.group_id + assert message.properties.group_sequence == pickled.properties.group_sequence + assert message.properties.reply_to_group_id == pickled.properties.reply_to_group_id + assert message.header.delivery_count == pickled.header.delivery_count + assert message.header.time_to_live == pickled.header.time_to_live + assert message.header.first_acquirer == pickled.header.first_acquirer + assert message.header.durable == pickled.header.durable + assert message.header.priority == pickled.header.priority diff --git a/uamqp/message.py b/uamqp/message.py index 55a22a727..c6913bf62 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -1,8 +1,8 @@ -#------------------------------------------------------------------------- +# ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. -#-------------------------------------------------------------------------- +# -------------------------------------------------------------------------- # pylint: disable=too-many-lines @@ -60,17 +60,19 @@ class Message(object): :type encoding: str """ - def __init__(self, - body=None, - properties=None, - application_properties=None, - annotations=None, - header=None, - msg_format=None, - message=None, - settler=None, - delivery_no=None, - encoding='UTF-8'): + def __init__( + self, + body=None, + properties=None, + application_properties=None, + annotations=None, + header=None, + msg_format=None, + message=None, + settler=None, + delivery_no=None, + encoding="UTF-8", + ): self.state = constants.MessageState.WaitingToBeSent self.idle_time = 0 self.retries = 0 @@ -102,7 +104,9 @@ def __init__(self, if isinstance(body, (six.text_type, six.binary_type)): self._body = DataBody(self._message) self._body.append(body) - elif isinstance(body, list) and all([isinstance(b, (six.text_type, six.binary_type)) for b in body]): + elif isinstance(body, list) and all( + [isinstance(b, (six.text_type, six.binary_type)) for b in body] + ): self._body = DataBody(self._message) for value in body: self._body.append(value) @@ -116,6 +120,41 @@ def __init__(self, self._annotations = annotations self._header = header + def __getstate__(self): + state = self.__dict__.copy() + if state["_body"].data: + body = list(state["_body"].data) + else: + body = "" + state["_body"] = body + + state["state"] = None + state["delivery_no"] = None + state["delivery_tag"] = None + state["_response"] = None + state["_settler"] = None + state["on_send_complete"] = None + state["_message"] = None + + return state + + def __setstate__(self, state): + self.__dict__.update(state) + body = state["_body"] + self._message = c_uamqp.create_message() + if isinstance(body, (six.text_type, six.binary_type)): + self._body = DataBody(self._message) + self._body.append(body) + elif isinstance(body, list) and all( + [isinstance(b, (six.text_type, six.binary_type)) for b in body] + ): + self._body = DataBody(self._message) + for value in body: + self._body.append(value) + else: + self._body = ValueBody(self._message) + self._body.set(body) + @property def properties(self): if self._need_further_parse: @@ -151,7 +190,8 @@ def footer(self, value): if value and not isinstance(value, dict): raise TypeError("Footer must be a dictionary") footer_props = c_uamqp.create_footer( - utils.data_factory(value, encoding=self._encoding)) + utils.data_factory(value, encoding=self._encoding) + ) self._message.footer = footer_props self._footer = value @@ -210,8 +250,12 @@ def _parse_message_properties(self): if self._need_further_parse: _props = self._message.properties if _props: - _logger.debug("Parsing received message properties %r.", self.delivery_no) - self._properties = MessageProperties(properties=_props, encoding=self._encoding) + _logger.debug( + "Parsing received message properties %r.", self.delivery_no + ) + self._properties = MessageProperties( + properties=_props, encoding=self._encoding + ) _header = self._message.header if _header: _logger.debug("Parsing received message header %r.", self.delivery_no) @@ -222,15 +266,23 @@ def _parse_message_properties(self): self._footer = _footer.map _app_props = self._message.application_properties if _app_props: - _logger.debug("Parsing received message application properties %r.", self.delivery_no) + _logger.debug( + "Parsing received message application properties %r.", + self.delivery_no, + ) self._application_properties = _app_props.map _ann = self._message.message_annotations if _ann: - _logger.debug("Parsing received message annotations %r.", self.delivery_no) + _logger.debug( + "Parsing received message annotations %r.", self.delivery_no + ) self._annotations = _ann.map _delivery_ann = self._message.delivery_annotations if _delivery_ann: - _logger.debug("Parsing received message delivery annotations %r.", self.delivery_no) + _logger.debug( + "Parsing received message delivery annotations %r.", + self.delivery_no, + ) self._delivery_annotations = _delivery_ann.map self._need_further_parse = False @@ -270,19 +322,23 @@ def _populate_message_attributes(self, c_message): if self.application_properties: if not isinstance(self.application_properties, dict): raise TypeError("Application properties must be a dictionary.") - amqp_props = utils.data_factory(self.application_properties, encoding=self._encoding) + amqp_props = utils.data_factory( + self.application_properties, encoding=self._encoding + ) c_message.application_properties = amqp_props if self.annotations: if not isinstance(self.annotations, dict): raise TypeError("Message annotations must be a dictionary.") ann_props = c_uamqp.create_message_annotations( - utils.data_factory(self.annotations, encoding=self._encoding)) + utils.data_factory(self.annotations, encoding=self._encoding) + ) c_message.message_annotations = ann_props if self.delivery_annotations: if not isinstance(self.delivery_annotations, dict): raise TypeError("Delivery annotations must be a dictionary.") delivery_ann_props = c_uamqp.create_delivery_annotations( - utils.data_factory(self.delivery_annotations, encoding=self._encoding)) + utils.data_factory(self.delivery_annotations, encoding=self._encoding) + ) c_message.delivery_annotations = delivery_ann_props if self.header: c_message.header = self.header.get_header_obj() @@ -290,10 +346,10 @@ def _populate_message_attributes(self, c_message): if not isinstance(self.footer, dict): raise TypeError("Footer must be a dictionary.") footer = c_uamqp.create_footer( - utils.data_factory(self.footer, encoding=self._encoding)) + utils.data_factory(self.footer, encoding=self._encoding) + ) c_message.footer = footer - @property def settled(self): """Whether the message transaction for this message has been completed. @@ -412,7 +468,8 @@ def reject(self, condition=None, description=None, info=None): condition=condition, description=description, info=info, - encoding=self._encoding) + encoding=self._encoding, + ) self._settler(self._response) self.state = constants.MessageState.ReceivedSettled return True @@ -456,10 +513,8 @@ def modify(self, failed, deliverable, annotations=None): """ if self._can_settle_message(): self._response = errors.MessageModified( - failed, - deliverable, - annotations=annotations, - encoding=self._encoding) + failed, deliverable, annotations=annotations, encoding=self._encoding + ) self._settler(self._response) self.state = constants.MessageState.ReceivedSettled return True @@ -518,14 +573,16 @@ class BatchMessage(Message): max_message_length = constants.MAX_MESSAGE_LENGTH_BYTES size_offset = 0 - def __init__(self, - data=None, - properties=None, - application_properties=None, - annotations=None, - header=None, - multi_messages=False, - encoding='UTF-8'): + def __init__( + self, + data=None, + properties=None, + application_properties=None, + annotations=None, + header=None, + multi_messages=False, + encoding="UTF-8", + ): # pylint: disable=super-init-not-called self._multi_messages = multi_messages self._body_gen = data @@ -543,12 +600,14 @@ def _create_batch_message(self): :rtype: ~uamqp.message.Message """ - return Message(body=[], - properties=self.properties, - annotations=self.annotations, - msg_format=self.batch_format, - header=self.header, - encoding=self._encoding) + return Message( + body=[], + properties=self.properties, + annotations=self.annotations, + msg_format=self.batch_format, + header=self.header, + encoding=self._encoding, + ) def _multi_message_generator(self): """Generate multiple ~uamqp.message.Message objects from a single data @@ -565,7 +624,9 @@ def _multi_message_generator(self): message_size = new_message.get_message_encoded_size() + self.size_offset body_size = 0 if unappended_message_bytes: - new_message._body.append(unappended_message_bytes) # pylint: disable=protected-access + new_message._body.append( + unappended_message_bytes + ) # pylint: disable=protected-access body_size += len(unappended_message_bytes) try: for data in self._body_gen: @@ -578,13 +639,18 @@ def _multi_message_generator(self): internal_uamqp_message = data try: # uamqp Message - if not internal_uamqp_message.application_properties and self.application_properties: - internal_uamqp_message.application_properties = self.application_properties + if ( + not internal_uamqp_message.application_properties + and self.application_properties + ): + internal_uamqp_message.application_properties = ( + self.application_properties + ) message_bytes = internal_uamqp_message.encode_message() except AttributeError: # raw data wrap_message = Message( body=internal_uamqp_message, - application_properties=self.application_properties + application_properties=self.application_properties, ) message_bytes = wrap_message.encode_message() body_size += len(message_bytes) @@ -593,7 +659,9 @@ def _multi_message_generator(self): unappended_message_bytes = message_bytes yield new_message raise StopIteration() - new_message._body.append(message_bytes) # pylint: disable=protected-access + new_message._body.append( + message_bytes + ) # pylint: disable=protected-access except StopIteration: _logger.debug("Sent partial message.") continue @@ -627,11 +695,19 @@ def gather(self): internal_uamqp_message = data try: # uamqp Message - if not internal_uamqp_message.application_properties and self.application_properties: - internal_uamqp_message.application_properties = self.application_properties + if ( + not internal_uamqp_message.application_properties + and self.application_properties + ): + internal_uamqp_message.application_properties = ( + self.application_properties + ) message_bytes = internal_uamqp_message.encode_message() except AttributeError: # raw data - wrap_message = Message(body=internal_uamqp_message, application_properties=self.application_properties) + wrap_message = Message( + body=internal_uamqp_message, + application_properties=self.application_properties, + ) message_bytes = wrap_message.encode_message() body_size += len(message_bytes) if (body_size + message_size) > self.max_message_length: @@ -680,22 +756,24 @@ class MessageProperties(object): :vartype reply_to_group_id: """ - def __init__(self, - message_id=None, - user_id=None, - to=None, - subject=None, - reply_to=None, - correlation_id=None, - content_type=None, - content_encoding=None, - absolute_expiry_time=None, - creation_time=None, - group_id=None, - group_sequence=None, - reply_to_group_id=None, - properties=None, - encoding='UTF-8'): + def __init__( + self, + message_id=None, + user_id=None, + to=None, + subject=None, + reply_to=None, + correlation_id=None, + content_type=None, + content_encoding=None, + absolute_expiry_time=None, + creation_time=None, + group_id=None, + group_sequence=None, + reply_to_group_id=None, + properties=None, + encoding="UTF-8", + ): self._encoding = encoding if properties: self._message_id = properties.message_id @@ -727,21 +805,33 @@ def __init__(self, self.reply_to_group_id = reply_to_group_id def __str__(self): - return str({ - 'message_id': self.message_id, - 'user_id': self.user_id, - 'to': self.to, - 'subject': self.subject, - 'reply_to': self.reply_to, - 'correlation_id': self.correlation_id, - 'content_type': self.content_type, - 'content_encoding': self.content_encoding, - 'absolute_expiry_time': self.absolute_expiry_time, - 'creation_time': self.creation_time, - 'group_id': self.group_id, - 'group_sequence': self.group_sequence, - 'reply_to_group_id': self.reply_to_group_id - }) + return str( + { + "message_id": self.message_id, + "user_id": self.user_id, + "to": self.to, + "subject": self.subject, + "reply_to": self.reply_to, + "correlation_id": self.correlation_id, + "content_type": self.content_type, + "content_encoding": self.content_encoding, + "absolute_expiry_time": self.absolute_expiry_time, + "creation_time": self.creation_time, + "group_id": self.group_id, + "group_sequence": self.group_sequence, + "reply_to_group_id": self.reply_to_group_id, + } + ) + + def __getstate__(self): + state = self._get_properties_dict() + state["_encoding"] = self._encoding + return state + + def __setstate__(self, state): + self._encoding = state.pop("_encoding") + for key, val in state.items(): + self.__setattr__(key, val) @property def message_id(self): @@ -774,7 +864,9 @@ def user_id(self, value): # user_id is type of binary according to the spec. # convert byte string into bytearray then wrap the data into c_uamqp.BinaryValue. if value is not None: - self._user_id = utils.data_factory(bytearray(value), encoding=self._encoding) + self._user_id = utils.data_factory( + bytearray(value), encoding=self._encoding + ) else: self._user_id = None @@ -912,25 +1004,42 @@ def _set_attr(self, attr, properties): if attr_value is not None: setattr(properties, attr, attr_value) + def _get_properties_dict(self): + return { + "message_id": self.message_id, + "user_id": self.user_id, + "to": self.to, + "subject": self.subject, + "reply_to": self.reply_to, + "correlation_id": self.correlation_id, + "content_type": self.content_type, + "content_encoding": self.content_encoding, + "absolute_expiry_time": self.absolute_expiry_time, + "creation_time": self.creation_time, + "group_id": self.group_id, + "group_sequence": self.group_sequence, + "reply_to_group_id": self.reply_to_group_id, + } + def get_properties_obj(self): """Get the underlying C reference from this object. :rtype: uamqp.c_uamqp.cProperties """ properties = c_uamqp.cProperties() - self._set_attr('message_id', properties) - self._set_attr('user_id', properties) - self._set_attr('to', properties) - self._set_attr('subject', properties) - self._set_attr('reply_to', properties) - self._set_attr('correlation_id', properties) - self._set_attr('content_type', properties) - self._set_attr('content_encoding', properties) - self._set_attr('absolute_expiry_time', properties) - self._set_attr('creation_time', properties) - self._set_attr('group_id', properties) - self._set_attr('group_sequence', properties) - self._set_attr('reply_to_group_id', properties) + self._set_attr("message_id", properties) + self._set_attr("user_id", properties) + self._set_attr("to", properties) + self._set_attr("subject", properties) + self._set_attr("reply_to", properties) + self._set_attr("correlation_id", properties) + self._set_attr("content_type", properties) + self._set_attr("content_encoding", properties) + self._set_attr("absolute_expiry_time", properties) + self._set_attr("creation_time", properties) + self._set_attr("group_id", properties) + self._set_attr("group_sequence", properties) + self._set_attr("reply_to_group_id", properties) return properties @@ -939,7 +1048,7 @@ class MessageBody(object): not be used directly. """ - def __init__(self, c_message, encoding='UTF-8'): + def __init__(self, c_message, encoding="UTF-8"): self._message = c_message self._encoding = encoding @@ -1106,13 +1215,32 @@ def __init__(self, header=None): self.priority = header.priority def __str__(self): - return str({ - 'delivery_count': self.delivery_count, - 'time_to_live': self.time_to_live, - 'first_acquirer': self.first_acquirer, - 'durable': self.durable, - 'priority': self.priority - }) + return str( + { + "delivery_count": self.delivery_count, + "time_to_live": self.time_to_live, + "first_acquirer": self.first_acquirer, + "durable": self.durable, + "priority": self.priority, + } + ) + + def __getstate__(self): + state = self._get_properties_dict() + return state + + def __setstate__(self, state): + for key, val in state.items(): + self.__setattr__(key, val) + + def _get_properties_dict(self): + return { + "delivery_count": self.delivery_count, + "time_to_live": self.time_to_live, + "first_acquirer": self.first_acquirer, + "durable": self.durable, + "priority": self.priority, + } def get_header_obj(self): """Get the underlying C reference from this object. From 335211f2b79c9dcfa262011251491dfd962d3199 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Fri, 26 Mar 2021 13:40:23 -0400 Subject: [PATCH 2/9] save _response for settled property --- uamqp/message.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/uamqp/message.py b/uamqp/message.py index c6913bf62..b07ef1707 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -122,16 +122,13 @@ def __init__( def __getstate__(self): state = self.__dict__.copy() - if state["_body"].data: - body = list(state["_body"].data) - else: - body = "" - state["_body"] = body + state["_body"] = list(state["_body"].data) if state["_body"].data else "" state["state"] = None state["delivery_no"] = None state["delivery_tag"] = None - state["_response"] = None + # need _response for settled property + state["_response"] = self.settled state["_settler"] = None state["on_send_complete"] = None state["_message"] = None @@ -155,6 +152,10 @@ def __setstate__(self, state): self._body = ValueBody(self._message) self._body.set(body) + # update response with arbitrary error, for settled property + if self._response: + self._response = errors.MessageAccepted() + @property def properties(self): if self._need_further_parse: From 41c3b3105c442a2d4e298ea5ed5fdc0ea16eff28 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Tue, 30 Mar 2021 09:38:29 -0700 Subject: [PATCH 3/9] add different body type support --- tests/test_message.py | 29 ++++++++++++++++++++++++++--- uamqp/constants.py | 7 +++++++ uamqp/message.py | 28 ++++++++++++++-------------- 3 files changed, 47 insertions(+), 17 deletions(-) diff --git a/tests/test_message.py b/tests/test_message.py index 99267b538..d9b4f919d 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -35,6 +35,7 @@ def test_message_properties(): properties.user_id = 'werid/0\0\1\t\n' assert properties.user_id == b'werid/0\0\1\t\n' + def test_message_pickle(): properties = MessageProperties() properties.message_id = '2' @@ -58,11 +59,33 @@ def test_message_pickle(): header.durable = True header.priority = 4 - - message = Message(properties=properties, header=header) + data_message = Message(body=[b'testmessage1', b'testmessage2']) + pickled = pickle.loads(pickle.dumps(data_message)) + body = list(pickled.get_data()) + assert len(body) == 2 + assert body == [b'testmessage1', b'testmessage2'] + + sequence_message = Message( + body=[[1234.56, b'testmessage2', True], [-1234.56, {b'key': b'value'}, False]], + body_type=MessageBodyType.Sequence + ) + pickled = pickle.loads(pickle.dumps(sequence_message)) + body = list(pickled.get_data()) + assert len(body) == 2 + assert body == [[1234.56, b'testmessage2', True], [-1234.56, {b'key': b'value'}, False]] + + value_message = Message( + body={b'key': [1, b'str', False]}, + body_type=MessageBodyType.Value + ) + pickled = pickle.loads(pickle.dumps(value_message)) + body = pickled.get_data() + assert body == {b'key': [1, b'str', False]} + + message = Message(body="test", properties=properties, header=header) message.footer = {'a':2} pickled = pickle.loads(pickle.dumps(message)) - + assert list(message.get_data()) == [b"test"] assert message.footer == pickled.footer assert message.application_properties == pickled.application_properties assert message.annotations == pickled.annotations diff --git a/uamqp/constants.py b/uamqp/constants.py index ce7b6af8d..3c7b24171 100644 --- a/uamqp/constants.py +++ b/uamqp/constants.py @@ -188,3 +188,10 @@ class MessageBodyType(Enum): Data = c_uamqp.MessageBodyType.DataType Value = c_uamqp.MessageBodyType.ValueType Sequence = c_uamqp.MessageBodyType.SequenceType + + +BODY_TYPE_C_PYTHON_MAP = { + c_uamqp.MessageBodyType.DataType.value: MessageBodyType.Data, + c_uamqp.MessageBodyType.SequenceType.value: MessageBodyType.Sequence, + c_uamqp.MessageBodyType.ValueType.value: MessageBodyType.Value +} diff --git a/uamqp/message.py b/uamqp/message.py index c4abbb121..57ec66a8c 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -138,7 +138,12 @@ def __init__( def __getstate__(self): state = self.__dict__.copy() - state["_body"] = list(state["_body"].data) if state["_body"].data else "" + + state["_body_type"] = self._body.type.value if self._body else None + if isinstance(self._body, (DataBody, SequenceBody)): + state["_body"] = list(self._body.data) + elif isinstance(self._body, ValueBody): + state["_body"] = self._body.data state["state"] = None state["delivery_no"] = None @@ -153,20 +158,15 @@ def __getstate__(self): def __setstate__(self, state): self.__dict__.update(state) - body = state["_body"] + body = state.get("_body") + body_type = constants.BODY_TYPE_C_PYTHON_MAP.get(state.get("_body_type")) self._message = c_uamqp.create_message() - if isinstance(body, (six.text_type, six.binary_type)): - self._body = DataBody(self._message) - self._body.append(body) - elif isinstance(body, list) and all( - [isinstance(b, (six.text_type, six.binary_type)) for b in body] - ): - self._body = DataBody(self._message) - for value in body: - self._body.append(value) - else: - self._body = ValueBody(self._message) - self._body.set(body) + self._body = None + if body: + if not body_type: + self._auto_set_body(body) + else: + self._set_body_by_body_type(body, body_type) # update response with arbitrary error, for settled property if self._response: From 7b9a66e4612850ee57309d1f689ac9db03050513 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Tue, 30 Mar 2021 10:27:51 -0700 Subject: [PATCH 4/9] fix pylint --- uamqp/message.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/uamqp/message.py b/uamqp/message.py index 57ec66a8c..114630646 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -157,11 +157,11 @@ def __getstate__(self): return state def __setstate__(self, state): + state["_body"] = None self.__dict__.update(state) body = state.get("_body") body_type = constants.BODY_TYPE_C_PYTHON_MAP.get(state.get("_body_type")) self._message = c_uamqp.create_message() - self._body = None if body: if not body_type: self._auto_set_body(body) @@ -687,9 +687,9 @@ def _multi_message_generator(self): message_size = new_message.get_message_encoded_size() + self.size_offset body_size = 0 if unappended_message_bytes: - new_message._body.append( + new_message._body.append( # pylint: disable=protected-access unappended_message_bytes - ) # pylint: disable=protected-access + ) body_size += len(unappended_message_bytes) try: for data in self._body_gen: @@ -722,9 +722,9 @@ def _multi_message_generator(self): unappended_message_bytes = message_bytes yield new_message raise StopIteration() - new_message._body.append( + new_message._body.append( # pylint: disable=protected-access message_bytes - ) # pylint: disable=protected-access + ) except StopIteration: _logger.debug("Sent partial message.") continue From e6d63e37fb047ea758e4a5dbaf82fca3abf6de90 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Tue, 30 Mar 2021 11:21:54 -0700 Subject: [PATCH 5/9] fix body and travis pylint after black --- uamqp/message.py | 85 ++++++++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/uamqp/message.py b/uamqp/message.py index 114630646..ac52acacf 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -78,20 +78,20 @@ class Message(object): def __init__( - self, - body=None, - properties=None, - application_properties=None, - annotations=None, - header=None, - msg_format=None, - message=None, - settler=None, - delivery_no=None, - encoding='UTF-8', - body_type=None, - footer=None, - delivery_annotations=None + self, + body=None, + properties=None, + application_properties=None, + annotations=None, + header=None, + msg_format=None, + message=None, + settler=None, + delivery_no=None, + encoding='UTF-8', + body_type=None, + footer=None, + delivery_annotations=None ): self.state = constants.MessageState.WaitingToBeSent self.idle_time = 0 @@ -157,7 +157,6 @@ def __getstate__(self): return state def __setstate__(self, state): - state["_body"] = None self.__dict__.update(state) body = state.get("_body") body_type = constants.BODY_TYPE_C_PYTHON_MAP.get(state.get("_body_type")) @@ -637,14 +636,14 @@ class BatchMessage(Message): size_offset = 0 def __init__( - self, - data=None, - properties=None, - application_properties=None, - annotations=None, - header=None, - multi_messages=False, - encoding="UTF-8", + self, + data=None, + properties=None, + application_properties=None, + annotations=None, + header=None, + multi_messages=False, + encoding="UTF-8", ): # pylint: disable=super-init-not-called self._multi_messages = multi_messages @@ -703,8 +702,8 @@ def _multi_message_generator(self): try: # uamqp Message if ( - not internal_uamqp_message.application_properties - and self.application_properties + not internal_uamqp_message.application_properties + and self.application_properties ): internal_uamqp_message.application_properties = ( self.application_properties @@ -759,8 +758,8 @@ def gather(self): try: # uamqp Message if ( - not internal_uamqp_message.application_properties - and self.application_properties + not internal_uamqp_message.application_properties + and self.application_properties ): internal_uamqp_message.application_properties = ( self.application_properties @@ -820,22 +819,22 @@ class MessageProperties(object): """ def __init__( - self, - message_id=None, - user_id=None, - to=None, - subject=None, - reply_to=None, - correlation_id=None, - content_type=None, - content_encoding=None, - absolute_expiry_time=None, - creation_time=None, - group_id=None, - group_sequence=None, - reply_to_group_id=None, - properties=None, - encoding="UTF-8", + self, + message_id=None, + user_id=None, + to=None, + subject=None, + reply_to=None, + correlation_id=None, + content_type=None, + content_encoding=None, + absolute_expiry_time=None, + creation_time=None, + group_id=None, + group_sequence=None, + reply_to_group_id=None, + properties=None, + encoding="UTF-8", ): self._encoding = encoding if properties: From f973e16fb855e8199de15bbf6430f7ef831a2f7f Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Tue, 30 Mar 2021 15:40:58 -0400 Subject: [PATCH 6/9] hide private variables --- tests/test_message.py | 4 +++- uamqp/message.py | 42 +++++++++++++++++++++++++++--------------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/tests/test_message.py b/tests/test_message.py index d9b4f919d..5cd4fa467 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -1,4 +1,4 @@ -from uamqp.message import MessageProperties, MessageHeader, Message +from uamqp.message import MessageProperties, MessageHeader, Message, errors import pickle import pytest @@ -84,6 +84,8 @@ def test_message_pickle(): message = Message(body="test", properties=properties, header=header) message.footer = {'a':2} + message._response = errors.MessageAccepted() + pickled = pickle.loads(pickle.dumps(message)) assert list(message.get_data()) == [b"test"] assert message.footer == pickled.footer diff --git a/uamqp/message.py b/uamqp/message.py index ac52acacf..ce0390ccf 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -137,27 +137,43 @@ def __init__( self._footer = footer def __getstate__(self): - state = self.__dict__.copy() - + state = {} + state["delivery_no"] = self.delivery_no + state["delivery_tag"] = self.delivery_tag + state["properties"] = self.properties + state["application_properties"] = self.application_properties + state["annotations"] = self.annotations + state["header"] = self.header + state["footer"] = self.footer + state["delivery_annotations"] = self._delivery_annotations + state["_response"] = str(type(self._response).__name__) if self._response else None state["_body_type"] = self._body.type.value if self._body else None if isinstance(self._body, (DataBody, SequenceBody)): state["_body"] = list(self._body.data) elif isinstance(self._body, ValueBody): state["_body"] = self._body.data - state["state"] = None - state["delivery_no"] = None - state["delivery_tag"] = None - # need _response for settled property - state["_response"] = self.settled - state["_settler"] = None - state["on_send_complete"] = None - state["_message"] = None - return state def __setstate__(self, state): + state["state"] = constants.MessageState.WaitingToBeSent + state["idle_time"] = 0 + state["retries"] = 0 + state["_settler"] = None + state["_encoding"] = "UTF-8" + state["on_send_complete"] = None + state["_need_further_parse"] = False + state["_properties"] = state.pop("properties") + state["_application_properties"] = state.pop("application_properties") + state["_annotations"] = state.pop("annotations") + state["_header"] = state.pop("header") + state["_footer"] = state.pop("footer") + state["_delivery_annotations"] = state.pop("delivery_annotations") self.__dict__.update(state) + + if self._response: + self._response = getattr(errors, self._response)() + body = state.get("_body") body_type = constants.BODY_TYPE_C_PYTHON_MAP.get(state.get("_body_type")) self._message = c_uamqp.create_message() @@ -167,10 +183,6 @@ def __setstate__(self, state): else: self._set_body_by_body_type(body, body_type) - # update response with arbitrary error, for settled property - if self._response: - self._response = errors.MessageAccepted() - @property def properties(self): if self._need_further_parse: From 6de2d06bcf95a867ed4ea60fcd3cd3195b213e3e Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Wed, 31 Mar 2021 10:59:36 -0400 Subject: [PATCH 7/9] remove _response/settled testing --- tests/test_message.py | 8 +++++--- uamqp/message.py | 8 +++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_message.py b/tests/test_message.py index 5cd4fa467..c32e7d527 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -1,4 +1,4 @@ -from uamqp.message import MessageProperties, MessageHeader, Message, errors +from uamqp.message import MessageProperties, MessageHeader, Message, constants import pickle import pytest @@ -84,15 +84,17 @@ def test_message_pickle(): message = Message(body="test", properties=properties, header=header) message.footer = {'a':2} - message._response = errors.MessageAccepted() + message.state = constants.MessageState.ReceivedSettled pickled = pickle.loads(pickle.dumps(message)) assert list(message.get_data()) == [b"test"] assert message.footer == pickled.footer + assert message.state == pickled.state assert message.application_properties == pickled.application_properties assert message.annotations == pickled.annotations assert message.delivery_annotations == pickled.delivery_annotations - assert message.settled == pickled.settled + # settled will not be tested, undecided about serializing _response + # assert message.settled == pickled.settled assert message.properties.message_id == pickled.properties.message_id assert message.properties.user_id == pickled.properties.user_id assert message.properties.to == pickled.properties.to diff --git a/uamqp/message.py b/uamqp/message.py index ce0390ccf..b09d39915 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -146,7 +146,7 @@ def __getstate__(self): state["header"] = self.header state["footer"] = self.footer state["delivery_annotations"] = self._delivery_annotations - state["_response"] = str(type(self._response).__name__) if self._response else None + state["state"] = self.state.value state["_body_type"] = self._body.type.value if self._body else None if isinstance(self._body, (DataBody, SequenceBody)): state["_body"] = list(self._body.data) @@ -156,9 +156,10 @@ def __getstate__(self): return state def __setstate__(self, state): - state["state"] = constants.MessageState.WaitingToBeSent + state["state"] = constants.MessageState(state.get("state")) state["idle_time"] = 0 state["retries"] = 0 + state["_response"] = None state["_settler"] = None state["_encoding"] = "UTF-8" state["on_send_complete"] = None @@ -171,9 +172,6 @@ def __setstate__(self, state): state["_delivery_annotations"] = state.pop("delivery_annotations") self.__dict__.update(state) - if self._response: - self._response = getattr(errors, self._response)() - body = state.get("_body") body_type = constants.BODY_TYPE_C_PYTHON_MAP.get(state.get("_body_type")) self._message = c_uamqp.create_message() From 44966752985302868a6cff2d9e1b9bdd3da910bd Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Wed, 31 Mar 2021 17:32:48 -0400 Subject: [PATCH 8/9] add private ivars + pickle errors --- tests/test_message.py | 39 ++++++++++++++++++++++++++++++++++++--- uamqp/errors.py | 13 +++++++++++++ uamqp/message.py | 24 ++---------------------- 3 files changed, 51 insertions(+), 25 deletions(-) diff --git a/tests/test_message.py b/tests/test_message.py index c32e7d527..220b0cc14 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -1,4 +1,4 @@ -from uamqp.message import MessageProperties, MessageHeader, Message, constants +from uamqp.message import MessageProperties, MessageHeader, Message, constants, errors, c_uamqp import pickle import pytest @@ -35,6 +35,12 @@ def test_message_properties(): properties.user_id = 'werid/0\0\1\t\n' assert properties.user_id == b'werid/0\0\1\t\n' +def send_complete_callback(result, error): + # helper for test below not in test, b/c results in: + # AttributeError: Can't pickle local object + print(result) + print(error) + def test_message_pickle(): properties = MessageProperties() @@ -82,7 +88,12 @@ def test_message_pickle(): body = pickled.get_data() assert body == {b'key': [1, b'str', False]} + error = errors.MessageModified(False, False, {b'key': b'value'}) + pickled_error = pickle.loads(pickle.dumps(error)) + assert pickled_error._annotations == {b'key': b'value'} # pylint: disable=protected-access + message = Message(body="test", properties=properties, header=header) + message.on_send_complete = send_complete_callback message.footer = {'a':2} message.state = constants.MessageState.ReceivedSettled @@ -93,8 +104,7 @@ def test_message_pickle(): assert message.application_properties == pickled.application_properties assert message.annotations == pickled.annotations assert message.delivery_annotations == pickled.delivery_annotations - # settled will not be tested, undecided about serializing _response - # assert message.settled == pickled.settled + assert message.settled == pickled.settled assert message.properties.message_id == pickled.properties.message_id assert message.properties.user_id == pickled.properties.user_id assert message.properties.to == pickled.properties.to @@ -114,6 +124,29 @@ def test_message_pickle(): assert message.header.durable == pickled.header.durable assert message.header.priority == pickled.header.priority + # send with message param + settler = errors.MessageAlreadySettled + internal_message = c_uamqp.create_message() + internal_message.add_body_data(b"hi") + message_w_message_param = Message( + message=internal_message, + settler=settler, + delivery_no=1 + ) + pickled = pickle.loads(pickle.dumps(message_w_message_param)) + message_data = str(message_w_message_param.get_data()) + pickled_data = str(pickled.get_data()) + + assert message_data == pickled_data + assert message_w_message_param.footer == pickled.footer + assert message_w_message_param.state == pickled.state + assert message_w_message_param.application_properties == pickled.application_properties + assert message_w_message_param.annotations == pickled.annotations + assert message_w_message_param.delivery_annotations == pickled.delivery_annotations + assert message_w_message_param.settled == pickled.settled + assert pickled.delivery_no == 1 + assert type(pickled._settler()) == type(settler()) # pylint: disable=protected-access + def test_message_auto_body_type(): single_data = b'!@#$%^&*()_+1234567890' single_data_message = Message(body=single_data) diff --git a/uamqp/errors.py b/uamqp/errors.py index daf87092b..0415236f5 100644 --- a/uamqp/errors.py +++ b/uamqp/errors.py @@ -259,6 +259,9 @@ def __init__(self): response = "Invalid operation: this message is already settled." super(MessageAlreadySettled, self).__init__(response) + def __reduce__(self): + return (self.__class__, ()) + class MessageAccepted(MessageResponse): pass @@ -267,6 +270,8 @@ class MessageAccepted(MessageResponse): class MessageRejected(MessageResponse): def __init__(self, condition=None, description=None, encoding='UTF-8', info=None): + self._encoding = encoding + self._info = info if condition: self.error_condition = condition.encode(encoding) if isinstance(condition, six.text_type) else condition else: @@ -282,6 +287,9 @@ def __init__(self, condition=None, description=None, encoding='UTF-8', info=None self.error_info = utils.data_factory(info, encoding=encoding) if info else None super(MessageRejected, self).__init__() + def __reduce__(self): + return (self.__class__, (self.error_condition, self.error_description, self._encoding, self._info)) + class MessageReleased(MessageResponse): pass @@ -292,11 +300,16 @@ class MessageModified(MessageResponse): def __init__(self, failed, undeliverable, annotations=None, encoding='UTF-8'): self.failed = failed self.undeliverable = undeliverable + self._encoding = encoding + self._annotations = annotations if annotations and not isinstance(annotations, dict): raise TypeError("Disposition annotations must be a dictionary.") self.annotations = utils.data_factory(annotations, encoding=encoding) if annotations else None super(MessageModified, self).__init__() + def __reduce__(self): + return (self.__class__, (self.failed, self.undeliverable, self._annotations, self._encoding)) + class ErrorResponse(object): diff --git a/uamqp/message.py b/uamqp/message.py index b09d39915..ccb39848c 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -137,16 +137,9 @@ def __init__( self._footer = footer def __getstate__(self): - state = {} - state["delivery_no"] = self.delivery_no - state["delivery_tag"] = self.delivery_tag - state["properties"] = self.properties - state["application_properties"] = self.application_properties - state["annotations"] = self.annotations - state["header"] = self.header - state["footer"] = self.footer - state["delivery_annotations"] = self._delivery_annotations + state = self.__dict__.copy() state["state"] = self.state.value + state["_message"] = None state["_body_type"] = self._body.type.value if self._body else None if isinstance(self._body, (DataBody, SequenceBody)): state["_body"] = list(self._body.data) @@ -157,19 +150,6 @@ def __getstate__(self): def __setstate__(self, state): state["state"] = constants.MessageState(state.get("state")) - state["idle_time"] = 0 - state["retries"] = 0 - state["_response"] = None - state["_settler"] = None - state["_encoding"] = "UTF-8" - state["on_send_complete"] = None - state["_need_further_parse"] = False - state["_properties"] = state.pop("properties") - state["_application_properties"] = state.pop("application_properties") - state["_annotations"] = state.pop("annotations") - state["_header"] = state.pop("header") - state["_footer"] = state.pop("footer") - state["_delivery_annotations"] = state.pop("delivery_annotations") self.__dict__.update(state) body = state.get("_body") From b82733f566fb4480c55975b0eb9d7952dd43a47f Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Thu, 1 Apr 2021 11:14:10 -0400 Subject: [PATCH 9/9] remove header ivars --- uamqp/message.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/uamqp/message.py b/uamqp/message.py index ccb39848c..1906ff93c 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -1344,23 +1344,6 @@ def __str__(self): } ) - def __getstate__(self): - state = self._get_properties_dict() - return state - - def __setstate__(self, state): - for key, val in state.items(): - self.__setattr__(key, val) - - def _get_properties_dict(self): - return { - "delivery_count": self.delivery_count, - "time_to_live": self.time_to_live, - "first_acquirer": self.first_acquirer, - "durable": self.durable, - "priority": self.priority, - } - def get_header_obj(self): """Get the underlying C reference from this object.