7
7
require "kafka/pending_message_queue"
8
8
require "kafka/pending_message"
9
9
require "kafka/compressor"
10
+ require "kafka/producer_interceptors"
10
11
11
12
module Kafka
12
13
# Allows sending messages to a Kafka cluster.
@@ -129,7 +130,8 @@ module Kafka
129
130
class Producer
130
131
class AbortTransaction < StandardError ; end
131
132
132
- def initialize ( cluster :, transaction_manager :, logger :, instrumenter :, compressor :, ack_timeout :, required_acks :, max_retries :, retry_backoff :, max_buffer_size :, max_buffer_bytesize :)
133
+ def initialize ( cluster :, transaction_manager :, logger :, instrumenter :, compressor :, ack_timeout :,
134
+ required_acks :, max_retries :, retry_backoff :, max_buffer_size :, max_buffer_bytesize :, interceptors : [ ] )
133
135
@cluster = cluster
134
136
@transaction_manager = transaction_manager
135
137
@logger = TaggedLogger . new ( logger )
@@ -141,6 +143,7 @@ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compresso
141
143
@max_buffer_size = max_buffer_size
142
144
@max_buffer_bytesize = max_buffer_bytesize
143
145
@compressor = compressor
146
+ @interceptors = ProducerInterceptors . new ( interceptors : interceptors , logger : logger )
144
147
145
148
# The set of topics that are produced to.
146
149
@target_topics = Set . new
@@ -191,15 +194,15 @@ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key:
191
194
# We want to fail fast if `topic` isn't a String
192
195
topic = topic . to_str
193
196
194
- message = PendingMessage . new (
197
+ message = @interceptors . on_send ( PendingMessage . new (
195
198
value : value && value . to_s ,
196
199
key : key && key . to_s ,
197
200
headers : headers ,
198
201
topic : topic ,
199
202
partition : partition && Integer ( partition ) ,
200
203
partition_key : partition_key && partition_key . to_s ,
201
204
create_time : create_time
202
- )
205
+ ) )
203
206
204
207
if buffer_size >= @max_buffer_size
205
208
buffer_overflow topic ,
@@ -283,6 +286,7 @@ def clear_buffer
283
286
#
284
287
# @return [nil]
285
288
def shutdown
289
+ @interceptors . close
286
290
@transaction_manager . close
287
291
@cluster . disconnect
288
292
end
0 commit comments