Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix Kafka::TransactionManager#send_offsets_to_txn #866

Merged
merged 3 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here.

## Unreleased

- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866).

## 1.3.0

- Support custom assignment strategy (#846).
Expand Down
2 changes: 2 additions & 0 deletions lib/kafka/protocol/add_offsets_to_txn_response.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module Kafka
module Protocol
class AddOffsetsToTxnResponse
Expand Down
39 changes: 34 additions & 5 deletions lib/kafka/protocol/txn_offset_commit_response.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,46 @@
# frozen_string_literal: true

module Kafka
module Protocol
class TxnOffsetCommitResponse
class PartitionError
attr_reader :partition, :error_code

def initialize(partition:, error_code:)
@partition = partition
@error_code = error_code
end
end

class TopicPartitionsError
attr_reader :topic, :partitions

def initialize(topic:, partitions:)
@topic = topic
@partitions = partitions
end
end

attr_reader :error_code
attr_reader :errors

def initialize(error_code:)
@error_code = error_code
def initialize(errors:)
@errors = errors
end

def self.decode(decoder)
_throttle_time_ms = decoder.int32
error_code = decoder.int16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get how this worked in the first place...

new(error_code: error_code)
errors = decoder.array do
TopicPartitionsError.new(
topic: decoder.string,
partitions: decoder.array do
PartitionError.new(
partition: decoder.int32,
error_code: decoder.int16
)
end
)
end
new(errors: errors)
end
end
end
Expand Down
19 changes: 17 additions & 2 deletions lib/kafka/transaction_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,23 @@ def send_offsets_to_txn(offsets:, group_id:)
)
Protocol.handle_error(add_response.error_code)

send_response = transaction_coordinator.txn_offset_commit(
send_response = group_coordinator(group_id: group_id).txn_offset_commit(
transactional_id: @transactional_id,
group_id: group_id,
producer_id: @producer_id,
producer_epoch: @producer_epoch,
offsets: offsets
)
Protocol.handle_error(send_response.error_code)
send_response.errors.each do |tp|
tp.partitions.each do |partition|
Protocol.handle_error(partition.error_code)
end
end

nil
rescue
@transaction_state.transition_to!(TransactionStateMachine::ERROR)
raise
end

def in_transaction?
Expand Down Expand Up @@ -283,6 +292,12 @@ def transaction_coordinator
)
end

def group_coordinator(group_id:)
@cluster.get_group_coordinator(
group_id: group_id
)
end

def complete_transaction
@transaction_state.transition_to!(TransactionStateMachine::READY)
@transaction_partitions = {}
Expand Down
85 changes: 81 additions & 4 deletions spec/transaction_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
let!(:logger) { LOGGER }
let!(:cluster) { double(:cluster) }
let!(:transaction_coordinator) { double(:broker) }
let!(:group_coordinator) { double(:broker) }

let!(:manager) do
described_class.new(logger: logger, cluster: cluster)
Expand All @@ -13,6 +14,9 @@
allow(cluster).to receive(:get_transaction_coordinator).and_return(
transaction_coordinator
)
allow(cluster).to receive(:get_group_coordinator).and_return(
group_coordinator
)
allow(transaction_coordinator).to receive(:init_producer_id).and_return(
Kafka::Protocol::InitProducerIDResponse.new(
error_code: 0,
Expand Down Expand Up @@ -586,17 +590,74 @@
error_code: 0
)
)
allow(transaction_coordinator).to receive(:txn_offset_commit).and_return(
Kafka::Protocol::TxnOffsetCommitResponse.new(
error_code: 0
allow(group_coordinator).to receive(:txn_offset_commit).and_return(
txn_offset_commit_response(
'hello' => [1],
'world' => [2]
)
)
end

it 'notifies transaction coordinator' do
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
expect(transaction_coordinator).to have_received(:add_offsets_to_txn)
expect(transaction_coordinator).to have_received(:txn_offset_commit)
expect(group_coordinator).to have_received(:txn_offset_commit)
end
end

context 'transaction coordinator returns error' do
before do
manager.init_transactions
manager.begin_transaction
allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return(
Kafka::Protocol::AddOffsetsToTxnResponse.new(
error_code: 47
)
)
end

it 'raises exception' do
expect do
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
end.to raise_error(Kafka::InvalidProducerEpochError)
end

it 'changes state to error' do
begin
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
rescue; end
expect(manager.error?).to eql(true)
end
end

context 'group coordinator returns error' do
before do
manager.init_transactions
manager.begin_transaction
allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return(
Kafka::Protocol::AddOffsetsToTxnResponse.new(
error_code: 0
)
)
allow(group_coordinator).to receive(:txn_offset_commit).and_return(
txn_offset_commit_response(
{ 'hello' => [1], 'world' => [2] },
error_code: 47
)
)
end

it 'raises exception' do
expect do
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
end.to raise_error(Kafka::InvalidProducerEpochError)
end

it 'changes state to error' do
begin
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
rescue; end
expect(manager.error?).to eql(true)
end
end
end
Expand All @@ -617,3 +678,19 @@ def success_add_partitions_to_txn_response(topics)
end
)
end

def txn_offset_commit_response(topics, error_code: 0)
Kafka::Protocol::TxnOffsetCommitResponse.new(
errors: topics.map do |topic, partitions|
Kafka::Protocol::TxnOffsetCommitResponse::TopicPartitionsError.new(
topic: topic,
partitions: partitions.map do |partition|
Kafka::Protocol::TxnOffsetCommitResponse::PartitionError.new(
partition: partition,
error_code: error_code
)
end
)
end
)
end