Skip to content

Commit 7bd628a

Browse files
add setstate and getstate to message for pickling (#222)
* add setstate and getstate to message * save _response for settled property * add different body type support * fix pylint * fix body and travis pylint after black * hide private variables * remove _response/settled testing * add private ivars + pickle errors * remove header ivars Co-authored-by: Adam Ling (MSFT) <[email protected]>
1 parent daef293 commit 7bd628a

File tree

4 files changed

+340
-106
lines changed

4 files changed

+340
-106
lines changed

tests/test_message.py

+113
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from uamqp.message import MessageProperties, MessageHeader, Message, constants, errors, c_uamqp
2+
import pickle
13
import pytest
24

35
from uamqp.message import MessageProperties, Message, SequenceBody, DataBody, ValueBody
@@ -33,6 +35,117 @@ def test_message_properties():
3335
properties.user_id = 'werid/0\0\1\t\n'
3436
assert properties.user_id == b'werid/0\0\1\t\n'
3537

38+
def send_complete_callback(result, error):
39+
# helper for test below not in test, b/c results in:
40+
# AttributeError: Can't pickle local object
41+
print(result)
42+
print(error)
43+
44+
45+
def test_message_pickle():
46+
properties = MessageProperties()
47+
properties.message_id = '2'
48+
properties.user_id = '1'
49+
properties.to = 'dkfj'
50+
properties.subject = 'dsljv'
51+
properties.reply_to = "kdjfk"
52+
properties.correlation_id = 'ienag'
53+
properties.content_type = 'b'
54+
properties.content_encoding = '39ru'
55+
properties.absolute_expiry_time = 24
56+
properties.creation_time = 10
57+
properties.group_id = '3irow'
58+
properties.group_sequence = 39
59+
properties.reply_to_group_id = '39rud'
60+
61+
header = MessageHeader()
62+
header.delivery_count = 3
63+
header.time_to_live = 5
64+
header.first_acquirer = 'dkfj'
65+
header.durable = True
66+
header.priority = 4
67+
68+
data_message = Message(body=[b'testmessage1', b'testmessage2'])
69+
pickled = pickle.loads(pickle.dumps(data_message))
70+
body = list(pickled.get_data())
71+
assert len(body) == 2
72+
assert body == [b'testmessage1', b'testmessage2']
73+
74+
sequence_message = Message(
75+
body=[[1234.56, b'testmessage2', True], [-1234.56, {b'key': b'value'}, False]],
76+
body_type=MessageBodyType.Sequence
77+
)
78+
pickled = pickle.loads(pickle.dumps(sequence_message))
79+
body = list(pickled.get_data())
80+
assert len(body) == 2
81+
assert body == [[1234.56, b'testmessage2', True], [-1234.56, {b'key': b'value'}, False]]
82+
83+
value_message = Message(
84+
body={b'key': [1, b'str', False]},
85+
body_type=MessageBodyType.Value
86+
)
87+
pickled = pickle.loads(pickle.dumps(value_message))
88+
body = pickled.get_data()
89+
assert body == {b'key': [1, b'str', False]}
90+
91+
error = errors.MessageModified(False, False, {b'key': b'value'})
92+
pickled_error = pickle.loads(pickle.dumps(error))
93+
assert pickled_error._annotations == {b'key': b'value'} # pylint: disable=protected-access
94+
95+
message = Message(body="test", properties=properties, header=header)
96+
message.on_send_complete = send_complete_callback
97+
message.footer = {'a':2}
98+
message.state = constants.MessageState.ReceivedSettled
99+
100+
pickled = pickle.loads(pickle.dumps(message))
101+
assert list(message.get_data()) == [b"test"]
102+
assert message.footer == pickled.footer
103+
assert message.state == pickled.state
104+
assert message.application_properties == pickled.application_properties
105+
assert message.annotations == pickled.annotations
106+
assert message.delivery_annotations == pickled.delivery_annotations
107+
assert message.settled == pickled.settled
108+
assert message.properties.message_id == pickled.properties.message_id
109+
assert message.properties.user_id == pickled.properties.user_id
110+
assert message.properties.to == pickled.properties.to
111+
assert message.properties.subject == pickled.properties.subject
112+
assert message.properties.reply_to == pickled.properties.reply_to
113+
assert message.properties.correlation_id == pickled.properties.correlation_id
114+
assert message.properties.content_type == pickled.properties.content_type
115+
assert message.properties.content_encoding == pickled.properties.content_encoding
116+
assert message.properties.absolute_expiry_time == pickled.properties.absolute_expiry_time
117+
assert message.properties.creation_time == pickled.properties.creation_time
118+
assert message.properties.group_id == pickled.properties.group_id
119+
assert message.properties.group_sequence == pickled.properties.group_sequence
120+
assert message.properties.reply_to_group_id == pickled.properties.reply_to_group_id
121+
assert message.header.delivery_count == pickled.header.delivery_count
122+
assert message.header.time_to_live == pickled.header.time_to_live
123+
assert message.header.first_acquirer == pickled.header.first_acquirer
124+
assert message.header.durable == pickled.header.durable
125+
assert message.header.priority == pickled.header.priority
126+
127+
# send with message param
128+
settler = errors.MessageAlreadySettled
129+
internal_message = c_uamqp.create_message()
130+
internal_message.add_body_data(b"hi")
131+
message_w_message_param = Message(
132+
message=internal_message,
133+
settler=settler,
134+
delivery_no=1
135+
)
136+
pickled = pickle.loads(pickle.dumps(message_w_message_param))
137+
message_data = str(message_w_message_param.get_data())
138+
pickled_data = str(pickled.get_data())
139+
140+
assert message_data == pickled_data
141+
assert message_w_message_param.footer == pickled.footer
142+
assert message_w_message_param.state == pickled.state
143+
assert message_w_message_param.application_properties == pickled.application_properties
144+
assert message_w_message_param.annotations == pickled.annotations
145+
assert message_w_message_param.delivery_annotations == pickled.delivery_annotations
146+
assert message_w_message_param.settled == pickled.settled
147+
assert pickled.delivery_no == 1
148+
assert type(pickled._settler()) == type(settler()) # pylint: disable=protected-access
36149

37150
def test_message_auto_body_type():
38151
single_data = b'!@#$%^&*()_+1234567890'

uamqp/constants.py

+7
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,10 @@ class MessageBodyType(Enum):
188188
Data = c_uamqp.MessageBodyType.DataType
189189
Value = c_uamqp.MessageBodyType.ValueType
190190
Sequence = c_uamqp.MessageBodyType.SequenceType
191+
192+
193+
BODY_TYPE_C_PYTHON_MAP = {
194+
c_uamqp.MessageBodyType.DataType.value: MessageBodyType.Data,
195+
c_uamqp.MessageBodyType.SequenceType.value: MessageBodyType.Sequence,
196+
c_uamqp.MessageBodyType.ValueType.value: MessageBodyType.Value
197+
}

uamqp/errors.py

+13
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ def __init__(self):
259259
response = "Invalid operation: this message is already settled."
260260
super(MessageAlreadySettled, self).__init__(response)
261261

262+
def __reduce__(self):
263+
return (self.__class__, ())
264+
262265

263266
class MessageAccepted(MessageResponse):
264267
pass
@@ -267,6 +270,8 @@ class MessageAccepted(MessageResponse):
267270
class MessageRejected(MessageResponse):
268271

269272
def __init__(self, condition=None, description=None, encoding='UTF-8', info=None):
273+
self._encoding = encoding
274+
self._info = info
270275
if condition:
271276
self.error_condition = condition.encode(encoding) if isinstance(condition, six.text_type) else condition
272277
else:
@@ -282,6 +287,9 @@ def __init__(self, condition=None, description=None, encoding='UTF-8', info=None
282287
self.error_info = utils.data_factory(info, encoding=encoding) if info else None
283288
super(MessageRejected, self).__init__()
284289

290+
def __reduce__(self):
291+
return (self.__class__, (self.error_condition, self.error_description, self._encoding, self._info))
292+
285293

286294
class MessageReleased(MessageResponse):
287295
pass
@@ -292,11 +300,16 @@ class MessageModified(MessageResponse):
292300
def __init__(self, failed, undeliverable, annotations=None, encoding='UTF-8'):
293301
self.failed = failed
294302
self.undeliverable = undeliverable
303+
self._encoding = encoding
304+
self._annotations = annotations
295305
if annotations and not isinstance(annotations, dict):
296306
raise TypeError("Disposition annotations must be a dictionary.")
297307
self.annotations = utils.data_factory(annotations, encoding=encoding) if annotations else None
298308
super(MessageModified, self).__init__()
299309

310+
def __reduce__(self):
311+
return (self.__class__, (self.failed, self.undeliverable, self._annotations, self._encoding))
312+
300313

301314
class ErrorResponse(object):
302315

0 commit comments

Comments
 (0)