Skip to content

Commit 1f72b1f

Browse files
authored
Merge pull request #866 from stasCF/fix-send-offsets-to-txn
fix Kafka::TransactionManager#send_offsets_to_txn
2 parents d3796da + d48aab8 commit 1f72b1f

5 files changed

+136
-11
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ Changes and additions to the library will be listed here.
44

55
## Unreleased
66

7+
- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866).
8+
79
## 1.3.0
810

911
- Support custom assignment strategy (#846).

lib/kafka/protocol/add_offsets_to_txn_response.rb

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# frozen_string_literal: true
2+
13
module Kafka
24
module Protocol
35
class AddOffsetsToTxnResponse

lib/kafka/protocol/txn_offset_commit_response.rb

+34-5
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,46 @@
1+
# frozen_string_literal: true
2+
13
module Kafka
24
module Protocol
35
class TxnOffsetCommitResponse
6+
class PartitionError
7+
attr_reader :partition, :error_code
8+
9+
def initialize(partition:, error_code:)
10+
@partition = partition
11+
@error_code = error_code
12+
end
13+
end
14+
15+
class TopicPartitionsError
16+
attr_reader :topic, :partitions
17+
18+
def initialize(topic:, partitions:)
19+
@topic = topic
20+
@partitions = partitions
21+
end
22+
end
423

5-
attr_reader :error_code
24+
attr_reader :errors
625

7-
def initialize(error_code:)
8-
@error_code = error_code
26+
def initialize(errors:)
27+
@errors = errors
928
end
1029

1130
def self.decode(decoder)
1231
_throttle_time_ms = decoder.int32
13-
error_code = decoder.int16
14-
new(error_code: error_code)
32+
errors = decoder.array do
33+
TopicPartitionsError.new(
34+
topic: decoder.string,
35+
partitions: decoder.array do
36+
PartitionError.new(
37+
partition: decoder.int32,
38+
error_code: decoder.int16
39+
)
40+
end
41+
)
42+
end
43+
new(errors: errors)
1544
end
1645
end
1746
end

lib/kafka/transaction_manager.rb

+17-2
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,23 @@ def send_offsets_to_txn(offsets:, group_id:)
233233
)
234234
Protocol.handle_error(add_response.error_code)
235235

236-
send_response = transaction_coordinator.txn_offset_commit(
236+
send_response = group_coordinator(group_id: group_id).txn_offset_commit(
237237
transactional_id: @transactional_id,
238238
group_id: group_id,
239239
producer_id: @producer_id,
240240
producer_epoch: @producer_epoch,
241241
offsets: offsets
242242
)
243-
Protocol.handle_error(send_response.error_code)
243+
send_response.errors.each do |tp|
244+
tp.partitions.each do |partition|
245+
Protocol.handle_error(partition.error_code)
246+
end
247+
end
248+
249+
nil
250+
rescue
251+
@transaction_state.transition_to!(TransactionStateMachine::ERROR)
252+
raise
244253
end
245254

246255
def in_transaction?
@@ -283,6 +292,12 @@ def transaction_coordinator
283292
)
284293
end
285294

295+
def group_coordinator(group_id:)
296+
@cluster.get_group_coordinator(
297+
group_id: group_id
298+
)
299+
end
300+
286301
def complete_transaction
287302
@transaction_state.transition_to!(TransactionStateMachine::READY)
288303
@transaction_partitions = {}

spec/transaction_manager_spec.rb

+81-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
let!(:logger) { LOGGER }
55
let!(:cluster) { double(:cluster) }
66
let!(:transaction_coordinator) { double(:broker) }
7+
let!(:group_coordinator) { double(:broker) }
78

89
let!(:manager) do
910
described_class.new(logger: logger, cluster: cluster)
@@ -13,6 +14,9 @@
1314
allow(cluster).to receive(:get_transaction_coordinator).and_return(
1415
transaction_coordinator
1516
)
17+
allow(cluster).to receive(:get_group_coordinator).and_return(
18+
group_coordinator
19+
)
1620
allow(transaction_coordinator).to receive(:init_producer_id).and_return(
1721
Kafka::Protocol::InitProducerIDResponse.new(
1822
error_code: 0,
@@ -586,17 +590,74 @@
586590
error_code: 0
587591
)
588592
)
589-
allow(transaction_coordinator).to receive(:txn_offset_commit).and_return(
590-
Kafka::Protocol::TxnOffsetCommitResponse.new(
591-
error_code: 0
593+
allow(group_coordinator).to receive(:txn_offset_commit).and_return(
594+
txn_offset_commit_response(
595+
'hello' => [1],
596+
'world' => [2]
592597
)
593598
)
594599
end
595600

596601
it 'notifies transaction coordinator' do
597602
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
598603
expect(transaction_coordinator).to have_received(:add_offsets_to_txn)
599-
expect(transaction_coordinator).to have_received(:txn_offset_commit)
604+
expect(group_coordinator).to have_received(:txn_offset_commit)
605+
end
606+
end
607+
608+
context 'transaction coordinator returns error' do
609+
before do
610+
manager.init_transactions
611+
manager.begin_transaction
612+
allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return(
613+
Kafka::Protocol::AddOffsetsToTxnResponse.new(
614+
error_code: 47
615+
)
616+
)
617+
end
618+
619+
it 'raises exception' do
620+
expect do
621+
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
622+
end.to raise_error(Kafka::InvalidProducerEpochError)
623+
end
624+
625+
it 'changes state to error' do
626+
begin
627+
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
628+
rescue; end
629+
expect(manager.error?).to eql(true)
630+
end
631+
end
632+
633+
context 'group coordinator returns error' do
634+
before do
635+
manager.init_transactions
636+
manager.begin_transaction
637+
allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return(
638+
Kafka::Protocol::AddOffsetsToTxnResponse.new(
639+
error_code: 0
640+
)
641+
)
642+
allow(group_coordinator).to receive(:txn_offset_commit).and_return(
643+
txn_offset_commit_response(
644+
{ 'hello' => [1], 'world' => [2] },
645+
error_code: 47
646+
)
647+
)
648+
end
649+
650+
it 'raises exception' do
651+
expect do
652+
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
653+
end.to raise_error(Kafka::InvalidProducerEpochError)
654+
end
655+
656+
it 'changes state to error' do
657+
begin
658+
manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1)
659+
rescue; end
660+
expect(manager.error?).to eql(true)
600661
end
601662
end
602663
end
@@ -617,3 +678,19 @@ def success_add_partitions_to_txn_response(topics)
617678
end
618679
)
619680
end
681+
682+
def txn_offset_commit_response(topics, error_code: 0)
683+
Kafka::Protocol::TxnOffsetCommitResponse.new(
684+
errors: topics.map do |topic, partitions|
685+
Kafka::Protocol::TxnOffsetCommitResponse::TopicPartitionsError.new(
686+
topic: topic,
687+
partitions: partitions.map do |partition|
688+
Kafka::Protocol::TxnOffsetCommitResponse::PartitionError.new(
689+
partition: partition,
690+
error_code: error_code
691+
)
692+
end
693+
)
694+
end
695+
)
696+
end

0 commit comments

Comments
 (0)