Skip to content

Commit ddb7a3f

Browse files
yunhaolingannatisch
authored andcommitted
EventHubs Refactor (#8515)
* init commit * Remove eventhubclient * Small fix * apply black style format * Draft refactor of the push mode * Start * Remove iterator test * Callback model receive test * Sync eventhub consumer refactor * Improve sync implementation * Async refactor * Verify send result with uamqp ReceiveClient * Start * Remove iterator test * Callback model receive test * Verify send result with uamqp ReceiveClient * api description update * Verify send result with uamqp ReceiveClient * Draft sync single thread * small fix * Fix tests for on_event (single event) * remove test_producer_client.py * Parse offset as byte * improve variable name in producer client * Consumer constructor accept callback method * fix bug in process error * Update _consumers * Update event processor unit test * Add test method for wrong json string * Fix reconnect test * put back delay in async * Invalid partition check * Change async test * revert back to cancel * Change async test * remove iterator on consumer * more remove * Fix a bug of validating wrong partition id * Add async test * async while true * fix async error handling * Catch link stolen and throw out if owner level is set * Add test for owner level * increase waiting time * Increase wait time to receive event * Correct test case for wrong key to send * Fix pylint error * update the version number * try version b6 * test code small fix * Further refactoring (#5) * Started some code clean up * More clean up * Refactored producer * Made async eventprocessor internal * Async clean up * Fixed locking behaviour * Merge updates * Fix missing lock * Review feedback * Fix multi-line strings * Test fixes * Pylint fixes * Fixed capabilities * Typo * Fix for producer shutdown * Fix editing error * Update ALL_PARTITION * Renamed logger * utc timestamps * Refactor _Address * Review feedback * More review feedback * Pylint fixes * Skip wrong hostname test on darwin * Skip wrong hostname test on darwin * Review feedback * Retry exception context * increase wait time in live test * increase wait time for owner level test * Review feedback * Perf patch * Pylint fixes * using None for default timeout * User agent prefix constant * Updated samples and snippets * Increase wait time to 6 seconds for client auth test * Add aiohttp in eventhubs dev_req * Only install aiohttp for py3.5+ * Fix client auth async test * Enlarge wait time to receive * Enlarge wait time to receive
1 parent 5ec0b1d commit ddb7a3f

File tree

75 files changed

+2722
-4018
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2722
-4018
lines changed

sdk/eventhub/azure-eventhubs/HISTORY.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Release History
22

3+
## 2019-11-04 5.0.0b6
4+
5+
**Breaking changes**
6+
37
## 2019-11-04 5.0.0b5
48

59
**Breaking changes**

sdk/eventhub/azure-eventhubs/README.md

+11-12
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,13 @@ client = EventHubConsumerClient.from_connection_string(connection_str, event_hub
189189

190190
logger = logging.getLogger("azure.eventhub")
191191

192-
def on_events(partition_context, events):
193-
logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id))
192+
def on_event(partition_context, event):
193+
logger.info("Received event from partition {}".format(partition_context.partition_id))
194194

195195
with client:
196-
client.receive(on_events=on_events, consumer_group="$Default")
196+
client.receive(on_event=on_event, consumer_group="$Default")
197197
# receive events from specified partition:
198-
# client.receive(on_events=on_events, consumer_group="$Default", partition_id='0')
198+
# client.receive(on_event=on_event, consumer_group="$Default", partition_id='0')
199199
```
200200

201201
### Async publish events to an Event Hub
@@ -273,15 +273,15 @@ event_hub_path = '<< NAME OF THE EVENT HUB >>'
273273

274274
logger = logging.getLogger("azure.eventhub")
275275

276-
async def on_events(partition_context, events):
277-
logger.info("Received {} events from partition {}".format(len(events), partition_context.partition_id))
276+
async def on_event(partition_context, event):
277+
logger.info("Received event from partition {}".format(partition_context.partition_id))
278278

279279
async def receive():
280280
client = EventHubConsumerClient.from_connection_string(connection_str, event_hub_path=event_hub_path)
281281
async with client:
282-
received = await client.receive(on_events=on_events, consumer_group='$Default')
282+
received = await client.receive(on_event=on_event, consumer_group='$Default')
283283
# receive events from specified partition:
284-
# received = await client.receive(on_events=on_events, consumer_group='$Default', partition_id='0')
284+
# received = await client.receive(on_event=on_event, consumer_group='$Default', partition_id='0')
285285

286286
if __name__ == '__main__':
287287
loop = asyncio.get_event_loop()
@@ -330,13 +330,12 @@ async def do_operation(event):
330330
print(event)
331331

332332

333-
async def process_events(partition_context, events):
334-
await asyncio.gather(*[do_operation(event) for event in events])
335-
await partition_context.update_checkpoint(events[-1])
333+
async def process_event(partition_context, event):
334+
await partition_context.update_checkpoint(event)
336335

337336
async def receive(client):
338337
try:
339-
await client.receive(on_events=process_events, consumer_group="$Default")
338+
await client.receive(on_event=process_event, consumer_group="$Default")
340339
except KeyboardInterrupt:
341340
await client.close()
342341

sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,23 @@
44
# --------------------------------------------------------------------------------------------
55

66
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
7-
__version__ = "5.0.0b5"
7+
__version__ = "5.0.0b6"
88
from uamqp import constants # type: ignore
9-
from .common import EventData, EventDataBatch, EventPosition
10-
from .error import EventHubError, EventDataError, ConnectError, \
11-
AuthenticationError, EventDataSendError, ConnectionLostError
9+
from ._common import EventData, EventDataBatch, EventPosition
1210
from ._producer_client import EventHubProducerClient
1311
from ._consumer_client import EventHubConsumerClient
14-
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential
12+
from ._common import EventHubSharedKeyCredential, EventHubSASTokenCredential
1513
from ._eventprocessor.partition_manager import PartitionManager
1614
from ._eventprocessor.common import CloseReason, OwnershipLostError
1715
from ._eventprocessor.partition_context import PartitionContext
16+
from .exceptions import (
17+
EventHubError,
18+
EventDataError,
19+
ConnectError,
20+
AuthenticationError,
21+
EventDataSendError,
22+
ConnectionLostError
23+
)
1824

1925
TransportType = constants.TransportType
2026

0 commit comments

Comments
 (0)