diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb
index 267019e22..35628608c 100644
--- a/lib/kafka/consumer.rb
+++ b/lib/kafka/consumer.rb
@@ -203,17 +203,21 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
 
         batches.each do |batch|
           batch.messages.each do |message|
-            @instrumenter.instrument("process_message.consumer") do |notification|
-              notification.update(
-                topic: message.topic,
-                partition: message.partition,
-                offset: message.offset,
-                offset_lag: batch.highwater_mark_offset - message.offset - 1,
-                create_time: message.create_time,
-                key: message.key,
-                value: message.value,
-              )
-
+            notification = {
+              topic: message.topic,
+              partition: message.partition,
+              offset: message.offset,
+              offset_lag: batch.highwater_mark_offset - message.offset - 1,
+              create_time: message.create_time,
+              key: message.key,
+              value: message.value,
+            }
+
+            # Instrument an event immediately so that subscribers don't have to wait until
+            # the block is completed.
+            @instrumenter.instrument("start_process_message.consumer", notification)
+
+            @instrumenter.instrument("process_message.consumer", notification) do
               begin
                 yield message
                 @current_offsets[message.topic][message.partition] = message.offset
@@ -278,15 +282,19 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
 
         batches.each do |batch|
           unless batch.empty?
-            @instrumenter.instrument("process_batch.consumer") do |notification|
-              notification.update(
-                topic: batch.topic,
-                partition: batch.partition,
-                offset_lag: batch.offset_lag,
-                highwater_mark_offset: batch.highwater_mark_offset,
-                message_count: batch.messages.count,
-              )
-
+            notification = {
+              topic: batch.topic,
+              partition: batch.partition,
+              offset_lag: batch.offset_lag,
+              highwater_mark_offset: batch.highwater_mark_offset,
+              message_count: batch.messages.count,
+            }
+
+            # Instrument an event immediately so that subscribers don't have to wait until
+            # the block is completed.
+            @instrumenter.instrument("start_process_batch.consumer", notification)
+
+            @instrumenter.instrument("process_batch.consumer", notification) do
               begin
                 yield batch
                 @current_offsets[batch.topic][batch.partition] = batch.last_offset