Skip to content

Clear current offsets on join group #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/kafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,15 @@ def join_group
if old_generation_id && @group.generation_id != old_generation_id + 1
# We've been out of the group for at least an entire generation, no
# sense in trying to hold on to offset data
clear_current_offsets
@offset_manager.clear_offsets
else
# After rejoining the group we may have been assigned a new set of
# partitions. Keeping the old offset commits around forever would risk
# having the consumer go back and reprocess messages if it's assigned
# a partition it used to be assigned to way back. For that reason, we
# only keep commits for the partitions that we're still assigned.
clear_current_offsets(excluding: @group.assigned_partitions)
@offset_manager.clear_offsets_excluding(@group.assigned_partitions)
end

Expand Down Expand Up @@ -537,5 +539,13 @@ def fetch_batches
def pause_for(topic, partition)
@pauses[topic][partition]
end

def clear_current_offsets(excluding: {})
@current_offsets.each do |topic, partitions|
partitions.keep_if do |partition, _|
excluding.fetch(topic, []).include?(partition)
end
end
end
end
end
97 changes: 97 additions & 0 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,46 @@
end
end

shared_context 'with partition reassignment' do
let(:messages_after_partition_reassignment) {
[
double(:message, {
value: "hello",
key: nil,
headers: {},
topic: "greetings",
partition: 1,
offset: 10,
create_time: Time.now,
is_control_record: false
})
]
}

let(:batches_after_partition_reassignment) {
[
Kafka::FetchedBatch.new(
topic: "greetings",
partition: 1,
highwater_mark_offset: 42,
messages: messages_after_partition_reassignment,
)
]
}

before do
@count = 0
allow(fetcher).to receive(:poll) {
@count += 1
if @count == 1
[:batches, fetched_batches]
else
[:batches, batches_after_partition_reassignment]
end
}
end
end

before do
allow(cluster).to receive(:add_target_topics)
allow(cluster).to receive(:disconnect)
Expand Down Expand Up @@ -272,6 +312,63 @@
expect(@yield_count).to eq 1
end
end

context 'consumer joins a new group' do
include_context 'with partition reassignment'

let(:group) { double(:group).as_null_object }
let(:fetcher) { double(:fetcher).as_null_object }
let(:current_offsets) { consumer.instance_variable_get(:@current_offsets) }
let(:assigned_partitions) { { 'greetings' => [0] } }
let(:reassigned_partitions) { { 'greetings' => [1] } }

before do
allow(heartbeat).to receive(:trigger) do
next unless @encounter_rebalance
@encounter_rebalance = false
raise Kafka::RebalanceInProgress
end

consumer.each_message do |message|
consumer.stop
end

allow(group).to receive(:assigned_partitions).and_return(reassigned_partitions)
allow(group).to receive(:assigned_to?).with('greetings', 1) { true }
allow(group).to receive(:assigned_to?).with('greetings', 0) { false }
allow(group).to receive(:generation_id).and_return(*generation_ids)

@encounter_rebalance = true
end

context 'with subsequent group generations' do
let(:generation_ids) { [1, 2] }

it 'removes local offsets for partitions it is no longer assigned' do
expect(offset_manager).to receive(:clear_offsets_excluding).with(reassigned_partitions)

expect do
consumer.each_message do |message|
consumer.stop
end
end.to change { current_offsets['greetings'].keys }.from([0]).to([1])
end
end

context 'with group generations further apart' do
let(:generation_ids) { [1, 3] }

it 'clears local offsets' do
expect(offset_manager).to receive(:clear_offsets)

expect do
consumer.each_message do |message|
consumer.stop
end
end.to change { current_offsets['greetings'].keys }.from([0]).to([1])
end
end
end
end

describe "#commit_offsets" do
Expand Down