Skip to content

Commit 77d7c28

Browse files
authored
[Event Hubs] Put uamqp.Message.properties into EventData.system_properties (#10508)
1 parent 59e7d42 commit 77d7c28

File tree

3 files changed

+114
-4
lines changed

3 files changed

+114
-4
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_common.py

+63-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@
2929
PROP_PARTITION_KEY,
3030
PROP_PARTITION_KEY_AMQP_SYMBOL,
3131
PROP_TIMESTAMP,
32+
PROP_ABSOLUTE_EXPIRY_TIME,
33+
PROP_CONTENT_ENCODING,
34+
PROP_CONTENT_TYPE,
35+
PROP_CORRELATION_ID,
36+
PROP_GROUP_ID,
37+
PROP_GROUP_SEQUENCE,
38+
PROP_MESSAGE_ID,
39+
PROP_REPLY_TO,
40+
PROP_REPLY_TO_GROUP_ID,
41+
PROP_SUBJECT,
42+
PROP_TO,
43+
PROP_USER_ID,
44+
PROP_CREATION_TIME,
3245
)
3346

3447
if TYPE_CHECKING:
@@ -39,6 +52,22 @@
3952
# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
4053
_BATCH_MESSAGE_OVERHEAD_COST = [5, 8]
4154

55+
_SYS_PROP_KEYS_TO_MSG_PROPERTIES = (
56+
(PROP_MESSAGE_ID, "message_id"),
57+
(PROP_USER_ID, "user_id"),
58+
(PROP_TO, "to"),
59+
(PROP_SUBJECT, "subject"),
60+
(PROP_REPLY_TO, "reply_to"),
61+
(PROP_CORRELATION_ID, "correlation_id"),
62+
(PROP_CONTENT_TYPE, "content_type"),
63+
(PROP_CONTENT_ENCODING, "content_encoding"),
64+
(PROP_ABSOLUTE_EXPIRY_TIME, "absolute_expiry_time"),
65+
(PROP_CREATION_TIME, "creation_time"),
66+
(PROP_GROUP_ID, "group_id"),
67+
(PROP_GROUP_SEQUENCE, "group_sequence"),
68+
(PROP_REPLY_TO_GROUP_ID, "reply_to_group_id"),
69+
)
70+
4271

4372
class EventData(object):
4473
"""The EventData class is a container for event content.
@@ -60,6 +89,7 @@ class EventData(object):
6089
def __init__(self, body=None):
6190
# type: (Union[str, bytes, List[AnyStr]]) -> None
6291
self._last_enqueued_event_properties = {} # type: Dict[str, Any]
92+
self._sys_properties = None # type: Optional[Dict[bytes, Any]]
6393
if body and isinstance(body, list):
6494
self.message = Message(body[0])
6595
for more in body[1:]:
@@ -207,12 +237,42 @@ def properties(self, value):
207237

208238
@property
209239
def system_properties(self):
210-
# type: () -> Dict[Union[str, bytes], Any]
211-
"""Metadata set by the Event Hubs Service associated with the event
240+
# type: () -> Dict[bytes, Any]
241+
"""Metadata set by the Event Hubs Service associated with the event.
242+
243+
An EventData could have some or all of the following meta data depending on the source
244+
of the event data.
245+
246+
- b"x-opt-sequence-number" (int)
247+
- b"x-opt-offset" (bytes)
248+
- b"x-opt-partition-key" (bytes)
249+
- b"x-opt-enqueued-time" (int)
250+
- b"message-id" (bytes)
251+
- b"user-id" (bytes)
252+
- b"to" (bytes)
253+
- b"subject" (bytes)
254+
- b"reply-to" (bytes)
255+
- b"correlation-id" (bytes)
256+
- b"content-type" (bytes)
257+
- b"content-encoding" (bytes)
258+
- b"absolute-expiry-time" (int)
259+
- b"creation-time" (int)
260+
- b"group-id" (bytes)
261+
- b"group-sequence" (bytes)
262+
- b"reply-to-group-id" (bytes)
212263
213264
:rtype: dict
214265
"""
215-
return self.message.annotations
266+
267+
if self._sys_properties is None:
268+
self._sys_properties = {}
269+
if self.message.properties:
270+
for key, prop_name in _SYS_PROP_KEYS_TO_MSG_PROPERTIES:
271+
value = getattr(self.message.properties, prop_name, None)
272+
if value:
273+
self._sys_properties[key] = value
274+
self._sys_properties.update(self.message.annotations)
275+
return self._sys_properties
216276

217277
@property
218278
def body(self):

sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py

+14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,20 @@
1717
PROP_LAST_ENQUEUED_TIME_UTC = b"last_enqueued_time_utc"
1818
PROP_RUNTIME_INFO_RETRIEVAL_TIME_UTC = b"runtime_info_retrieval_time_utc"
1919

20+
PROP_MESSAGE_ID = b"message-id"
21+
PROP_USER_ID = b"user-id"
22+
PROP_TO = b"to"
23+
PROP_SUBJECT = b"subject"
24+
PROP_REPLY_TO = b"reply-to"
25+
PROP_CORRELATION_ID = b"correlation-id"
26+
PROP_CONTENT_TYPE = b"content-type"
27+
PROP_CONTENT_ENCODING = b"content-encoding"
28+
PROP_ABSOLUTE_EXPIRY_TIME = b"absolute-expiry-time"
29+
PROP_CREATION_TIME = b"creation-time"
30+
PROP_GROUP_ID = b"group-id"
31+
PROP_GROUP_SEQUENCE = b"group-sequence"
32+
PROP_REPLY_TO_GROUP_ID = b"reply-to-group-id"
33+
2034
EPOCH_SYMBOL = b"com.microsoft:epoch"
2135
TIMEOUT_SYMBOL = b"com.microsoft:timeout"
2236
RECEIVER_RUNTIME_METRIC_SYMBOL = b"com.microsoft:enable-receiver-runtime-metric"

sdk/eventhub/azure-eventhub/tests/unittest/test_event_data.py

+37-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import platform
22
import pytest
3-
3+
import uamqp
4+
from azure.eventhub import _common
45

56
pytestmark = pytest.mark.skipif(platform.python_implementation() == "PyPy", reason="This is ignored for PyPy")
67

@@ -52,6 +53,41 @@ def test_app_properties():
5253
assert event_data.properties["a"] == "b"
5354

5455

56+
def test_sys_properties():
57+
properties = uamqp.message.MessageProperties()
58+
properties.message_id = "message_id"
59+
properties.user_id = "user_id"
60+
properties.to = "to"
61+
properties.subject = "subject"
62+
properties.reply_to = "reply_to"
63+
properties.correlation_id = "correlation_id"
64+
properties.content_type = "content_type"
65+
properties.content_encoding = "content_encoding"
66+
properties.absolute_expiry_time = 1
67+
properties.creation_time = 1
68+
properties.group_id = "group_id"
69+
properties.group_sequence = 1
70+
properties.reply_to_group_id = "reply_to_group_id"
71+
message = uamqp.Message(properties=properties)
72+
message.annotations = {_common.PROP_OFFSET: "@latest"}
73+
ed = EventData._from_message(message) # type: EventData
74+
75+
assert ed.system_properties[_common.PROP_OFFSET] == "@latest"
76+
assert ed.system_properties[_common.PROP_CORRELATION_ID] == properties.correlation_id
77+
assert ed.system_properties[_common.PROP_MESSAGE_ID] == properties.message_id
78+
assert ed.system_properties[_common.PROP_CONTENT_ENCODING] == properties.content_encoding
79+
assert ed.system_properties[_common.PROP_CONTENT_TYPE] == properties.content_type
80+
assert ed.system_properties[_common.PROP_USER_ID] == properties.user_id
81+
assert ed.system_properties[_common.PROP_TO] == properties.to
82+
assert ed.system_properties[_common.PROP_SUBJECT] == properties.subject
83+
assert ed.system_properties[_common.PROP_REPLY_TO] == properties.reply_to
84+
assert ed.system_properties[_common.PROP_ABSOLUTE_EXPIRY_TIME] == properties.absolute_expiry_time
85+
assert ed.system_properties[_common.PROP_CREATION_TIME] == properties.creation_time
86+
assert ed.system_properties[_common.PROP_GROUP_ID] == properties.group_id
87+
assert ed.system_properties[_common.PROP_GROUP_SEQUENCE] == properties.group_sequence
88+
assert ed.system_properties[_common.PROP_REPLY_TO_GROUP_ID] == properties.reply_to_group_id
89+
90+
5591
def test_event_data_batch():
5692
batch = EventDataBatch(max_size_in_bytes=100, partition_key="par")
5793
batch.add(EventData("A"))

0 commit comments

Comments
 (0)