diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index d558bb548..614080b2b 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -369,6 +369,7 @@ def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_by cluster: @cluster, logger: @logger, min_bytes: min_bytes, + max_bytes: max_bytes, max_wait_time: max_wait_time, ) diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index a2b3e182f..7b3337be4 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -179,6 +179,8 @@ def paused?(topic, partition) # @param min_bytes [Integer] the minimum number of bytes to read before # returning messages from each broker; if `max_wait_time` is reached, this # is ignored. + # @param max_bytes [Integer] the maximum number of bytes to read before + # returning messages from each broker. # @param max_wait_time [Integer, Float] the maximum duration of time to wait before # returning messages from each broker, in seconds. # @param automatically_mark_as_processed [Boolean] whether to automatically @@ -190,10 +192,11 @@ def paused?(topic, partition) # The original exception will be returned by calling `#cause` on the # {Kafka::ProcessingError} instance. # @return [nil] - def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed: true) + def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) consumer_loop do batches = fetch_batches( min_bytes: min_bytes, + max_bytes: max_bytes, max_wait_time: max_wait_time, automatically_mark_as_processed: automatically_mark_as_processed ) @@ -253,6 +256,8 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed # @param min_bytes [Integer] the minimum number of bytes to read before # returning messages from each broker; if `max_wait_time` is reached, this # is ignored. + # @param max_bytes [Integer] the maximum number of bytes to read before + # returning messages from each broker. # @param max_wait_time [Integer, Float] the maximum duration of time to wait before # returning messages from each broker, in seconds. # @param automatically_mark_as_processed [Boolean] whether to automatically @@ -261,10 +266,11 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed # messages can be committed to Kafka. # @yieldparam batch [Kafka::FetchedBatch] a message batch fetched from Kafka. # @return [nil] - def each_batch(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed: true) + def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) consumer_loop do batches = fetch_batches( min_bytes: min_bytes, + max_bytes: max_bytes, max_wait_time: max_wait_time, automatically_mark_as_processed: automatically_mark_as_processed ) @@ -400,7 +406,7 @@ def join_group end end - def fetch_batches(min_bytes:, max_wait_time:, automatically_mark_as_processed:) + def fetch_batches(min_bytes:, max_bytes:, max_wait_time:, automatically_mark_as_processed:) join_group unless @group.member? subscribed_partitions = @group.subscribed_partitions @@ -411,6 +417,7 @@ def fetch_batches(min_bytes:, max_wait_time:, automatically_mark_as_processed:) cluster: @cluster, logger: @logger, min_bytes: min_bytes, + max_bytes: max_bytes, max_wait_time: max_wait_time, ) diff --git a/lib/kafka/fetch_operation.rb b/lib/kafka/fetch_operation.rb index c181ab191..517b55a50 100644 --- a/lib/kafka/fetch_operation.rb +++ b/lib/kafka/fetch_operation.rb @@ -18,10 +18,11 @@ module Kafka # operation.execute # class FetchOperation - def initialize(cluster:, logger:, min_bytes: 1, max_wait_time: 5) + def initialize(cluster:, logger:, min_bytes: 1, max_bytes: 10485760, max_wait_time: 5) @cluster = cluster @logger = logger @min_bytes = min_bytes + @max_bytes = max_bytes @max_wait_time = max_wait_time @topics = {} end @@ -66,6 +67,7 @@ def execute options = { max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs min_bytes: @min_bytes, + max_bytes: @max_bytes, topics: topics, } diff --git a/lib/kafka/protocol/fetch_request.rb b/lib/kafka/protocol/fetch_request.rb index 226f7fed4..6f4aa9cf6 100644 --- a/lib/kafka/protocol/fetch_request.rb +++ b/lib/kafka/protocol/fetch_request.rb @@ -19,10 +19,11 @@ class FetchRequest # @param max_wait_time [Integer] # @param min_bytes [Integer] # @param topics [Hash] - def initialize(max_wait_time:, min_bytes:, topics:) + def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:) @replica_id = REPLICA_ID @max_wait_time = max_wait_time @min_bytes = min_bytes + @max_bytes = max_bytes @topics = topics end @@ -31,7 +32,7 @@ def api_key end def api_version - 2 + 3 end def response_class @@ -42,6 +43,7 @@ def encode(encoder) encoder.write_int32(@replica_id) encoder.write_int32(@max_wait_time) encoder.write_int32(@min_bytes) + encoder.write_int32(@max_bytes) encoder.write_array(@topics) do |topic, partitions| encoder.write_string(topic) diff --git a/spec/broker_spec.rb b/spec/broker_spec.rb index 54e3fe3b3..57a5f640d 100644 --- a/spec/broker_spec.rb +++ b/spec/broker_spec.rb @@ -87,6 +87,7 @@ def send_request(request) actual_response = broker.fetch_messages( max_wait_time: 0, min_bytes: 0, + max_bytes: 10 * 1024, topics: {} )