Skip to content

Commit 419b569

Browse files
authored
Add JSpecify based Nullability checks in spring-kafka-test module
Signed-off-by: Soby Chacko <[email protected]>
1 parent 66b79d4 commit 419b569

18 files changed

+94
-39
lines changed

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 the original author or authors.
2+
* Copyright 2023-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import java.util.HashSet;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.Objects;
3132
import java.util.Properties;
3233
import java.util.Set;
3334
import java.util.UUID;
@@ -54,6 +55,7 @@
5455
import org.apache.kafka.common.TopicPartition;
5556
import org.apache.kafka.common.utils.Exit;
5657
import org.apache.kafka.common.utils.Utils;
58+
import org.jspecify.annotations.Nullable;
5759

5860
import org.springframework.core.log.LogAccessor;
5961
import org.springframework.util.Assert;
@@ -93,7 +95,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
9395
private static final boolean IS_KAFKA_39_OR_LATER = ClassUtils.isPresent(
9496
"org.apache.kafka.server.config.AbstractKafkaConfig", EmbeddedKafkaKraftBroker.class.getClassLoader());
9597

96-
private static final Method SET_CONFIG_METHOD;
98+
private static final @Nullable Method SET_CONFIG_METHOD;
9799

98100
static {
99101
if (IS_KAFKA_39_OR_LATER) {
@@ -117,7 +119,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
117119

118120
private final AtomicBoolean initialized = new AtomicBoolean();
119121

120-
private KafkaClusterTestKit cluster;
122+
private @Nullable KafkaClusterTestKit cluster;
121123

122124
private int[] kafkaPorts;
123125

@@ -131,7 +133,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {
131133
* @param partitions partitions per topic.
132134
* @param topics the topics to create.
133135
*/
134-
public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics) {
136+
public EmbeddedKafkaKraftBroker(int count, int partitions, String @Nullable ... topics) {
135137
this.count = count;
136138
this.kafkaPorts = new int[this.count]; // random ports by default.
137139
if (topics != null) {
@@ -261,7 +263,9 @@ private void start() {
261263
private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, String key, Object value) {
262264
if (IS_KAFKA_39_OR_LATER) {
263265
// For Kafka 3.9.0+: use reflection
264-
ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value);
266+
if (SET_CONFIG_METHOD != null) {
267+
ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value);
268+
}
265269
}
266270
else {
267271
// For Kafka 3.8.0: direct call
@@ -484,10 +488,12 @@ public int getPartitionsPerTopic() {
484488

485489
@Override
486490
public String getBrokersAsString() {
487-
return (String) this.cluster.clientProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
491+
Assert.notNull(this.cluster, "cluster cannot be null");
492+
String brokersString = (String) this.cluster.clientProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
493+
return Objects.requireNonNull(brokersString);
488494
}
489495

490-
public KafkaClusterTestKit getCluster() {
496+
public @Nullable KafkaClusterTestKit getCluster() {
491497
return this.cluster;
492498
}
493499

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,7 @@
6666
import org.apache.zookeeper.client.ZKClientConfig;
6767
import org.apache.zookeeper.server.NIOServerCnxnFactory;
6868
import org.apache.zookeeper.server.ZooKeeperServer;
69+
import org.jspecify.annotations.Nullable;
6970

7071
import org.springframework.core.log.LogAccessor;
7172
import org.springframework.kafka.test.core.BrokerAddress;
@@ -117,9 +118,9 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
117118

118119
private final AtomicBoolean initialized = new AtomicBoolean();
119120

120-
private EmbeddedZookeeper zookeeper;
121+
private @Nullable EmbeddedZookeeper zookeeper;
121122

122-
private String zkConnect;
123+
private @Nullable String zkConnect;
123124

124125
private int zkPort;
125126

@@ -133,7 +134,7 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {
133134

134135
private String brokerListProperty = "spring.kafka.bootstrap-servers";
135136

136-
private volatile ZooKeeperClient zooKeeperClient;
137+
private volatile @Nullable ZooKeeperClient zooKeeperClient;
137138

138139
public EmbeddedKafkaZKBroker(int count) {
139140
this(count, false);
@@ -145,7 +146,7 @@ public EmbeddedKafkaZKBroker(int count) {
145146
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
146147
* @param topics the topics to create (2 partitions per).
147148
*/
148-
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics) {
149+
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String @Nullable ... topics) {
149150
this(count, controlledShutdown, 2, topics);
150151
}
151152

@@ -156,7 +157,7 @@ public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... to
156157
* @param partitions partitions per topic.
157158
* @param topics the topics to create.
158159
*/
159-
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics) {
160+
public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String @Nullable ... topics) {
160161
this.count = count;
161162
this.kafkaPorts = new int[this.count]; // random ports by default.
162163
this.controlledShutdown = controlledShutdown;
@@ -557,8 +558,10 @@ public void destroy() {
557558
}
558559
}
559560
try {
560-
this.zookeeper.shutdown();
561-
this.zkConnect = null;
561+
if (this.zookeeper != null) {
562+
this.zookeeper.shutdown();
563+
this.zkConnect = null;
564+
}
562565
}
563566
catch (Exception e) {
564567
// do nothing
@@ -582,7 +585,7 @@ public KafkaServer getKafkaServer(int id) {
582585
return this.kafkaServers.get(id);
583586
}
584587

585-
public EmbeddedZookeeper getZookeeper() {
588+
public @Nullable EmbeddedZookeeper getZookeeper() {
586589
return this.zookeeper;
587590
}
588591

@@ -599,7 +602,7 @@ public synchronized ZooKeeperClient getZooKeeperClient() {
599602
return this.zooKeeperClient;
600603
}
601604

602-
public String getZookeeperConnectionString() {
605+
public @Nullable String getZookeeperConnectionString() {
603606
return this.zkConnect;
604607
}
605608

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Provides a class for assertj conditions.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test.assertj;
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes for JUnit5 conditions.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.kafka.test.condition;

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2024 the original author or authors.
2+
* Copyright 2017-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,17 +16,13 @@
1616

1717
package org.springframework.kafka.test.context;
1818

19-
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
20-
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
21-
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
22-
import org.springframework.beans.factory.support.RootBeanDefinition;
2319
import org.springframework.context.ConfigurableApplicationContext;
20+
import org.springframework.context.support.GenericApplicationContext;
2421
import org.springframework.core.env.ConfigurableEnvironment;
2522
import org.springframework.kafka.test.EmbeddedKafkaBroker;
2623
import org.springframework.kafka.test.EmbeddedKafkaBrokerFactory;
2724
import org.springframework.test.context.ContextCustomizer;
2825
import org.springframework.test.context.MergedContextConfiguration;
29-
import org.springframework.util.Assert;
3026

3127
/**
3228
* The {@link ContextCustomizer} implementation for the {@link EmbeddedKafkaBroker} bean registration.
@@ -51,16 +47,15 @@ class EmbeddedKafkaContextCustomizer implements ContextCustomizer {
5147

5248
@Override
5349
public void customizeContext(ConfigurableApplicationContext context, MergedContextConfiguration mergedConfig) {
54-
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
55-
Assert.isInstanceOf(DefaultSingletonBeanRegistry.class, beanFactory);
56-
5750
ConfigurableEnvironment environment = context.getEnvironment();
5851

5952
EmbeddedKafkaBroker embeddedKafkaBroker =
6053
EmbeddedKafkaBrokerFactory.create(this.embeddedKafka, environment::resolvePlaceholders);
6154

62-
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME,
63-
new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker));
55+
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) context;
56+
57+
genericApplicationContext.registerBean(EmbeddedKafkaBroker.BEAN_NAME,
58+
EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker);
6459
}
6560

6661
@Override

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerFactory.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.util.List;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.test.context.ContextConfigurationAttributes;
2224
import org.springframework.test.context.ContextCustomizer;
2325
import org.springframework.test.context.ContextCustomizerFactory;
@@ -35,6 +37,7 @@
3537
class EmbeddedKafkaContextCustomizerFactory implements ContextCustomizerFactory {
3638

3739
@Override
40+
@Nullable
3841
public ContextCustomizer createContextCustomizer(Class<?> testClass,
3942
List<ContextConfigurationAttributes> configAttributes) {
4043
EmbeddedKafka embeddedKafka =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Provides classes for EmbeddedKafka context customization.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test.context;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* core package for spring-kafka-test module.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test.core;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Provides hamcrest matchers.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test.hamcrest;

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -90,8 +90,10 @@ public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionLi
9090
public static final String BROKER_PROPERTIES_LOCATION_PROPERTY_NAME =
9191
"spring.kafka.embedded.broker.properties.location";
9292

93+
@SuppressWarnings("NullAway.Init")
9394
private EmbeddedKafkaBroker embeddedKafkaBroker;
9495

96+
@SuppressWarnings("NullAway.Init")
9597
private Log logger;
9698

9799
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Provides JUnit specific extensions in spring-kafka-test.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test.junit;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Provides top-level API for EmbeddedKafka.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Provides JUnit rules.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test.rule;

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.atomic.AtomicReference;
2424

25+
import org.springframework.util.Assert;
2526
import org.springframework.util.ReflectionUtils;
2627

2728
/**
@@ -52,6 +53,7 @@ public static void waitForAssignment(Object container, int partitions) { // NOSO
5253
return;
5354
}
5455
List<?> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class);
56+
Assert.notNull(containers, "Containers must not be null");
5557
int n = 0;
5658
int count = 0;
5759
Method getAssignedPartitions = null;

Diff for: spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,11 +45,11 @@
4545
import org.apache.kafka.common.serialization.StringDeserializer;
4646
import org.apache.kafka.common.serialization.StringSerializer;
4747
import org.apache.kafka.streams.StreamsConfig;
48+
import org.jspecify.annotations.Nullable;
4849

4950
import org.springframework.beans.DirectFieldAccessor;
5051
import org.springframework.core.log.LogAccessor;
5152
import org.springframework.kafka.test.EmbeddedKafkaBroker;
52-
import org.springframework.lang.Nullable;
5353
import org.springframework.util.Assert;
5454

5555
/**
@@ -66,6 +66,7 @@ public final class KafkaTestUtils {
6666

6767
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaTestUtils.class)); // NOSONAR
6868

69+
@SuppressWarnings("NullAway.Init")
6970
private static Properties defaults;
7071

7172
private KafkaTestUtils() {
@@ -261,7 +262,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
261262
* @throws Exception if an exception occurs.
262263
* @since 2.3
263264
*/
264-
public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
265+
public static @Nullable OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition)
265266
throws Exception { // NOSONAR
266267

267268
try (AdminClient client = AdminClient
@@ -281,7 +282,7 @@ public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String
281282
* @throws Exception if an exception occurs.
282283
* @since 3.0
283284
*/
284-
public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
285+
public static @Nullable OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
285286
throws Exception { // NOSONAR
286287

287288
return adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get() // NOSONAR false positive
@@ -395,7 +396,7 @@ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, D
395396
* @param propertyPath The path.
396397
* @return The field.
397398
*/
398-
public static Object getPropertyValue(Object root, String propertyPath) {
399+
public static @Nullable Object getPropertyValue(Object root, String propertyPath) {
399400
Object value = null;
400401
DirectFieldAccessor accessor = new DirectFieldAccessor(root);
401402
String[] tokens = propertyPath.split("\\.");
@@ -424,7 +425,7 @@ else if (i == tokens.length - 1) {
424425
* @see #getPropertyValue(Object, String)
425426
*/
426427
@SuppressWarnings("unchecked")
427-
public static <T> T getPropertyValue(Object root, String propertyPath, Class<T> type) {
428+
public static <T> @Nullable T getPropertyValue(Object root, String propertyPath, Class<T> type) {
428429
Object value = getPropertyValue(root, propertyPath);
429430
if (value != null) {
430431
Assert.isAssignable(type, value.getClass());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Utils package.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.kafka.test.utils;

0 commit comments

Comments
 (0)