Skip to content

Commit 7d35094

Browse files
committed
Send spec initial
1 parent 04a61c4 commit 7d35094

File tree

4 files changed

+143
-28
lines changed

4 files changed

+143
-28
lines changed

sdk/eventgrid/azure-eventgrid/azure/eventgrid/_helpers.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,11 @@ def _get_authentication_policy(credential):
7070
if isinstance(credential, EventGridSharedAccessSignatureCredential):
7171
authentication_policy = EventGridSharedAccessSignatureCredentialPolicy(credential=credential, name=constants.EVENTGRID_TOKEN_HEADER)
7272
return authentication_policy
73+
74+
def _is_cloud_event(event):
75+
# type: dict -> bool
76+
required = ('id', 'source', 'specversion', 'type')
77+
try:
78+
return all([_ in event for _ in required]) and event['specversion'] == "1.0"
79+
except TypeError:
80+
pass

sdk/eventgrid/azure-eventgrid/azure/eventgrid/_publisher_client.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,23 @@
1313

1414
if TYPE_CHECKING:
1515
# pylint: disable=unused-import,ungrouped-imports
16-
from typing import Any
16+
from typing import Any, Union, Dict, List
17+
SendType = Union[
18+
CloudEvent,
19+
EventGridEvent,
20+
CustomEvent,
21+
Dict,
22+
List[CloudEvent],
23+
List[EventGridEvent],
24+
List[CustomEvent]
25+
]
1726

1827
from ._models import CloudEvent, EventGridEvent, CustomEvent
19-
from ._helpers import _get_topic_hostname_only_fqdn, _get_authentication_policy
28+
from ._helpers import _get_topic_hostname_only_fqdn, _get_authentication_policy, _is_cloud_event
2029
from ._generated._event_grid_publisher_client import EventGridPublisherClient as EventGridPublisherClientImpl
2130
from . import _constants as constants
2231

32+
2333
class EventGridPublisherClient(object):
2434
"""EventGrid Python Publisher Client.
2535
@@ -36,20 +46,24 @@ def __init__(self, topic_hostname, credential, **kwargs):
3646
self._topic_hostname = topic_hostname
3747
auth_policy = _get_authentication_policy(credential)
3848
self._client = EventGridPublisherClientImpl(authentication_policy=auth_policy, **kwargs)
39-
49+
4050
def send(self, events, **kwargs):
41-
# type: (Union[List[CloudEvent], List[EventGridEvent], List[CustomEvent]], Any) -> None
51+
# type: (SendType, Any) -> None
4252
"""Sends event data to topic hostname specified during client initialization.
4353
4454
:param events: A list of CloudEvent/EventGridEvent/CustomEvent to be sent.
4555
:type events: Union[List[models.CloudEvent], List[models.EventGridEvent], List[models.CustomEvent]]
4656
:rtype: None
4757
raise: :class:`ValueError`, when events do not follow specified SendType.
4858
"""
59+
if not isinstance(events, list):
60+
events = [events]
4961

50-
if all(isinstance(e, CloudEvent) for e in events):
62+
if all(isinstance(e, CloudEvent) for e in events) or all(_is_cloud_event(e) for e in events):
63+
kwargs.setdefault("content_type", "application/cloudevents-batch+json; charset=utf-8")
5164
self._client.publish_cloud_event_events(self._topic_hostname, events, **kwargs)
52-
elif all(isinstance(e, EventGridEvent) for e in events):
65+
elif all(isinstance(e, EventGridEvent) for e in events) or all(isinstance(e, dict) for e in events):
66+
kwargs.setdefault("content_type", "application/json; charset=utf-8")
5367
self._client.publish_events(self._topic_hostname, events, **kwargs)
5468
elif all(isinstance(e, CustomEvent) for e in events):
5569
serialized_events = [dict(e) for e in events]

sdk/eventgrid/azure-eventgrid/azure/eventgrid/aio/_publisher_client_async.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,24 @@
1212
from msrest import Deserializer, Serializer
1313

1414
from .._models import CloudEvent, EventGridEvent
15-
from .._helpers import _get_topic_hostname_only_fqdn, _get_authentication_policy
15+
from .._helpers import _get_topic_hostname_only_fqdn, _get_authentication_policy, _is_cloud_event
1616
from azure.core.pipeline.policies import AzureKeyCredentialPolicy
1717
from azure.core.credentials import AzureKeyCredential
1818
from .._generated.aio import EventGridPublisherClient as EventGridPublisherClientAsync
1919
from .. import _constants as constants
2020

21+
if TYPE_CHECKING:
22+
# pylint: disable=unused-import,ungrouped-imports
23+
from typing import Any, Union, Dict, List
24+
SendType = Union[
25+
CloudEvent,
26+
EventGridEvent,
27+
CustomEvent,
28+
Dict,
29+
List[CloudEvent],
30+
List[EventGridEvent],
31+
List[CustomEvent]
32+
]
2133

2234
class EventGridPublisherClient(object):
2335
"""Asynchronous EventGrid Python Publisher Client.
@@ -35,22 +47,26 @@ def __init__(self, topic_hostname, credential, **kwargs):
3547

3648

3749
async def send(self, events, **kwargs):
38-
# type: (Union[List[CloudEvent], List[EventGridEvent], List[CustomEvent]], Any) -> None
50+
# type: (SendType) -> None
3951
"""Sends event data to topic hostname specified during client initialization.
4052
4153
:param events: A list of CloudEvent/EventGridEvent/CustomEvent to be sent.
4254
:type events: Union[List[models.CloudEvent], List[models.EventGridEvent], List[models.CustomEvent]]
4355
:rtype: None
4456
raise: :class:`ValueError`, when events do not follow specified SendType.
4557
"""
58+
if not isinstance(events, list):
59+
events = [events]
4660

47-
if all(isinstance(e, CloudEvent) for e in events):
48-
await self._client.publish_cloud_event_events(self._topic_hostname, events, **kwargs)
49-
elif all(isinstance(e, EventGridEvent) for e in events):
50-
await self._client.publish_events(self._topic_hostname, events, **kwargs)
61+
if all(isinstance(e, CloudEvent) for e in events) or all(_is_cloud_event(e) for e in events):
62+
kwargs.setdefault("content_type", "application/cloudevents-batch+json; charset=utf-8")
63+
self._client.publish_cloud_event_events(self._topic_hostname, events, **kwargs)
64+
elif all(isinstance(e, EventGridEvent) for e in events) or all(isinstance(e, dict) for e in events):
65+
kwargs.setdefault("content_type", "application/json; charset=utf-8")
66+
self._client.publish_events(self._topic_hostname, events, **kwargs)
5167
elif all(isinstance(e, CustomEvent) for e in events):
5268
serialized_events = [dict(e) for e in events]
53-
await self._client.publish_custom_event_events(self._topic_hostname, serialized_events, **kwargs)
69+
self._client.publish_custom_event_events(self._topic_hostname, serialized_events, **kwargs)
5470
else:
5571
raise ValueError("Event schema is not correct.")
5672

sdk/eventgrid/azure-eventgrid/tests/test_eg_publisher_client.py

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class EventGridPublisherClientTests(AzureMgmtTestCase):
2525
@pytest.mark.liveTest
2626
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
2727
@CachedEventGridTopicPreparer(name_prefix='eventgridtest')
28-
def test_eg_publisher_client_publish_event_grid_event_data_dict(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
28+
def test_send_event_grid_event_data_dict(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
2929
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
3030
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
3131
eg_event = EventGridEvent(
@@ -34,12 +34,32 @@ def test_eg_publisher_client_publish_event_grid_event_data_dict(self, resource_g
3434
event_type="Sample.EventGrid.Event",
3535
data_version="2.0"
3636
)
37-
client.send([eg_event])
37+
client.send(eg_event)
3838

3939
@pytest.mark.liveTest
4040
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
4141
@CachedEventGridTopicPreparer(name_prefix='eventgridtest')
42-
def test_eg_publisher_client_publish_event_grid_event_data_str(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
42+
def test_send_event_grid_event_data_as_list(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
43+
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
44+
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
45+
eg_event1 = EventGridEvent(
46+
subject="sample",
47+
data="eventgridevent",
48+
event_type="Sample.EventGrid.Event",
49+
data_version="2.0"
50+
)
51+
eg_event2 = EventGridEvent(
52+
subject="sample2",
53+
data="eventgridevent2",
54+
event_type="Sample.EventGrid.Event",
55+
data_version="2.0"
56+
)
57+
client.send([eg_event1, eg_event2])
58+
59+
@pytest.mark.liveTest
60+
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
61+
@CachedEventGridTopicPreparer(name_prefix='eventgridtest')
62+
def test_send_event_grid_event_data_str(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
4363
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
4464
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
4565
eg_event = EventGridEvent(
@@ -48,38 +68,67 @@ def test_eg_publisher_client_publish_event_grid_event_data_str(self, resource_gr
4868
event_type="Sample.EventGrid.Event",
4969
data_version="2.0"
5070
)
51-
client.send([eg_event])
71+
client.send(eg_event)
5272

5373
@pytest.mark.liveTest
5474
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
5575
@CachedEventGridTopicPreparer(name_prefix='cloudeventgridtest')
56-
def test_eg_publisher_client_publish_cloud_event_data_dict(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
76+
def test_send_cloud_event_data_dict(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
5777
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
5878
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
5979
cloud_event = CloudEvent(
6080
source = "http://samplesource.dev",
6181
data = {"sample": "cloudevent"},
6282
type="Sample.Cloud.Event"
6383
)
64-
client.send([cloud_event])
84+
client.send(cloud_event)
85+
86+
# @pytest.mark.liveTest
87+
# @CachedResourceGroupPreparer(name_prefix='eventgridtest')
88+
# @CachedEventGridTopicPreparer(name_prefix='cloudeventgridtest')
89+
# def test_send_cloud_event_data_str(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
90+
# akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
91+
# client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
92+
# cloud_event = CloudEvent(
93+
# source = "http://samplesource.dev",
94+
# data = "cloudevent",
95+
# type="Sample.Cloud.Event"
96+
# )
97+
# client.send(cloud_event)
98+
6599

66100
@pytest.mark.liveTest
67101
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
68-
@CachedEventGridTopicPreparer(name_prefix='cloudeventgridtest')
69-
def test_eg_publisher_client_publish_cloud_event_data_str(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
102+
@CachedEventGridTopicPreparer(name_prefix='eventgridtest')
103+
def test_send_cloud_event_data_as_list(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
70104
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
71105
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
72-
cloud_event = CloudEvent(
106+
cloud_event1 = CloudEvent(
73107
source = "http://samplesource.dev",
74108
data = "cloudevent",
75109
type="Sample.Cloud.Event"
76110
)
77-
client.send([cloud_event])
78-
111+
client.send([cloud_event1])
112+
79113
@pytest.mark.liveTest
80114
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
81115
@CachedEventGridTopicPreparer(name_prefix='eventgridtest')
82-
def test_eg_publisher_client_publish_signature_credential(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
116+
def test_send_cloud_event_dict(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
117+
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
118+
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
119+
cloud_event1 = {
120+
"id": "1234",
121+
"source": "http://samplesource.dev",
122+
"specversion": "1.0",
123+
"data": "cloudevent",
124+
"type": "Sample.Cloud.Event"
125+
}
126+
client.send(cloud_event1)
127+
128+
@pytest.mark.liveTest
129+
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
130+
@CachedEventGridTopicPreparer(name_prefix='eventgridtest')
131+
def test_send_signature_credential(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
83132
expiration_date_utc = dt.datetime.now(UTC()) + timedelta(hours=1)
84133
signature = generate_shared_access_signature(eventgrid_topic_endpoint, eventgrid_topic_primary_key, expiration_date_utc)
85134
credential = EventGridSharedAccessSignatureCredential(signature)
@@ -90,12 +139,12 @@ def test_eg_publisher_client_publish_signature_credential(self, resource_group,
90139
event_type="Sample.EventGrid.Event",
91140
data_version="2.0"
92141
)
93-
client.send([eg_event])
142+
client.send(eg_event)
94143

95144
@pytest.mark.liveTest
96145
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
97146
@CachedEventGridTopicPreparer(name_prefix='customeventgridtest')
98-
def test_eg_publisher_client_publish_custom_schema_event(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
147+
def test_send_custom_schema_event(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
99148
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
100149
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
101150
custom_event = CustomEvent(
@@ -108,4 +157,32 @@ def test_eg_publisher_client_publish_custom_schema_event(self, resource_group, e
108157
"customData": "sample data"
109158
}
110159
)
111-
client.send([custom_event])
160+
client.send(custom_event)
161+
162+
@pytest.mark.liveTest
163+
@CachedResourceGroupPreparer(name_prefix='eventgridtest')
164+
@CachedEventGridTopicPreparer(name_prefix='customeventgridtest')
165+
def test_send_custom_schema_event_as_list(self, resource_group, eventgrid_topic, eventgrid_topic_primary_key, eventgrid_topic_endpoint):
166+
akc_credential = AzureKeyCredential(eventgrid_topic_primary_key)
167+
client = EventGridPublisherClient(eventgrid_topic_endpoint, akc_credential)
168+
custom_event1 = CustomEvent(
169+
{
170+
"customSubject": "sample",
171+
"customEventType": "sample.event",
172+
"customDataVersion": "2.0",
173+
"customId": "1234",
174+
"customEventTime": dt.datetime.now(UTC()).isoformat(),
175+
"customData": "sample data"
176+
}
177+
)
178+
custom_event2 = CustomEvent(
179+
{
180+
"customSubject": "sample2",
181+
"customEventType": "sample.event",
182+
"customDataVersion": "2.0",
183+
"customId": "12345",
184+
"customEventTime": dt.datetime.now(UTC()).isoformat(),
185+
"customData": "sample data 2"
186+
}
187+
)
188+
client.send([custom_event1, custom_event2])

0 commit comments

Comments
 (0)