Skip to content

Commit 6ccbb0f

Browse files
authored
Merge pull request #837 from Shopify/producer-consumer-interceptors
add producer consumer interceptors
2 parents 07b6077 + efb67e3 commit 6ccbb0f

9 files changed

+266
-7
lines changed

lib/kafka/client.rb

+12-3
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit
248248
# be in a message set before it should be compressed. Note that message sets
249249
# are per-partition rather than per-topic or per-producer.
250250
#
251+
# @param interceptors [Array<Object>] a list of producer interceptors the implement
252+
# `call(Kafka::PendingMessage)`.
253+
#
251254
# @return [Kafka::Producer] the Kafka producer.
252255
def producer(
253256
compression_codec: nil,
@@ -261,7 +264,8 @@ def producer(
261264
idempotent: false,
262265
transactional: false,
263266
transactional_id: nil,
264-
transactional_timeout: 60
267+
transactional_timeout: 60,
268+
interceptors: []
265269
)
266270
cluster = initialize_cluster
267271
compressor = Compressor.new(
@@ -291,6 +295,7 @@ def producer(
291295
retry_backoff: retry_backoff,
292296
max_buffer_size: max_buffer_size,
293297
max_buffer_bytesize: max_buffer_bytesize,
298+
interceptors: interceptors
294299
)
295300
end
296301

@@ -343,6 +348,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
343348
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
344349
# If it is 0, the topic list won't be refreshed (default)
345350
# If it is n (n > 0), the topic list will be refreshed every n seconds
351+
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
352+
# `call(Kafka::FetchedBatch)`.
346353
# @return [Consumer]
347354
def consumer(
348355
group_id:,
@@ -353,7 +360,8 @@ def consumer(
353360
heartbeat_interval: 10,
354361
offset_retention_time: nil,
355362
fetcher_max_queue_size: 100,
356-
refresh_topic_interval: 0
363+
refresh_topic_interval: 0,
364+
interceptors: []
357365
)
358366
cluster = initialize_cluster
359367

@@ -407,7 +415,8 @@ def consumer(
407415
fetcher: fetcher,
408416
session_timeout: session_timeout,
409417
heartbeat: heartbeat,
410-
refresh_topic_interval: refresh_topic_interval
418+
refresh_topic_interval: refresh_topic_interval,
419+
interceptors: interceptors
411420
)
412421
end
413422

lib/kafka/consumer.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "kafka/consumer_group"
4+
require "kafka/interceptors"
45
require "kafka/offset_manager"
56
require "kafka/fetcher"
67
require "kafka/pause"
@@ -44,7 +45,8 @@ module Kafka
4445
#
4546
class Consumer
4647

47-
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
48+
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:,
49+
session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: [])
4850
@cluster = cluster
4951
@logger = TaggedLogger.new(logger)
5052
@instrumenter = instrumenter
@@ -54,6 +56,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
5456
@fetcher = fetcher
5557
@heartbeat = heartbeat
5658
@refresh_topic_interval = refresh_topic_interval
59+
@interceptors = Interceptors.new(interceptors: interceptors, logger: logger)
5760

5861
@pauses = Hash.new {|h, k|
5962
h[k] = Hash.new {|h2, k2|
@@ -220,6 +223,7 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
220223
batches = fetch_batches
221224

222225
batches.each do |batch|
226+
batch = @interceptors.call(batch)
223227
batch.messages.each do |message|
224228
notification = {
225229
topic: message.topic,
@@ -311,6 +315,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
311315
unless batch.empty?
312316
raw_messages = batch.messages
313317
batch.messages = raw_messages.reject(&:is_control_record)
318+
batch = @interceptors.call(batch)
314319

315320
notification = {
316321
topic: batch.topic,

lib/kafka/interceptors.rb

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
module Kafka
4+
# Holds a list of interceptors that implement `call`
5+
# and wraps calls to a chain of custom interceptors.
6+
class Interceptors
7+
def initialize(interceptors:, logger:)
8+
@interceptors = interceptors || []
9+
@logger = TaggedLogger.new(logger)
10+
end
11+
12+
# This method is called when the client produces a message or once the batches are fetched.
13+
# The message returned from the first call is passed to the second interceptor call, and so on in an
14+
# interceptor chain. This method does not throw exceptions.
15+
#
16+
# @param intercepted [Kafka::PendingMessage || Kafka::FetchedBatch] the produced message or
17+
# fetched batch.
18+
#
19+
# @return [Kafka::PendingMessage || Kafka::FetchedBatch] the intercepted message or batch
20+
# returned by the last interceptor.
21+
def call(intercepted)
22+
@interceptors.each do |interceptor|
23+
begin
24+
intercepted = interceptor.call(intercepted)
25+
rescue Exception => e
26+
@logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.message}\n#{e.backtrace.join("\n")}"
27+
end
28+
end
29+
30+
intercepted
31+
end
32+
end
33+
end

lib/kafka/producer.rb

+6-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "kafka/pending_message_queue"
88
require "kafka/pending_message"
99
require "kafka/compressor"
10+
require "kafka/interceptors"
1011

1112
module Kafka
1213
# Allows sending messages to a Kafka cluster.
@@ -129,7 +130,8 @@ module Kafka
129130
class Producer
130131
class AbortTransaction < StandardError; end
131132

132-
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
133+
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
134+
required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, interceptors: [])
133135
@cluster = cluster
134136
@transaction_manager = transaction_manager
135137
@logger = TaggedLogger.new(logger)
@@ -141,6 +143,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
141143
@max_buffer_size = max_buffer_size
142144
@max_buffer_bytesize = max_buffer_bytesize
143145
@compressor = compressor
146+
@interceptors = Interceptors.new(interceptors: interceptors, logger: logger)
144147

145148
# The set of topics that are produced to.
146149
@target_topics = Set.new
@@ -191,15 +194,15 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key:
191194
# We want to fail fast if `topic` isn't a String
192195
topic = topic.to_str
193196

194-
message = PendingMessage.new(
197+
message = @interceptors.call(PendingMessage.new(
195198
value: value && value.to_s,
196199
key: key && key.to_s,
197200
headers: headers,
198201
topic: topic,
199202
partition: partition && Integer(partition),
200203
partition_key: partition_key && partition_key.to_s,
201204
create_time: create_time
202-
)
205+
))
203206

204207
if buffer_size >= @max_buffer_size
205208
buffer_overflow topic,

spec/consumer_spec.rb

+48
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# frozen_string_literal: true
22

33
require "timecop"
4+
require "fake_consumer_interceptor"
5+
require "kafka/interceptors"
46

57
describe Kafka::Consumer do
68
let(:cluster) { double(:cluster) }
@@ -514,4 +516,50 @@
514516

515517
it { expect(method_original_name).to eq(:trigger_heartbeat!) }
516518
end
519+
520+
describe '#interceptor' do
521+
it "creates and stops a consumer with interceptor" do
522+
interceptor = FakeConsumerInterceptor.new
523+
consumer = Kafka::Consumer.new(
524+
cluster: cluster,
525+
logger: logger,
526+
instrumenter: instrumenter,
527+
group: group,
528+
offset_manager: offset_manager,
529+
fetcher: fetcher,
530+
session_timeout: session_timeout,
531+
heartbeat: heartbeat,
532+
interceptors: [interceptor]
533+
)
534+
535+
consumer.stop
536+
end
537+
538+
it "chains call" do
539+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2')
540+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
541+
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
542+
intercepted_batch = interceptors.call(fetched_batches[0])
543+
544+
expect(intercepted_batch.messages[0].value).to eq "hellohello2hello3"
545+
end
546+
547+
it "does not break the call chain" do
548+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true)
549+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
550+
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
551+
intercepted_batch = interceptors.call(fetched_batches[0])
552+
553+
expect(intercepted_batch.messages[0].value).to eq "hellohello3"
554+
end
555+
556+
it "returns original batch when all interceptors fail" do
557+
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true)
558+
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3', on_call_error: true)
559+
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
560+
intercepted_batch = interceptors.call(fetched_batches[0])
561+
562+
expect(intercepted_batch.messages[0].value).to eq "hello"
563+
end
564+
end
517565
end

spec/fake_consumer_interceptor.rb

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# frozen_string_literal: true
2+
3+
class FakeConsumerInterceptor
4+
def initialize(append_s: '', on_call_error: false)
5+
@append_s = append_s
6+
@on_call_error = on_call_error
7+
end
8+
9+
def call(batch)
10+
if @on_call_error
11+
raise StandardError, "Something went wrong in the consumer interceptor"
12+
end
13+
intercepted_messages = batch.messages.collect do |message|
14+
Kafka::FetchedMessage.new(
15+
message: Kafka::Protocol::Message.new(
16+
value: message.value + @append_s,
17+
key: message.key,
18+
create_time: message.create_time,
19+
offset: message.offset
20+
),
21+
topic: message.topic,
22+
partition: message.partition
23+
)
24+
end
25+
Kafka::FetchedBatch.new(
26+
topic: batch.topic,
27+
partition: batch.partition,
28+
highwater_mark_offset: batch.highwater_mark_offset,
29+
messages: intercepted_messages,
30+
last_offset: batch.last_offset,
31+
leader_epoch: batch.leader_epoch
32+
)
33+
end
34+
end

spec/fake_producer_interceptor.rb

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# frozen_string_literal: true
2+
3+
class FakeProducerInterceptor
4+
def initialize(append_s: '', on_call_error: false)
5+
@append_s = append_s
6+
@on_call_error = on_call_error
7+
end
8+
9+
def call(pending_message)
10+
if @on_call_error
11+
raise StandardError, "Something went wrong in the producer interceptor"
12+
end
13+
Kafka::PendingMessage.new(
14+
value: pending_message.value + @append_s,
15+
key: pending_message.key,
16+
headers: pending_message.headers,
17+
topic: pending_message.topic,
18+
partition: pending_message.partition,
19+
partition_key: pending_message.partition_key,
20+
create_time: pending_message.create_time
21+
)
22+
end
23+
end

spec/functional/interceptors_spec.rb

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
require "fake_consumer_interceptor"
4+
require "fake_producer_interceptor"
5+
6+
describe "Interceptors", functional: true do
7+
let!(:topic) { create_random_topic(num_partitions: 3) }
8+
9+
example "intercept produced and consumed message" do
10+
producer_interceptor = FakeProducerInterceptor.new(append_s: 'producer')
11+
producer = kafka.producer(max_retries: 3, retry_backoff: 1, interceptors: [producer_interceptor])
12+
producer.produce("hello", topic: topic)
13+
producer.deliver_messages
14+
producer.shutdown
15+
16+
consumer_interceptor = FakeConsumerInterceptor.new(append_s: 'consumer')
17+
consumer = kafka.consumer(group_id: SecureRandom.uuid, fetcher_max_queue_size: 1, interceptors: [consumer_interceptor])
18+
consumer.subscribe(topic)
19+
consumer.each_message do |message|
20+
expect(message.value).to eq "helloproducerconsumer"
21+
break
22+
end
23+
consumer.stop
24+
end
25+
end

0 commit comments

Comments
 (0)