|
| 1 | +# Azure Event Hubs client library for Python |
| 2 | + |
| 3 | +Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. |
| 4 | + |
| 5 | +Use the Event Hubs client library for Python to: |
| 6 | + |
| 7 | +- Publish events to the Event Hubs service through a producer. |
| 8 | +- Read events from the Event Hubs service through a consumer. |
| 9 | + |
| 10 | +On Python 3.5.3 and above, it also includes: |
| 11 | + |
| 12 | +- An async producer and consumer that supports async/await methods. |
| 13 | +- An Event Processor Host module that manages the distribution of partition readers. |
| 14 | + |
| 15 | +[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://docs.microsoft.com/python/api/azure-eventhub) | [Product documentation](https://docs.microsoft.com/en-ca/azure/event-hubs/) |
| 16 | + |
| 17 | +# Getting started |
| 18 | + |
| 19 | +## Install the package |
| 20 | + |
| 21 | +Install the Azure Event Hubs client library for Python with pip: |
| 22 | + |
| 23 | +``` |
| 24 | +$ pip install azure-eventhub |
| 25 | +``` |
| 26 | + |
| 27 | +### Prerequisites |
| 28 | + |
| 29 | +- An Azure subscription. |
| 30 | +- Python 2.7, 3.5 or later. |
| 31 | +- An existing Event Hubs namespace and event hub. You can create these entities by following the instructions in [this article](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create) |
| 32 | + |
| 33 | +## Authenticate the client |
| 34 | + |
| 35 | +Interaction with Event Hubs starts with an instance of the EventHubClient class. You need the host name, SAS/AAD credential and event hub name to instantiate the client object. |
| 36 | + |
| 37 | +### Get credentials |
| 38 | + |
| 39 | +You can find credential information in [Azure Portal](https://portal.azure.com/). |
| 40 | + |
| 41 | +### Create client |
| 42 | + |
| 43 | +There are several ways to instantiate the EventHubClient object and the following code snippets demonstrate one way: |
| 44 | + |
| 45 | +```python |
| 46 | +import os |
| 47 | +from azure.eventhub import EventHubClient |
| 48 | + |
| 49 | +connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( |
| 50 | + os.environ['EVENT_HUB_HOSTNAME'], |
| 51 | + os.environ['EVENT_HUB_SAS_POLICY'], |
| 52 | + os.environ['EVENT_HUB_SAS_KEY'], |
| 53 | + os.environ['EVENT_HUB_NAME']) |
| 54 | +client = EventHubClient.from_connection_string(connection_str) |
| 55 | +``` |
| 56 | + |
| 57 | +# Key concepts |
| 58 | + |
| 59 | +- **Namespace:** An Event Hubs namespace provides a unique scoping container, referenced by its fully qualified domain name, in which you create one or more event hubs or Kafka topics. |
| 60 | + |
| 61 | +- **Event publishers**: Any entity that sends data to an event hub is an event producer, or event publisher. Event publishers can publish events using HTTPS or AMQP 1.0 or Kafka 1.0 and later. Event publishers use a Shared Access Signature (SAS) token to identify themselves to an event hub, and can have a unique identity, or use a common SAS token. |
| 62 | + |
| 63 | +- **Event consumers**: Any entity that reads event data from an event hub is an event consumer. All Event Hubs consumers connect via the AMQP 1.0 session and events are delivered through the session as they become available. The client does not need to poll for data availability. |
| 64 | + |
| 65 | +- **SAS tokens**: Event Hubs uses Shared Access Signatures, which are available at the namespace and event hub level. A SAS token is generated from a SAS key and is an SHA hash of a URL, encoded in a specific format. Using the name of the key (policy) and the token, Event Hubs can regenerate the hash and thus authenticate the sender. |
| 66 | + |
| 67 | +For more information about these concepts, see [Features and terminology in Azure Event Hubs](https://docs.microsoft.com/en-ca/azure/event-hubs/event-hubs-features). |
| 68 | + |
| 69 | +# Examples |
| 70 | + |
| 71 | +The following sections provide several code snippets covering some of the most common Event Hubs tasks, including: |
| 72 | + |
| 73 | +- [Send event data](#send-event-data) |
| 74 | +- [Receive event data](#receive-event-data) |
| 75 | +- [Async send event data](#async-send-event-data) |
| 76 | +- [Async receive event data](#async-receive-event-data) |
| 77 | + |
| 78 | +## Send event data |
| 79 | + |
| 80 | +Sends an event data and blocks until acknowledgement is received or operation times out. |
| 81 | + |
| 82 | +```python |
| 83 | +import os |
| 84 | +from azure.eventhub import EventHubClient, EventData |
| 85 | + |
| 86 | +connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( |
| 87 | + os.environ['EVENT_HUB_HOSTNAME'], |
| 88 | + os.environ['EVENT_HUB_SAS_POLICY'], |
| 89 | + os.environ['EVENT_HUB_SAS_KEY'], |
| 90 | + os.environ['EVENT_HUB_NAME']) |
| 91 | +client = EventHubClient.from_connection_string(connection_str) |
| 92 | +sender = client.create_producer(partition_id="0") |
| 93 | + |
| 94 | +try: |
| 95 | + event_list = [] |
| 96 | + for i in range(10): |
| 97 | + event_list.append(EventData(b"A single event")) |
| 98 | + |
| 99 | + with sender: |
| 100 | + sender.send(event_list) |
| 101 | +except: |
| 102 | + raise |
| 103 | +finally: |
| 104 | + pass |
| 105 | +``` |
| 106 | + |
| 107 | +## Receive event data |
| 108 | + |
| 109 | +Receive events from the EventHub. |
| 110 | + |
| 111 | +```python |
| 112 | +import os |
| 113 | +import logging |
| 114 | +from azure.eventhub import EventHubClient, EventData, EventPosition |
| 115 | + |
| 116 | +connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( |
| 117 | + os.environ['EVENT_HUB_HOSTNAME'], |
| 118 | + os.environ['EVENT_HUB_SAS_POLICY'], |
| 119 | + os.environ['EVENT_HUB_SAS_KEY'], |
| 120 | + os.environ['EVENT_HUB_NAME']) |
| 121 | +client = EventHubClient.from_connection_string(connection_str) |
| 122 | +receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition.new_events_only()) |
| 123 | + |
| 124 | +try: |
| 125 | + logger = logging.getLogger("azure.eventhub") |
| 126 | + with receiver: |
| 127 | + received = receiver.receive(max_batch_size=100, timeout=5) |
| 128 | + for event_data in received: |
| 129 | + logger.info("Message received:{}".format(event_data.body_as_str())) |
| 130 | +except: |
| 131 | + raise |
| 132 | +finally: |
| 133 | + pass |
| 134 | +``` |
| 135 | + |
| 136 | +## Async send event data |
| 137 | + |
| 138 | +Sends an event data and asynchronously. |
| 139 | + |
| 140 | +```python |
| 141 | +import os |
| 142 | +from azure.eventhub.aio import EventHubClient |
| 143 | +from azure.eventhub import EventData |
| 144 | + |
| 145 | +connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( |
| 146 | + os.environ['EVENT_HUB_HOSTNAME'], |
| 147 | + os.environ['EVENT_HUB_SAS_POLICY'], |
| 148 | + os.environ['EVENT_HUB_SAS_KEY'], |
| 149 | + os.environ['EVENT_HUB_NAME']) |
| 150 | +client = EventHubClient.from_connection_string(connection_str) |
| 151 | +sender = client.create_producer(partition_id="0") |
| 152 | + |
| 153 | +try: |
| 154 | + event_list = [] |
| 155 | + for i in range(10): |
| 156 | + event_list.append(EventData(b"A single event")) |
| 157 | + |
| 158 | + async with sender: |
| 159 | + await sender.send(event_list) |
| 160 | +except: |
| 161 | + raise |
| 162 | +finally: |
| 163 | + pass |
| 164 | +``` |
| 165 | + |
| 166 | +## Async receive event data |
| 167 | + |
| 168 | +Receive events asynchronously from the EventHub. |
| 169 | + |
| 170 | +```python |
| 171 | +import os |
| 172 | +import logging |
| 173 | +from azure.eventhub.aio import EventHubClient |
| 174 | +from azure.eventhub import EventData, EventPosition |
| 175 | + |
| 176 | +connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format( |
| 177 | + os.environ['EVENT_HUB_HOSTNAME'], |
| 178 | + os.environ['EVENT_HUB_SAS_POLICY'], |
| 179 | + os.environ['EVENT_HUB_SAS_KEY'], |
| 180 | + os.environ['EVENT_HUB_NAME']) |
| 181 | +client = EventHubClient.from_connection_string(connection_str) |
| 182 | +receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition.new_events_only()) |
| 183 | + |
| 184 | +try: |
| 185 | + logger = logging.getLogger("azure.eventhub") |
| 186 | + async with receiver: |
| 187 | + received = await receiver.receive(max_batch_size=100, timeout=5) |
| 188 | + for event_data in received: |
| 189 | + logger.info("Message received:{}".format(event_data.body_as_str())) |
| 190 | +except: |
| 191 | + raise |
| 192 | +finally: |
| 193 | + pass |
| 194 | +``` |
| 195 | + |
| 196 | +# Troubleshooting |
| 197 | + |
| 198 | +## General |
| 199 | + |
| 200 | +The Event Hubs APIs generate the following exceptions. |
| 201 | + |
| 202 | +- **AuthenticationError:** Failed to authenticate because of wrong address, SAS policy/key pair, SAS token or azure identity. |
| 203 | +- **ConnectError:** Failed to connect to the EventHubs. The AuthenticationError is a type of ConnectError. |
| 204 | +- **ConnectionLostError:** Lose connection after a connection has been built. |
| 205 | +- **EventDataError:** The EventData to be sent fails data validation. |
| 206 | +For instance, this error is raised if you try to send an EventData that is already sent. |
| 207 | +- **EventDataSendError:** The Eventhubs service responds with an error when an EventData is sent. |
| 208 | +- **EventHubError:** All other Eventhubs related errors. It is also the root error class of all the above mentioned errors. |
| 209 | + |
| 210 | +# Next steps |
| 211 | + |
| 212 | +## Examples |
| 213 | + |
| 214 | +- ./examples/send.py - use sender to publish events |
| 215 | +- ./examples/recv.py - use receiver to read events |
| 216 | +- ./examples/send_async.py - async/await support of a sender |
| 217 | +- ./examples/recv_async.py - async/await support of a receiver |
| 218 | +- ./examples/eph.py - event processor host |
| 219 | + |
| 220 | +## Documentation |
| 221 | + |
| 222 | +Reference documentation is available at https://docs.microsoft.com/python/api/azure-eventhub. |
| 223 | + |
| 224 | +## Logging |
| 225 | + |
| 226 | +- Enable 'azure.eventhub' logger to collect traces from the library. |
| 227 | +- Enable 'uamqp' logger to collect traces from the underlying uAMQP library. |
| 228 | +- Enable AMQP frame level trace by setting `network_tracing=True` when creating the client. |
| 229 | + |
| 230 | +## Provide Feedback |
| 231 | + |
| 232 | +If you encounter any bugs or have suggestions, please file an issue in the [Issues](https://github.com/Azure/azure-sdk-for-python/issues) section of the project. |
| 233 | + |
| 234 | +# Contributing |
| 235 | + |
| 236 | +This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com. |
| 237 | + |
| 238 | +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA. |
| 239 | + |
| 240 | +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). |
| 241 | +For more information see the [Code of Conduct FAQ ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [[email protected]](mailto:[email protected]) with any additional questions or comments. |
0 commit comments