Skip to content

initial temp/potential pickle event data fix #17259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 122 additions & 4 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import json
import logging
import copy
from typing import (
Union,
Dict,
Expand All @@ -20,7 +21,9 @@

import six

from uamqp import BatchMessage, Message, constants
from uamqp import BatchMessage, Message, constants, errors
import uamqp
from uamqp.message import MessageProperties, MessageHeader

from ._utils import set_message_partition_key, trace_message, utc_from_timestamp
from ._constants import (
Expand Down Expand Up @@ -153,6 +156,118 @@ def __str__(self):
event_str += " }"
return event_str

def __getstate__(self):
# TODO: all the properties don't get instantiated unless the below line is called? why?
self.offset
state = self.__dict__.copy()
message_state = state["message"].__dict__.copy()

# get _body as list of bytes/str to serialize
if message_state["_body"].data:
body_data = list(message_state["_body"].data)
else:
body_data = ""
message_state["_body"] = body_data

# get message properties as dict to serialize, if exists
if message_state["_properties"]:
message_props = message_state["_properties"]
message_props_dict = {
"message_id": message_props.message_id,
"user_id": message_props.user_id,
"to": message_props.to,
"subject": message_props.subject,
"reply_to": message_props.reply_to,
"correlation_id": message_props.correlation_id,
"content_type": message_props.content_type,
"content_encoding": message_props.content_encoding,
"absolute_expiry_time": message_props.absolute_expiry_time,
"creation_time": message_props.creation_time,
"group_id": message_props.group_id,
"group_sequence": message_props.group_sequence,
"reply_to_group_id": message_props.reply_to_group_id,
}
message_state["_properties"] = message_props_dict

# get message header as dict to serialize, if exists
if message_state["_header"]:
message_header = message_state["_header"]
message_header_dict = {
"delivery_count": message_header.delivery_count,
"time_to_live": message_header.time_to_live,
"first_acquirer": message_header.first_acquirer,
"durable": message_header.durable,
"priority": message_header.priority,
}
message_state["_header"] = message_header_dict

# remove _message to serialize
del message_state["_message"]

# get message response class name to serialize, if exists
if message_state["_response"]:
message_state["_response"] = message_state["_response"].__class__

# TODO:check whether saving _message and _settler is needed/can be done

# reset message property to serializable message state
state["message"] = message_state
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._populate_message_properties(state["message"])

def _populate_message_properties(self, message_state):
# deserialize _body
data = message_state["_body"]
try:
body = "".join(b.decode("UTF-8") for b in cast(Iterable[bytes], data))
except TypeError:
body = six.text_type(data)
del message_state["_body"]

# update message with serializable properties, if they exist
properties = None
header = None
message = None
settler = None
if message_state["_properties"]:
properties = MessageProperties(**message_state["_properties"])
del message_state["_properties"]

if message_state["_header"]:
header = MessageHeader(**message_state["_header"])
del message_state["_header"]

if message_state["_response"]:
message_state["_response"] = message_state["_response"]()

# TODO:deserialize _message and _settler if they are saved

if body and isinstance(body, list):
# create message from body
self.message = Message(
body[0],
properties=properties,
header=header,
message=message,
settler=settler,
)
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
else:
self.message = Message(
body,
properties=properties,
header=header,
message=message,
settler=settler,
)

for prop, value in message_state.items():
setattr(self.message, prop, value)

@classmethod
def _from_message(cls, message):
# type: (Message) -> EventData
Expand Down Expand Up @@ -379,9 +494,11 @@ def _load_events(self, events):
try:
self.add(event_data)
except ValueError:
raise ValueError("The combined size of EventData collection exceeds the Event Hub frame size limit. "
"Please send a smaller collection of EventData, or use EventDataBatch, "
"which is guaranteed to be under the frame size limit")
raise ValueError(
"The combined size of EventData collection exceeds the Event Hub frame size limit. "
"Please send a smaller collection of EventData, or use EventDataBatch, "
"which is guaranteed to be under the frame size limit"
)

@property
def size_in_bytes(self):
Expand Down Expand Up @@ -438,6 +555,7 @@ def add(self, event_data):
self._size = size_after_add
self._count += 1


class DictMixin(object):
def __setitem__(self, key, item):
# type: (Any, Any) -> None
Expand Down
107 changes: 107 additions & 0 deletions sdk/eventhub/azure-eventhub/samples/sync_samples/pickle_event_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import pickle
from azure.eventhub import EventData, EventDataBatch, EventHubProducerClient
from multiprocessing import Pool
from typing import List
import uamqp
import os

CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)

def build_event_data(i):
return EventData(body=f'{i}')

def compare_props(before, after):
for prop, val in vars(before).items():
after_val = getattr(after, prop)
if after_val != val:
print('the following two are not equal')
print(prop, ":", val)
print(type(val))
print(prop, ":", after_val)
print(type(after_val))
print()

def compare_message_props(before, after):
before_m = before.message._body._message
after_m = after.message._body._message
before_attributes = [attr for attr in dir(before_m) if not attr.startswith('__')]
after_attributes = [attr for attr in dir(after_m) if not attr.startswith('__')]
for attr in before_attributes:
before_val = getattr(before_m, attr)
after_val = getattr(after_m, attr)
if after_val != before_val:
print('the following two are not equal')
print(attr, ":", before_val)
print(type(before_val))
print(attr, ":", after_val)
print(type(after_val))
print()

def pickle_event():
print('pickle event')
event = EventData('bye')
a = pickle.dumps(event)
b = pickle.loads(a)
return event, b

def pickle_event_batch():
print('pickle event batch')
# try pickling batch
event = EventData('bye')
event.properties = {'prop_key': 'val'}
data_batch = producer.create_batch(partition_key='pkey', partition_id='0')
data_batch.add(event)
a = pickle.dumps(data_batch)
b = pickle.loads(a)
return data_batch, b

def pickle_event_list():
print('pickle event batch')
# try pickling batch
event = EventData('bye')
event.properties = {'prop_key': 'val'}
data_batch = producer.create_batch(partition_key='pkey', partition_id='0')
data_batch.add(event)
for prop, val in vars(data_batch.message).items():
print(prop, ":", val)
a = pickle.dumps(data_batch)
b = pickle.loads(a)
print()
for prop, val in vars(b.message).items():
print(prop, ":", val)
print()

def pickle_event_with_props():
print('pickle event with props\n')
a = 'dkfhahg9w342ashgkaht1 3t9hgadihgiaw htgasjhdfkwytahgaury9ryh3jadhga9syf3hrfhafd'
event = EventData(a)
event.properties = {'prop_key': 'val'}
a = pickle.dumps(event)
b = pickle.loads(a)
return event, b

if __name__ == "__main__":
with Pool() as p:
eventdata : List[EventData] = p.map(build_event_data,[(i) for i in range(10)])
#before, after = pickle_event()
#compare_props(before.message, after.message)

before, after = pickle_event_with_props()
print(before)
print(after)
print(before.message)
print(after.message)
#print(type(before.message._body))
#compare_message_props(before, after)
#print(before.message._body.__dict__)
#print(after.message._body.__dict__)
#compare_props(before.message._body, after.message._body)

#before, after = pickle_event_batch()
#compare_props(before, after)
84 changes: 84 additions & 0 deletions sdk/eventhub/azure-eventhub/tests/unittest/test_event_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import platform
import pytest
import pickle
import uamqp
from packaging import version
from azure.eventhub import _common
Expand Down Expand Up @@ -105,3 +106,86 @@ def test_event_data_batch():
assert batch.size_in_bytes == 89 and len(batch) == 1
with pytest.raises(ValueError):
batch.add(EventData("A"))

@pytest.mark.parametrize("test_input, expected_result",
[("", ""), ("AAA", "AAA"), (None, ValueError), (["a", "b", "c"], "abc"), (b"abc", "abc")])
def test_pickle_event_data(test_input, expected_result):
if isinstance(expected_result, type):
with pytest.raises(expected_result):
EventData(test_input)
else:
event_data = EventData(test_input)
pickled_event_data = pickle.loads(pickle.dumps(event_data))
repickled_event_data = pickle.loads(pickle.dumps(pickled_event_data))
# check that, even if test_input is changed, pickled_event_data doesn't change
event_data.properties["a"] = "b"
assert len(event_data.properties) == 1

# check that repickled event data produces expected result
assert repickled_event_data.body_as_str() == expected_result
assert repickled_event_data.partition_key is None
assert len(repickled_event_data.properties) == 0
assert repickled_event_data.enqueued_time is None
assert repickled_event_data.offset is None
assert repickled_event_data.sequence_number is None
assert len(repickled_event_data.system_properties) == 0
assert str(repickled_event_data) == "{{ body: '{}', properties: {{}} }}".format(expected_result)
assert repr(repickled_event_data) == "EventData(body='{}', properties={{}}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None)".format(expected_result)

with pytest.raises(TypeError):
pickled_event_data.body_as_json()


def test_pickle_body_json():
event_data = EventData('{"a":"b"}')
pickled_event_data = pickle.loads(pickle.dumps(event_data))
assert str(pickled_event_data) == "{ body: '{\"a\":\"b\"}', properties: {} }"
assert repr(pickled_event_data) == "EventData(body='{\"a\":\"b\"}', properties={}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None)"
jo = pickled_event_data.body_as_json()
assert jo["a"] == "b"


def test_pickle_app_properties():
app_props = {"a": "b"}
event_data = EventData("")
event_data.properties = app_props
pickled_event_data = pickle.loads(pickle.dumps(event_data))
assert str(pickled_event_data) == "{ body: '', properties: {'a': 'b'} }"
assert repr(pickled_event_data) == "EventData(body='', properties={'a': 'b'}, offset=None, sequence_number=None, partition_key=None, enqueued_time=None)"
assert pickled_event_data.properties["a"] == "b"

def test_pickled_twice_sys_properties():
properties = uamqp.message.MessageProperties()
properties.message_id = "message_id"
properties.user_id = "user_id"
properties.to = "to"
properties.subject = "subject"
properties.reply_to = "reply_to"
properties.correlation_id = "correlation_id"
properties.content_type = "content_type"
properties.content_encoding = "content_encoding"
properties.absolute_expiry_time = 1
properties.creation_time = 1
properties.group_id = "group_id"
properties.group_sequence = 1
properties.reply_to_group_id = "reply_to_group_id"
message = uamqp.Message(properties=properties)
message.annotations = {_common.PROP_OFFSET: "@latest"}
ed = EventData._from_message(message) # type: EventData
pickle_ed = pickle.loads(pickle.dumps(ed))
repickle_ed = pickle.loads(pickle.dumps(pickle_ed))

assert repickle_ed.system_properties[_common.PROP_OFFSET] == "@latest"
assert repickle_ed.system_properties[_common.PROP_CORRELATION_ID] == properties.correlation_id
assert repickle_ed.system_properties[_common.PROP_MESSAGE_ID] == properties.message_id
assert repickle_ed.system_properties[_common.PROP_CONTENT_ENCODING] == properties.content_encoding
assert repickle_ed.system_properties[_common.PROP_CONTENT_TYPE] == properties.content_type
assert repickle_ed.system_properties[_common.PROP_USER_ID] == properties.user_id
assert repickle_ed.system_properties[_common.PROP_TO] == properties.to
assert repickle_ed.system_properties[_common.PROP_SUBJECT] == properties.subject
assert repickle_ed.system_properties[_common.PROP_REPLY_TO] == properties.reply_to
assert repickle_ed.system_properties[_common.PROP_ABSOLUTE_EXPIRY_TIME] == properties.absolute_expiry_time
assert repickle_ed.system_properties[_common.PROP_CREATION_TIME] == properties.creation_time
assert repickle_ed.system_properties[_common.PROP_GROUP_ID] == properties.group_id
assert repickle_ed.system_properties[_common.PROP_GROUP_SEQUENCE] == properties.group_sequence
assert repickle_ed.system_properties[_common.PROP_REPLY_TO_GROUP_ID] == properties.reply_to_group_id