Skip to content

Single shared KafkaEmbedded is all JUnit tests #666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
eugene-khyst opened this issue May 3, 2018 · 15 comments · Fixed by #2308
Closed

Single shared KafkaEmbedded is all JUnit tests #666

eugene-khyst opened this issue May 3, 2018 · 15 comments · Fixed by #2308

Comments

@eugene-khyst
Copy link

It seems like there is no convenient way to define single shared KafkaEmbedded that will be used in all JUnit tests.

There might be multiple tests that require KafkaEmbedded.
When @KafkaEmbedded is used with @ClassRule, Kafka server is started and stopped for every test class.
It may take a lot of time. Better to have embedded Kafka server started only once for all test classes.

Straightforward approach is to add to test sources configuration class defining a KafkaEmbedded bean:

@Configuration
public class KafkaEmbeddedConfig {

  @Bean
  public KafkaEmbedded kafkaEmbedded() {
    KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, false, 1, "test_topic");
    kafkaEmbedded.setKafkaPorts(9092);
    return kafkaEmbedded;
  }
}

The problem is that there might tests creating multiple test contexts:

@TestPropertySource("classpath:test.properties")
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaListenerTest {
...

and

@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureMockMvc
public class RestControllerTest {
...

In this example Kafka server will be started twice. Second executed test class will fail with the exception

Caused by: kafka.common.KafkaException: Socket server failed to bind to localhost:9092: Address already in use.

if port is specified or new Kafka server will be started if random port is used.

Moreover, if @DirtiesContext is used, together with the refresh of the test context Kafka server will be restarted.

As a workaround KafkaEmbedded can be defined as a constant in some base test class that test classes will extend:

@ActiveProfiles("kafka")
public class KafkaTestBase {

  public static final KafkaEmbedded KAFKA_EMBEDDED = createKafkaEmbedded();

  private static KafkaEmbedded createKafkaEmbedded() {
    AnnotationConfigApplicationContext context =
        new AnnotationConfigApplicationContext(KafkaEmbeddedConfig.class);
    KafkaEmbedded kafkaEmbedded = context.getBean(KafkaEmbedded.class);
    Runtime.getRuntime().addShutdownHook(new Thread(context::close));
    return kafkaEmbedded;
  }
}
@TestPropertySource("classpath:test.properties")
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaListenerTest extends KafkaTestBase {
...

This way Kafka server will be started in a separate Spring application context that will not be affected by @DirtiesContext or multiple test contexts (due to the usage of @AutoConfigureMockMvc etc.).

Can such approach be considered a normal solution or a workaround?
Are there any more efficient ways to have single shared KafkaEmbedded instance across all JUnit tests?

Sample:

@artembilan
Copy link
Member

How about this: https://docs.spring.io/spring-kafka/docs/2.1.5.RELEASE/reference/html/_reference.html#__embeddedkafka_annotation ?

Although that sounds like you just need a fully blown external Kafka running and that's all.

@eugene-khyst
Copy link
Author

eugene-khyst commented May 3, 2018

@artembilan thank you for suggesting @EmbeddedKafka.
On the first look everything works with @EmbeddedKafka.
There is one disadvantage. When I have multiple tests requiring Kafka, annotation with the list of all topics (@EmbeddedKafka(topics = {"test_topic", "test_topic2"})) have to be duplicated.
Sample (branch embedded-kafka-annotation): https://github.com/evgeniy-khist/spring-kafka-embedded-sample/tree/embedded-kafka-annotation

Of course, it is possible to run standalone Kafka server for tests but it is not convenient compared to embedded one.

@garyrussell
Copy link
Contributor

You can use a test suite and put the embedded kafka broker there.

It's generally not a good idea to use a fixed port in tests since that port might be in use elsewhere on the server; this is particularly a problem on CI servers which might have multiple builds running.

However, since it's best to define the topics to the embedded server so that they are created before the tests start, the suite would have to be configured with all the required topics for all the tests.

@eugene-khyst
Copy link
Author

@garyrussell thank you for the answer.
A JUnit test suite was the first idea that came to my mind. But when a test suite is used Kafka-related tests can only be executed from the central class (suite class). It's not an optimal solution when there are a lot of tests using Kafka.

Hardcoded port in the sample is used for simplicity. Port can be specified as a property:

@Value("${embedded.kafka.port:9092}")
private int port;

@Bean
public KafkaEmbedded kafkaEmbedded() {
  KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, false, 1, "test_topic");
  kafkaEmbedded.setKafkaPorts(port);
  return kafkaEmbedded;
}

To avoid problems on CI with the port conflicts, a random port can be passed as a property:

./gradlew test -Dembedded.kafka.port=<RANDOM PORT>

Usage of a fixed port makes sure that only 1 instance of Kafka server is started.

To use random port chosen by Spring either @EmbeddedKafka or @ClassRule has to be used to have the property spring.embedded.kafka.brokers populated before test context will be started.
If KafkaEmbedded defined as a @Bean in a @Configuration, the property spring.embedded.kafka.brokers will be not be resolved (test context will need to resolve it before it will be set).

@eugene-khyst
Copy link
Author

eugene-khyst commented May 4, 2018

@artembilan regarding @EmbeddedKafka.

This annotation with the list of all topics has to be added to every class that requires Kafka.
When there are a lot of topics it's a big code duplication. The solution might be to create base Kafka test class, a single place where @EmbeddedKafka will be added:

@EmbeddedKafka(topics = {"test_topic", "test_topic2"})
public class KafkaTestBase {

  @Autowired
  protected KafkaEmbedded kafkaEmbedded;

}

The problem is that with @EmbeddedKafka in the base class or each test class, second Kafka test class is failing from time to time. Messages are not delivered to the second listener when all tests executed in the group while passing when executed individually. The problem is floating.

  1. https://github.com/evgeniy-khist/spring-kafka-embedded-sample/tree/embedded-kafka-annotation
  2. https://github.com/evgeniy-khist/spring-kafka-embedded-sample/tree/embedded-kafka-annotation-in-base-test-class (with base tests class there are more failures than successful build but maybe just coincidence).

Am I using @EmbeddedKafka in a wrong way?

@artembilan
Copy link
Member

Well, I think there is just no solution for you on the matter.
Even caching applicationContext in Spring Test Framework is based on the same set of configurations.

I may suggest you to start a daemon application with such an KafkaEmbedded bean. Essentially it is really Kafka per se. It is expose a particular system property:

System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());

To be used in target test configurations:

@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;

@Bean
public Map<String, Object> producerConfigs() {
	return KafkaTestUtils.senderProps(this.brokerAddresses);
}

@Bean
public ProducerFactory<Integer, String> producerFactory() {
	return new DefaultKafkaProducerFactory<>(producerConfigs());
}

Not sure when ad how to kill such a daemon, but looks like Gradle team checks periodically their daemon activity and eventually it kills itself.

Our practice is like do not cache resources between tests - performance in favor of stability.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 4, 2018
Resolves spring-projects#666

Allow a single `KafkaEmbedded` to be used across multiple test classes.
@garyrussell
Copy link
Contributor

@evgeniy-khist The attached PR seems to fit the bill.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 5, 2018
Resolves spring-projects#666

Allow arbitrary `AdminClient` operations and adding topics.

Polishing - PR Comments
@eugene-khyst
Copy link
Author

@garyrussell A section in the testing.adoc in the attached PR answers my question. Thanks!

artembilan pushed a commit that referenced this issue May 7, 2018
Resolves #666

Allow arbitrary `AdminClient` operations and adding topics.

Polishing - PR Comments
artembilan pushed a commit that referenced this issue May 7, 2018
Resolves #666

Allow arbitrary `AdminClient` operations and adding topics.

Polishing - PR Comments
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Resolves spring-projects/spring-kafka#666

Allow arbitrary `AdminClient` operations and adding topics.

Polishing - PR Comments
@ashishkujoy
Copy link

Maybe an overkill, I was facing the same issue, I had created one gradle plugin to manage embedded Kafka for my tests.
https://github.com/ashishkujoy/gradle-embedded-kafka-plugin

@artembilan
Copy link
Member

Hi @ashishkujoy !

Thank you for sharing that with us. Really cool feature.

Looks like your solution is based on this project https://github.com/embeddedkafka/embedded-kafka, so it might be better to contribute your plugin back there.

The only problem that this is limited to Gradle and if you run unit tests from IDE, you still need to come up with some other solution.
And from here we really prefer Testcontainers where Docker environment is must have these days.

Nevertheless it is already out of this project scope.

@artembilan
Copy link
Member

I found another solution which comes from JUnit 5 Platform Launcher - the TestExecutionListener:

public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionListener {

	private EmbeddedKafkaBroker embeddedKafkaBroker;

	@Override
	public void testPlanExecutionStarted(TestPlan testPlan) {
		ConfigurationParameters configurationParameters = testPlan.getConfigurationParameters();
		Integer count = configurationParameters.get("spring.kafka.embedded.count", Integer::parseInt).orElse(1);
		this.embeddedKafkaBroker = new EmbeddedKafkaBroker(count);
		this.embeddedKafkaBroker.afterPropertiesSet();
	}

	@Override
	public void testPlanExecutionFinished(TestPlan testPlan) {
		this.embeddedKafkaBroker.destroy();
	}

}

Then you register it as a Java’s ServiceLoader in the src\test\resources\META-INF\services\org.junit.platform.launcher.TestExecutionListener of your project and add a org.junit.platform:junit-platform-launcher dependency.
It can be customized respectively via provided TestPlan.getConfigurationParameters(). Those parameters can be configured in the junit-platform.properties file in the classpath - in the same src\test\resources dir, for example.
The property spring.kafka.embedded.count I chose randomly.

We may add this as an out-of-the-box solution with some external spring.kafka.global.embedded.enabled which can be set as an ENV var or system property from Maven/Gradle.
Perhaps some set of configuration properties also can be exposed to customize such a plan execution.

See more info in JUnit 5 docs: https://junit.org/junit5/docs/current/user-guide/#launcher-api-listeners-custom

@ashishkujoy
Copy link

ashishkujoy commented Jun 8, 2022

Thanks, @artembilan, for letting me know about this,

artembilan pushed a commit to artembilan/spring-kafka that referenced this issue Jun 15, 2022
Fixes spring-projects#666

* Introduce a `GlobalEmbeddedKafkaTestExecutionListener` which is registered
by the service loader from JUnit Platform
* Make its activation conditional based on the `spring.kafka.global.embedded.enabled`
system property
* Expose some other configuration properties for the target global `EmbeddedKafkaBroker`
* Verify its functionality via manual `Launcher.execute()`
* Add more `@DirtiesContext` to some tests in the `spring-kafka-test` which don't close
their embedded brokers on the exit
@artembilan
Copy link
Member

I have decided to go ahead and introduce the mentioned listener as a framework component.
See the linked PR: #2308

@artembilan artembilan reopened this Jun 15, 2022
artembilan pushed a commit to artembilan/spring-kafka that referenced this issue Jun 15, 2022
Fixes spring-projects#666

* Introduce a `GlobalEmbeddedKafkaTestExecutionListener` which is registered
by the service loader from JUnit Platform
* Make its activation conditional based on the `spring.kafka.global.embedded.enabled`
system property
* Expose some other configuration properties for the target global `EmbeddedKafkaBroker`
* Verify its functionality via manual `Launcher.execute()`
* Add more `@DirtiesContext` to some tests in the `spring-kafka-test` which don't close
their embedded brokers on the exit
artembilan pushed a commit to artembilan/spring-kafka that referenced this issue Jun 22, 2022
Fixes spring-projects#666

* Introduce a `GlobalEmbeddedKafkaTestExecutionListener` which is registered
by the service loader from JUnit Platform
* Make its activation conditional based on the `spring.kafka.global.embedded.enabled`
system property
* Expose some other configuration properties for the target global `EmbeddedKafkaBroker`
* Verify its functionality via manual `Launcher.execute()`
* Add more `@DirtiesContext` to some tests in the `spring-kafka-test` which don't close
their embedded brokers on the exit
garyrussell added a commit that referenced this issue Jun 22, 2022
* GH-666: Single global embedded Kafka with JUnit 5

Fixes #666

* Introduce a `GlobalEmbeddedKafkaTestExecutionListener` which is registered
by the service loader from JUnit Platform
* Make its activation conditional based on the `spring.kafka.global.embedded.enabled`
system property
* Expose some other configuration properties for the target global `EmbeddedKafkaBroker`
* Verify its functionality via manual `Launcher.execute()`
* Add more `@DirtiesContext` to some tests in the `spring-kafka-test` which don't close
their embedded brokers on the exit

* * Print local launcher summary into SOUT for tracing the nested problems

* * Fix prefix for `file:` resource in the test

* * Make `junit-platform-launcher` as provided dep -
this is part of JUnit platform the test plan is going to be run
* Add missed new line in the end of `junit-platform.properties`
* Rethrow an exception from the local launcher in the `GlobalEmbeddedKafkaTestExecutionListenerTests`
to fail if its suite test classes have failed.

* * Fix JavaDocs for `GlobalEmbeddedKafkaTestExecutionListener` constants
* Move system properties clean up in the `GlobalEmbeddedKafkaTestExecutionListenerTests` into `@AfterAll`
* Add docs for this new feature
* Add `sample-05` to demonstrate global embedded Kafka in action via Maven configuration

* * Mention `sample-05` in the common `README.adoc` for samples

* * Fix language in the `testing.adoc`
* Add new lines into new files
* Add `spring.kafka.consumer.auto-offset-reset=earliest` to make a consumer to start from the beginning of the partition:
fixes a race condition when publishing happens before consumer is started
* Make `mvnw` as an executable
* Add `spring.embedded.kafka.brokers.property` explanation into docs

* * Fix language in docs

Co-authored-by: Gary Russell <[email protected]>

* * Make `Sample05Application2Tests` passing
* Remove `deliberately failing` sentence from the JavaDoc an README
* Clean up unused imports

Co-authored-by: Gary Russell <[email protected]>
@estigma88
Copy link

estigma88 commented Jun 20, 2024

Facing the same challenge, I was checking the GlobalEmbeddedKafkaTestExecutionListener class, but I didn't find a way to access the EmbeddedKafkaBroker instance. I need to do something as follows:

val consumerProps: MutableMap<String, Any> =  KafkaTestUtils.consumerProps(name, "true", kafka)

consumerProps[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"

val cf: ConsumerFactory<String, String> = DefaultKafkaConsumerFactory(consumerProps, StringDeserializer(),           StringDeserializer())

val consumer: Consumer<String, String> = cf.createConsumer()

kafka.consumeFromAnEmbeddedTopic(consumer, topic)

Where kafka is the EmbeddedKafkaBroker instance.

Any advice would be appreciated.

Btw, I am using Spring Boot.

@artembilan
Copy link
Member

@estigma88 ,

This issue is old enough and resolved some way in mutual agreement.
It might be great if you raise your concern as new Stackoverflow question.
Just because I don't see relevance between your request and what we have done in this issue.

Now about the problem.
The GlobalEmbeddedKafkaTestExecutionListener is a JUnit feature. It has nothing (and knows nothing) about Spring.
Therefore when JUnit engine is started, this listener is triggered and all the test (Spring or not) might have access to embedded Kafka broker via regular Kafka Client connection.

I understand the convenience of the consumeFromAnEmbeddedTopic() API, but that is not available for when we use GlobalEmbeddedKafkaTestExecutionListener and cannot be because GlobalEmbeddedKafkaTestExecutionListener is not Spring environment component.
You may consider some other way: https://docs.spring.io/spring-kafka/reference/testing.html#same-broker-multiple-tests.

Or just design your tests if you would deal with real Kafka broker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants