6
6
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
7
7
# --------------------------------------------------------------------------
8
8
9
- from typing import Any
9
+ from typing import Any , TYPE_CHECKING
10
10
11
11
from azure .core import AsyncPipelineClient
12
12
from msrest import Deserializer , Serializer
13
13
14
- from .._models import CloudEvent , EventGridEvent
15
- from .._helpers import _get_topic_hostname_only_fqdn , _get_authentication_policy
14
+ from .._models import CloudEvent , EventGridEvent , CustomEvent
15
+ from .._helpers import _get_topic_hostname_only_fqdn , _get_authentication_policy , _is_cloud_event
16
16
from azure .core .pipeline .policies import AzureKeyCredentialPolicy
17
17
from azure .core .credentials import AzureKeyCredential
18
18
from .._generated .aio import EventGridPublisherClient as EventGridPublisherClientAsync
19
19
from .. import _constants as constants
20
20
21
+ if TYPE_CHECKING :
22
+ # pylint: disable=unused-import,ungrouped-imports
23
+ from typing import Any , Union , Dict , List
24
+ SendType = Union [
25
+ CloudEvent ,
26
+ EventGridEvent ,
27
+ CustomEvent ,
28
+ Dict ,
29
+ List [CloudEvent ],
30
+ List [EventGridEvent ],
31
+ List [CustomEvent ],
32
+ List [Dict ]
33
+ ]
21
34
22
35
class EventGridPublisherClient (object ):
23
36
"""Asynchronous EventGrid Python Publisher Client.
@@ -30,23 +43,30 @@ class EventGridPublisherClient(object):
30
43
def __init__ (self , topic_hostname , credential , ** kwargs ):
31
44
# type: (str, Union[AzureKeyCredential, EventGridSharedAccessSignatureCredential], Any) -> None
32
45
auth_policy = _get_authentication_policy (credential )
33
- self ._client = EventGridPublisherClientAsync (authentication_policy = auth_policy )
46
+ self ._client = EventGridPublisherClientAsync (authentication_policy = auth_policy , ** kwargs )
47
+ topic_hostname = _get_topic_hostname_only_fqdn (topic_hostname )
34
48
self ._topic_hostname = topic_hostname
35
49
36
50
37
51
async def send (self , events , ** kwargs ):
38
- # type: (Union[List[CloudEvent], List[EventGridEvent], List[CustomEvent]], Any ) -> None
52
+ # type: (SendType ) -> None
39
53
"""Sends event data to topic hostname specified during client initialization.
40
54
41
55
:param events: A list of CloudEvent/EventGridEvent/CustomEvent to be sent.
42
56
:type events: Union[List[models.CloudEvent], List[models.EventGridEvent], List[models.CustomEvent]]
57
+ :keyword str content_type: The type of content to be used to send the events.
58
+ Has default value "application/json; charset=utf-8" for EventGridEvents, with "cloudevents-batch+json" for CloudEvents
43
59
:rtype: None
44
- raise: :class:`ValueError`, when events do not follow specified SendType.
60
+ : raise: :class:`ValueError`, when events do not follow specified SendType.
45
61
"""
62
+ if not isinstance (events , list ):
63
+ events = [events ]
46
64
47
- if all (isinstance (e , CloudEvent ) for e in events ):
65
+ if all (isinstance (e , CloudEvent ) for e in events ) or all (_is_cloud_event (e ) for e in events ):
66
+ kwargs .setdefault ("content_type" , "application/cloudevents-batch+json; charset=utf-8" )
48
67
await self ._client .publish_cloud_event_events (self ._topic_hostname , events , ** kwargs )
49
- elif all (isinstance (e , EventGridEvent ) for e in events ):
68
+ elif all (isinstance (e , EventGridEvent ) for e in events ) or all (isinstance (e , dict ) for e in events ):
69
+ kwargs .setdefault ("content_type" , "application/json; charset=utf-8" )
50
70
await self ._client .publish_events (self ._topic_hostname , events , ** kwargs )
51
71
elif all (isinstance (e , CustomEvent ) for e in events ):
52
72
serialized_events = [dict (e ) for e in events ]
0 commit comments