Skip to content

Commit 20bf523

Browse files
garyrussellartembilan
authored andcommitted
GH-666: Single global embedded Kafka with JUnit 5
Fixes #666 * Introduce a `GlobalEmbeddedKafkaTestExecutionListener` which is registered by the service loader from JUnit Platform * Make its activation conditional based on the `spring.kafka.global.embedded.enabled` system property * Expose some other configuration properties for the target global `EmbeddedKafkaBroker` * Verify its functionality via manual `Launcher.execute()` * Add more `@DirtiesContext` to some tests in the `spring-kafka-test` which don't close their embedded brokers on the exit
1 parent db617ec commit 20bf523

File tree

7 files changed

+286
-2
lines changed

7 files changed

+286
-2
lines changed

Diff for: build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ project ('spring-kafka-test') {
373373
api "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
374374
api "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
375375
api 'org.junit.jupiter:junit-jupiter-api'
376+
api 'org.junit.platform:junit-platform-launcher'
376377
optionalApi "org.hamcrest:hamcrest-core:$hamcrestVersion"
377378
optionalApi "org.mockito:mockito-core:$mockitoVersion"
378379
optionalApi ("junit:junit:$junit4Version") {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.test.junit;
18+
19+
import java.io.IOException;
20+
import java.io.UncheckedIOException;
21+
import java.util.Map;
22+
23+
import org.junit.platform.engine.ConfigurationParameters;
24+
import org.junit.platform.launcher.TestExecutionListener;
25+
import org.junit.platform.launcher.TestPlan;
26+
27+
import org.springframework.core.io.DefaultResourceLoader;
28+
import org.springframework.core.io.Resource;
29+
import org.springframework.core.io.support.PropertiesLoaderUtils;
30+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
31+
import org.springframework.util.StringUtils;
32+
33+
/**
34+
* The {@link TestExecutionListener} to start an {@link EmbeddedKafkaBroker}
35+
* in the beginning of the test plan and stop in the end.
36+
* This approach ensures one global Kafka cluster for all the unit tests to execute.
37+
* <p>
38+
* The {@link GlobalEmbeddedKafkaTestExecutionListener} is disabled by default.
39+
* Set {@link GlobalEmbeddedKafkaTestExecutionListener#LISTENER_ENABLED_PROPERTY_NAME}
40+
* system property (or respective {@link ConfigurationParameters#CONFIG_FILE_NAME} entry)
41+
* to enable it.
42+
*
43+
* @author Artem Bilan
44+
*
45+
* @since 3.0
46+
*/
47+
public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionListener {
48+
49+
50+
/**
51+
* Property name used to enable the {@code GlobalEmbeddedKafkaTestExecutionListener}.
52+
* The {@code GlobalEmbeddedKafkaTestExecutionListener} is registered automatically via
53+
* Java's {@link java.util.ServiceLoader} mechanism but disabled by default.
54+
* Set the value of this property to {@code true} to enable this listener.
55+
*/
56+
public static final String LISTENER_ENABLED_PROPERTY_NAME = "spring.kafka.global.embedded.enabled";
57+
58+
/**
59+
* The number of brokers for {@link EmbeddedKafkaBroker}.
60+
*/
61+
public static final String COUNT_PROPERTY_NAME = "spring.kafka.embedded.count";
62+
63+
/**
64+
* The topics to create on the embedded broker(s).
65+
*/
66+
public static final String TOPICS_PROPERTY_NAME = "spring.kafka.embedded.topics";
67+
68+
/**
69+
* The topics to create on the embedded broker(s).
70+
*/
71+
public static final String PORTS_PROPERTY_NAME = "spring.kafka.embedded.ports";
72+
73+
/**
74+
* The number of partitions on topics to create on the embedded broker(s).
75+
*/
76+
public static final String PARTITIONS_PROPERTY_NAME = "spring.kafka.embedded.partitions";
77+
78+
/**
79+
* The number of partitions on topics to create on the embedded broker(s).
80+
*/
81+
public static final String BROKER_PROPERTIES_LOCATION_PROPERTY_NAME =
82+
"spring.kafka.embedded.broker.properties.location";
83+
84+
private EmbeddedKafkaBroker embeddedKafkaBroker;
85+
86+
@Override
87+
public void testPlanExecutionStarted(TestPlan testPlan) {
88+
ConfigurationParameters configurationParameters = testPlan.getConfigurationParameters();
89+
boolean enabled = configurationParameters.getBoolean(LISTENER_ENABLED_PROPERTY_NAME).orElse(false);
90+
if (enabled) {
91+
Integer count = configurationParameters.get(COUNT_PROPERTY_NAME, Integer::parseInt).orElse(1);
92+
String[] topics =
93+
configurationParameters.get(TOPICS_PROPERTY_NAME, StringUtils::commaDelimitedListToStringArray)
94+
.orElse(null);
95+
Integer partitions = configurationParameters.get(PARTITIONS_PROPERTY_NAME, Integer::parseInt).orElse(2);
96+
Map<String, String> brokerProperties =
97+
configurationParameters.get(BROKER_PROPERTIES_LOCATION_PROPERTY_NAME, this::brokerProperties)
98+
.orElse(null);
99+
String brokerListProperty = configurationParameters.get(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY)
100+
.orElse(null);
101+
int[] ports =
102+
configurationParameters.get(PORTS_PROPERTY_NAME, this::ports)
103+
.orElse(new int[count]);
104+
105+
this.embeddedKafkaBroker =
106+
new EmbeddedKafkaBroker(count, false, partitions, topics)
107+
.brokerProperties(brokerProperties)
108+
.brokerListProperty(brokerListProperty)
109+
.kafkaPorts(ports);
110+
111+
this.embeddedKafkaBroker.afterPropertiesSet();
112+
}
113+
}
114+
115+
@SuppressWarnings({ "rawtypes", "unchecked" })
116+
private Map<String, String> brokerProperties(String propertiesLocation) {
117+
Resource propertiesResource = new DefaultResourceLoader().getResource(propertiesLocation);
118+
try {
119+
return (Map) PropertiesLoaderUtils.loadProperties(propertiesResource);
120+
}
121+
catch (IOException ex) {
122+
throw new UncheckedIOException(ex);
123+
}
124+
}
125+
126+
private int[] ports(String ports) {
127+
return StringUtils.commaDelimitedListToSet(ports)
128+
.stream()
129+
.mapToInt(Integer::parseInt)
130+
.toArray();
131+
}
132+
133+
@Override
134+
public void testPlanExecutionFinished(TestPlan testPlan) {
135+
if (this.embeddedKafkaBroker != null) {
136+
this.embeddedKafkaBroker.destroy();
137+
}
138+
}
139+
140+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.springframework.kafka.test.junit.GlobalEmbeddedKafkaTestExecutionListener

Diff for: spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/WithSpringTestContextTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import org.springframework.context.annotation.Configuration;
2525
import org.springframework.kafka.test.EmbeddedKafkaBroker;
2626
import org.springframework.kafka.test.context.EmbeddedKafka;
27+
import org.springframework.test.annotation.DirtiesContext;
2728
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
2829

2930
/**
@@ -33,6 +34,7 @@
3334
*/
3435
@SpringJUnitConfig
3536
@EmbeddedKafka
37+
@DirtiesContext
3638
public class WithSpringTestContextTests {
3739

3840
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.test.junit;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.io.BufferedOutputStream;
22+
import java.io.File;
23+
import java.io.FileOutputStream;
24+
import java.io.IOException;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Properties;
28+
import java.util.Set;
29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.TimeoutException;
32+
33+
import org.apache.kafka.clients.admin.AdminClient;
34+
import org.apache.kafka.clients.admin.AdminClientConfig;
35+
import org.apache.kafka.clients.producer.KafkaProducer;
36+
import org.apache.kafka.clients.producer.ProducerConfig;
37+
import org.apache.kafka.clients.producer.ProducerRecord;
38+
import org.apache.kafka.common.serialization.StringSerializer;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
41+
import org.junit.platform.engine.discovery.DiscoverySelectors;
42+
import org.junit.platform.launcher.core.LauncherDiscoveryRequestBuilder;
43+
import org.junit.platform.launcher.core.LauncherFactory;
44+
import org.junit.platform.launcher.listeners.SummaryGeneratingListener;
45+
46+
import org.springframework.util.DefaultPropertiesPersister;
47+
48+
/**
49+
* @author Artem Bilan
50+
*
51+
* @since 3.0
52+
*/
53+
public class GlobalEmbeddedKafkaTestExecutionListenerTests {
54+
55+
@Test
56+
void testGlobalEmbeddedKafkaTestExecutionListener() throws IOException {
57+
System.setProperty(GlobalEmbeddedKafkaTestExecutionListener.LISTENER_ENABLED_PROPERTY_NAME, "true");
58+
59+
var brokerProperties = new Properties();
60+
brokerProperties.setProperty("auto.create.topics.enable", "false");
61+
62+
var propertiesFile = File.createTempFile("kafka-broker", ".properties");
63+
64+
try (var outputStream = new BufferedOutputStream(new FileOutputStream(propertiesFile))) {
65+
new DefaultPropertiesPersister().store(brokerProperties, outputStream, "Last entry");
66+
}
67+
68+
System.setProperty(GlobalEmbeddedKafkaTestExecutionListener.BROKER_PROPERTIES_LOCATION_PROPERTY_NAME,
69+
"file:/" + propertiesFile.getAbsolutePath());
70+
71+
var discoveryRequest =
72+
LauncherDiscoveryRequestBuilder.request()
73+
.selectors(DiscoverySelectors.selectClass(TestClass1.class),
74+
DiscoverySelectors.selectClass(TestClass2.class))
75+
.build();
76+
77+
var summaryGeneratingListener = new SummaryGeneratingListener();
78+
LauncherFactory.create().execute(discoveryRequest, summaryGeneratingListener);
79+
80+
var summary = summaryGeneratingListener.getSummary();
81+
assertThat(summary.getTestsStartedCount()).isEqualTo(2);
82+
assertThat(summary.getTestsSucceededCount()).isEqualTo(1);
83+
assertThat(summary.getTestsFailedCount()).isEqualTo(1);
84+
85+
System.clearProperty(GlobalEmbeddedKafkaTestExecutionListener.LISTENER_ENABLED_PROPERTY_NAME);
86+
System.clearProperty(GlobalEmbeddedKafkaTestExecutionListener.BROKER_PROPERTIES_LOCATION_PROPERTY_NAME);
87+
}
88+
89+
@EnabledIfSystemProperty(named = GlobalEmbeddedKafkaTestExecutionListener.LISTENER_ENABLED_PROPERTY_NAME,
90+
matches = "true")
91+
static class TestClass1 {
92+
93+
@Test
94+
void testDescribeTopic() throws ExecutionException, InterruptedException, TimeoutException {
95+
Map<String, Object> adminConfigs =
96+
Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
97+
System.getProperty("spring.kafka.bootstrap-servers"));
98+
try (var admin = AdminClient.create(adminConfigs)) {
99+
var topicsMap =
100+
admin.describeTopics(Set.of("topic1", "topic2"))
101+
.allTopicNames()
102+
.get(10, TimeUnit.SECONDS);
103+
104+
assertThat(topicsMap).containsOnlyKeys("topic1", "topic2");
105+
}
106+
}
107+
108+
}
109+
110+
@EnabledIfSystemProperty(named = GlobalEmbeddedKafkaTestExecutionListener.LISTENER_ENABLED_PROPERTY_NAME,
111+
matches = "true")
112+
static class TestClass2 {
113+
114+
@Test
115+
void testCannotAutoCreateTopic() throws ExecutionException, InterruptedException, TimeoutException {
116+
Map<String, Object> producerConfigs = new HashMap<>();
117+
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
118+
System.getProperty("spring.kafka.bootstrap-servers"));
119+
producerConfigs.put(ProducerConfig.RETRIES_CONFIG, 1);
120+
producerConfigs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1);
121+
producerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10);
122+
123+
StringSerializer serializer = new StringSerializer();
124+
try (var kafkaProducer = new KafkaProducer<>(producerConfigs, serializer, serializer)) {
125+
var recordMetadata =
126+
kafkaProducer.send(new ProducerRecord<>("nonExistingTopic", "testValue"))
127+
.get(10, TimeUnit.SECONDS);
128+
129+
assertThat(recordMetadata).isNotNull();
130+
}
131+
}
132+
133+
}
134+
135+
}

Diff for: spring-kafka-test/src/test/java/org/springframework/kafka/test/rule/AddressableEmbeddedBrokerTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
import org.springframework.context.annotation.Configuration;
3737
import org.springframework.kafka.test.EmbeddedKafkaBroker;
3838
import org.springframework.kafka.test.utils.KafkaTestUtils;
39+
import org.springframework.test.annotation.DirtiesContext;
3940
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4041

4142
/**
@@ -48,6 +49,7 @@
4849
*
4950
*/
5051
@SpringJUnitConfig
52+
@DirtiesContext
5153
public class AddressableEmbeddedBrokerTests {
5254

5355
private static final String TEST_EMBEDDED = "testEmbedded";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
spring.kafka.embedded.count=2
2+
spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
3+
spring.kafka.embedded.topics=topic1,topic2

0 commit comments

Comments
 (0)