Skip to content

Commit 8fba445

Browse files
authored
Merge pull request #468 from zendesk/dasch/max-fetch-bytes
Support setting the max bytes to fetch per request
2 parents 95e3a21 + b1ec289 commit 8fba445

File tree

5 files changed

+19
-6
lines changed

5 files changed

+19
-6
lines changed

Diff for: lib/kafka/client.rb

+1
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_by
369369
cluster: @cluster,
370370
logger: @logger,
371371
min_bytes: min_bytes,
372+
max_bytes: max_bytes,
372373
max_wait_time: max_wait_time,
373374
)
374375

Diff for: lib/kafka/consumer.rb

+10-3
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ def paused?(topic, partition)
179179
# @param min_bytes [Integer] the minimum number of bytes to read before
180180
# returning messages from each broker; if `max_wait_time` is reached, this
181181
# is ignored.
182+
# @param max_bytes [Integer] the maximum number of bytes to read before
183+
# returning messages from each broker.
182184
# @param max_wait_time [Integer, Float] the maximum duration of time to wait before
183185
# returning messages from each broker, in seconds.
184186
# @param automatically_mark_as_processed [Boolean] whether to automatically
@@ -190,10 +192,11 @@ def paused?(topic, partition)
190192
# The original exception will be returned by calling `#cause` on the
191193
# {Kafka::ProcessingError} instance.
192194
# @return [nil]
193-
def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed: true)
195+
def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
194196
consumer_loop do
195197
batches = fetch_batches(
196198
min_bytes: min_bytes,
199+
max_bytes: max_bytes,
197200
max_wait_time: max_wait_time,
198201
automatically_mark_as_processed: automatically_mark_as_processed
199202
)
@@ -253,6 +256,8 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
253256
# @param min_bytes [Integer] the minimum number of bytes to read before
254257
# returning messages from each broker; if `max_wait_time` is reached, this
255258
# is ignored.
259+
# @param max_bytes [Integer] the maximum number of bytes to read before
260+
# returning messages from each broker.
256261
# @param max_wait_time [Integer, Float] the maximum duration of time to wait before
257262
# returning messages from each broker, in seconds.
258263
# @param automatically_mark_as_processed [Boolean] whether to automatically
@@ -261,10 +266,11 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
261266
# messages can be committed to Kafka.
262267
# @yieldparam batch [Kafka::FetchedBatch] a message batch fetched from Kafka.
263268
# @return [nil]
264-
def each_batch(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed: true)
269+
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
265270
consumer_loop do
266271
batches = fetch_batches(
267272
min_bytes: min_bytes,
273+
max_bytes: max_bytes,
268274
max_wait_time: max_wait_time,
269275
automatically_mark_as_processed: automatically_mark_as_processed
270276
)
@@ -400,7 +406,7 @@ def join_group
400406
end
401407
end
402408

403-
def fetch_batches(min_bytes:, max_wait_time:, automatically_mark_as_processed:)
409+
def fetch_batches(min_bytes:, max_bytes:, max_wait_time:, automatically_mark_as_processed:)
404410
join_group unless @group.member?
405411

406412
subscribed_partitions = @group.subscribed_partitions
@@ -411,6 +417,7 @@ def fetch_batches(min_bytes:, max_wait_time:, automatically_mark_as_processed:)
411417
cluster: @cluster,
412418
logger: @logger,
413419
min_bytes: min_bytes,
420+
max_bytes: max_bytes,
414421
max_wait_time: max_wait_time,
415422
)
416423

Diff for: lib/kafka/fetch_operation.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ module Kafka
1818
# operation.execute
1919
#
2020
class FetchOperation
21-
def initialize(cluster:, logger:, min_bytes: 1, max_wait_time: 5)
21+
def initialize(cluster:, logger:, min_bytes: 1, max_bytes: 10485760, max_wait_time: 5)
2222
@cluster = cluster
2323
@logger = logger
2424
@min_bytes = min_bytes
25+
@max_bytes = max_bytes
2526
@max_wait_time = max_wait_time
2627
@topics = {}
2728
end
@@ -66,6 +67,7 @@ def execute
6667
options = {
6768
max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs
6869
min_bytes: @min_bytes,
70+
max_bytes: @max_bytes,
6971
topics: topics,
7072
}
7173

Diff for: lib/kafka/protocol/fetch_request.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ class FetchRequest
1919
# @param max_wait_time [Integer]
2020
# @param min_bytes [Integer]
2121
# @param topics [Hash]
22-
def initialize(max_wait_time:, min_bytes:, topics:)
22+
def initialize(max_wait_time:, min_bytes:, max_bytes:, topics:)
2323
@replica_id = REPLICA_ID
2424
@max_wait_time = max_wait_time
2525
@min_bytes = min_bytes
26+
@max_bytes = max_bytes
2627
@topics = topics
2728
end
2829

@@ -31,7 +32,7 @@ def api_key
3132
end
3233

3334
def api_version
34-
2
35+
3
3536
end
3637

3738
def response_class
@@ -42,6 +43,7 @@ def encode(encoder)
4243
encoder.write_int32(@replica_id)
4344
encoder.write_int32(@max_wait_time)
4445
encoder.write_int32(@min_bytes)
46+
encoder.write_int32(@max_bytes)
4547

4648
encoder.write_array(@topics) do |topic, partitions|
4749
encoder.write_string(topic)

Diff for: spec/broker_spec.rb

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def send_request(request)
8787
actual_response = broker.fetch_messages(
8888
max_wait_time: 0,
8989
min_bytes: 0,
90+
max_bytes: 10 * 1024,
9091
topics: {}
9192
)
9293

0 commit comments

Comments
 (0)