Skip to content

Sending message from transactional Kafka listener: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION #645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
eugene-khyst opened this issue Apr 13, 2018 · 13 comments

Comments

@eugene-khyst
Copy link

eugene-khyst commented Apr 13, 2018

I'm trying to send a message from Kafka listener in the same transaction to have message sending and offset committing in the same transaction but receives an exception: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, and the transaction is rolled back.

I've added transaction support to my application (Spring Boot 2, Spring Kafka 2.1.4) using the following approach.

application.properties

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.group-id=transaction-sample
spring.kafka.listener.ack-mode=RECORD

spring.kafka.producer.transaction-id-prefix=transaction-sample-${random.uuid}

I've set kafkaTransactionManager to kafkaListenerContainerFactory containerProperties:

@SpringBootApplication
public class KafkaTransactionsSampleApplication {

  @Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
      ConsumerFactory<Object, Object> kafkaConsumerFactory,
      KafkaTransactionManager<Object, Object> kafkaTransactionManager) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    return factory;
  }

  public static void main(String[] args) {
    SpringApplication.run(KafkaTransactionsSampleApplication.class, args);
  }
}

Test listener should receive a message from input topic (transaction-sample-topic-in), log it, and forward it to the output topic (transaction-sample-topic-out). I want message sending and offset committing to be performed in the same transaction.

@Component
public class TestKafkaListener {

  private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaListener.class);

  public static final String INPUT_TEST_TOPIC = "transaction-sample-topic-in";
  public static final String OUTPUT_TEST_TOPIC = "transaction-sample-topic-out";

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  @KafkaListener(topics = INPUT_TEST_TOPIC)
  public void listen(ConsumerRecord<String, String> record) {
    LOGGER.info("Received Kafka record from {}: {}", INPUT_TEST_TOPIC, record);
    kafkaTemplate.send(OUTPUT_TEST_TOPIC, record.key(), record.value());
    LOGGER.info("Forwarded Kafka record to {}: {}", OUTPUT_TEST_TOPIC, record);
  }
}

In the tests, I've defined embedded Kafka with broker properties allowing to have transaction support with 1 broker instance.

@Bean
public KafkaEmbedded kafkaEmbedded() {
  KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, false, 1,
      INPUT_TEST_TOPIC, OUTPUT_TEST_TOPIC);
  kafkaEmbedded.setKafkaPorts(9092);
  kafkaEmbedded.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1");
  kafkaEmbedded.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");
  return kafkaEmbedded;
}

When I run a simple test that sends a message to the input topic I receive an exception: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION. The transaction is rolled back and the message is not forwarded to the output topic.

@TestPropertySource("classpath:test.properties")
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaListenerTest {

  @Autowired
  private KafkaEmbedded kafkaEmbedded;

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  @Test
  public void shouldProcessEvent() throws Exception {
    String testKey = "test_key";
    String testData = "test_data";

    kafkaTemplate.send(INPUT_TEST_TOPIC, testKey, testData);

    try (Consumer<String, String> consumer = createConsumer()) {
      kafkaEmbedded.consumeFromAnEmbeddedTopic(consumer, OUTPUT_TEST_TOPIC);
      ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
      Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
      ConsumerRecord<String, String> record = iterator.next();

      assertEquals(testKey, record.value());
      assertEquals(testData, record.value());
      assertFalse(iterator.hasNext());
    }
  }

  private Consumer<String, String> createConsumer() {
    Map<String, Object> consumerProps =
        KafkaTestUtils.consumerProps("test-consumer", "true", kafkaEmbedded);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
        consumerProps, new StringDeserializer(), new StringDeserializer());
    return cf.createConsumer();
  }
}

The exception is

2018-04-13 16:22:46.856 ERROR 141936 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId transaction-sample-209b149f-7f97-42f6-82e7-257e1ac0d1950: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
	at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:141) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137) ~[spring-tx-5.0.4.RELEASE.jar:5.0.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.innvokeRecordListenerInTx(KafkaMessageListenerContainer.java:949) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:929) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: org.apache.kafka.common.KafkaException: TransactionalId transaction-sample-209b149f-7f97-42f6-82e7-257e1ac0d1950: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:755) ~[kafka-clients-1.0.0.jar:na]
	at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:749) ~[kafka-clients-1.0.0.jar:na]
	at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215) ~[kafka-clients-1.0.0.jar:na]
	at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:564) ~[kafka-clients-1.0.0.jar:na]
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:285) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:60) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:126) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
	... 9 common frames omitted

I've created a sample application to reproduce the problem: https://github.com/evgeniy-khist/spring-kafka-transactions-sample

@eugene-khyst eugene-khyst changed the title Sending message from Kafka listener in transaction: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION Sending message from transactional Kafka listener: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION Apr 13, 2018
@garyrussell
Copy link
Contributor

garyrussell commented Apr 13, 2018

Your test send() needs to run in its own transaction:

kafkaTemplate.executeInTransaction(kt ->
    kt.send(INPUT_TEST_TOPIC, testKey, testData));

(or don't use a transactional template).

I also fixed your asserts:

assertEquals(testKey, record.key());
assertEquals(testData, record.value());

(they were both comparing the value).

@eugene-khyst
Copy link
Author

@garyrussell thank you. I've checked when test send() is done in its own transaction, everything works. It was not obvious that send in test should be in transaction if auto-configured KafkaTemplate is used. What is a proper way to get non-transactional template?

@garyrussell
Copy link
Contributor

You would need to define two producer factories and two templates (one with and one without a transaction id prefix).

I just issued a PR #647 to detect the condition and provide a meaningful exception.

@eklierka
Copy link

@garyrussell, did the test pass for you? I tried to run the test and assertion fails:

java.util.NoSuchElementException at org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52) at com.example.kafka.sample.transaction.KafkaListenerTest.shouldProcessEvent(KafkaListenerTest.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:73) at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:83) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

It seems that listener method doesn't consume the event.

I removed the transaction manager from the listener container and annotated TestKafkaListener.listen() with @Transactional("kafkaTransactionManager") and only then test passed. Is it the right behaviour?

@garyrussell
Copy link
Contributor

Yes, it works for me - I just ran it again...

screen shot 2018-04-18 at 8 50 58 am

package com.example.kafka.sample.transaction;

import static com.example.kafka.sample.transaction.listener.TestKafkaListener.INPUT_TEST_TOPIC;
import static com.example.kafka.sample.transaction.listener.TestKafkaListener.OUTPUT_TEST_TOPIC;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import java.util.Iterator;
import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

@TestPropertySource("classpath:test.properties")
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaListenerTest {

  @Autowired
  private KafkaEmbedded kafkaEmbedded;

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  @Test
  public void shouldProcessEvent() throws Exception {
    String testKey = "test_key";
    String testData = "test_data";

    kafkaTemplate.executeInTransaction(kt ->
    	kt.send(INPUT_TEST_TOPIC, testKey, testData));

    try (Consumer<String, String> consumer = createConsumer()) {
      kafkaEmbedded.consumeFromAnEmbeddedTopic(consumer, OUTPUT_TEST_TOPIC);
      ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
      Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
      ConsumerRecord<String, String> record = iterator.next();

      assertEquals(testKey, record.key());
      assertEquals(testData, record.value());
      assertFalse(iterator.hasNext());
    }
  }

  private Consumer<String, String> createConsumer() {
    Map<String, Object> consumerProps =
        KafkaTestUtils.consumerProps("test-consumer", "true", kafkaEmbedded);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
        consumerProps, new StringDeserializer(), new StringDeserializer());
    return cf.createConsumer();
  }
}

@eklierka
Copy link

Test fails on Windows but passes on Ubuntu... probably the issue is related to Windows

@artembilan
Copy link
Member

probably the issue is related to Windows

Probably the issue is related to the Apache Kafka which doesn't support Windows properly.

Yes, I develop on Windows as well and I really see some issues with Kafka very often, especially with transactions.

I think the issue comes out of the scope of this project.

You can try to raise issue against Apache Kafka and cross-link it here.

Thanks for understanding

@eugene-khyst
Copy link
Author

@eklierka this is a known Kafka issue: https://issues.apache.org/jira/browse/KAFKA-6052. According to the ticket, fix will be included in the Kafka version 1.1.1.

@garyrussell
Copy link
Contributor

Closing for now; we can reopen if this tuns out to be a spring-kafka issue.

@vspiliopoulos
Copy link

vspiliopoulos commented Jun 15, 2018

Hello @eklierka and all!

I tried exactly the above in Windows (spring-kafka-2.1.6.RELEASE and kafka 1.0.1) as well and the behavior I am seeing is that the TestKafkaListener.listen() receives the record successfully, but OUTPUT_TEST_TOPIC never received the record. Is this the issue you had on Windows?

From my logs:

2018-06-15 17:11:05.568 DEBUG 13512 --- [           main] o.s.kafka.test.rule.KafkaEmbedded        : partitions assigned: [transaction-sample-topic-out-0]
2018-06-15 17:11:05.568 DEBUG 13512 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager          : Initiating transaction commit
2018-06-15 17:11:05.592  INFO 13512 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [transaction-sample-topic-in-0]
2018-06-15 17:11:05.592 DEBUG 13512 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-06-15 17:11:05.592 DEBUG 13512 --- [           main] o.s.kafka.test.rule.KafkaEmbedded        : Subscription Initiated
2018-06-15 17:11:05.592 DEBUG 13512 --- [           main] o.s.kafka.test.utils.KafkaTestUtils      : Polling...
2018-06-15 17:11:05.646 DEBUG 13512 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-06-15 17:11:05.648 DEBUG 13512 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2018-06-15 17:11:05.648 DEBUG 13512 --- [ntainer#0-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1e959a17]]
2018-06-15 17:11:05.653 DEBUG 13512 --- [ntainer#0-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=test_data, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@456a69df, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test_key, kafka_receivedPartitionId=0, kafka_receivedTopic=transaction-sample-topic-in, kafka_receivedTimestamp=1529082665320}]]
2018-06-15 17:11:05.653  INFO 13512 --- [ntainer#0-0-C-1] c.e.q.t.s.k.listener.TestKafkaListener   : Received Kafka record from transaction-sample-topic-in: ConsumerRecord(topic = transaction-sample-topic-in, partition = 0, offset = 0, CreateTime = 1529082665320, serialized key size = 8, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = test_key, value = test_data)
2018-06-15 17:11:05.653  INFO 13512 --- [ntainer#0-0-C-1] c.e.q.t.s.k.listener.TestKafkaListener   : Forwarded Kafka record to transaction-sample-topic-out: ConsumerRecord(topic = transaction-sample-topic-in, partition = 0, offset = 0, CreateTime = 1529082665320, serialized key size = 8, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = test_key, value = test_data)
2018-06-15 17:11:05.653 DEBUG 13512 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {transaction-sample-topic-in-0=OffsetAndMetadata{offset=1, metadata=''}}
2018-06-15 17:12:05.593 DEBUG 13512 --- [           main] o.s.kafka.test.utils.KafkaTestUtils      : Received: 0, []

@artembilan
Copy link
Member

Yes, I'm observing some issue with Kafka transaction on Windows.
Mostly my thought is like there is some race condition around resource in Apache Kafka client per se.
Would be great if you escalate such an issue to Apache Kafka directly.

Thanks

@vspiliopoulos
Copy link

vspiliopoulos commented Jun 19, 2018

Thanks @artembilan,

It was indeed a windows related issue, as in my mac works fine!

@eugene-khyst
Copy link
Author

Hi @vspiliopoulos .
There is a bug in Kafka 1.0.0 https://issues.apache.org/jira/browse/KAFKA-6052
Due to this bug, Kafka transactions are not working on Windows.
So Kafka transactions sample can't be run on Windows.
The fix to the https://issues.apache.org/jira/browse/KAFKA-6052 is expected to be delivered in Kafka 1.1.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants