Skip to content

Commit 4803c38

Browse files
committed
1 parent 9ae265c commit 4803c38

File tree

1 file changed

+10
-0
lines changed

1 file changed

+10
-0
lines changed

Diff for: lib/kafka/consumer.rb

+10
Original file line numberDiff line numberDiff line change
@@ -438,13 +438,15 @@ def join_group
438438
if old_generation_id && @group.generation_id != old_generation_id + 1
439439
# We've been out of the group for at least an entire generation, no
440440
# sense in trying to hold on to offset data
441+
clear_current_offsets
441442
@offset_manager.clear_offsets
442443
else
443444
# After rejoining the group we may have been assigned a new set of
444445
# partitions. Keeping the old offset commits around forever would risk
445446
# having the consumer go back and reprocess messages if it's assigned
446447
# a partition it used to be assigned to way back. For that reason, we
447448
# only keep commits for the partitions that we're still assigned.
449+
clear_current_offsets(excluding: @group.assigned_partitions)
448450
@offset_manager.clear_offsets_excluding(@group.assigned_partitions)
449451
end
450452

@@ -531,5 +533,13 @@ def fetch_batches
531533
def pause_for(topic, partition)
532534
@pauses[topic][partition]
533535
end
536+
537+
def clear_current_offsets(excluding: {})
538+
@current_offsets.each do |topic, partitions|
539+
partitions.keep_if do |partition, _|
540+
excluding.fetch(topic, []).include?(partition)
541+
end
542+
end
543+
end
534544
end
535545
end

0 commit comments

Comments
 (0)