Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add producer consumer interceptors #837

Merged
merged 1 commit into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit
# be in a message set before it should be compressed. Note that message sets
# are per-partition rather than per-topic or per-producer.
#
# @param interceptors [Array<Object>] a list of producer interceptors the implement
# `call(Kafka::PendingMessage)`.
#
# @return [Kafka::Producer] the Kafka producer.
def producer(
compression_codec: nil,
Expand All @@ -261,7 +264,8 @@ def producer(
idempotent: false,
transactional: false,
transactional_id: nil,
transactional_timeout: 60
transactional_timeout: 60,
interceptors: []
)
cluster = initialize_cluster
compressor = Compressor.new(
Expand Down Expand Up @@ -291,6 +295,7 @@ def producer(
retry_backoff: retry_backoff,
max_buffer_size: max_buffer_size,
max_buffer_bytesize: max_buffer_bytesize,
interceptors: interceptors
)
end

Expand Down Expand Up @@ -343,6 +348,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
# If it is 0, the topic list won't be refreshed (default)
# If it is n (n > 0), the topic list will be refreshed every n seconds
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
# `call(Kafka::FetchedBatch)`.
# @return [Consumer]
def consumer(
group_id:,
Expand All @@ -353,7 +360,8 @@ def consumer(
heartbeat_interval: 10,
offset_retention_time: nil,
fetcher_max_queue_size: 100,
refresh_topic_interval: 0
refresh_topic_interval: 0,
interceptors: []
)
cluster = initialize_cluster

Expand Down Expand Up @@ -407,7 +415,8 @@ def consumer(
fetcher: fetcher,
session_timeout: session_timeout,
heartbeat: heartbeat,
refresh_topic_interval: refresh_topic_interval
refresh_topic_interval: refresh_topic_interval,
interceptors: interceptors
)
end

Expand Down
7 changes: 6 additions & 1 deletion lib/kafka/consumer.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "kafka/consumer_group"
require "kafka/interceptors"
require "kafka/offset_manager"
require "kafka/fetcher"
require "kafka/pause"
Expand Down Expand Up @@ -44,7 +45,8 @@ module Kafka
#
class Consumer

def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:,
session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: [])
@cluster = cluster
@logger = TaggedLogger.new(logger)
@instrumenter = instrumenter
Expand All @@ -54,6 +56,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
@fetcher = fetcher
@heartbeat = heartbeat
@refresh_topic_interval = refresh_topic_interval
@interceptors = Interceptors.new(interceptors: interceptors, logger: logger)

@pauses = Hash.new {|h, k|
h[k] = Hash.new {|h2, k2|
Expand Down Expand Up @@ -220,6 +223,7 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
batches = fetch_batches

batches.each do |batch|
batch = @interceptors.call(batch)
batch.messages.each do |message|
notification = {
topic: message.topic,
Expand Down Expand Up @@ -311,6 +315,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
unless batch.empty?
raw_messages = batch.messages
batch.messages = raw_messages.reject(&:is_control_record)
batch = @interceptors.call(batch)

notification = {
topic: batch.topic,
Expand Down
33 changes: 33 additions & 0 deletions lib/kafka/interceptors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module Kafka
# Holds a list of interceptors that implement `call`
# and wraps calls to a chain of custom interceptors.
class Interceptors
def initialize(interceptors:, logger:)
@interceptors = interceptors || []
@logger = TaggedLogger.new(logger)
end

# This method is called when the client produces a message or once the batches are fetched.
# The message returned from the first call is passed to the second interceptor call, and so on in an
# interceptor chain. This method does not throw exceptions.
#
# @param intercepted [Kafka::PendingMessage || Kafka::FetchedBatch] the produced message or
# fetched batch.
#
# @return [Kafka::PendingMessage || Kafka::FetchedBatch] the intercepted message or batch
# returned by the last interceptor.
def call(intercepted)
@interceptors.each do |interceptor|
begin
intercepted = interceptor.call(intercepted)
rescue Exception => e
@logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.message}\n#{e.backtrace.join("\n")}"
end
end

intercepted
end
end
end
9 changes: 6 additions & 3 deletions lib/kafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "kafka/pending_message_queue"
require "kafka/pending_message"
require "kafka/compressor"
require "kafka/interceptors"

module Kafka
# Allows sending messages to a Kafka cluster.
Expand Down Expand Up @@ -129,7 +130,8 @@ module Kafka
class Producer
class AbortTransaction < StandardError; end

def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:,
required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, interceptors: [])
@cluster = cluster
@transaction_manager = transaction_manager
@logger = TaggedLogger.new(logger)
Expand All @@ -141,6 +143,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
@compressor = compressor
@interceptors = Interceptors.new(interceptors: interceptors, logger: logger)

# The set of topics that are produced to.
@target_topics = Set.new
Expand Down Expand Up @@ -191,15 +194,15 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key:
# We want to fail fast if `topic` isn't a String
topic = topic.to_str

message = PendingMessage.new(
message = @interceptors.call(PendingMessage.new(
value: value && value.to_s,
key: key && key.to_s,
headers: headers,
topic: topic,
partition: partition && Integer(partition),
partition_key: partition_key && partition_key.to_s,
create_time: create_time
)
))

if buffer_size >= @max_buffer_size
buffer_overflow topic,
Expand Down
48 changes: 48 additions & 0 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

require "timecop"
require "fake_consumer_interceptor"
require "kafka/interceptors"

describe Kafka::Consumer do
let(:cluster) { double(:cluster) }
Expand Down Expand Up @@ -524,4 +526,50 @@

it { expect(method_original_name).to eq(:trigger_heartbeat!) }
end

describe '#interceptor' do
it "creates and stops a consumer with interceptor" do
interceptor = FakeConsumerInterceptor.new
consumer = Kafka::Consumer.new(
cluster: cluster,
logger: logger,
instrumenter: instrumenter,
group: group,
offset_manager: offset_manager,
fetcher: fetcher,
session_timeout: session_timeout,
heartbeat: heartbeat,
interceptors: [interceptor]
)

consumer.stop
end

it "chains call" do
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2')
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
intercepted_batch = interceptors.call(fetched_batches[0])

expect(intercepted_batch.messages[0].value).to eq "hellohello2hello3"
end

it "does not break the call chain" do
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true)
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3')
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
intercepted_batch = interceptors.call(fetched_batches[0])

expect(intercepted_batch.messages[0].value).to eq "hellohello3"
end

it "returns original batch when all interceptors fail" do
interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true)
interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3', on_call_error: true)
interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger)
intercepted_batch = interceptors.call(fetched_batches[0])

expect(intercepted_batch.messages[0].value).to eq "hello"
end
end
end
34 changes: 34 additions & 0 deletions spec/fake_consumer_interceptor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

class FakeConsumerInterceptor
def initialize(append_s: '', on_call_error: false)
@append_s = append_s
@on_call_error = on_call_error
end

def call(batch)
if @on_call_error
raise StandardError, "Something went wrong in the consumer interceptor"
end
intercepted_messages = batch.messages.collect do |message|
Kafka::FetchedMessage.new(
message: Kafka::Protocol::Message.new(
value: message.value + @append_s,
key: message.key,
create_time: message.create_time,
offset: message.offset
),
topic: message.topic,
partition: message.partition
)
end
Kafka::FetchedBatch.new(
topic: batch.topic,
partition: batch.partition,
highwater_mark_offset: batch.highwater_mark_offset,
messages: intercepted_messages,
last_offset: batch.last_offset,
leader_epoch: batch.leader_epoch
)
end
end
23 changes: 23 additions & 0 deletions spec/fake_producer_interceptor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

class FakeProducerInterceptor
def initialize(append_s: '', on_call_error: false)
@append_s = append_s
@on_call_error = on_call_error
end

def call(pending_message)
if @on_call_error
raise StandardError, "Something went wrong in the producer interceptor"
end
Kafka::PendingMessage.new(
value: pending_message.value + @append_s,
key: pending_message.key,
headers: pending_message.headers,
topic: pending_message.topic,
partition: pending_message.partition,
partition_key: pending_message.partition_key,
create_time: pending_message.create_time
)
end
end
25 changes: 25 additions & 0 deletions spec/functional/interceptors_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

require "fake_consumer_interceptor"
require "fake_producer_interceptor"

describe "Interceptors", functional: true do
let!(:topic) { create_random_topic(num_partitions: 3) }

example "intercept produced and consumed message" do
producer_interceptor = FakeProducerInterceptor.new(append_s: 'producer')
producer = kafka.producer(max_retries: 3, retry_backoff: 1, interceptors: [producer_interceptor])
producer.produce("hello", topic: topic)
producer.deliver_messages
producer.shutdown

consumer_interceptor = FakeConsumerInterceptor.new(append_s: 'consumer')
consumer = kafka.consumer(group_id: SecureRandom.uuid, fetcher_max_queue_size: 1, interceptors: [consumer_interceptor])
consumer.subscribe(topic)
consumer.each_message do |message|
expect(message.value).to eq "helloproducerconsumer"
break
end
consumer.stop
end
end
Loading