Skip to content

Commit 3232f8e

Browse files
committed
producer consumer interceptors
linter asyn producer interceptor nil consumer docs typo error details pending message revert lint empty interceptor linter reviews1 intercept batch test error reviews2 interceptors typos extra line rm nil check more reviews linter functional test revert dockercompose rm close revert docker compose hoost more tests rm close and base class single interceptors class
1 parent 8c106b1 commit 3232f8e

9 files changed

+266
-7
lines changed

lib/kafka/client.rb

+12-3
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit
248248
# be in a message set before it should be compressed. Note that message sets
249249
# are per-partition rather than per-topic or per-producer.
250250
#
251+
# @param interceptors [Array<Object>] a list of producer interceptors the implement
252+
# `call(Kafka::PendingMessage)`.
253+
#
251254
# @return [Kafka::Producer] the Kafka producer.
252255
def producer(
253256
compression_codec: nil,
@@ -261,7 +264,8 @@ def producer(
261264
idempotent: false,
262265
transactional: false,
263266
transactional_id: nil,
264-
transactional_timeout: 60
267+
transactional_timeout: 60,
268+
interceptors: []
265269
)
266270
cluster = initialize_cluster
267271
compressor = Compressor.new(
@@ -291,6 +295,7 @@ def producer(
291295
retry_backoff: retry_backoff,
292296
max_buffer_size: max_buffer_size,
293297
max_buffer_bytesize: max_buffer_bytesize,
298+
interceptors: interceptors
294299
)
295300
end
296301

@@ -343,6 +348,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
343348
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
344349
# If it is 0, the topic list won't be refreshed (default)
345350
# If it is n (n > 0), the topic list will be refreshed every n seconds
351+
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
352+
# `call(Kafka::FetchedBatch)`.
346353
# @return [Consumer]
347354
def consumer(
348355
group_id:,
@@ -353,7 +360,8 @@ def consumer(
353360
heartbeat_interval: 10,
354361
offset_retention_time: nil,
355362
fetcher_max_queue_size: 100,
356-
refresh_topic_interval: 0
363+
refresh_topic_interval: 0,
364+
interceptors: []
357365
)
358366
cluster = initialize_cluster
359367

@@ -407,7 +415,8 @@ def consumer(
407415
fetcher: fetcher,
408416
session_timeout: session_timeout,
409417
heartbeat: heartbeat,
410-
refresh_topic_interval: refresh_topic_interval
418+
refresh_topic_interval: refresh_topic_interval,
419+
interceptors: interceptors
411420
)
412421
end
413422

lib/kafka/consumer.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "kafka/consumer_group"
4+
require "kafka/interceptors"
45
require "kafka/offset_manager"
56
require "kafka/fetcher"
67
require "kafka/pause"
@@ -44,7 +45,8 @@ module Kafka
4445
#
4546
class Consumer
4647

47-
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
48+
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:,
49+
session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: [])
4850
@cluster = cluster
4951
@logger = TaggedLogger.new(logger)
5052
@instrumenter = instrumenter
@@ -54,6 +56,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
5456
@fetcher = fetcher
5557
@heartbeat = heartbeat
5658
@refresh_topic_interval = refresh_topic_interval
59+
@interceptors = Interceptors.new(interceptors: interceptors, logger: logger)
5760

5861
@pauses = Hash.new {|h, k|
5962
h[k] = Hash.new {|h2, k2|
@@ -220,6 +223,7 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
220223
batches = fetch_batches
221224

222225
batches.each do |batch|
226+
batch = @interceptors.call(batch)
223227
batch.messages.each do |message|
224228
notification = {
225229
topic: message.topic,
@@ -311,6 +315,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
311315
unless batch.empty?
312316
raw_messages = batch.messages
313317
batch.messages = raw_messages.reject(&:is_control_record)
318+
batch = @interceptors.call(batch)
314319

315320
notification = {
316321
topic: batch.topic,

lib/kafka/interceptors.rb

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
# Holds a list of interceptors that implement `call`
5+
# and wraps calls to a chain of custom interceptors.
6+
class Interceptors
7+
def initialize(interceptors:, logger:)
8+
@interceptors = interceptors || []
9+
@logger = TaggedLogger.new(logger)
10+
end
11+
12+
# This method is called when the client produces a message or once the batches are fetched.
13+
# The message returned from the first call is passed to the second interceptor call, and so on in an
14+
# interceptor chain. This method does not throw exceptions.
15+
#
16+
# @param intercepted [Kafka::PendingMessage || Kafka::FetchedBatch] the produced message or
17+
# fetched batch.
18+
#
19+
# @return [Kafka::PendingMessage || Kafka::FetchedBatch] the intercepted message or batch
20+
# returned by the last interceptor.
21+
def call(intercepted)
22+
@interceptors.each do |interceptor|
23+
begin
24+
intercepted = interceptor.call(intercepted)
25+
rescue Exception => e
26+
@logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.message}\n#{e.backtrace.join("\n")}"
27+
end
28+
end
29+
30+
intercepted
31+
end
32+
end
33+
end

lib/kafka/producer.rb

+6-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "kafka/pending_message_queue"
88
require "kafka/pending_message"
99
require "kafka/compressor"
10+
require "kafka/interceptors"
1011

1112
module Kafka
1213
# Allows sending messages to a Kafka cluster.
@@ -129,7 +130,8 @@ module Kafka
129130
class Producer
130131
class AbortTransaction < StandardError; end
131132

132-
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
133+
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
134+
required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, interceptors: [])
133135
@cluster = cluster
134136
@transaction_manager = transaction_manager
135137
@logger = TaggedLogger.new(logger)
@@ -141,6 +143,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
141143
@max_buffer_size = max_buffer_size
142144
@max_buffer_bytesize = max_buffer_bytesize
143145
@compressor = compressor
146+
@interceptors = Interceptors.new(interceptors: interceptors, logger: logger)
144147

145148
# The set of topics that are produced to.
146149
@target_topics = Set.new
@@ -191,15 +194,15 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key:
191194
# We want to fail fast if `topic` isn't a String
192195
topic = topic.to_str
193196

194-
message = PendingMessage.new(
197+
message = @interceptors.call(PendingMessage.new(
195198
value: value && value.to_s,
196199
key: key && key.to_s,
197200
headers: headers,
198201
topic: topic,
199202
partition: partition && Integer(partition),
200203
partition_key: partition_key && partition_key.to_s,
201204
create_time: create_time
202-
)
205+
))
203206

204207
if buffer_size >= @max_buffer_size
205208
buffer_overflow topic,

spec/consumer_spec.rb

+48
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# frozen_string_literal: true
22

33
require "timecop"
4+
require "fake_consumer_interceptor"
5+
require "kafka/interceptors"
46

57
describe Kafka::Consumer do
68
let(:cluster) { double(:cluster) }
@@ -524,4 +526,50 @@
524526

525527
it { expect(method_original_name).to eq(:trigger_heartbeat!) }
526528
end
529+
530+
describe '#interceptor' do
531+
it "creates and stops a consumer with interceptor" do
532+
interceptor = FakeConsumerInterceptor.new
533+
consumer = Kafka::Consumer.new(
534+
cluster: cluster,
535+
logger: logger,
536+
instrumenter: instrumenter,
537+
group: group,
538+
offset_manager: offset_manager,
539+
fetcher: fetcher,
540+
session_timeout: session_timeout,
541+
heartbeat: heartbeat,
542+
interceptors: [interceptor]
543+
)
544+
545+
consumer.stop
546+
end
547+
548+
it "chains on_consume" do
549+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2')
550+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
551+
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
552+
intercepted_batch = interceptors.call(fetched_batches[0])
553+
554+
expect(intercepted_batch.messages[0].value).to eq "hellohello2hello3"
555+
end
556+
557+
it "does not break the on_consume chain" do
558+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true)
559+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
560+
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
561+
intercepted_batch = interceptors.call(fetched_batches[0])
562+
563+
expect(intercepted_batch.messages[0].value).to eq "hellohello3"
564+
end
565+
566+
it "returns original batch when all interceptors fail" do
567+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true)
568+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3', on_call_error: true)
569+
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
570+
intercepted_batch = interceptors.call(fetched_batches[0])
571+
572+
expect(intercepted_batch.messages[0].value).to eq "hello"
573+
end
574+
end
527575
end

spec/fake_consumer_interceptor.rb

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# frozen_string_literal: true
2+
3+
class FakeConsumerInterceptor
4+
def initialize(append_s: '', on_call_error: false)
5+
@append_s = append_s
6+
@on_call_error = on_call_error
7+
end
8+
9+
def call(batch)
10+
if @on_call_error
11+
raise StandardError, "Something went wrong in the consumer interceptor"
12+
end
13+
intercepted_messages = batch.messages.collect do |message|
14+
Kafka::FetchedMessage.new(
15+
message: Kafka::Protocol::Message.new(
16+
value: message.value + @append_s,
17+
key: message.key,
18+
create_time: message.create_time,
19+
offset: message.offset
20+
),
21+
topic: message.topic,
22+
partition: message.partition
23+
)
24+
end
25+
Kafka::FetchedBatch.new(
26+
topic: batch.topic,
27+
partition: batch.partition,
28+
highwater_mark_offset: batch.highwater_mark_offset,
29+
messages: intercepted_messages,
30+
last_offset: batch.last_offset,
31+
leader_epoch: batch.leader_epoch
32+
)
33+
end
34+
end

spec/fake_producer_interceptor.rb

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# frozen_string_literal: true
2+
3+
class FakeProducerInterceptor
4+
def initialize(append_s: '', on_call_error: false)
5+
@append_s = append_s
6+
@on_call_error = on_call_error
7+
end
8+
9+
def call(pending_message)
10+
if @on_call_error
11+
raise StandardError, "Something went wrong in the producer interceptor"
12+
end
13+
Kafka::PendingMessage.new(
14+
value: pending_message.value + @append_s,
15+
key: pending_message.key,
16+
headers: pending_message.headers,
17+
topic: pending_message.topic,
18+
partition: pending_message.partition,
19+
partition_key: pending_message.partition_key,
20+
create_time: pending_message.create_time
21+
)
22+
end
23+
end

spec/functional/interceptors_spec.rb

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
require "fake_consumer_interceptor"
4+
require "fake_producer_interceptor"
5+
6+
describe "Interceptors", functional: true do
7+
let!(:topic) { create_random_topic(num_partitions: 3) }
8+
9+
example "intercept produced and consumed message" do
10+
producer_interceptor = FakeProducerInterceptor.new(append_s: 'producer')
11+
producer = kafka.producer(max_retries: 3, retry_backoff: 1, interceptors: [producer_interceptor])
12+
producer.produce("hello", topic: topic)
13+
producer.deliver_messages
14+
producer.shutdown
15+
16+
consumer_interceptor = FakeConsumerInterceptor.new(append_s: 'consumer')
17+
consumer = kafka.consumer(group_id: SecureRandom.uuid, fetcher_max_queue_size: 1, interceptors: [consumer_interceptor])
18+
consumer.subscribe(topic)
19+
consumer.each_message do |message|
20+
expect(message.value).to eq "helloproducerconsumer"
21+
break
22+
end
23+
consumer.stop
24+
end
25+
end

0 commit comments

Comments
 (0)