diff --git a/lib/kafka/transaction_manager.rb b/lib/kafka/transaction_manager.rb index 3cc03bab5..fd7f9351f 100644 --- a/lib/kafka/transaction_manager.rb +++ b/lib/kafka/transaction_manager.rb @@ -95,7 +95,7 @@ def add_partitions_to_transaction(topic_partitions) force_transactional! if @transaction_state.uninitialized? - raise 'Transaction is uninitialized' + raise Kafka::InvalidTxnStateError, 'Transaction is uninitialized' end # Extract newly created partitions @@ -138,8 +138,8 @@ def add_partitions_to_transaction(topic_partitions) def begin_transaction force_transactional! - raise 'Transaction has already started' if @transaction_state.in_transaction? - raise 'Transaction is not ready' unless @transaction_state.ready? + raise Kafka::InvalidTxnStateError, 'Transaction has already started' if @transaction_state.in_transaction? + raise Kafka::InvalidTxnStateError, 'Transaction is not ready' unless @transaction_state.ready? @transaction_state.transition_to!(TransactionStateMachine::IN_TRANSACTION) @logger.info "Begin transaction #{@transactional_id}, Producer ID: #{@producer_id} (Epoch #{@producer_epoch})" @@ -159,7 +159,7 @@ def commit_transaction end unless @transaction_state.in_transaction? - raise 'Transaction is not valid to commit' + raise Kafka::InvalidTxnStateError, 'Transaction is not valid to commit' end @transaction_state.transition_to!(TransactionStateMachine::COMMITTING_TRANSACTION) @@ -192,7 +192,8 @@ def abort_transaction end unless @transaction_state.in_transaction? - raise 'Transaction is not valid to abort' + @logger.warn('Aborting transaction that was never opened on brokers') + return end @transaction_state.transition_to!(TransactionStateMachine::ABORTING_TRANSACTION) @@ -221,7 +222,7 @@ def send_offsets_to_txn(offsets:, group_id:) force_transactional! unless @transaction_state.in_transaction? - raise 'Transaction is not valid to send offsets' + raise Kafka::InvalidTxnStateError, 'Transaction is not valid to send offsets' end add_response = transaction_coordinator.add_offsets_to_txn( @@ -250,6 +251,10 @@ def error? @transaction_state.error? end + def ready? + @transaction_state.ready? + end + def close if in_transaction? @logger.warn("Aborting pending transaction ...") @@ -264,11 +269,11 @@ def close def force_transactional! unless transactional? - raise 'Please turn on transactional mode to use transaction' + raise Kafka::InvalidTxnStateError, 'Please turn on transactional mode to use transaction' end if @transactional_id.nil? || @transactional_id.empty? - raise 'Please provide a transaction_id to use transactional mode' + raise Kafka::InvalidTxnStateError, 'Please provide a transaction_id to use transactional mode' end end diff --git a/spec/transaction_manager_spec.rb b/spec/transaction_manager_spec.rb index 6e59c86ba..b0fe7ee4c 100644 --- a/spec/transaction_manager_spec.rb +++ b/spec/transaction_manager_spec.rb @@ -210,7 +210,7 @@ manager.add_partitions_to_transaction( 'hello' => [1, 2, 3] ) - end.to raise_error(/please turn on transactional mode/i) + end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i) end end end @@ -256,7 +256,7 @@ it 'raises exception' do expect do manager.init_transactions - end.to raise_error(/please turn on transactional mode/i) + end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i) end it 'changes state to error' do @@ -291,7 +291,7 @@ it 'raises exception' do expect do manager.begin_transaction - end.to raise_error(/transaction has already started/i) + end.to raise_error(Kafka::InvalidTxnStateError, /transaction has already started/i) end it 'changes state to error' do @@ -306,7 +306,7 @@ it 'raises exception' do expect do manager.begin_transaction - end.to raise_error(/transaction is not ready/i) + end.to raise_error(Kafka::InvalidTxnStateError, /transaction is not ready/i) end it 'changes state to error' do @@ -325,7 +325,7 @@ it 'raises exception' do expect do manager.begin_transaction - end.to raise_error(/please turn on transactional mode/i) + end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i) end it 'changes state to error' do @@ -401,17 +401,15 @@ manager.init_transactions end - it 'raises exception' do - expect do - manager.abort_transaction - end.to raise_error(/transaction is not valid to abort/i) + it 'does not raise an exception' do + expect { manager.abort_transaction }.not_to raise_error end - it 'changes state to error' do + it 'leaves transaction manager in ready state' do begin manager.abort_transaction rescue; end - expect(manager.error?).to eql(true) + expect(manager.ready?).to eql(true) end end @@ -423,7 +421,7 @@ it 'raises exception' do expect do manager.abort_transaction - end.to raise_error(/please turn on transactional mode/i) + end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i) end it 'changes state to error' do @@ -502,7 +500,7 @@ it 'raises exception' do expect do manager.commit_transaction - end.to raise_error(/transaction is not valid to commit/i) + end.to raise_error(Kafka::InvalidTxnStateError, /transaction is not valid to commit/i) end it 'changes state to error' do @@ -521,7 +519,7 @@ it 'raises exception' do expect do manager.commit_transaction - end.to raise_error(/please turn on transactional mode/i) + end.to raise_error(Kafka::InvalidTxnStateError, /please turn on transactional mode/i) end it 'changes state to error' do