Skip to content

Commit cfd7c98

Browse files
authored
jspecify nullability changes for the listener package. (#3775)
#3762 Signed-off-by: Soby Chacko <[email protected]>
1 parent 2af76c0 commit cfd7c98

File tree

55 files changed

+367
-297
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+367
-297
lines changed

Diff for: spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2024 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -60,8 +60,8 @@ private fun createContainer(
6060
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
6161
): ConcurrentMessageListenerContainer<String, String> {
6262
val container = factory.createContainer(topic)
63-
container.containerProperties.messageListener = MyListener()
64-
container.containerProperties.groupId = group
63+
container.containerProperties.setMessageListener(MyListener())
64+
container.containerProperties.setGroupId(group)
6565
container.beanName = group
6666
container.start()
6767
return container
@@ -104,9 +104,10 @@ fun pojo(id: String, topic: String): MyPojo {
104104

105105
// tag::listener[]
106106

107-
class MyListener : MessageListener<String?, String?> {
107+
class MyListener : MessageListener<String, String> {
108+
109+
override fun onMessage(data: ConsumerRecord<String, String>) {
108110

109-
override fun onMessage(data: ConsumerRecord<String?, String?>) {
110111
// ...
111112
}
112113

Diff for: spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/requestreply/Application.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class Application {
7474
factory: ConcurrentKafkaListenerContainerFactory<String, String>
7575
): ReplyingKafkaTemplate<String, String, String> {
7676
val replyContainer = factory.createContainer("replies")
77-
replyContainer.containerProperties.groupId = "request.replies"
77+
replyContainer.containerProperties.setGroupId("request.replies")
7878
val template = ReplyingKafkaTemplate<String, String, String>(pf, replyContainer)
7979
template.messageConverter = ByteArrayJsonMessageConverter()
8080
template.setDefaultTopic("requests")

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

+1
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ interface ProducerCallback<K, V, T> {
321321
*/
322322
interface OperationsCallback<K, V, T> {
323323

324+
@Nullable
324325
T doInOperations(KafkaOperations<K, V> operations);
325326

326327
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ public <T> T execute(ProducerCallback<K, V, T> callback) {
660660
}
661661

662662
@Override
663-
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
663+
public <T> @Nullable T executeInTransaction(OperationsCallback<K, V, T> callback) {
664664
Assert.notNull(callback, "'callback' cannot be null");
665665
Assert.state(this.transactional, "Producer factory does not support transactions");
666666
Thread currentThread = Thread.currentThread();

Diff for: spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppingEvent.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.common.TopicPartition;
23+
import org.jspecify.annotations.Nullable;
2324

2425
/**
2526
* An event published when a consumer is stopped. While it is best practice to use
@@ -37,7 +38,7 @@ public class ConsumerStoppingEvent extends KafkaEvent {
3738

3839
private transient final Consumer<?, ?> consumer;
3940

40-
private transient final Collection<TopicPartition> partitions;
41+
private transient final @Nullable Collection<TopicPartition> partitions;
4142

4243
/**
4344
* Construct an instance with the provided source, consumer and partitions.
@@ -48,7 +49,7 @@ public class ConsumerStoppingEvent extends KafkaEvent {
4849
* @since 2.2.1
4950
*/
5051
public ConsumerStoppingEvent(Object source, Object container,
51-
Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
52+
Consumer<?, ?> consumer, @Nullable Collection<TopicPartition> partitions) {
5253
super(source, container);
5354
this.consumer = consumer;
5455
this.partitions = partitions;
@@ -58,7 +59,7 @@ public ConsumerStoppingEvent(Object source, Object container,
5859
return this.consumer;
5960
}
6061

61-
public Collection<TopicPartition> getPartitions() {
62+
public @Nullable Collection<TopicPartition> getPartitions() {
6263
return this.partitions;
6364
}
6465

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,23 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
6565
}
6666

6767
@Override
68-
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
69-
partitions.forEach(tp -> {
70-
List<ConsumerSeekCallback> removedCallbacks = this.topicToCallbacks.remove(tp);
71-
if (removedCallbacks != null && !removedCallbacks.isEmpty()) {
72-
removedCallbacks.forEach(cb -> {
73-
List<TopicPartition> topics = this.callbackToTopics.get(cb);
74-
if (topics != null) {
75-
topics.remove(tp);
76-
if (topics.isEmpty()) {
77-
this.callbackToTopics.remove(cb);
68+
public void onPartitionsRevoked(@Nullable Collection<TopicPartition> partitions) {
69+
if (partitions != null) {
70+
partitions.forEach(tp -> {
71+
List<ConsumerSeekCallback> removedCallbacks = this.topicToCallbacks.remove(tp);
72+
if (removedCallbacks != null && !removedCallbacks.isEmpty()) {
73+
removedCallbacks.forEach(cb -> {
74+
List<TopicPartition> topics = this.callbackToTopics.get(cb);
75+
if (topics != null) {
76+
topics.remove(tp);
77+
if (topics.isEmpty()) {
78+
this.callbackToTopics.remove(cb);
79+
}
7880
}
79-
}
80-
});
81-
}
82-
});
81+
});
82+
}
83+
});
84+
}
8385
}
8486

8587
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,10 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.Objects;
20+
21+
import org.jspecify.annotations.Nullable;
22+
1923
import org.springframework.context.ApplicationContext;
2024
import org.springframework.context.ApplicationContextAware;
2125
import org.springframework.kafka.config.KafkaListenerConfigUtils;
@@ -32,9 +36,9 @@
3236
public abstract class AbstractKafkaBackOffManagerFactory
3337
implements KafkaBackOffManagerFactory, ApplicationContextAware {
3438

35-
private ApplicationContext applicationContext;
39+
private @Nullable ApplicationContext applicationContext;
3640

37-
private ListenerContainerRegistry listenerContainerRegistry;
41+
private @Nullable ListenerContainerRegistry listenerContainerRegistry;
3842

3943
/**
4044
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
@@ -83,7 +87,7 @@ private ListenerContainerRegistry getListenerContainerFromContext() {
8387
}
8488

8589
protected <T> T getBean(String beanName, Class<T> beanClass) {
86-
return this.applicationContext.getBean(beanName, beanClass);
90+
return Objects.requireNonNull(this.applicationContext).getBean(beanName, beanClass);
8791
}
8892

8993
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ public abstract class AbstractMessageListenerContainer<K, V>
101101
@NonNull
102102
private String beanName = "noBeanNameSet";
103103

104-
private ApplicationEventPublisher applicationEventPublisher;
104+
private @Nullable ApplicationEventPublisher applicationEventPublisher;
105105

106-
private CommonErrorHandler commonErrorHandler;
106+
private @Nullable CommonErrorHandler commonErrorHandler;
107107

108108
private boolean autoStartup = true;
109109

@@ -114,15 +114,16 @@ public abstract class AbstractMessageListenerContainer<K, V>
114114

115115
private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT;
116116

117-
private RecordInterceptor<K, V> recordInterceptor;
117+
private @Nullable RecordInterceptor<K, V> recordInterceptor;
118118

119-
private BatchInterceptor<K, V> batchInterceptor;
119+
private @Nullable BatchInterceptor<K, V> batchInterceptor;
120120

121121
private boolean interceptBeforeTx = true;
122122

123+
@SuppressWarnings("NullAway.Init")
123124
private byte[] listenerInfo;
124125

125-
private ApplicationContext applicationContext;
126+
private @Nullable ApplicationContext applicationContext;
126127

127128
private volatile boolean running = false;
128129

@@ -149,13 +150,13 @@ public abstract class AbstractMessageListenerContainer<K, V>
149150
* @param containerProperties the properties.
150151
*/
151152
@SuppressWarnings("unchecked")
152-
protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
153+
protected AbstractMessageListenerContainer(@Nullable ConsumerFactory<? super K, ? super V> consumerFactory,
153154
ContainerProperties containerProperties) {
154155

155156
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
156157
Assert.notNull(consumerFactory, "'consumerFactory' cannot be null");
157158
this.consumerFactory = (ConsumerFactory<K, V>) consumerFactory;
158-
String[] topics = containerProperties.getTopics();
159+
@Nullable String @Nullable [] topics = containerProperties.getTopics();
159160
if (topics != null) {
160161
this.containerProperties = new ContainerProperties(topics);
161162
}
@@ -165,7 +166,7 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
165166
this.containerProperties = new ContainerProperties(topicPattern);
166167
}
167168
else {
168-
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
169+
@Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
169170
if (topicPartitions != null) {
170171
this.containerProperties = new ContainerProperties(topicPartitions);
171172
}
@@ -370,8 +371,8 @@ public String getMainListenerId() {
370371
return this.mainListenerId;
371372
}
372373

373-
@Nullable
374374
@Override
375+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
375376
public byte[] getListenerInfo() {
376377
return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null;
377378
}
@@ -382,6 +383,7 @@ public byte[] getListenerInfo() {
382383
* @param listenerInfo the info.
383384
* @since 2.8.4
384385
*/
386+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
385387
public void setListenerInfo(@Nullable byte[] listenerInfo) {
386388
this.listenerInfo = listenerInfo != null ? Arrays.copyOf(listenerInfo, listenerInfo.length) : null;
387389
}
@@ -458,7 +460,7 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
458460
this.kafkaAdmin = kafkaAdmin;
459461
}
460462

461-
protected RecordInterceptor<K, V> getRecordInterceptor() {
463+
protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
462464
return this.recordInterceptor;
463465
}
464466

@@ -469,11 +471,11 @@ protected RecordInterceptor<K, V> getRecordInterceptor() {
469471
* @since 2.2.7
470472
* @see #setInterceptBeforeTx(boolean)
471473
*/
472-
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
474+
public void setRecordInterceptor(@Nullable RecordInterceptor<K, V> recordInterceptor) {
473475
this.recordInterceptor = recordInterceptor;
474476
}
475477

476-
protected BatchInterceptor<K, V> getBatchInterceptor() {
478+
protected @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
477479
return this.batchInterceptor;
478480
}
479481

@@ -483,7 +485,7 @@ protected BatchInterceptor<K, V> getBatchInterceptor() {
483485
* @since 2.6.6
484486
* @see #setInterceptBeforeTx(boolean)
485487
*/
486-
public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
488+
public void setBatchInterceptor(@Nullable BatchInterceptor<K, V> batchInterceptor) {
487489
this.batchInterceptor = batchInterceptor;
488490
}
489491

@@ -541,7 +543,7 @@ protected void checkTopics() {
541543
List<String> missing = null;
542544
try (AdminClient client = AdminClient.create(configs)) { // NOSONAR - false positive null check
543545
if (client != null) {
544-
String[] topics = this.containerProperties.getTopics();
546+
@Nullable String @Nullable[] topics = this.containerProperties.getTopics();
545547
if (topics == null) {
546548
topics = Arrays.stream(this.containerProperties.getTopicPartitions())
547549
.map(TopicPartitionOffset::getTopic)

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingConsumerAwareMessageListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,6 @@ default void onMessage(ConsumerRecord<K, V> data) {
4747
}
4848

4949
@Override
50-
void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer);
50+
void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer);
5151

5252
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public interface BackOffHandler {
3535
* @param exception the exception.
3636
* @param nextBackOff the next back off.
3737
*/
38-
default void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
38+
default void onNextBackOff(@Nullable MessageListenerContainer container, @Nullable Exception exception, long nextBackOff) {
3939
throw new UnsupportedOperationException();
4040
}
4141

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingConsumerAwareMessageListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,6 @@ default void onMessage(List<ConsumerRecord<K, V>> data) {
5151
}
5252

5353
@Override
54-
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer);
54+
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer);
5555

5656
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingMessageListener.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020

2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.jspecify.annotations.Nullable;
2223

2324
import org.springframework.kafka.support.Acknowledgment;
2425

@@ -49,6 +50,6 @@ default void onMessage(List<ConsumerRecord<K, V>> data) {
4950
}
5051

5152
@Override
52-
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
53+
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment);
5354

5455
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/BatchConsumerAwareMessageListener.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.jspecify.annotations.Nullable;
2324

2425
/**
2526
* Listener for handling a batch of incoming Kafka messages; the list
@@ -47,6 +48,6 @@ default void onMessage(List<ConsumerRecord<K, V>> data) {
4748
}
4849

4950
@Override
50-
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
51+
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Consumer<?, ?> consumer);
5152

5253
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class BatchListenerFailedException extends KafkaException {
3636

3737
private final int index;
3838

39-
private transient ConsumerRecord<?, ?> record;
39+
private transient @Nullable ConsumerRecord<?, ?> record;
4040

4141
/**
4242
* Construct an instance with the provided properties.

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222

2323
import org.apache.kafka.clients.consumer.Consumer;
2424
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.jspecify.annotations.Nullable;
2526

2627
import org.springframework.util.Assert;
2728

@@ -53,7 +54,7 @@ public CompositeBatchInterceptor(BatchInterceptor<K, V>... delegates) {
5354
}
5455

5556
@Override
56-
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
57+
public @Nullable ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
5758
ConsumerRecords<K, V> recordsToIntercept = records;
5859
for (BatchInterceptor<K, V> delegate : this.delegates) {
5960
recordsToIntercept = delegate.intercept(recordsToIntercept, consumer);

0 commit comments

Comments
 (0)