Skip to content

GH-666: Single global embedded Kafka with JUnit 5 #2308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 22, 2022

Conversation

artembilan
Copy link
Member

Fixes #666

  • Introduce a GlobalEmbeddedKafkaTestExecutionListener which is registered
    by the service loader from JUnit Platform
  • Make its activation conditional based on the spring.kafka.global.embedded.enabled
    system property
  • Expose some other configuration properties for the target global EmbeddedKafkaBroker
  • Verify its functionality via manual Launcher.execute()
  • Add more @DirtiesContext to some tests in the spring-kafka-test which don't close
    their embedded brokers on the exit

@artembilan
Copy link
Member Author

If this is OK, I'll go ahead with docs and perhaps some sample how to enable and configure this via Gradle which might be really a top interest from end-users.

(Perhaps the number of issue is so bad that it took for me a couple days to figure out how to test this 😢 😄 )

@garyrussell
Copy link
Contributor

Looks like your IDE is messing with Git again: 20bf523

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I can review this adequately because I am not familiar with the concepts. Perhaps we can ask @sbrannen to take a look?

@artembilan
Copy link
Member Author

I am not familiar with the concepts

No problem, Gary!
I'm in a chat with Sam, so we can see together from JUnit perspective.

From your side I just need 👍 about a general idea to have such a global embedded Kafka for the whole test plan as it is requested by users and as they demonstrate it via their custom Gradle plugins.

Re. Git commit: I think I had several original commits before pushing and probably when I wanted to push to PR as a single commit I accidentally included one of yours then rebased.
That's how your name could slip into the final commit message.
So, the probem not with an IDE, but someone's blind eyes between screen and keyboard. 😄

@garyrussell
Copy link
Contributor

From your side I just need...

Yes; I think it's a good idea.

Copy link
Member

@sbrannen sbrannen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the GlobalEmbeddedKafkaTestExecutionListener implementation looks pretty good.

I only noticed a few very minor issues that I commented on.

@@ -0,0 +1,3 @@
spring.kafka.embedded.count=2
spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this instead be spring.kafka.embedded.broker.properties.location?

Or is that a property not used by your TestExecutionListener?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by the broker itself (it tells the broker to update that system property - useful in Boot tests) - @artembilan I think we should add docs about this, especially for Boot users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Sorry, you just answered to Sam's question :shame:

But yeah, I have added more in docs anyway 😄

@artembilan
Copy link
Member Author

Thank you, @sbrannen , for consultation and review!
I have addressed all your concerns, unless I decided to not play with test suites since I still want it to pass properly in the IDE as well without extra configuration on the end-user side.
Perhaps eventually, when we come up without more similar sophisticated unit testing, we will come back to a suite approach and respective Developer Guide.

From here I guess we don't need to bother you any more and @garyrussell simple can continue to review the rest of the changes.

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sample-05 tests fail for me

[ERROR] Errors: 
[ERROR]   Sample05Application1Tests.testKafkaListener:49 » ConditionTimeout Condition with com.example.Sample05Application1Tests was not fulfilled within 10 seconds.
[ERROR]   Sample05Application2Tests.testKafkaTemplateSend:53 » Kafka Send failed
[INFO] 
[ERROR] Tests run: 2, Failures: 0, Errors: 2, Skipped: 0

More info for the second one (because it waits for the future)...

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic nonExistingTopic not present in metadata after 1000 ms.

@@ -0,0 +1,3 @@
spring.kafka.embedded.count=2
spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by the broker itself (it tells the broker to update that system property - useful in Boot tests) - @artembilan I think we should add docs about this, especially for Boot users.

@artembilan
Copy link
Member Author

The sample-05 tests fail for me

Sorry, I forgot to mention that you need to ./gradlew publishToMavenLocal the core project first: the sample is based on the content of this branch.

@garyrussell
Copy link
Contributor

I already did that - it won't compile otherwise.

@artembilan
Copy link
Member Author

This is used by the broker itself (it tells the broker to update that system property - useful in Boot tests)

Sorry, it is not clear what is your concern over here. This indeed was my intention to demonstrate how that property is populated by the embedded property and can be used by Spring Boot auto-configuration.

I think we should add docs about this, especially for Boot users.

We do have some mentioning about that setting in our docs: https://docs.spring.io/spring-kafka/docs/current/reference/html/#testing.
And we also have something in Spring Boot by itself: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.embedded
Nevertheless I will look shortly what else we can add into docs and explain.

Thanks

garyrussell and others added 6 commits June 22, 2022 15:54
Fixes spring-projects#666

* Introduce a `GlobalEmbeddedKafkaTestExecutionListener` which is registered
by the service loader from JUnit Platform
* Make its activation conditional based on the `spring.kafka.global.embedded.enabled`
system property
* Expose some other configuration properties for the target global `EmbeddedKafkaBroker`
* Verify its functionality via manual `Launcher.execute()`
* Add more `@DirtiesContext` to some tests in the `spring-kafka-test` which don't close
their embedded brokers on the exit
this is part of JUnit platform the test plan is going to be run
* Add missed new line in the end of `junit-platform.properties`
* Rethrow an exception from the local launcher in the `GlobalEmbeddedKafkaTestExecutionListenerTests`
to fail if its suite test classes have failed.
* Move system properties clean up in the `GlobalEmbeddedKafkaTestExecutionListenerTests` into `@AfterAll`
* Add docs for this new feature
* Add `sample-05` to demonstrate global embedded Kafka in action via Maven configuration
@garyrussell
Copy link
Contributor

Fixed the first one with spring.kafka.consumer.auto-offset-reset=earliest - the container started after the send completed.

@garyrussell
Copy link
Contributor

Of course, the second one fails -

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic nonExistingTopic not present in metadata after 1000 ms.

The future will be completed with the timeout exception.

@garyrussell
Copy link
Contributor

// May be an immediate failure
if (sendFuture.isDone()) {
try {
sendFuture.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted", e);
}
catch (ExecutionException e) {
throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
}
}

I'm getting that Send failed exception with ...

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic nonExistingTopic not present in metadata after 1000 ms.

@artembilan
Copy link
Member Author

I'm getting that Send failed exception with ...

That's correct. See JavaDocs of that Sample05Application2Tests:

/**
 * This test is going to fail from IDE since there is no exposed {@code spring.embedded.kafka.brokers} system property.
 * This test is deliberately failing to demonstrate that global embedded Kafka broker config for
 * {@code auto.create.topics.enable=false} is in an effect.
 * See {@code /resources/kafka-broker.properties} and Maven Surefire plugin configuration.
 */

@garyrussell
Copy link
Contributor

garyrussell commented Jun 22, 2022

Right, but it fails under maven as well; we should be asserting the root cause as being the timeout exception due to metadata not present.

This is the full stack trace:

[ERROR] com.example.Sample05Application2Tests.testKafkaTemplateSend  Time elapsed: 1.015 s  <<< ERROR!
org.springframework.kafka.KafkaException: Send failed
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:656)
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:415)
	at com.example.Sample05Application2Tests.testKafkaTemplateSend(Sample05Application2Tests.java:53)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.apache.maven.surefire.junitplatform.LazyLauncher.execute(LazyLauncher.java:55)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:223)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:175)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:139)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:456)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:169)
	at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:595)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:581)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic nonExistingTopic not present in metadata after 1000 ms.

@artembilan
Copy link
Member Author

but it fails under maven as well;

That's correct as well.
I say there in the JavaDocs This test is deliberately failing to demonstrate that global embedded Kafka broker config for {@code auto.create.topics.enable=false} is in an effect..

The sample is about showing a global embedded Kafka, not how to right tests.
The failing test and an output that global embedded Kafka is stopped in the end of plan is a good sign for us to be sure that we are on a right track.
The failing report is still a report.

I also say that explicitly in the README.adoc:

One of the tests is deliberately failing to demonstrate that global embedded Kafka broker config for auto.create.topics.enable=false is in an effect.

I'm not sure why you still insist that it has to pass...

* Add new lines into new files
* Add `spring.kafka.consumer.auto-offset-reset=earliest` to make a consumer to start from the beginning of the partition:
fixes a race condition when publishing happens before consumer is started
* Make `mvnw` as an executable
* Add `spring.embedded.kafka.brokers.property` explanation into docs
Comment on lines 50 to 56
@Test
void testKafkaTemplateSend() throws ExecutionException, InterruptedException, TimeoutException {
SendResult<String, String> sendResult =
this.kafkaTemplate.send("nonExistingTopic", "fake data").get(10, TimeUnit.SECONDS);

assertThat(sendResult).isNotNull();
}
Copy link
Contributor

@garyrussell garyrussell Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's ugly seeing all those errors on the command line - I think this is better:

Suggested change
@Test
void testKafkaTemplateSend() throws ExecutionException, InterruptedException, TimeoutException {
SendResult<String, String> sendResult =
this.kafkaTemplate.send("nonExistingTopic", "fake data").get(10, TimeUnit.SECONDS);
assertThat(sendResult).isNotNull();
}
@Test
void testKafkaTemplateSend() {
Throwable thrown = catchThrowable(() ->
this.kafkaTemplate.send("nonExistingTopic", "fake data").get(10, TimeUnit.SECONDS));
assertThatExceptionOfType(TimeoutException.class).isThrownBy(() -> {
throw thrown.getCause();
})
.withMessageContaining("Topic nonExistingTopic")
.withMessageContaining("metadata");
}

Copy link
Member Author

@artembilan artembilan Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😸 I still think my idea to demonstrate a robustness of our solution even if some tests are failing in between is the way to go for this sample.
On the other hand we just can remove this sample altogether: for me clean finish for the plan is OK even if the report is about failure.
The successful tests was really not a goal for me in this sample, but rather how global embedded Kafka works.
See also GlobalEmbeddedKafkaTestExecutionListenerTests: one if its tests is also deliberately failing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just don't like to see the big stack trace, and having multiple tests clearly demonstrates that there is only one broker; reducing to 1 test, we'll lose that.

Co-authored-by: Gary Russell <[email protected]>
* Remove `deliberately failing` sentence from the JavaDoc an README
* Clean up unused imports
@garyrussell garyrussell merged commit 0f69553 into spring-projects:main Jun 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Single shared KafkaEmbedded is all JUnit tests
3 participants