diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 22208d70f..9ee2feb2d 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -248,6 +248,9 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit # be in a message set before it should be compressed. Note that message sets # are per-partition rather than per-topic or per-producer. # + # @param interceptors [Array] a list of producer interceptors the implement + # `call(Kafka::PendingMessage)`. + # # @return [Kafka::Producer] the Kafka producer. def producer( compression_codec: nil, @@ -261,7 +264,8 @@ def producer( idempotent: false, transactional: false, transactional_id: nil, - transactional_timeout: 60 + transactional_timeout: 60, + interceptors: [] ) cluster = initialize_cluster compressor = Compressor.new( @@ -291,6 +295,7 @@ def producer( retry_backoff: retry_backoff, max_buffer_size: max_buffer_size, max_buffer_bytesize: max_buffer_bytesize, + interceptors: interceptors ) end @@ -343,6 +348,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: # @param refresh_topic_interval [Integer] interval of refreshing the topic list. # If it is 0, the topic list won't be refreshed (default) # If it is n (n > 0), the topic list will be refreshed every n seconds + # @param interceptors [Array] a list of consumer interceptors that implement + # `call(Kafka::FetchedBatch)`. # @return [Consumer] def consumer( group_id:, @@ -353,7 +360,8 @@ def consumer( heartbeat_interval: 10, offset_retention_time: nil, fetcher_max_queue_size: 100, - refresh_topic_interval: 0 + refresh_topic_interval: 0, + interceptors: [] ) cluster = initialize_cluster @@ -407,7 +415,8 @@ def consumer( fetcher: fetcher, session_timeout: session_timeout, heartbeat: heartbeat, - refresh_topic_interval: refresh_topic_interval + refresh_topic_interval: refresh_topic_interval, + interceptors: interceptors ) end diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index 6768089a0..d3d41526c 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "kafka/consumer_group" +require "kafka/interceptors" require "kafka/offset_manager" require "kafka/fetcher" require "kafka/pause" @@ -44,7 +45,8 @@ module Kafka # class Consumer - def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0) + def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, + session_timeout:, heartbeat:, refresh_topic_interval: 0, interceptors: []) @cluster = cluster @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @@ -54,6 +56,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage @fetcher = fetcher @heartbeat = heartbeat @refresh_topic_interval = refresh_topic_interval + @interceptors = Interceptors.new(interceptors: interceptors, logger: logger) @pauses = Hash.new {|h, k| h[k] = Hash.new {|h2, k2| @@ -220,6 +223,7 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica batches = fetch_batches batches.each do |batch| + batch = @interceptors.call(batch) batch.messages.each do |message| notification = { topic: message.topic, @@ -311,6 +315,7 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall unless batch.empty? raw_messages = batch.messages batch.messages = raw_messages.reject(&:is_control_record) + batch = @interceptors.call(batch) notification = { topic: batch.topic, diff --git a/lib/kafka/interceptors.rb b/lib/kafka/interceptors.rb new file mode 100644 index 000000000..1ae0bd7c1 --- /dev/null +++ b/lib/kafka/interceptors.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +module Kafka + # Holds a list of interceptors that implement `call` + # and wraps calls to a chain of custom interceptors. + class Interceptors + def initialize(interceptors:, logger:) + @interceptors = interceptors || [] + @logger = TaggedLogger.new(logger) + end + + # This method is called when the client produces a message or once the batches are fetched. + # The message returned from the first call is passed to the second interceptor call, and so on in an + # interceptor chain. This method does not throw exceptions. + # + # @param intercepted [Kafka::PendingMessage || Kafka::FetchedBatch] the produced message or + # fetched batch. + # + # @return [Kafka::PendingMessage || Kafka::FetchedBatch] the intercepted message or batch + # returned by the last interceptor. + def call(intercepted) + @interceptors.each do |interceptor| + begin + intercepted = interceptor.call(intercepted) + rescue Exception => e + @logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.message}\n#{e.backtrace.join("\n")}" + end + end + + intercepted + end + end +end diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index 709453487..fe909f7e2 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -7,6 +7,7 @@ require "kafka/pending_message_queue" require "kafka/pending_message" require "kafka/compressor" +require "kafka/interceptors" module Kafka # Allows sending messages to a Kafka cluster. @@ -129,7 +130,8 @@ module Kafka class Producer class AbortTransaction < StandardError; end - def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) + def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, + required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, interceptors: []) @cluster = cluster @transaction_manager = transaction_manager @logger = TaggedLogger.new(logger) @@ -141,6 +143,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor + @interceptors = Interceptors.new(interceptors: interceptors, logger: logger) # The set of topics that are produced to. @target_topics = Set.new @@ -191,7 +194,7 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: # We want to fail fast if `topic` isn't a String topic = topic.to_str - message = PendingMessage.new( + message = @interceptors.call(PendingMessage.new( value: value && value.to_s, key: key && key.to_s, headers: headers, @@ -199,7 +202,7 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: partition: partition && Integer(partition), partition_key: partition_key && partition_key.to_s, create_time: create_time - ) + )) if buffer_size >= @max_buffer_size buffer_overflow topic, diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index 6dc780239..436ed7bb3 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true require "timecop" +require "fake_consumer_interceptor" +require "kafka/interceptors" describe Kafka::Consumer do let(:cluster) { double(:cluster) } @@ -524,4 +526,50 @@ it { expect(method_original_name).to eq(:trigger_heartbeat!) } end + + describe '#interceptor' do + it "creates and stops a consumer with interceptor" do + interceptor = FakeConsumerInterceptor.new + consumer = Kafka::Consumer.new( + cluster: cluster, + logger: logger, + instrumenter: instrumenter, + group: group, + offset_manager: offset_manager, + fetcher: fetcher, + session_timeout: session_timeout, + heartbeat: heartbeat, + interceptors: [interceptor] + ) + + consumer.stop + end + + it "chains call" do + interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2') + interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3') + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) + intercepted_batch = interceptors.call(fetched_batches[0]) + + expect(intercepted_batch.messages[0].value).to eq "hellohello2hello3" + end + + it "does not break the call chain" do + interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true) + interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3') + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) + intercepted_batch = interceptors.call(fetched_batches[0]) + + expect(intercepted_batch.messages[0].value).to eq "hellohello3" + end + + it "returns original batch when all interceptors fail" do + interceptor1 = FakeConsumerInterceptor.new(append_s: 'hello2', on_call_error: true) + interceptor2 = FakeConsumerInterceptor.new(append_s: 'hello3', on_call_error: true) + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) + intercepted_batch = interceptors.call(fetched_batches[0]) + + expect(intercepted_batch.messages[0].value).to eq "hello" + end + end end diff --git a/spec/fake_consumer_interceptor.rb b/spec/fake_consumer_interceptor.rb new file mode 100644 index 000000000..85bfa9f30 --- /dev/null +++ b/spec/fake_consumer_interceptor.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +class FakeConsumerInterceptor + def initialize(append_s: '', on_call_error: false) + @append_s = append_s + @on_call_error = on_call_error + end + + def call(batch) + if @on_call_error + raise StandardError, "Something went wrong in the consumer interceptor" + end + intercepted_messages = batch.messages.collect do |message| + Kafka::FetchedMessage.new( + message: Kafka::Protocol::Message.new( + value: message.value + @append_s, + key: message.key, + create_time: message.create_time, + offset: message.offset + ), + topic: message.topic, + partition: message.partition + ) + end + Kafka::FetchedBatch.new( + topic: batch.topic, + partition: batch.partition, + highwater_mark_offset: batch.highwater_mark_offset, + messages: intercepted_messages, + last_offset: batch.last_offset, + leader_epoch: batch.leader_epoch + ) + end +end diff --git a/spec/fake_producer_interceptor.rb b/spec/fake_producer_interceptor.rb new file mode 100644 index 000000000..1bebfd099 --- /dev/null +++ b/spec/fake_producer_interceptor.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +class FakeProducerInterceptor + def initialize(append_s: '', on_call_error: false) + @append_s = append_s + @on_call_error = on_call_error + end + + def call(pending_message) + if @on_call_error + raise StandardError, "Something went wrong in the producer interceptor" + end + Kafka::PendingMessage.new( + value: pending_message.value + @append_s, + key: pending_message.key, + headers: pending_message.headers, + topic: pending_message.topic, + partition: pending_message.partition, + partition_key: pending_message.partition_key, + create_time: pending_message.create_time + ) + end +end diff --git a/spec/functional/interceptors_spec.rb b/spec/functional/interceptors_spec.rb new file mode 100644 index 000000000..6787b3851 --- /dev/null +++ b/spec/functional/interceptors_spec.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +require "fake_consumer_interceptor" +require "fake_producer_interceptor" + +describe "Interceptors", functional: true do + let!(:topic) { create_random_topic(num_partitions: 3) } + + example "intercept produced and consumed message" do + producer_interceptor = FakeProducerInterceptor.new(append_s: 'producer') + producer = kafka.producer(max_retries: 3, retry_backoff: 1, interceptors: [producer_interceptor]) + producer.produce("hello", topic: topic) + producer.deliver_messages + producer.shutdown + + consumer_interceptor = FakeConsumerInterceptor.new(append_s: 'consumer') + consumer = kafka.consumer(group_id: SecureRandom.uuid, fetcher_max_queue_size: 1, interceptors: [consumer_interceptor]) + consumer.subscribe(topic) + consumer.each_message do |message| + expect(message.value).to eq "helloproducerconsumer" + break + end + consumer.stop + end +end diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index 16af7692d..0a48605ab 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -1,7 +1,9 @@ # frozen_string_literal: true require "fake_broker" +require "fake_producer_interceptor" require "timecop" +require "kafka/interceptors" describe Kafka::Producer do let(:logger) { LOGGER } @@ -20,6 +22,7 @@ allow(cluster).to receive(:get_leader).with("greetings", 0) { broker1 } allow(cluster).to receive(:get_leader).with("greetings", 1) { broker2 } + allow(cluster).to receive(:disconnect).and_return(nil) allow(compressor).to receive(:compress) {|message_set| message_set } @@ -30,6 +33,7 @@ allow(transaction_manager).to receive(:producer_epoch).and_return(0) allow(transaction_manager).to receive(:transactional_id).and_return(nil) allow(transaction_manager).to receive(:send_offsets_to_txn).and_return(nil) + allow(transaction_manager).to receive(:close).and_return(nil) end describe "#produce" do @@ -360,6 +364,81 @@ end end + describe '#interceptor' do + let(:now) { Time.now } + let(:pending_message) { + Kafka::PendingMessage.new( + value: "hello1", + key: nil, + headers: { + hello: 'World' + }, + topic: "greetings", + partition: 0, + partition_key: nil, + create_time: now + ) + } + + it "creates and shuts down a producer with interceptor" do + interceptor = FakeProducerInterceptor.new + producer = initialize_producer( + interceptors: [interceptor], + cluster: cluster, + transaction_manager: transaction_manager + ) + + producer.shutdown + end + + it "chains call" do + interceptor1 = FakeProducerInterceptor.new(append_s: 'hello2') + interceptor2 = FakeProducerInterceptor.new(append_s: 'hello3') + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) + intercepted_message = interceptors.call(pending_message) + + expect(intercepted_message).to eq Kafka::PendingMessage.new( + value: "hello1hello2hello3", + key: nil, + headers: { + hello: 'World' + }, + topic: "greetings", + partition: 0, + partition_key: nil, + create_time: now + ) + end + + it "does not break the call chain" do + interceptor1 = FakeProducerInterceptor.new(append_s: 'hello2', on_call_error: true) + interceptor2 = FakeProducerInterceptor.new(append_s: 'hello3') + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) + intercepted_message = interceptors.call(pending_message) + + expect(intercepted_message).to eq Kafka::PendingMessage.new( + value: "hello1hello3", + key: nil, + headers: { + hello: 'World' + }, + topic: "greetings", + partition: 0, + partition_key: nil, + create_time: now + ) + end + + it "returns original message when all interceptors fail" do + interceptor1 = FakeProducerInterceptor.new(append_s: 'hello2', on_call_error: true) + interceptor2 = FakeProducerInterceptor.new(append_s: 'hello3', on_call_error: true) + interceptors = Kafka::Interceptors.new(interceptors: [interceptor1, interceptor2], logger: logger) + intercepted_message = interceptors.call(pending_message) + + expect(intercepted_message).to eq pending_message + end + end + def initialize_producer(**options) default_options = { cluster: cluster,