diff --git a/CHANGELOG.md b/CHANGELOG.md index c1adb26e8..6e31a47d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here. ## Unreleased +- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). + ## 1.3.0 - Support custom assignment strategy (#846). diff --git a/lib/kafka/protocol/add_offsets_to_txn_response.rb b/lib/kafka/protocol/add_offsets_to_txn_response.rb index 830613dfb..7ac824cd7 100644 --- a/lib/kafka/protocol/add_offsets_to_txn_response.rb +++ b/lib/kafka/protocol/add_offsets_to_txn_response.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Kafka module Protocol class AddOffsetsToTxnResponse diff --git a/lib/kafka/protocol/txn_offset_commit_response.rb b/lib/kafka/protocol/txn_offset_commit_response.rb index 6395bf030..628af7d66 100644 --- a/lib/kafka/protocol/txn_offset_commit_response.rb +++ b/lib/kafka/protocol/txn_offset_commit_response.rb @@ -1,17 +1,46 @@ +# frozen_string_literal: true + module Kafka module Protocol class TxnOffsetCommitResponse + class PartitionError + attr_reader :partition, :error_code + + def initialize(partition:, error_code:) + @partition = partition + @error_code = error_code + end + end + + class TopicPartitionsError + attr_reader :topic, :partitions + + def initialize(topic:, partitions:) + @topic = topic + @partitions = partitions + end + end - attr_reader :error_code + attr_reader :errors - def initialize(error_code:) - @error_code = error_code + def initialize(errors:) + @errors = errors end def self.decode(decoder) _throttle_time_ms = decoder.int32 - error_code = decoder.int16 - new(error_code: error_code) + errors = decoder.array do + TopicPartitionsError.new( + topic: decoder.string, + partitions: decoder.array do + PartitionError.new( + partition: decoder.int32, + error_code: decoder.int16 + ) + end + ) + end + new(errors: errors) end end end diff --git a/lib/kafka/transaction_manager.rb b/lib/kafka/transaction_manager.rb index fd7f9351f..23c8328c6 100644 --- a/lib/kafka/transaction_manager.rb +++ b/lib/kafka/transaction_manager.rb @@ -233,14 +233,23 @@ def send_offsets_to_txn(offsets:, group_id:) ) Protocol.handle_error(add_response.error_code) - send_response = transaction_coordinator.txn_offset_commit( + send_response = group_coordinator(group_id: group_id).txn_offset_commit( transactional_id: @transactional_id, group_id: group_id, producer_id: @producer_id, producer_epoch: @producer_epoch, offsets: offsets ) - Protocol.handle_error(send_response.error_code) + send_response.errors.each do |tp| + tp.partitions.each do |partition| + Protocol.handle_error(partition.error_code) + end + end + + nil + rescue + @transaction_state.transition_to!(TransactionStateMachine::ERROR) + raise end def in_transaction? @@ -283,6 +292,12 @@ def transaction_coordinator ) end + def group_coordinator(group_id:) + @cluster.get_group_coordinator( + group_id: group_id + ) + end + def complete_transaction @transaction_state.transition_to!(TransactionStateMachine::READY) @transaction_partitions = {} diff --git a/spec/transaction_manager_spec.rb b/spec/transaction_manager_spec.rb index b0fe7ee4c..015beb0aa 100644 --- a/spec/transaction_manager_spec.rb +++ b/spec/transaction_manager_spec.rb @@ -4,6 +4,7 @@ let!(:logger) { LOGGER } let!(:cluster) { double(:cluster) } let!(:transaction_coordinator) { double(:broker) } + let!(:group_coordinator) { double(:broker) } let!(:manager) do described_class.new(logger: logger, cluster: cluster) @@ -13,6 +14,9 @@ allow(cluster).to receive(:get_transaction_coordinator).and_return( transaction_coordinator ) + allow(cluster).to receive(:get_group_coordinator).and_return( + group_coordinator + ) allow(transaction_coordinator).to receive(:init_producer_id).and_return( Kafka::Protocol::InitProducerIDResponse.new( error_code: 0, @@ -586,9 +590,10 @@ error_code: 0 ) ) - allow(transaction_coordinator).to receive(:txn_offset_commit).and_return( - Kafka::Protocol::TxnOffsetCommitResponse.new( - error_code: 0 + allow(group_coordinator).to receive(:txn_offset_commit).and_return( + txn_offset_commit_response( + 'hello' => [1], + 'world' => [2] ) ) end @@ -596,7 +601,63 @@ it 'notifies transaction coordinator' do manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) expect(transaction_coordinator).to have_received(:add_offsets_to_txn) - expect(transaction_coordinator).to have_received(:txn_offset_commit) + expect(group_coordinator).to have_received(:txn_offset_commit) + end + end + + context 'transaction coordinator returns error' do + before do + manager.init_transactions + manager.begin_transaction + allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return( + Kafka::Protocol::AddOffsetsToTxnResponse.new( + error_code: 47 + ) + ) + end + + it 'raises exception' do + expect do + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + end.to raise_error(Kafka::InvalidProducerEpochError) + end + + it 'changes state to error' do + begin + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + rescue; end + expect(manager.error?).to eql(true) + end + end + + context 'group coordinator returns error' do + before do + manager.init_transactions + manager.begin_transaction + allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return( + Kafka::Protocol::AddOffsetsToTxnResponse.new( + error_code: 0 + ) + ) + allow(group_coordinator).to receive(:txn_offset_commit).and_return( + txn_offset_commit_response( + { 'hello' => [1], 'world' => [2] }, + error_code: 47 + ) + ) + end + + it 'raises exception' do + expect do + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + end.to raise_error(Kafka::InvalidProducerEpochError) + end + + it 'changes state to error' do + begin + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + rescue; end + expect(manager.error?).to eql(true) end end end @@ -617,3 +678,19 @@ def success_add_partitions_to_txn_response(topics) end ) end + +def txn_offset_commit_response(topics, error_code: 0) + Kafka::Protocol::TxnOffsetCommitResponse.new( + errors: topics.map do |topic, partitions| + Kafka::Protocol::TxnOffsetCommitResponse::TopicPartitionsError.new( + topic: topic, + partitions: partitions.map do |partition| + Kafka::Protocol::TxnOffsetCommitResponse::PartitionError.new( + partition: partition, + error_code: error_code + ) + end + ) + end + ) +end