Skip to content

Commit a57700e

Browse files
committed
producer consumer interceptors
linter asyn producer interceptor nil consumer docs typo error details pending message revert lint empty interceptor linter reviews1 intercept batch test error reviews2 interceptors typos extra line rm nil check more reviews linter functional test revert dockercompose rm close revert docker compose hoost more tests
1 parent 8c106b1 commit a57700e

13 files changed

+379
-7
lines changed

lib/kafka/client.rb

+10-3
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit
248248
# be in a message set before it should be compressed. Note that message sets
249249
# are per-partition rather than per-topic or per-producer.
250250
#
251+
# @param interceptors [Array<Kafka::ProducerInterceptor>] a list of producer interceptors.
252+
#
251253
# @return [Kafka::Producer] the Kafka producer.
252254
def producer(
253255
compression_codec: nil,
@@ -261,7 +263,8 @@ def producer(
261263
idempotent: false,
262264
transactional: false,
263265
transactional_id: nil,
264-
transactional_timeout: 60
266+
transactional_timeout: 60,
267+
interceptors: []
265268
)
266269
cluster = initialize_cluster
267270
compressor = Compressor.new(
@@ -291,6 +294,7 @@ def producer(
291294
retry_backoff: retry_backoff,
292295
max_buffer_size: max_buffer_size,
293296
max_buffer_bytesize: max_buffer_bytesize,
297+
interceptors: interceptors
294298
)
295299
end
296300

@@ -343,6 +347,7 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
343347
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
344348
# If it is 0, the topic list won't be refreshed (default)
345349
# If it is n (n > 0), the topic list will be refreshed every n seconds
350+
# @param interceptors [Array<Kafka::ConsumerInterceptor>] a list of consumer interceptors.
346351
# @return [Consumer]
347352
def consumer(
348353
group_id:,
@@ -353,7 +358,8 @@ def consumer(
353358
heartbeat_interval: 10,
354359
offset_retention_time: nil,
355360
fetcher_max_queue_size: 100,
356-
refresh_topic_interval: 0
361+
refresh_topic_interval: 0,
362+
interceptors: []
357363
)
358364
cluster = initialize_cluster
359365

@@ -407,7 +413,8 @@ def consumer(
407413
fetcher: fetcher,
408414
session_timeout: session_timeout,
409415
heartbeat: heartbeat,
410-
refresh_topic_interval: refresh_topic_interval
416+
refresh_topic_interval: refresh_topic_interval,
417+
interceptors: interceptors
411418
)
412419
end
413420

lib/kafka/consumer.rb

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "kafka/consumer_group"
4+
require "kafka/consumer_interceptors"
45
require "kafka/offset_manager"
56
require "kafka/fetcher"
67
require "kafka/pause"
@@ -44,7 +45,8 @@ module Kafka
4445
#
4546
class Consumer
4647

47-
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
48+
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:,
49+
session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: [])
4850
@cluster = cluster
4951
@logger = TaggedLogger.new(logger)
5052
@instrumenter = instrumenter
@@ -54,6 +56,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
5456
@fetcher = fetcher
5557
@heartbeat = heartbeat
5658
@refresh_topic_interval = refresh_topic_interval
59+
@interceptors = ConsumerInterceptors.new(interceptors: interceptors, logger: logger)
5760

5861
@pauses = Hash.new {|h, k|
5962
h[k] = Hash.new {|h2, k2|
@@ -124,6 +127,7 @@ def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, m
124127
# @return [nil]
125128
def stop
126129
@running = false
130+
@interceptors.close
127131
@fetcher.stop
128132
end
129133

@@ -220,6 +224,7 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
220224
batches = fetch_batches
221225

222226
batches.each do |batch|
227+
batch = @interceptors.on_consume(batch)
223228
batch.messages.each do |message|
224229
notification = {
225230
topic: message.topic,
@@ -311,6 +316,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
311316
unless batch.empty?
312317
raw_messages = batch.messages
313318
batch.messages = raw_messages.reject(&:is_control_record)
319+
batch = @interceptors.on_consume(batch)
314320

315321
notification = {
316322
topic: batch.topic,

lib/kafka/consumer_interceptor.rb

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
# Allows the client to intercept the messages received by the consumer.
5+
class ConsumerInterceptor
6+
# This is called from Kafka::Consumer once the batches are fetched.
7+
#
8+
# @param batch [Kafka::FetchedBatch]
9+
# @return [Kafka::FetchedBatch] intercepted batch.
10+
def on_consume(batch)
11+
raise NotImplementedError, "Implement this method in a child class"
12+
end
13+
14+
# This is called when the interceptor is closed.
15+
# @return [nil]
16+
def close
17+
raise NotImplementedError, "Implement this method in a child class"
18+
end
19+
end
20+
end

lib/kafka/consumer_interceptors.rb

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
require "kafka/interceptors"
4+
5+
module Kafka
6+
# Holds a list of Kafka::ConsumerInterceptor
7+
# and wraps calls to a chain of custom consumer interceptors.
8+
class ConsumerInterceptors < Kafka::Interceptors
9+
# This is called from Kafka::Consumer once the batches are fetched.
10+
# The batch returned from the first ConsumerInterceptor.on_consume is passed to the second interceptor on_consume,
11+
# and so on in an interceptor chain. This method does not throw exceptions.
12+
#
13+
# @param batch [Kafka::FetchedBatch] fetched batch.
14+
#
15+
# @return [Kafka::FetchedBatch] intercepted batch.
16+
def on_consume(batch)
17+
@interceptors.each do |interceptor|
18+
begin
19+
batch = interceptor.on_consume(batch)
20+
rescue Exception => e
21+
@logger.warn "Error executing interceptor on_consume for topic: #{batch.topic} partition: #{batch.partition} #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
22+
end
23+
end
24+
25+
batch
26+
end
27+
end
28+
end

lib/kafka/interceptors.rb

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
# Holds a list of interceptors
5+
# and wraps calls to a chain of custom interceptors.
6+
class Interceptors
7+
def initialize(interceptors:, logger:)
8+
@interceptors = interceptors || []
9+
@logger = TaggedLogger.new(logger)
10+
end
11+
12+
# Closes every interceptor.
13+
# @return [nil]
14+
def close
15+
@interceptors.each do |interceptor|
16+
begin
17+
interceptor.close
18+
rescue Exception => e
19+
@logger.warn "Failed to close #{self.class} #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
20+
end
21+
end
22+
end
23+
end
24+
end

lib/kafka/producer.rb

+7-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "kafka/pending_message_queue"
88
require "kafka/pending_message"
99
require "kafka/compressor"
10+
require "kafka/producer_interceptors"
1011

1112
module Kafka
1213
# Allows sending messages to a Kafka cluster.
@@ -129,7 +130,8 @@ module Kafka
129130
class Producer
130131
class AbortTransaction < StandardError; end
131132

132-
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
133+
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
134+
required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, interceptors: [])
133135
@cluster = cluster
134136
@transaction_manager = transaction_manager
135137
@logger = TaggedLogger.new(logger)
@@ -141,6 +143,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
141143
@max_buffer_size = max_buffer_size
142144
@max_buffer_bytesize = max_buffer_bytesize
143145
@compressor = compressor
146+
@interceptors = ProducerInterceptors.new(interceptors: interceptors, logger: logger)
144147

145148
# The set of topics that are produced to.
146149
@target_topics = Set.new
@@ -191,15 +194,15 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key:
191194
# We want to fail fast if `topic` isn't a String
192195
topic = topic.to_str
193196

194-
message = PendingMessage.new(
197+
message = @interceptors.on_send(PendingMessage.new(
195198
value: value && value.to_s,
196199
key: key && key.to_s,
197200
headers: headers,
198201
topic: topic,
199202
partition: partition && Integer(partition),
200203
partition_key: partition_key && partition_key.to_s,
201204
create_time: create_time
202-
)
205+
))
203206

204207
if buffer_size >= @max_buffer_size
205208
buffer_overflow topic,
@@ -283,6 +286,7 @@ def clear_buffer
283286
#
284287
# @return [nil]
285288
def shutdown
289+
@interceptors.close
286290
@transaction_manager.close
287291
@cluster.disconnect
288292
end

lib/kafka/producer_interceptor.rb

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
# Allows the client to intercept the messages before they get sent to the Kafka brokers.
5+
class ProducerInterceptor
6+
# This method is called by Kafka::Producer.produce
7+
# before the message gets queued to be sent to the Kafka brokers.
8+
#
9+
# @param value [Kafka::PendingMessage] the message.
10+
11+
# @return value [Kafka::PendingMessage] the intercepted message.
12+
def on_send(pending_message)
13+
raise NotImplementedError, "Implement this method in a child class"
14+
end
15+
16+
# This is called when the interceptor is closed.
17+
# @return [nil]
18+
def close
19+
raise NotImplementedError, "Implement this method in a child class"
20+
end
21+
end
22+
end

lib/kafka/producer_interceptors.rb

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
require "kafka/interceptors"
4+
5+
module Kafka
6+
# Holds a list of Kafka::ProducerInterceptor
7+
# and wraps calls to a chain of custom producer interceptors.
8+
class ProducerInterceptors < Kafka::Interceptors
9+
# This method is called when the client produces a message, before it gets queued to be sent to the Kafka brokers.
10+
# The message returned from the first ProducerInterceptor.on_send is passed to the second interceptor on_send, and so on in an
11+
# interceptor chain. This method does not throw exceptions.
12+
#
13+
# @param pending_message [Kafka::PendingMessage] the produced message.
14+
#
15+
# @return [Kafka::PendingMessage] the intercepted message returned by the last interceptor.
16+
def on_send(pending_message)
17+
@interceptors.each do |interceptor|
18+
begin
19+
pending_message = interceptor.on_send(pending_message)
20+
rescue Exception => e
21+
@logger.warn "Error executing interceptor on_send for topic: #{pending_message.topic} partition: #{pending_message.partition} #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
22+
end
23+
end
24+
25+
pending_message
26+
end
27+
end
28+
end

spec/consumer_spec.rb

+49
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# frozen_string_literal: true
22

33
require "timecop"
4+
require "fake_consumer_interceptor"
5+
require "kafka/consumer_interceptors"
46

57
describe Kafka::Consumer do
68
let(:cluster) { double(:cluster) }
@@ -524,4 +526,51 @@
524526

525527
it { expect(method_original_name).to eq(:trigger_heartbeat!) }
526528
end
529+
530+
describe '#interceptor' do
531+
it "creates and stops a consumer with interceptor" do
532+
interceptor = FakeConsumerInterceptor.new
533+
consumer = Kafka::Consumer.new(
534+
cluster: cluster,
535+
logger: logger,
536+
instrumenter: instrumenter,
537+
group: group,
538+
offset_manager: offset_manager,
539+
fetcher: fetcher,
540+
session_timeout: session_timeout,
541+
heartbeat: heartbeat,
542+
interceptors: [interceptor]
543+
)
544+
545+
consumer.stop
546+
expect(interceptor.close_count).to eq 1
547+
end
548+
549+
it "chains on_consume" do
550+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2')
551+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
552+
interceptors = Kafka::ConsumerInterceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
553+
intercepted_batch = interceptors.on_consume(fetched_batches[0])
554+
555+
expect(intercepted_batch.messages[0].value).to eq "hellohello2hello3"
556+
end
557+
558+
it "does not break the on_consume chain" do
559+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_consume_error: true)
560+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
561+
interceptors = Kafka::ConsumerInterceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
562+
intercepted_batch = interceptors.on_consume(fetched_batches[0])
563+
564+
expect(intercepted_batch.messages[0].value).to eq "hellohello3"
565+
end
566+
567+
it "returns original batch when all interceptors fail" do
568+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_consume_error: true)
569+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3', on_consume_error: true)
570+
interceptors = Kafka::ConsumerInterceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
571+
intercepted_batch = interceptors.on_consume(fetched_batches[0])
572+
573+
expect(intercepted_batch.messages[0].value).to eq "hello"
574+
end
575+
end
527576
end

spec/fake_consumer_interceptor.rb

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# frozen_string_literal: true
2+
3+
require "kafka/consumer_interceptor"
4+
5+
class FakeConsumerInterceptor < Kafka::ConsumerInterceptor
6+
def initialize(append_s: '', on_consume_error: false)
7+
@close_count = 0
8+
@append_s = append_s
9+
@on_consume_error = on_consume_error
10+
end
11+
12+
def on_consume(batch)
13+
if @on_consume_error
14+
raise StandardError, "Something went wrong in the consumer interceptor"
15+
end
16+
intercepted_messages = batch.messages.collect do |message|
17+
Kafka::FetchedMessage.new(
18+
message: Kafka::Protocol::Message.new(
19+
value: message.value + @append_s,
20+
key: message.key,
21+
create_time: message.create_time,
22+
offset: message.offset
23+
),
24+
topic: message.topic,
25+
partition: message.partition
26+
)
27+
end
28+
Kafka::FetchedBatch.new(
29+
topic: batch.topic,
30+
partition: batch.partition,
31+
highwater_mark_offset: batch.highwater_mark_offset,
32+
messages: intercepted_messages,
33+
last_offset: batch.last_offset,
34+
leader_epoch: batch.leader_epoch
35+
)
36+
end
37+
38+
def close
39+
@close_count += 1
40+
end
41+
42+
def close_count
43+
@close_count
44+
end
45+
end

0 commit comments

Comments
 (0)