Skip to content

Commit 3f30f58

Browse files
committed
producer consumer interceptors
linter asyn producer interceptor nil consumer docs typo error details pending message revert lint empty interceptor linter reviews1 intercept batch test error reviews2 interceptors typos extra line rm nil check more reviews linter functional test revert dockercompose rm close revert docker compose hoost more tests rm close and base class
1 parent 8c106b1 commit 3f30f58

10 files changed

+293
-7
lines changed

lib/kafka/client.rb

+10-3
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit
248248
# be in a message set before it should be compressed. Note that message sets
249249
# are per-partition rather than per-topic or per-producer.
250250
#
251+
# @param interceptors [Array<Kafka::ProducerInterceptor>] a list of producer interceptors.
252+
#
251253
# @return [Kafka::Producer] the Kafka producer.
252254
def producer(
253255
compression_codec: nil,
@@ -261,7 +263,8 @@ def producer(
261263
idempotent: false,
262264
transactional: false,
263265
transactional_id: nil,
264-
transactional_timeout: 60
266+
transactional_timeout: 60,
267+
interceptors: []
265268
)
266269
cluster = initialize_cluster
267270
compressor = Compressor.new(
@@ -291,6 +294,7 @@ def producer(
291294
retry_backoff: retry_backoff,
292295
max_buffer_size: max_buffer_size,
293296
max_buffer_bytesize: max_buffer_bytesize,
297+
interceptors: interceptors
294298
)
295299
end
296300

@@ -343,6 +347,7 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
343347
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
344348
# If it is 0, the topic list won't be refreshed (default)
345349
# If it is n (n > 0), the topic list will be refreshed every n seconds
350+
# @param interceptors [Array<Kafka::ConsumerInterceptor>] a list of consumer interceptors.
346351
# @return [Consumer]
347352
def consumer(
348353
group_id:,
@@ -353,7 +358,8 @@ def consumer(
353358
heartbeat_interval: 10,
354359
offset_retention_time: nil,
355360
fetcher_max_queue_size: 100,
356-
refresh_topic_interval: 0
361+
refresh_topic_interval: 0,
362+
interceptors: []
357363
)
358364
cluster = initialize_cluster
359365

@@ -407,7 +413,8 @@ def consumer(
407413
fetcher: fetcher,
408414
session_timeout: session_timeout,
409415
heartbeat: heartbeat,
410-
refresh_topic_interval: refresh_topic_interval
416+
refresh_topic_interval: refresh_topic_interval,
417+
interceptors: interceptors
411418
)
412419
end
413420

lib/kafka/consumer.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "kafka/consumer_group"
4+
require "kafka/consumer_interceptors"
45
require "kafka/offset_manager"
56
require "kafka/fetcher"
67
require "kafka/pause"
@@ -44,7 +45,8 @@ module Kafka
4445
#
4546
class Consumer
4647

47-
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
48+
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:,
49+
session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: [])
4850
@cluster = cluster
4951
@logger = TaggedLogger.new(logger)
5052
@instrumenter = instrumenter
@@ -54,6 +56,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
5456
@fetcher = fetcher
5557
@heartbeat = heartbeat
5658
@refresh_topic_interval = refresh_topic_interval
59+
@interceptors = ConsumerInterceptors.new(interceptors: interceptors, logger: logger)
5760

5861
@pauses = Hash.new {|h, k|
5962
h[k] = Hash.new {|h2, k2|
@@ -220,6 +223,7 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
220223
batches = fetch_batches
221224

222225
batches.each do |batch|
226+
batch = @interceptors.on_consume(batch)
223227
batch.messages.each do |message|
224228
notification = {
225229
topic: message.topic,
@@ -311,6 +315,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
311315
unless batch.empty?
312316
raw_messages = batch.messages
313317
batch.messages = raw_messages.reject(&:is_control_record)
318+
batch = @interceptors.on_consume(batch)
314319

315320
notification = {
316321
topic: batch.topic,

lib/kafka/consumer_interceptors.rb

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
# Holds a list of consumer interceptors that implement on_consume
5+
# and wraps calls to a chain of custom consumer interceptors.
6+
class ConsumerInterceptors
7+
def initialize(interceptors:, logger:)
8+
@interceptors = interceptors || []
9+
@logger = TaggedLogger.new(logger)
10+
end
11+
12+
# This is called from Kafka::Consumer once the batches are fetched.
13+
# The batch returned from the first ConsumerInterceptor.on_consume is passed to the second interceptor on_consume,
14+
# and so on in an interceptor chain. This method does not throw exceptions.
15+
#
16+
# @param batch [Kafka::FetchedBatch] fetched batch.
17+
#
18+
# @return [Kafka::FetchedBatch] intercepted batch.
19+
def on_consume(batch)
20+
@interceptors.each do |interceptor|
21+
begin
22+
batch = interceptor.on_consume(batch)
23+
rescue Exception => e
24+
@logger.warn "Error executing interceptor on_consume for topic: #{batch.topic} partition: #{batch.partition} #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
25+
end
26+
end
27+
28+
batch
29+
end
30+
end
31+
end

lib/kafka/producer.rb

+6-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "kafka/pending_message_queue"
88
require "kafka/pending_message"
99
require "kafka/compressor"
10+
require "kafka/producer_interceptors"
1011

1112
module Kafka
1213
# Allows sending messages to a Kafka cluster.
@@ -129,7 +130,8 @@ module Kafka
129130
class Producer
130131
class AbortTransaction < StandardError; end
131132

132-
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
133+
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
134+
required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, interceptors: [])
133135
@cluster = cluster
134136
@transaction_manager = transaction_manager
135137
@logger = TaggedLogger.new(logger)
@@ -141,6 +143,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
141143
@max_buffer_size = max_buffer_size
142144
@max_buffer_bytesize = max_buffer_bytesize
143145
@compressor = compressor
146+
@interceptors = ProducerInterceptors.new(interceptors: interceptors, logger: logger)
144147

145148
# The set of topics that are produced to.
146149
@target_topics = Set.new
@@ -191,15 +194,15 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key:
191194
# We want to fail fast if `topic` isn't a String
192195
topic = topic.to_str
193196

194-
message = PendingMessage.new(
197+
message = @interceptors.on_send(PendingMessage.new(
195198
value: value && value.to_s,
196199
key: key && key.to_s,
197200
headers: headers,
198201
topic: topic,
199202
partition: partition && Integer(partition),
200203
partition_key: partition_key && partition_key.to_s,
201204
create_time: create_time
202-
)
205+
))
203206

204207
if buffer_size >= @max_buffer_size
205208
buffer_overflow topic,

lib/kafka/producer_interceptors.rb

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
# Holds a list of producer interceptors that implement on_send
5+
# and wraps calls to a chain of custom producer interceptors.
6+
class ProducerInterceptors
7+
def initialize(interceptors:, logger:)
8+
@interceptors = interceptors || []
9+
@logger = TaggedLogger.new(logger)
10+
end
11+
12+
# This method is called when the client produces a message, before it gets queued to be sent to the Kafka brokers.
13+
# The message returned from the first ProducerInterceptor.on_send is passed to the second interceptor on_send, and so on in an
14+
# interceptor chain. This method does not throw exceptions.
15+
#
16+
# @param pending_message [Kafka::PendingMessage] the produced message.
17+
#
18+
# @return [Kafka::PendingMessage] the intercepted message returned by the last interceptor.
19+
def on_send(pending_message)
20+
@interceptors.each do |interceptor|
21+
begin
22+
pending_message = interceptor.on_send(pending_message)
23+
rescue Exception => e
24+
@logger.warn "Error executing interceptor on_send for topic: #{pending_message.topic} partition: #{pending_message.partition} #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
25+
end
26+
end
27+
28+
pending_message
29+
end
30+
end
31+
end

spec/consumer_spec.rb

+48
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# frozen_string_literal: true
22

33
require "timecop"
4+
require "fake_consumer_interceptor"
5+
require "kafka/consumer_interceptors"
46

57
describe Kafka::Consumer do
68
let(:cluster) { double(:cluster) }
@@ -524,4 +526,50 @@
524526

525527
it { expect(method_original_name).to eq(:trigger_heartbeat!) }
526528
end
529+
530+
describe '#interceptor' do
531+
it "creates and stops a consumer with interceptor" do
532+
interceptor = FakeConsumerInterceptor.new
533+
consumer = Kafka::Consumer.new(
534+
cluster: cluster,
535+
logger: logger,
536+
instrumenter: instrumenter,
537+
group: group,
538+
offset_manager: offset_manager,
539+
fetcher: fetcher,
540+
session_timeout: session_timeout,
541+
heartbeat: heartbeat,
542+
interceptors: [interceptor]
543+
)
544+
545+
consumer.stop
546+
end
547+
548+
it "chains on_consume" do
549+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2')
550+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
551+
interceptors = Kafka::ConsumerInterceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
552+
intercepted_batch = interceptors.on_consume(fetched_batches[0])
553+
554+
expect(intercepted_batch.messages[0].value).to eq "hellohello2hello3"
555+
end
556+
557+
it "does not break the on_consume chain" do
558+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_consume_error: true)
559+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
560+
interceptors = Kafka::ConsumerInterceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
561+
intercepted_batch = interceptors.on_consume(fetched_batches[0])
562+
563+
expect(intercepted_batch.messages[0].value).to eq "hellohello3"
564+
end
565+
566+
it "returns original batch when all interceptors fail" do
567+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_consume_error: true)
568+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3', on_consume_error: true)
569+
interceptors = Kafka::ConsumerInterceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
570+
intercepted_batch = interceptors.on_consume(fetched_batches[0])
571+
572+
expect(intercepted_batch.messages[0].value).to eq "hello"
573+
end
574+
end
527575
end

spec/fake_consumer_interceptor.rb

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# frozen_string_literal: true
2+
3+
class FakeConsumerInterceptor
4+
def initialize(append_s: '', on_consume_error: false)
5+
@append_s = append_s
6+
@on_consume_error = on_consume_error
7+
end
8+
9+
def on_consume(batch)
10+
if @on_consume_error
11+
raise StandardError, "Something went wrong in the consumer interceptor"
12+
end
13+
intercepted_messages = batch.messages.collect do |message|
14+
Kafka::FetchedMessage.new(
15+
message: Kafka::Protocol::Message.new(
16+
value: message.value + @append_s,
17+
key: message.key,
18+
create_time: message.create_time,
19+
offset: message.offset
20+
),
21+
topic: message.topic,
22+
partition: message.partition
23+
)
24+
end
25+
Kafka::FetchedBatch.new(
26+
topic: batch.topic,
27+
partition: batch.partition,
28+
highwater_mark_offset: batch.highwater_mark_offset,
29+
messages: intercepted_messages,
30+
last_offset: batch.last_offset,
31+
leader_epoch: batch.leader_epoch
32+
)
33+
end
34+
end

spec/fake_producer_interceptor.rb

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# frozen_string_literal: true
2+
3+
class FakeProducerInterceptor
4+
def initialize(append_s: '', on_send_error: false)
5+
@append_s = append_s
6+
@on_send_error = on_send_error
7+
end
8+
9+
def on_send(pending_message)
10+
if @on_send_error
11+
raise StandardError, "Something went wrong in the producer interceptor"
12+
end
13+
Kafka::PendingMessage.new(
14+
value: pending_message.value + @append_s,
15+
key: pending_message.key,
16+
headers: pending_message.headers,
17+
topic: pending_message.topic,
18+
partition: pending_message.partition,
19+
partition_key: pending_message.partition_key,
20+
create_time: pending_message.create_time
21+
)
22+
end
23+
end

spec/functional/interceptors_spec.rb

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
require "fake_consumer_interceptor"
4+
require "fake_producer_interceptor"
5+
6+
describe "Interceptors", functional: true do
7+
let!(:topic) { create_random_topic(num_partitions: 3) }
8+
9+
example "intercept produced and consumed message" do
10+
producer_interceptor = FakeProducerInterceptor.new(append_s: 'producer')
11+
producer = kafka.producer(max_retries: 3, retry_backoff: 1, interceptors: [producer_interceptor])
12+
producer.produce("hello", topic: topic)
13+
producer.deliver_messages
14+
producer.shutdown
15+
16+
consumer_interceptor = FakeConsumerInterceptor.new(append_s: 'consumer')
17+
consumer = kafka.consumer(group_id: SecureRandom.uuid, fetcher_max_queue_size: 1, interceptors: [consumer_interceptor])
18+
consumer.subscribe(topic)
19+
consumer.each_message do |message|
20+
expect(message.value).to eq "helloproducerconsumer"
21+
break
22+
end
23+
consumer.stop
24+
end
25+
end

0 commit comments

Comments
 (0)