Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit b43d645

Browse files
committed
rdkafka/producer: minor changes to poll()
Checking for ReferenceError is just noise since we always join() the poller thread before stopping/finalising. Signed-off-by: Yung-Chin Oei <[email protected]>
1 parent de5d945 commit b43d645

File tree

1 file changed

+3
-5
lines changed

1 file changed

+3
-5
lines changed

pykafka/rdkafka/producer.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,9 @@ def start(self):
5252
self._running = True
5353

5454
def poll(self):
55-
try:
56-
while self._running or self._rdk_producer.outq_len() > 0:
57-
self._rdk_producer.poll(timeout_ms=1000)
58-
except ReferenceError: # weakref'd self
59-
pass
55+
while self._running or self._rdk_producer.outq_len() > 0:
56+
self._rdk_producer.poll(timeout_ms=1000)
57+
assert(not self._rdk_producer._pending_messages)
6058
log.debug("Exiting RdKafkaProducer poller thread cleanly.")
6159

6260
self._poller_thread = self._cluster.handler.spawn(

0 commit comments

Comments
 (0)