diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index 242cc188d..6ddbaf798 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -308,6 +308,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall topic: batch.topic, partition: batch.partition, last_offset: batch.last_offset, + last_create_time: batch.messages.last.try(:create_time), offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch.messages.count, diff --git a/lib/kafka/datadog.rb b/lib/kafka/datadog.rb index 0c3e12a71..c5710c432 100644 --- a/lib/kafka/datadog.rb +++ b/lib/kafka/datadog.rb @@ -160,6 +160,8 @@ def process_message(event) def process_batch(event) offset = event.payload.fetch(:last_offset) messages = event.payload.fetch(:message_count) + create_time = event.payload.fetch(:last_create_time) + time_lag = create_time && ((Time.now - create_time) * 1000).to_i tags = { client: event.payload.fetch(:client_id), @@ -176,6 +178,10 @@ def process_batch(event) end gauge("consumer.offset", offset, tags: tags) + + if time_lag + gauge("consumer.time_lag", time_lag, tags: tags) + end end def fetch_batch(event) diff --git a/spec/prometheus_spec.rb b/spec/prometheus_spec.rb index 83d4c1dfc..46046c268 100644 --- a/spec/prometheus_spec.rb +++ b/spec/prometheus_spec.rb @@ -114,6 +114,7 @@ topic: 'AAA', partition: 4, last_offset: 100, + last_create_time: Time.now, message_count: 7 } end