|
59 | 59 | import org.springframework.retry.support.RetryTemplate;
|
60 | 60 | import org.springframework.util.Assert;
|
61 | 61 |
|
| 62 | +import kafka.common.KafkaException; |
62 | 63 | import kafka.server.KafkaConfig;
|
63 | 64 | import kafka.server.KafkaServer;
|
64 | 65 | import kafka.server.NotRunning;
|
@@ -234,19 +235,45 @@ public void before() throws Exception { //NOSONAR
|
234 | 235 | this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
|
235 | 236 | }
|
236 | 237 | }
|
237 |
| - Map<String, Object> adminConfigs = new HashMap<>(); |
238 |
| - adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); |
239 |
| - AdminClient admin = AdminClient.create(adminConfigs); |
240 |
| - List<NewTopic> newTopics = Arrays.stream(this.topics) |
241 |
| - .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) |
242 |
| - .collect(Collectors.toList()); |
243 |
| - CreateTopicsResult createTopics = admin.createTopics(newTopics); |
244 |
| - createTopics.all().get(); |
245 |
| - admin.close(); |
| 238 | + addTopics(this.topics); |
246 | 239 | System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
|
247 | 240 | System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
|
248 | 241 | }
|
249 | 242 |
|
| 243 | + /** |
| 244 | + * Add topics to the existing broker(s) using the configured number of partitions. |
| 245 | + * @param topics the topics. |
| 246 | + * @since 2.1 |
| 247 | + */ |
| 248 | + public void addTopics(String... topics) { |
| 249 | + doWithAdmin(admin -> { |
| 250 | + List<NewTopic> newTopics = Arrays.stream(topics) |
| 251 | + .map(t -> new NewTopic(t, this.partitionsPerTopic, (short) this.count)) |
| 252 | + .collect(Collectors.toList()); |
| 253 | + CreateTopicsResult createTopics = admin.createTopics(newTopics); |
| 254 | + try { |
| 255 | + createTopics.all().get(); |
| 256 | + } |
| 257 | + catch (Exception e) { |
| 258 | + throw new KafkaException(e); |
| 259 | + } |
| 260 | + }); |
| 261 | + } |
| 262 | + |
| 263 | + /** |
| 264 | + * Create an {@link AdminClient} invoke the callback and reliable close the |
| 265 | + * admin. |
| 266 | + * @param callback the callback. |
| 267 | + * @since 2.1 |
| 268 | + */ |
| 269 | + public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) { |
| 270 | + Map<String, Object> adminConfigs = new HashMap<>(); |
| 271 | + adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokersAsString()); |
| 272 | + try (AdminClient admin = AdminClient.create(adminConfigs)) { |
| 273 | + callback.accept(admin); |
| 274 | + } |
| 275 | + } |
| 276 | + |
250 | 277 | public Properties createBrokerProperties(int i) {
|
251 | 278 | if (testUtilsCreateBrokerConfigMethod == null) {
|
252 | 279 | return TestUtils.createBrokerConfig(i, this.zkConnect, this.controlledShutdown,
|
|
0 commit comments