From dc38c66c8cf30bd5dba43fbd80eeb21e309c341f Mon Sep 17 00:00:00 2001 From: Steffen Schildknecht Date: Thu, 5 Jul 2018 10:19:11 +0200 Subject: [PATCH 1/2] Test: Handle stored offsets on join group --- spec/consumer_spec.rb | 97 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index f01e4c774..975ebb022 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -90,6 +90,46 @@ end end + shared_context 'with partition reassignment' do + let(:messages_after_partition_reassignment) { + [ + double(:message, { + value: "hello", + key: nil, + headers: {}, + topic: "greetings", + partition: 1, + offset: 10, + create_time: Time.now, + is_control_record: false + }) + ] + } + + let(:batches_after_partition_reassignment) { + [ + Kafka::FetchedBatch.new( + topic: "greetings", + partition: 1, + highwater_mark_offset: 42, + messages: messages_after_partition_reassignment, + ) + ] + } + + before do + @count = 0 + allow(fetcher).to receive(:poll) { + @count += 1 + if @count == 1 + [:batches, fetched_batches] + else + [:batches, batches_after_partition_reassignment] + end + } + end + end + before do allow(cluster).to receive(:add_target_topics) allow(cluster).to receive(:disconnect) @@ -272,6 +312,63 @@ expect(@yield_count).to eq 1 end end + + context 'consumer joins a new group' do + include_context 'with partition reassignment' + + let(:group) { double(:group).as_null_object } + let(:fetcher) { double(:fetcher).as_null_object } + let(:current_offsets) { consumer.instance_variable_get(:@current_offsets) } + let(:assigned_partitions) { { 'greetings' => [0] } } + let(:reassigned_partitions) { { 'greetings' => [1] } } + + before do + allow(heartbeat).to receive(:trigger) do + next unless @encounter_rebalance + @encounter_rebalance = false + raise Kafka::RebalanceInProgress + end + + consumer.each_message do |message| + consumer.stop + end + + allow(group).to receive(:assigned_partitions).and_return(reassigned_partitions) + allow(group).to receive(:assigned_to?).with('greetings', 1) { true } + allow(group).to receive(:assigned_to?).with('greetings', 0) { false } + allow(group).to receive(:generation_id).and_return(*generation_ids) + + @encounter_rebalance = true + end + + context 'with subsequent group generations' do + let(:generation_ids) { [1, 2] } + + it 'removes local offsets for partitions it is no longer assigned' do + expect(offset_manager).to receive(:clear_offsets_excluding).with(reassigned_partitions) + + expect do + consumer.each_message do |message| + consumer.stop + end + end.to change { current_offsets['greetings'].keys }.from([0]).to([1]) + end + end + + context 'with group generations further apart' do + let(:generation_ids) { [1, 3] } + + it 'clears local offsets' do + expect(offset_manager).to receive(:clear_offsets) + + expect do + consumer.each_message do |message| + consumer.stop + end + end.to change { current_offsets['greetings'].keys }.from([0]).to([1]) + end + end + end end describe "#commit_offsets" do From 1ef03bdcbd78369eafe4703db9f91e9634cbc8d9 Mon Sep 17 00:00:00 2001 From: Steffen Schildknecht Date: Mon, 2 Jul 2018 15:27:22 +0200 Subject: [PATCH 2/2] Clear @current_offsets when joining group --- lib/kafka/consumer.rb | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index 225a63309..be9da0abe 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -444,6 +444,7 @@ def join_group if old_generation_id && @group.generation_id != old_generation_id + 1 # We've been out of the group for at least an entire generation, no # sense in trying to hold on to offset data + clear_current_offsets @offset_manager.clear_offsets else # After rejoining the group we may have been assigned a new set of @@ -451,6 +452,7 @@ def join_group # having the consumer go back and reprocess messages if it's assigned # a partition it used to be assigned to way back. For that reason, we # only keep commits for the partitions that we're still assigned. + clear_current_offsets(excluding: @group.assigned_partitions) @offset_manager.clear_offsets_excluding(@group.assigned_partitions) end @@ -537,5 +539,13 @@ def fetch_batches def pause_for(topic, partition) @pauses[topic][partition] end + + def clear_current_offsets(excluding: {}) + @current_offsets.each do |topic, partitions| + partitions.keep_if do |partition, _| + excluding.fetch(topic, []).include?(partition) + end + end + end end end