Skip to content

Commit f106797

Browse files
committed
[Chore] Try to debug on CircleCI
1 parent 5891055 commit f106797

File tree

4 files changed

+14
-0
lines changed

4 files changed

+14
-0
lines changed

lib/kafka/consumer.rb

+5
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ def make_final_offsets_commit!(attempts = 3)
469469
end
470470

471471
def join_group
472+
puts "Join group"
472473
old_generation_id = @group.generation_id
473474

474475
@group.join
@@ -490,6 +491,7 @@ def join_group
490491

491492
@fetcher.reset
492493

494+
puts "assigned_partitions: #{@group.assigned_partitions}"
493495
@group.assigned_partitions.each do |topic, partitions|
494496
partitions.each do |partition|
495497
if paused?(topic, partition)
@@ -499,9 +501,11 @@ def join_group
499501
end
500502
end
501503
end
504+
puts "finished join group"
502505
end
503506

504507
def seek_to_next(topic, partition)
508+
puts "Seek to #{topic} - #{partition}"
505509
# When automatic marking is off, the first poll needs to be based on the last committed
506510
# offset from Kafka, that's why we fallback in case of nil (it may not be 0)
507511
if @current_offsets[topic].key?(partition)
@@ -534,6 +538,7 @@ def fetch_batches
534538
# Return early if the consumer has been stopped.
535539
return [] if shutting_down?
536540

541+
puts "@group.member #{@group.member?}"
537542
join_group unless @group.member?
538543

539544
trigger_heartbeat

lib/kafka/consumer_group.rb

+4
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,23 @@ def member?
4141
end
4242

4343
def join
44+
puts "Consumer group joining"
4445
if @topics.empty?
4546
raise Kafka::Error, "Cannot join group without at least one topic subscription"
4647
end
4748

4849
join_group
4950
synchronize
51+
puts "Consumer group joined"
5052
rescue NotCoordinatorForGroup
5153
@logger.error "Failed to find coordinator for group `#{@group_id}`; retrying..."
54+
puts "Failed to find coordinator for group `#{@group_id}`; retrying..."
5255
sleep 1
5356
@coordinator = nil
5457
retry
5558
rescue ConnectionError
5659
@logger.error "Connection error while trying to join group `#{@group_id}`; retrying..."
60+
puts "Connection error while trying to join group `#{@group_id}`; retrying..."
5761
sleep 1
5862
@cluster.mark_as_stale!
5963
@coordinator = nil

lib/kafka/fetcher.rb

+3
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def loop
109109

110110
@logger.debug "Handling fetcher command: #{cmd}"
111111

112+
puts "Handling fetcher command: #{cmd} - #{args}"
112113
send("handle_#{cmd}", *args)
113114
elsif @queue.size < @max_queue_size
114115
step
@@ -158,6 +159,7 @@ def handle_seek(topic, partition, offset)
158159

159160
def step
160161
batches = fetch_batches
162+
puts "Fetched #{batches.size} messages"
161163

162164
batches.each do |batch|
163165
unless batch.empty?
@@ -197,6 +199,7 @@ def fetch_batches
197199
max_bytes = @max_bytes_per_partition[topic]
198200

199201
partitions.each do |partition, offset|
202+
puts "Fetching from #{topic} - partition #{partition} - offset #{offset}"
200203
operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes)
201204
end
202205
end

spec/functional/consumer_group_spec.rb

+2
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123

124124
producer = Kafka.new(kafka_brokers, client_id: "test").producer
125125

126+
puts "Starting pushing a"
126127
messages_a.each { |i| producer.produce(i.to_s, topic: topic_a) }
127128
producer.deliver_messages
128129

@@ -145,6 +146,7 @@
145146
end
146147
thread.abort_on_exception = true
147148

149+
puts "Starting pushing b"
148150
messages_b.each { |i| producer.produce(i.to_s, topic: topic_b) }
149151
producer.deliver_messages
150152

0 commit comments

Comments
 (0)