18
18
limitations under the License.
19
19
"""
20
20
__all__ = ["BalancedConsumer" ]
21
+ import functools
21
22
import itertools
22
23
import logging
23
24
import socket
25
+ import sys
24
26
import time
27
+ import traceback
25
28
from uuid import uuid4
26
29
import weakref
27
30
44
47
log = logging .getLogger (__name__ )
45
48
46
49
50
+ def _catch_thread_exception (fn ):
51
+ """Sets self._worker_exception when fn raises an exception"""
52
+ def wrapped (self , * args , ** kwargs ):
53
+ try :
54
+ ret = fn (self , * args , ** kwargs )
55
+ except Exception :
56
+ self ._worker_exception = sys .exc_info ()
57
+ else :
58
+ return ret
59
+ return wrapped
60
+
61
+
47
62
class BalancedConsumer ():
48
63
"""
49
64
A self-balancing consumer for Kafka that uses ZooKeeper to communicate
@@ -173,6 +188,8 @@ def __init__(self,
173
188
self ._zookeeper_connection_timeout_ms = zookeeper_connection_timeout_ms
174
189
self ._reset_offset_on_start = reset_offset_on_start
175
190
self ._running = False
191
+ self ._worker_exception = None
192
+ self ._worker_trace_logged = False
176
193
177
194
if not rdkafka and use_rdkafka :
178
195
raise ImportError ("use_rdkafka requires rdkafka to be installed" )
@@ -200,6 +217,10 @@ def __init__(self,
200
217
if auto_start is True :
201
218
self .start ()
202
219
220
+ def __del__ (self ):
221
+ log .debug ("Finalising {}" .format (self ))
222
+ self .stop ()
223
+
203
224
def __repr__ (self ):
204
225
return "<{module}.{name} at {id_} (consumer_group={group})>" .format (
205
226
module = self .__class__ .__module__ ,
@@ -208,15 +229,33 @@ def __repr__(self):
208
229
group = self ._consumer_group
209
230
)
210
231
232
+ def _raise_worker_exceptions (self ):
233
+ """Raises exceptions encountered on worker threads"""
234
+ if self ._worker_exception is not None :
235
+ _ , ex , tb = self ._worker_exception
236
+ if not self ._worker_trace_logged :
237
+ self ._worker_trace_logged = True
238
+ log .error ("Exception encountered in worker thread:\n %s" ,
239
+ "" .join (traceback .format_tb (tb )))
240
+ raise ex
241
+
211
242
def _setup_checker_worker (self ):
212
243
"""Start the zookeeper partition checker thread"""
244
+ self = weakref .proxy (self )
245
+
213
246
def checker ():
214
247
while True :
215
- if not self ._running :
248
+ try :
249
+ if not self ._running :
250
+ break
251
+ time .sleep (120 )
252
+ if not self ._check_held_partitions ():
253
+ self ._rebalance ()
254
+ except Exception as e :
255
+ if not isinstance (e , ReferenceError ):
256
+ # surface all exceptions to the main thread
257
+ self ._worker_exception = sys .exc_info ()
216
258
break
217
- time .sleep (120 )
218
- if not self ._check_held_partitions ():
219
- self ._rebalance ()
220
259
log .debug ("Checker thread exiting" )
221
260
log .debug ("Starting checker thread" )
222
261
return self ._cluster .handler .spawn (checker )
@@ -242,17 +281,21 @@ def held_offsets(self):
242
281
243
282
def start (self ):
244
283
"""Open connections and join a cluster."""
245
- if self ._zookeeper is None :
246
- self ._setup_zookeeper (self ._zookeeper_connect ,
247
- self ._zookeeper_connection_timeout_ms )
248
- self ._zookeeper .ensure_path (self ._topic_path )
249
- self ._zk_state_listener = self ._get_zk_state_listener ()
250
- self ._zookeeper .add_listener (self ._zk_state_listener )
251
- self ._add_self ()
252
- self ._running = True
253
- self ._set_watches ()
254
- self ._rebalance ()
255
- self ._setup_checker_worker ()
284
+ try :
285
+ if self ._zookeeper is None :
286
+ self ._setup_zookeeper (self ._zookeeper_connect ,
287
+ self ._zookeeper_connection_timeout_ms )
288
+ self ._zookeeper .ensure_path (self ._topic_path )
289
+ self ._zk_state_listener = self ._get_zk_state_listener ()
290
+ self ._zookeeper .add_listener (self ._zk_state_listener )
291
+ self ._add_self ()
292
+ self ._running = True
293
+ self ._set_watches ()
294
+ self ._rebalance ()
295
+ self ._setup_checker_worker ()
296
+ except Exception :
297
+ log .error ("Stopping consumer in response to error" )
298
+ self .stop ()
256
299
257
300
def stop (self ):
258
301
"""Close the zookeeper connection and stop consuming.
@@ -273,10 +316,13 @@ def stop(self):
273
316
self ._zookeeper .stop ()
274
317
else :
275
318
self ._remove_partitions (self ._get_held_partitions ())
276
- self ._zookeeper .delete (self ._path_self )
277
- # additionally we'd want to remove watches here, but there are no
278
- # facilities for that in ChildrenWatch - as a workaround we check
279
- # self._running in the watcher callbacks (see further down)
319
+ try :
320
+ self ._zookeeper .delete (self ._path_self )
321
+ except :
322
+ pass
323
+ # additionally we'd want to remove watches here, but there are no
324
+ # facilities for that in ChildrenWatch - as a workaround we check
325
+ # self._running in the watcher callbacks (see further down)
280
326
281
327
def _setup_zookeeper (self , zookeeper_connect , timeout ):
282
328
"""Open a connection to a ZooKeeper host.
@@ -411,13 +457,18 @@ def _set_watches(self):
411
457
consumer group remains up-to-date with the current state of the
412
458
cluster.
413
459
"""
460
+ proxy = weakref .proxy (self )
461
+ _brokers_changed = functools .partial (BalancedConsumer ._brokers_changed , proxy )
462
+ _topics_changed = functools .partial (BalancedConsumer ._topics_changed , proxy )
463
+ _consumers_changed = functools .partial (BalancedConsumer ._consumers_changed , proxy )
464
+
414
465
self ._setting_watches = True
415
466
# Set all our watches and then rebalance
416
467
broker_path = '/brokers/ids'
417
468
try :
418
469
self ._broker_watcher = ChildrenWatch (
419
470
self ._zookeeper , broker_path ,
420
- self . _brokers_changed
471
+ _brokers_changed
421
472
)
422
473
except NoNodeException :
423
474
raise Exception (
@@ -428,12 +479,12 @@ def _set_watches(self):
428
479
self ._topics_watcher = ChildrenWatch (
429
480
self ._zookeeper ,
430
481
'/brokers/topics' ,
431
- self . _topics_changed
482
+ _topics_changed
432
483
)
433
484
434
485
self ._consumer_watcher = ChildrenWatch (
435
486
self ._zookeeper , self ._consumer_id_path ,
436
- self . _consumers_changed
487
+ _consumers_changed
437
488
)
438
489
self ._setting_watches = False
439
490
@@ -602,6 +653,7 @@ def listener(zk_state):
602
653
603
654
return listener
604
655
656
+ @_catch_thread_exception
605
657
def _brokers_changed (self , brokers ):
606
658
if not self ._running :
607
659
return False # `False` tells ChildrenWatch to disable this watch
@@ -611,6 +663,7 @@ def _brokers_changed(self, brokers):
611
663
self ._consumer_id ))
612
664
self ._rebalance ()
613
665
666
+ @_catch_thread_exception
614
667
def _consumers_changed (self , consumers ):
615
668
if not self ._running :
616
669
return False # `False` tells ChildrenWatch to disable this watch
@@ -620,6 +673,7 @@ def _consumers_changed(self, consumers):
620
673
self ._consumer_id ))
621
674
self ._rebalance ()
622
675
676
+ @_catch_thread_exception
623
677
def _topics_changed (self , topics ):
624
678
if not self ._running :
625
679
return False # `False` tells ChildrenWatch to disable this watch
@@ -641,6 +695,7 @@ def reset_offsets(self, partition_offsets=None):
641
695
:type partition_offsets: Iterable of
642
696
(:class:`pykafka.partition.Partition`, int)
643
697
"""
698
+ self ._raise_worker_exceptions ()
644
699
if not self ._consumer :
645
700
raise ConsumerStoppedException ("Internal consumer is stopped" )
646
701
self ._consumer .reset_offsets (partition_offsets = partition_offsets )
@@ -663,6 +718,7 @@ def consumer_timed_out():
663
718
message = None
664
719
self ._last_message_time = time .time ()
665
720
while message is None and not consumer_timed_out ():
721
+ self ._raise_worker_exceptions ()
666
722
try :
667
723
message = self ._consumer .consume (block = block )
668
724
except ConsumerStoppedException :
@@ -690,4 +746,5 @@ def commit_offsets(self):
690
746
691
747
Uses the offset commit/fetch API
692
748
"""
749
+ self ._raise_worker_exceptions ()
693
750
return self ._consumer .commit_offsets ()
0 commit comments