Skip to content

Commit feb90ec

Browse files
authored
[EventHubs] exception/kwargs testing (#26867)
* fix flushing list batches in bp * add send list bp test
1 parent deb1378 commit feb90ec

File tree

4 files changed

+18
-6
lines changed

4 files changed

+18
-6
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,21 @@ def flush(self, timeout_time=None, raise_error=True):
165165
self.partition_id,
166166
len(batch),
167167
)
168-
self._on_success(batch._internal_events, self.partition_id)
168+
try:
169+
self._on_success(batch._internal_events, self.partition_id)
170+
except AttributeError:
171+
self._on_success(batch, self.partition_id)
169172
except Exception as exc: # pylint: disable=broad-except
170173
_LOGGER.info(
171174
"Partition %r sending %r events failed due to exception: %r ",
172175
self.partition_id,
173176
len(batch),
174177
exc,
175178
)
176-
self._on_error(batch._internal_events, self.partition_id, exc)
179+
try:
180+
self._on_error(batch._internal_events, self.partition_id, exc)
181+
except AttributeError:
182+
self._on_error(batch, self.partition_id, exc)
177183
finally:
178184
self._cur_buffered_len -= len(batch)
179185
else:

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -167,15 +167,21 @@ async def _flush(self, timeout_time=None, raise_error=True):
167167
self.partition_id,
168168
len(batch),
169169
)
170-
await self._on_success(batch._internal_events, self.partition_id)
170+
try:
171+
await self._on_success(batch._internal_events, self.partition_id)
172+
except AttributeError:
173+
await self._on_success(batch, self.partition_id)
171174
except Exception as exc: # pylint: disable=broad-except
172175
_LOGGER.info(
173176
"Partition %r sending %r events failed due to exception: %r",
174177
self.partition_id,
175178
len(batch),
176179
exc,
177180
)
178-
await self._on_error(batch._internal_events, self.partition_id, exc)
181+
try:
182+
await self._on_error(batch._internal_events, self.partition_id, exc)
183+
except AttributeError:
184+
await self._on_error(batch, self.partition_id, exc)
179185
finally:
180186
self._cur_buffered_len -= len(batch)
181187
# If flush could not get the semaphore, we log and raise error if wanted

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_buffered_producer_async.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ async def on_error(events, pid, err):
407407

408408
async with producer:
409409
partitions = await producer.get_partition_ids()
410-
await producer.send_event(EventData('data'))
410+
await producer.send_batch([EventData('data')])
411411
await asyncio.sleep(5)
412412
assert not sent_events
413413
await asyncio.sleep(20)

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_buffered_producer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ def on_error(events, pid, err):
419419

420420
with producer:
421421
partitions = producer.get_partition_ids()
422-
producer.send_event(EventData('data'))
422+
producer.send_batch([EventData('data')])
423423
time.sleep(5)
424424
assert not sent_events
425425
time.sleep(10)

0 commit comments

Comments
 (0)