Skip to content

Commit 5261dec

Browse files
committed
jspecify nullability changes in retrytopic related classes/package
#3762 Signed-off-by: Soby Chacko <[email protected]>
1 parent 35542b0 commit 5261dec

26 files changed

+115
-78
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void setMethods(List<Method> methods) {
9595
* @param defaultMethod the default method.
9696
* @since 3.2
9797
*/
98-
public void setDefaultMethod(Method defaultMethod) {
98+
public void setDefaultMethod(@Nullable Method defaultMethod) {
9999
this.defaultMethod = defaultMethod;
100100
}
101101

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public AbstractKafkaBackOffManagerFactory() {
5353
* which will be used to fetch the {@link MessageListenerContainer} to back off.
5454
* @param listenerContainerRegistry the listenerContainerRegistry to use.
5555
*/
56-
public AbstractKafkaBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
56+
public AbstractKafkaBackOffManagerFactory(@Nullable ListenerContainerRegistry listenerContainerRegistry) {
5757
this.listenerContainerRegistry = listenerContainerRegistry;
5858
}
5959

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafk
3737
* @param listenerContainerRegistry the registry.
3838
* @param applicationContext the application context.
3939
*/
40-
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry,
40+
public ContainerPartitionPausingBackOffManagerFactory(@Nullable ListenerContainerRegistry listenerContainerRegistry,
4141
ApplicationContext applicationContext) {
4242

4343
super(listenerContainerRegistry);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.util.stream.IntStream;
2323
import java.util.stream.LongStream;
2424

25+
import org.jspecify.annotations.Nullable;
26+
2527
import org.springframework.retry.backoff.BackOffContext;
2628
import org.springframework.retry.backoff.BackOffPolicy;
2729
import org.springframework.retry.backoff.FixedBackOffPolicy;
@@ -50,7 +52,7 @@ public class BackOffValuesGenerator {
5052

5153
private final BackOffPolicy backOffPolicy;
5254

53-
public BackOffValuesGenerator(int providedMaxAttempts, BackOffPolicy providedBackOffPolicy) {
55+
public BackOffValuesGenerator(int providedMaxAttempts, @Nullable BackOffPolicy providedBackOffPolicy) {
5456
this.numberOfValuesToCreate = getMaxAttempts(providedMaxAttempts) - 1;
5557
BackOffPolicy policy = providedBackOffPolicy != null ? providedBackOffPolicy : DEFAULT_BACKOFF_POLICY;
5658
checkBackOffPolicyType(policy);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class DeadLetterPublishingRecovererFactory {
7171

7272
private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = recoverer -> { };
7373

74+
@Nullable
7475
private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction;
7576

7677
private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED;
@@ -268,7 +269,7 @@ else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrate
268269
}
269270

270271
private DeadLetterPublishingRecoverer create(
271-
Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
272+
Function<ProducerRecord<?, ?>, @Nullable KafkaOperations<?, ?>> templateResolver,
272273
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
273274

274275
return new DeadLetterPublishingRecoverer(templateResolver, destinationResolver);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicProcessor.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.util.function.Consumer;
2323
import java.util.stream.Collectors;
2424

25+
import org.jspecify.annotations.Nullable;
26+
2527
/**
2628
*
2729
* Default implementation of the {@link DestinationTopicProcessor} interface.
@@ -49,7 +51,7 @@ public void processDestinationTopicProperties(Consumer<DestinationTopic.Properti
4951
}
5052

5153
@Override
52-
public void registerDestinationTopic(String mainTopicName, String destinationTopicName,
54+
public void registerDestinationTopic(String mainTopicName, @Nullable String destinationTopicName,
5355
DestinationTopic.Properties destinationTopicProperties, Context context) {
5456
List<DestinationTopic> topicDestinations = context.destinationsByTopicMap
5557
.computeIfAbsent(mainTopicName, newTopic -> new ArrayList<>());

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class DefaultDestinationTopicResolver extends ExceptionClassifier
7070

7171
private final Clock clock;
7272

73+
@SuppressWarnings("NullAway.Init")
7374
private ApplicationContext applicationContext;
7475

7576
private boolean contextRefreshed;
@@ -111,7 +112,8 @@ private Boolean isNotFatalException(Exception e) {
111112
return getClassifier().classify(e);
112113
}
113114

114-
private Throwable maybeUnwrapException(Throwable e) {
115+
private Throwable maybeUnwrapException(@Nullable Throwable e) {
116+
Assert.state(e != null, "Exception cannot be null");
115117
return FRAMEWORK_EXCEPTIONS
116118
.stream()
117119
.filter(frameworkException -> frameworkException.isAssignableFrom(e.getClass()))
@@ -160,21 +162,21 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String
160162

161163
@Nullable
162164
@Override
163-
public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) {
165+
public DestinationTopic getDltFor(String mainListenerId, String topicName, @Nullable Exception e) {
164166
DestinationTopic destination = getDltOrNoOpsDestination(mainListenerId, topicName, e);
165167
return destination.isNoOpsTopic()
166168
? null
167169
: destination;
168170
}
169171

170-
private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, String topic, Exception e) {
172+
private DestinationTopic getDltOrNoOpsDestination(String mainListenerId, @Nullable String topic, @Nullable Exception e) {
171173
DestinationTopic destination = getNextDestinationTopicFor(mainListenerId, topic);
172174
return isMatchingDltTopic(destination, e) || destination.isNoOpsTopic() ?
173175
destination :
174176
getDltOrNoOpsDestination(mainListenerId, destination.getDestinationName(), e);
175177
}
176178

177-
private static boolean isMatchingDltTopic(DestinationTopic destination, Exception e) {
179+
private static boolean isMatchingDltTopic(DestinationTopic destination, @Nullable Exception e) {
178180
if (!destination.isDltTopic()) {
179181
return false;
180182
}
@@ -185,7 +187,7 @@ private static boolean isMatchingDltTopic(DestinationTopic destination, Exceptio
185187
return isDltIntendedForCurrentExc || isGenericPurposeDlt;
186188
}
187189

188-
private static boolean isDirectExcOrCause(Exception e, Class<? extends Throwable> excType) {
190+
private static boolean isDirectExcOrCause(@Nullable Exception e, Class<? extends Throwable> excType) {
189191
if (e == null) {
190192
return false;
191193
}
@@ -205,17 +207,17 @@ private static boolean isDirectExcOrCause(Exception e, Class<? extends Throwable
205207
}
206208

207209
@Override
208-
public DestinationTopic getNextDestinationTopicFor(String mainListenerId, String topic) {
210+
public DestinationTopic getNextDestinationTopicFor(String mainListenerId, @Nullable String topic) {
209211
return getDestinationHolderFor(mainListenerId, topic).getNextDestination();
210212
}
211213

212-
private DestinationTopicHolder getDestinationHolderFor(String mainListenerId, String topic) {
214+
private DestinationTopicHolder getDestinationHolderFor(String mainListenerId, @Nullable String topic) {
213215
return this.contextRefreshed
214216
? doGetDestinationFor(mainListenerId, topic)
215217
: getDestinationTopicSynchronized(mainListenerId, topic);
216218
}
217219

218-
private DestinationTopicHolder getDestinationTopicSynchronized(String mainListenerId, String topic) {
220+
private DestinationTopicHolder getDestinationTopicSynchronized(String mainListenerId, @Nullable String topic) {
219221
try {
220222
this.sourceDestinationsHolderLock.lock();
221223
return doGetDestinationFor(mainListenerId, topic);
@@ -225,7 +227,7 @@ private DestinationTopicHolder getDestinationTopicSynchronized(String mainListen
225227
}
226228
}
227229

228-
private DestinationTopicHolder doGetDestinationFor(String mainListenerId, String topic) {
230+
private DestinationTopicHolder doGetDestinationFor(String mainListenerId, @Nullable String topic) {
229231
Map<String, DestinationTopicHolder> map = this.sourceDestinationsHolderMap.get(mainListenerId);
230232
Assert.notNull(map, () -> "No destination resolution information for listener " + mainListenerId);
231233
return Objects.requireNonNull(map.get(topic),

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.jspecify.annotations.Nullable;
2525

2626
import org.springframework.kafka.core.KafkaOperations;
27+
import org.springframework.util.Assert;
2728

2829
/**
2930
*
@@ -38,11 +39,11 @@
3839
*/
3940
public class DestinationTopic {
4041

41-
private final String destinationName;
42+
private final @Nullable String destinationName;
4243

4344
private final Properties properties;
4445

45-
public DestinationTopic(String destinationName, Properties properties) {
46+
public DestinationTopic(@Nullable String destinationName, Properties properties) {
4647
this.destinationName = destinationName;
4748
this.properties = properties;
4849
}
@@ -80,7 +81,7 @@ public boolean isMainTopic() {
8081
return Type.MAIN.equals(this.properties.type);
8182
}
8283

83-
public String getDestinationName() {
84+
public @Nullable String getDestinationName() {
8485
return this.destinationName;
8586
}
8687

@@ -113,6 +114,7 @@ public boolean equals(Object o) {
113114
return false;
114115
}
115116
DestinationTopic that = (DestinationTopic) o;
117+
Assert.state(this.destinationName != null, "destination name must not be null");
116118
return this.destinationName.equals(that.destinationName) && this.properties.equals(that.properties);
117119
}
118120

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ default DestinationTopic getDltFor(String mainListenerId, String topicName) {
8585
* @return The {@link DestinationTopic} instance corresponding to the DLT.
8686
*/
8787
@Nullable
88-
default DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc) {
88+
default DestinationTopic getDltFor(String mainListenerId, String topicName, @Nullable Exception exc) {
8989
return null;
9090
}
9191
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicProcessor.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import java.util.Map;
2323
import java.util.function.Consumer;
2424

25+
import org.jspecify.annotations.Nullable;
26+
2527
/**
2628
*
2729
* The {@link DestinationTopicProcessor} creates and registers the
@@ -56,7 +58,7 @@ public interface DestinationTopicProcessor {
5658
* @param destinationTopicProperties the destination topic properties.
5759
* @param context the context.
5860
*/
59-
void registerDestinationTopic(String mainTopicName, String destinationTopicName,
61+
void registerDestinationTopic(String mainTopicName, @Nullable String destinationTopicName,
6062
DestinationTopic.Properties destinationTopicProperties, Context context);
6163

6264
class Context {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
117117
* @param dltRoutingRules the specification of which DLT should be used for the particular exception type
118118
* @since 3.2.0
119119
*/
120-
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
120+
public DestinationTopicPropertiesFactory(@Nullable String retryTopicSuffix, @Nullable String dltSuffix, List<Long> backOffValues,
121121
BinaryExceptionClassifier exceptionClassifier,
122122
int numPartitions, KafkaOperations<?, ?> kafkaOperations,
123123
DltStrategy dltStrategy,
@@ -253,7 +253,7 @@ public static class DestinationTopicSuffixes {
253253

254254
private final String dltSuffix;
255255

256-
public DestinationTopicSuffixes(String retryTopicSuffix, String dltSuffix) {
256+
public DestinationTopicSuffixes(@Nullable String retryTopicSuffix, @Nullable String dltSuffix) {
257257
this.retryTopicSuffix = StringUtils.hasText(retryTopicSuffix)
258258
? retryTopicSuffix
259259
: RetryTopicConstants.DEFAULT_RETRY_SUFFIX;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizer.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2024 the original author or authors.
2+
* Copyright 2021-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.util.Collection;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
2224

2325
/**
@@ -48,9 +50,9 @@ class TopicNamesHolder {
4850

4951
private final String mainTopic;
5052

51-
private final String customizedTopic;
53+
private final @Nullable String customizedTopic;
5254

53-
TopicNamesHolder(String mainTopic, String customizedTopic) {
55+
TopicNamesHolder(String mainTopic, @Nullable String customizedTopic) {
5456
this.mainTopic = mainTopic;
5557
this.customizedTopic = customizedTopic;
5658
}
@@ -59,6 +61,7 @@ String getMainTopic() {
5961
return this.mainTopic;
6062
}
6163

64+
@Nullable
6265
String getCustomizedTopic() {
6366
return this.customizedTopic;
6467
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
2121
import java.util.Collection;
2222
import java.util.stream.Stream;
2323

24+
import org.jspecify.annotations.Nullable;
25+
2426
import org.springframework.beans.factory.BeanFactory;
2527
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
2628
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
@@ -145,7 +147,7 @@ private void configurationEndpoint(MethodKafkaListenerEndpoint<?, ?> endpoint,
145147

146148
private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
147149
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
148-
TopicPartitionOffset[] topicPartitionOffsets) {
150+
@Nullable TopicPartitionOffset[] topicPartitionOffsets) {
149151

150152
return Stream.of(topicPartitionOffsets)
151153
.map(tpo -> properties.isMainEndpoint()
@@ -180,7 +182,7 @@ protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTo
180182
private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint) {
181183
Collection<String> topics = endpoint.getTopics();
182184
if (topics.isEmpty()) {
183-
TopicPartitionOffset[] topicPartitionsToAssign = endpoint.getTopicPartitionsToAssign();
185+
@Nullable TopicPartitionOffset[] topicPartitionsToAssign = endpoint.getTopicPartitionsToAssign();
184186
if (topicPartitionsToAssign != null && topicPartitionsToAssign.length > 0) {
185187
topics = Arrays.stream(topicPartitionsToAssign)
186188
.map(TopicPartitionOffset::getTopic)

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,14 @@ public class ListenerContainerFactoryConfigurer {
6262
@Nullable
6363
private BackOff providedBlockingBackOff;
6464

65-
@Nullable
66-
private Class<? extends Exception>[] blockingExceptionTypes;
65+
private @Nullable Class<? extends Exception> @Nullable [] blockingExceptionTypes;
6766

6867
private boolean retainStandardFatal;
6968

7069
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
7170
};
7271

73-
private Consumer<DefaultErrorHandler> errorHandlerCustomizer = errorHandler -> {
72+
private @Nullable Consumer<DefaultErrorHandler> errorHandlerCustomizer = errorHandler -> {
7473
};
7574

7675
private final DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory;
@@ -131,7 +130,7 @@ public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerC
131130
* @since 2.8.4
132131
* @see DefaultErrorHandler
133132
*/
134-
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
133+
public void setBlockingRetriesBackOff(@Nullable BackOff blockingBackOff) {
135134
Assert.notNull(blockingBackOff, "The provided BackOff cannot be null");
136135
Assert.state(this.providedBlockingBackOff == null, () ->
137136
"Blocking retries back off has already been set. Current: "
@@ -168,12 +167,12 @@ public void setRetainStandardFatal(boolean retainStandardFatal) {
168167
this.retainStandardFatal = retainStandardFatal;
169168
}
170169

171-
public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
170+
public void setContainerCustomizer(@Nullable Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
172171
Assert.notNull(containerCustomizer, "'containerCustomizer' cannot be null");
173172
this.containerCustomizer = containerCustomizer;
174173
}
175174

176-
public void setErrorHandlerCustomizer(Consumer<DefaultErrorHandler> errorHandlerCustomizer) {
175+
public void setErrorHandlerCustomizer(@Nullable Consumer<DefaultErrorHandler> errorHandlerCustomizer) {
177176
this.errorHandlerCustomizer = errorHandlerCustomizer;
178177
}
179178

@@ -185,7 +184,9 @@ protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer de
185184
if (this.blockingExceptionTypes != null) {
186185
errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
187186
}
188-
this.errorHandlerCustomizer.accept(errorHandler);
187+
if (this.errorHandlerCustomizer != null) {
188+
this.errorHandlerCustomizer.accept(errorHandler);
189+
}
189190
return errorHandler;
190191
}
191192

@@ -206,7 +207,8 @@ protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListener
206207
}
207208

208209
@SuppressWarnings("unchecked")
209-
private <T> T checkAndCast(Object obj, Class<T> clazz) {
210+
private <T> T checkAndCast(@Nullable Object obj, Class<T> clazz) {
211+
Assert.state(obj != null, "Object cannot be null");
210212
Assert.isAssignable(clazz, obj.getClass(),
211213
() -> String.format("The provided class %s is not assignable from %s",
212214
obj.getClass().getSimpleName(), clazz.getSimpleName()));

0 commit comments

Comments
 (0)