Skip to content

Commit ab17e3d

Browse files
committed
GH-3658: Fix @EmbeddedKafka for adminTimeout resolution (#3664)
Fixes: #3658 Issue link: #3658 When `@EmbeddedKafka` is used with Spring context, the `adminTimeout` is not resolved. Apparently when `adminTimeout` was introduced, it was covered only by the `EmbeddedKafkaCondition`. * Extract `EmbeddedKafkaBrokerFactory` to encapsulate an `EmbeddedKafkaBroker` creation logic (including the mentioned `adminTimeout`) * Replace the logic in the `EmbeddedKafkaCondition` and `EmbeddedKafkaContextCustomizer` with that new `EmbeddedKafkaBrokerFactory`, essentially, introducing a single place of truth. * Pull `adminTimeout(int)` property to the `EmbeddedKafkaBroker` interface, making the logic in the `EmbeddedKafkaBrokerFactory` simpler * Add `adminTimeout` attribute verification into tests for condition, as well as Spring-based # Conflicts: # spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java # spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java
1 parent 88026ed commit ab17e3d

File tree

8 files changed

+179
-182
lines changed

8 files changed

+179
-182
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

+8
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ default void afterPropertiesSet() {
8989
*/
9090
EmbeddedKafkaBroker brokerListProperty(String brokerListProperty);
9191

92+
/**
93+
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
94+
* @param adminTimeout the timeout.
95+
* @return the {@link EmbeddedKafkaBroker}
96+
* @since 2.8.5
97+
*/
98+
EmbeddedKafkaBroker adminTimeout(int adminTimeout);
99+
92100
/**
93101
* Get the bootstrap server addresses as a String.
94102
* @return the bootstrap servers.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.test;
18+
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.io.StringReader;
22+
import java.util.Arrays;
23+
import java.util.Map;
24+
import java.util.Properties;
25+
import java.util.function.Function;
26+
27+
import org.springframework.core.io.Resource;
28+
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
29+
import org.springframework.kafka.test.context.EmbeddedKafka;
30+
import org.springframework.util.StringUtils;
31+
32+
/**
33+
* The factory to encapsulate an {@link EmbeddedKafkaBroker} creation logic.
34+
*
35+
* @author Artem Bilan
36+
*
37+
* @since 3.2.6
38+
*/
39+
public final class EmbeddedKafkaBrokerFactory {
40+
41+
private static final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor";
42+
43+
/**
44+
* Create an {@link EmbeddedKafkaBroker} based on the {@code EmbeddedKafka} annotation.
45+
* @param embeddedKafka the {@code EmbeddedKafka} annotation.
46+
* @return a new {@link EmbeddedKafkaBroker} instance.
47+
*/
48+
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka) {
49+
return create(embeddedKafka, Function.identity());
50+
}
51+
52+
/**
53+
* Create an {@link EmbeddedKafkaBroker} based on the {@code EmbeddedKafka} annotation.
54+
* @param embeddedKafka the {@code EmbeddedKafka} annotation.
55+
* @param propertyResolver the {@link Function} for placeholders in the annotation attributes.
56+
* @return a new {@link EmbeddedKafkaBroker} instance.
57+
*/
58+
@SuppressWarnings("unchecked")
59+
public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function<String, String> propertyResolver) {
60+
String[] topics =
61+
Arrays.stream(embeddedKafka.topics())
62+
.map(propertyResolver)
63+
.toArray(String[]::new);
64+
65+
EmbeddedKafkaBroker embeddedKafkaBroker;
66+
if (embeddedKafka.kraft()) {
67+
embeddedKafkaBroker = kraftBroker(embeddedKafka, topics);
68+
}
69+
else {
70+
embeddedKafkaBroker = zkBroker(embeddedKafka, topics);
71+
}
72+
int[] ports = setupPorts(embeddedKafka);
73+
74+
embeddedKafkaBroker.kafkaPorts(ports)
75+
.adminTimeout(embeddedKafka.adminTimeout());
76+
77+
Properties properties = new Properties();
78+
79+
for (String pair : embeddedKafka.brokerProperties()) {
80+
if (!StringUtils.hasText(pair)) {
81+
continue;
82+
}
83+
try {
84+
properties.load(new StringReader(propertyResolver.apply(pair)));
85+
}
86+
catch (Exception ex) {
87+
throw new IllegalStateException("Failed to load broker property from [" + pair + "]", ex);
88+
}
89+
}
90+
91+
String brokerPropertiesLocation = embeddedKafka.brokerPropertiesLocation();
92+
if (StringUtils.hasText(brokerPropertiesLocation)) {
93+
String propertiesLocation = propertyResolver.apply(brokerPropertiesLocation);
94+
Resource propertiesResource = new PathMatchingResourcePatternResolver().getResource(propertiesLocation);
95+
if (!propertiesResource.exists()) {
96+
throw new IllegalStateException(
97+
"Failed to load broker properties from [" + propertiesResource + "]: resource does not exist.");
98+
}
99+
try (InputStream in = propertiesResource.getInputStream()) {
100+
Properties p = new Properties();
101+
p.load(in);
102+
p.forEach((key, value) -> properties.putIfAbsent(key, propertyResolver.apply((String) value)));
103+
}
104+
catch (IOException ex) {
105+
throw new IllegalStateException("Failed to load broker properties from [" + propertiesResource + "]", ex);
106+
}
107+
}
108+
109+
properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR,
110+
String.valueOf(Math.min(3, embeddedKafka.count())));
111+
112+
embeddedKafkaBroker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
113+
String bootstrapServersProperty = embeddedKafka.bootstrapServersProperty();
114+
if (StringUtils.hasText(bootstrapServersProperty)) {
115+
embeddedKafkaBroker.brokerListProperty(bootstrapServersProperty);
116+
}
117+
118+
// Safe to start an embedded broker eagerly before context refresh
119+
embeddedKafkaBroker.afterPropertiesSet();
120+
121+
return embeddedKafkaBroker;
122+
}
123+
124+
private static int[] setupPorts(EmbeddedKafka embedded) {
125+
int[] ports = embedded.ports();
126+
if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) {
127+
ports = new int[embedded.count()];
128+
}
129+
return ports;
130+
}
131+
132+
private static EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, String[] topics) {
133+
return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), topics);
134+
}
135+
136+
private static EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, String[] topics) {
137+
return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(), embedded.partitions(), topics)
138+
.zkPort(embedded.zookeeperPort())
139+
.zkConnectionTimeout(embedded.zkConnectionTimeout())
140+
.zkSessionTimeout(embedded.zkSessionTimeout());
141+
}
142+
143+
private EmbeddedKafkaBrokerFactory() {
144+
}
145+
146+
}

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,7 @@ public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
173173
return this;
174174
}
175175

176-
/**
177-
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
178-
* @param adminTimeout the timeout.
179-
* @return the {@link EmbeddedKafkaKraftBroker}
180-
* @since 2.8.5
181-
*/
176+
@Override
182177
public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
183178
this.adminTimeout = Duration.ofSeconds(adminTimeout);
184179
return this;

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,7 @@ public void setZkPort(int zkPort) {
246246
this.zkPort = zkPort;
247247
}
248248

249-
/**
250-
* Set the timeout in seconds for admin operations (e.g. topic creation, close).
251-
* @param adminTimeout the timeout.
252-
* @return the {@link EmbeddedKafkaBroker}
253-
* @since 2.8.5
254-
*/
249+
@Override
255250
public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
256251
this.adminTimeout = Duration.ofSeconds(adminTimeout);
257252
return this;

spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java

+10-86
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,9 @@
1616

1717
package org.springframework.kafka.test.condition;
1818

19-
import java.io.IOException;
20-
import java.io.InputStream;
21-
import java.io.StringReader;
2219
import java.lang.reflect.AnnotatedElement;
2320
import java.util.Arrays;
24-
import java.util.Map;
2521
import java.util.Optional;
26-
import java.util.Properties;
2722

2823
import org.junit.jupiter.api.extension.AfterAllCallback;
2924
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
@@ -37,15 +32,11 @@
3732
import org.junit.jupiter.api.extension.ParameterResolver;
3833

3934
import org.springframework.core.annotation.AnnotatedElementUtils;
40-
import org.springframework.core.io.Resource;
41-
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
4235
import org.springframework.kafka.test.EmbeddedKafkaBroker;
43-
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
44-
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
36+
import org.springframework.kafka.test.EmbeddedKafkaBrokerFactory;
4537
import org.springframework.kafka.test.context.EmbeddedKafka;
4638
import org.springframework.test.context.junit.jupiter.SpringExtension;
4739
import org.springframework.util.Assert;
48-
import org.springframework.util.StringUtils;
4940

5041
/**
5142
* JUnit5 condition for an embedded broker.
@@ -117,89 +108,22 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
117108
private boolean springTestContext(AnnotatedElement annotatedElement) {
118109
return AnnotatedElementUtils.findAllMergedAnnotations(annotatedElement, ExtendWith.class)
119110
.stream()
120-
.filter(extended -> Arrays.asList(extended.value()).contains(SpringExtension.class))
121-
.findFirst()
122-
.isPresent();
111+
.map(ExtendWith::value)
112+
.flatMap(Arrays::stream)
113+
.anyMatch(SpringExtension.class::isAssignableFrom);
123114
}
124115

125-
@SuppressWarnings("unchecked")
126116
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
127-
int[] ports = setupPorts(embedded);
128-
EmbeddedKafkaBroker broker;
129-
if (embedded.kraft()) {
130-
broker = kraftBroker(embedded, ports);
131-
}
132-
else {
133-
broker = zkBroker(embedded, ports);
134-
}
135-
Properties properties = new Properties();
136-
137-
for (String pair : embedded.brokerProperties()) {
138-
if (!StringUtils.hasText(pair)) {
139-
continue;
140-
}
141-
try {
142-
properties.load(new StringReader(pair));
143-
}
144-
catch (Exception ex) {
145-
throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
146-
ex);
147-
}
148-
}
149-
if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
150-
Resource propertiesResource = new PathMatchingResourcePatternResolver()
151-
.getResource(embedded.brokerPropertiesLocation());
152-
if (!propertiesResource.exists()) {
153-
throw new IllegalStateException(
154-
"Failed to load broker properties from [" + propertiesResource
155-
+ "]: resource does not exist.");
156-
}
157-
try (InputStream in = propertiesResource.getInputStream()) {
158-
Properties p = new Properties();
159-
p.load(in);
160-
p.forEach(properties::putIfAbsent);
161-
}
162-
catch (IOException ex) {
163-
throw new IllegalStateException(
164-
"Failed to load broker properties from [" + propertiesResource + "]", ex);
165-
}
166-
}
167-
broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
168-
if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
169-
broker.brokerListProperty(embedded.bootstrapServersProperty());
170-
}
171-
broker.afterPropertiesSet();
172-
return broker;
173-
}
174-
175-
private EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, int[] ports) {
176-
return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), embedded.topics())
177-
.kafkaPorts(ports)
178-
.adminTimeout(embedded.adminTimeout());
179-
}
180-
181-
private EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, int[] ports) {
182-
return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(),
183-
embedded.partitions(), embedded.topics())
184-
.zkPort(embedded.zookeeperPort())
185-
.kafkaPorts(ports)
186-
.zkConnectionTimeout(embedded.zkConnectionTimeout())
187-
.zkSessionTimeout(embedded.zkSessionTimeout())
188-
.adminTimeout(embedded.adminTimeout());
189-
}
190-
191-
private int[] setupPorts(EmbeddedKafka embedded) {
192-
int[] ports = embedded.ports();
193-
if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) {
194-
ports = new int[embedded.count()];
195-
}
196-
return ports;
117+
return EmbeddedKafkaBrokerFactory.create(embedded);
197118
}
198119

199120
private EmbeddedKafkaBroker getBrokerFromStore(ExtensionContext context) {
200-
return getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class) == null
121+
EmbeddedKafkaBroker embeddedKafkaBrokerFromParentStore =
122+
getParentStore(context)
123+
.get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class);
124+
return embeddedKafkaBrokerFromParentStore == null
201125
? getStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class)
202-
: getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class);
126+
: embeddedKafkaBrokerFromParentStore;
203127
}
204128

205129
private Store getStore(ExtensionContext context) {

0 commit comments

Comments
 (0)