From 08aebf7da9297a3583ac32bfec786a0b5f570b8b Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Wed, 29 Nov 2017 14:39:31 +0100 Subject: [PATCH] Instrument the start of message/batch processing This makes it possible for subscribers to do things before the processing completes. --- lib/kafka/consumer.rb | 48 +++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index 267019e22..35628608c 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -203,17 +203,21 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica batches.each do |batch| batch.messages.each do |message| - @instrumenter.instrument("process_message.consumer") do |notification| - notification.update( - topic: message.topic, - partition: message.partition, - offset: message.offset, - offset_lag: batch.highwater_mark_offset - message.offset - 1, - create_time: message.create_time, - key: message.key, - value: message.value, - ) - + notification = { + topic: message.topic, + partition: message.partition, + offset: message.offset, + offset_lag: batch.highwater_mark_offset - message.offset - 1, + create_time: message.create_time, + key: message.key, + value: message.value, + } + + # Instrument an event immediately so that subscribers don't have to wait until + # the block is completed. + @instrumenter.instrument("start_process_message.consumer", notification) + + @instrumenter.instrument("process_message.consumer", notification) do begin yield message @current_offsets[message.topic][message.partition] = message.offset @@ -278,15 +282,19 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall batches.each do |batch| unless batch.empty? - @instrumenter.instrument("process_batch.consumer") do |notification| - notification.update( - topic: batch.topic, - partition: batch.partition, - offset_lag: batch.offset_lag, - highwater_mark_offset: batch.highwater_mark_offset, - message_count: batch.messages.count, - ) - + notification = { + topic: batch.topic, + partition: batch.partition, + offset_lag: batch.offset_lag, + highwater_mark_offset: batch.highwater_mark_offset, + message_count: batch.messages.count, + } + + # Instrument an event immediately so that subscribers don't have to wait until + # the block is completed. + @instrumenter.instrument("start_process_batch.consumer", notification) + + @instrumenter.instrument("process_batch.consumer", notification) do begin yield batch @current_offsets[batch.topic][batch.partition] = batch.last_offset