Skip to content

add setstate and getstate to message for pickling #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 2, 2021
113 changes: 113 additions & 0 deletions tests/test_message.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from uamqp.message import MessageProperties, MessageHeader, Message, constants, errors, c_uamqp
import pickle
import pytest

from uamqp.message import MessageProperties, Message, SequenceBody, DataBody, ValueBody
Expand Down Expand Up @@ -33,6 +35,117 @@ def test_message_properties():
properties.user_id = 'werid/0\0\1\t\n'
assert properties.user_id == b'werid/0\0\1\t\n'

def send_complete_callback(result, error):
# helper for test below not in test, b/c results in:
# AttributeError: Can't pickle local object
print(result)
print(error)


def test_message_pickle():
properties = MessageProperties()
properties.message_id = '2'
properties.user_id = '1'
properties.to = 'dkfj'
properties.subject = 'dsljv'
properties.reply_to = "kdjfk"
properties.correlation_id = 'ienag'
properties.content_type = 'b'
properties.content_encoding = '39ru'
properties.absolute_expiry_time = 24
properties.creation_time = 10
properties.group_id = '3irow'
properties.group_sequence = 39
properties.reply_to_group_id = '39rud'

header = MessageHeader()
header.delivery_count = 3
header.time_to_live = 5
header.first_acquirer = 'dkfj'
header.durable = True
header.priority = 4

data_message = Message(body=[b'testmessage1', b'testmessage2'])
pickled = pickle.loads(pickle.dumps(data_message))
body = list(pickled.get_data())
assert len(body) == 2
assert body == [b'testmessage1', b'testmessage2']

sequence_message = Message(
body=[[1234.56, b'testmessage2', True], [-1234.56, {b'key': b'value'}, False]],
body_type=MessageBodyType.Sequence
)
pickled = pickle.loads(pickle.dumps(sequence_message))
body = list(pickled.get_data())
assert len(body) == 2
assert body == [[1234.56, b'testmessage2', True], [-1234.56, {b'key': b'value'}, False]]

value_message = Message(
body={b'key': [1, b'str', False]},
body_type=MessageBodyType.Value
)
pickled = pickle.loads(pickle.dumps(value_message))
body = pickled.get_data()
assert body == {b'key': [1, b'str', False]}

error = errors.MessageModified(False, False, {b'key': b'value'})
pickled_error = pickle.loads(pickle.dumps(error))
assert pickled_error._annotations == {b'key': b'value'} # pylint: disable=protected-access

message = Message(body="test", properties=properties, header=header)
message.on_send_complete = send_complete_callback
message.footer = {'a':2}
message.state = constants.MessageState.ReceivedSettled

pickled = pickle.loads(pickle.dumps(message))
assert list(message.get_data()) == [b"test"]
assert message.footer == pickled.footer
assert message.state == pickled.state
assert message.application_properties == pickled.application_properties
assert message.annotations == pickled.annotations
assert message.delivery_annotations == pickled.delivery_annotations
assert message.settled == pickled.settled
assert message.properties.message_id == pickled.properties.message_id
assert message.properties.user_id == pickled.properties.user_id
assert message.properties.to == pickled.properties.to
assert message.properties.subject == pickled.properties.subject
assert message.properties.reply_to == pickled.properties.reply_to
assert message.properties.correlation_id == pickled.properties.correlation_id
assert message.properties.content_type == pickled.properties.content_type
assert message.properties.content_encoding == pickled.properties.content_encoding
assert message.properties.absolute_expiry_time == pickled.properties.absolute_expiry_time
assert message.properties.creation_time == pickled.properties.creation_time
assert message.properties.group_id == pickled.properties.group_id
assert message.properties.group_sequence == pickled.properties.group_sequence
assert message.properties.reply_to_group_id == pickled.properties.reply_to_group_id
assert message.header.delivery_count == pickled.header.delivery_count
assert message.header.time_to_live == pickled.header.time_to_live
assert message.header.first_acquirer == pickled.header.first_acquirer
assert message.header.durable == pickled.header.durable
assert message.header.priority == pickled.header.priority

# send with message param
settler = errors.MessageAlreadySettled
internal_message = c_uamqp.create_message()
internal_message.add_body_data(b"hi")
message_w_message_param = Message(
message=internal_message,
settler=settler,
delivery_no=1
)
pickled = pickle.loads(pickle.dumps(message_w_message_param))
message_data = str(message_w_message_param.get_data())
pickled_data = str(pickled.get_data())

assert message_data == pickled_data
assert message_w_message_param.footer == pickled.footer
assert message_w_message_param.state == pickled.state
assert message_w_message_param.application_properties == pickled.application_properties
assert message_w_message_param.annotations == pickled.annotations
assert message_w_message_param.delivery_annotations == pickled.delivery_annotations
assert message_w_message_param.settled == pickled.settled
assert pickled.delivery_no == 1
assert type(pickled._settler()) == type(settler()) # pylint: disable=protected-access

def test_message_auto_body_type():
single_data = b'!@#$%^&*()_+1234567890'
Expand Down
7 changes: 7 additions & 0 deletions uamqp/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,10 @@ class MessageBodyType(Enum):
Data = c_uamqp.MessageBodyType.DataType
Value = c_uamqp.MessageBodyType.ValueType
Sequence = c_uamqp.MessageBodyType.SequenceType


BODY_TYPE_C_PYTHON_MAP = {
c_uamqp.MessageBodyType.DataType.value: MessageBodyType.Data,
c_uamqp.MessageBodyType.SequenceType.value: MessageBodyType.Sequence,
c_uamqp.MessageBodyType.ValueType.value: MessageBodyType.Value
}
13 changes: 13 additions & 0 deletions uamqp/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ def __init__(self):
response = "Invalid operation: this message is already settled."
super(MessageAlreadySettled, self).__init__(response)

def __reduce__(self):
return (self.__class__, ())


class MessageAccepted(MessageResponse):
pass
Expand All @@ -267,6 +270,8 @@ class MessageAccepted(MessageResponse):
class MessageRejected(MessageResponse):

def __init__(self, condition=None, description=None, encoding='UTF-8', info=None):
self._encoding = encoding
self._info = info
if condition:
self.error_condition = condition.encode(encoding) if isinstance(condition, six.text_type) else condition
else:
Expand All @@ -282,6 +287,9 @@ def __init__(self, condition=None, description=None, encoding='UTF-8', info=None
self.error_info = utils.data_factory(info, encoding=encoding) if info else None
super(MessageRejected, self).__init__()

def __reduce__(self):
return (self.__class__, (self.error_condition, self.error_description, self._encoding, self._info))


class MessageReleased(MessageResponse):
pass
Expand All @@ -292,11 +300,16 @@ class MessageModified(MessageResponse):
def __init__(self, failed, undeliverable, annotations=None, encoding='UTF-8'):
self.failed = failed
self.undeliverable = undeliverable
self._encoding = encoding
self._annotations = annotations
if annotations and not isinstance(annotations, dict):
raise TypeError("Disposition annotations must be a dictionary.")
self.annotations = utils.data_factory(annotations, encoding=encoding) if annotations else None
super(MessageModified, self).__init__()

def __reduce__(self):
return (self.__class__, (self.failed, self.undeliverable, self._annotations, self._encoding))


class ErrorResponse(object):

Expand Down
Loading