File tree 3 files changed +13
-0
lines changed
3 files changed +13
-0
lines changed Original file line number Diff line number Diff line change @@ -469,6 +469,7 @@ def make_final_offsets_commit!(attempts = 3)
469
469
end
470
470
471
471
def join_group
472
+ puts "Join group"
472
473
old_generation_id = @group . generation_id
473
474
474
475
@group . join
@@ -490,6 +491,7 @@ def join_group
490
491
491
492
@fetcher . reset
492
493
494
+ puts "assigned_partitions: #{ @group . assigned_partitions } "
493
495
@group . assigned_partitions . each do |topic , partitions |
494
496
partitions . each do |partition |
495
497
if paused? ( topic , partition )
@@ -499,9 +501,11 @@ def join_group
499
501
end
500
502
end
501
503
end
504
+ puts "finished join group"
502
505
end
503
506
504
507
def seek_to_next ( topic , partition )
508
+ puts "Seek to #{ topic } - #{ partition } "
505
509
# When automatic marking is off, the first poll needs to be based on the last committed
506
510
# offset from Kafka, that's why we fallback in case of nil (it may not be 0)
507
511
if @current_offsets [ topic ] . key? ( partition )
@@ -534,6 +538,7 @@ def fetch_batches
534
538
# Return early if the consumer has been stopped.
535
539
return [ ] if shutting_down?
536
540
541
+ puts "@group.member #{ @group . member? } "
537
542
join_group unless @group . member?
538
543
539
544
trigger_heartbeat
Original file line number Diff line number Diff line change @@ -41,19 +41,23 @@ def member?
41
41
end
42
42
43
43
def join
44
+ puts "Consumer group joining"
44
45
if @topics . empty?
45
46
raise Kafka ::Error , "Cannot join group without at least one topic subscription"
46
47
end
47
48
48
49
join_group
49
50
synchronize
51
+ puts "Consumer group joined"
50
52
rescue NotCoordinatorForGroup
51
53
@logger . error "Failed to find coordinator for group `#{ @group_id } `; retrying..."
54
+ puts "Failed to find coordinator for group `#{ @group_id } `; retrying..."
52
55
sleep 1
53
56
@coordinator = nil
54
57
retry
55
58
rescue ConnectionError
56
59
@logger . error "Connection error while trying to join group `#{ @group_id } `; retrying..."
60
+ puts "Connection error while trying to join group `#{ @group_id } `; retrying..."
57
61
sleep 1
58
62
@cluster . mark_as_stale!
59
63
@coordinator = nil
@@ -174,6 +178,7 @@ def synchronize
174
178
if group_leader?
175
179
@logger . info "Chosen as leader of group `#{ @group_id } `"
176
180
181
+ puts "assignment strategy #{ @members . keys } - #{ @topics } "
177
182
group_assignment = @assignment_strategy . assign (
178
183
members : @members . keys ,
179
184
topics : @topics ,
@@ -192,6 +197,7 @@ def synchronize
192
197
193
198
response . member_assignment . topics . each do |topic , assigned_partitions |
194
199
@logger . info "Partitions assigned for `#{ topic } `: #{ assigned_partitions . join ( ', ' ) } "
200
+ puts "Partitions assigned for `#{ topic } `: #{ assigned_partitions . join ( ', ' ) } "
195
201
end
196
202
197
203
@assigned_partitions . replace ( response . member_assignment . topics )
Original file line number Diff line number Diff line change 123
123
124
124
producer = Kafka . new ( kafka_brokers , client_id : "test" ) . producer
125
125
126
+ puts "Starting pushing a"
126
127
messages_a . each { |i | producer . produce ( i . to_s , topic : topic_a ) }
127
128
producer . deliver_messages
128
129
145
146
end
146
147
thread . abort_on_exception = true
147
148
149
+ puts "Starting pushing b"
148
150
messages_b . each { |i | producer . produce ( i . to_s , topic : topic_b ) }
149
151
producer . deliver_messages
150
152
You can’t perform that action at this time.
0 commit comments