Skip to content

Commit 9d91a79

Browse files
committed
jspecify nullability changes for the requestreply package
docs related Kotlin code changes Signed-off-by: Soby Chacko <[email protected]>
1 parent c06adf4 commit 9d91a79

File tree

8 files changed

+34
-25
lines changed

8 files changed

+34
-25
lines changed

Diff for: spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/requestreply/Application.kt

+9-9
Original file line numberDiff line numberDiff line change
@@ -68,33 +68,33 @@ class Application {
6868
// tag::beans[]
6969
@Bean
7070
fun template(
71-
pf: ProducerFactory<String?, String>?,
72-
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
73-
): ReplyingKafkaTemplate<String?, String, String?> {
71+
pf: ProducerFactory<String, String>,
72+
factory: ConcurrentKafkaListenerContainerFactory<String, String>
73+
): ReplyingKafkaTemplate<String, String, String> {
7474
val replyContainer = factory.createContainer("replies")
7575
replyContainer.containerProperties.groupId = "request.replies"
76-
val template = ReplyingKafkaTemplate(pf, replyContainer)
76+
val template = ReplyingKafkaTemplate<String, String, String>(pf, replyContainer)
7777
template.messageConverter = ByteArrayJsonMessageConverter()
7878
template.defaultTopic = "requests"
7979
return template
8080
}
8181
// end::beans[]
8282

8383
@Bean
84-
fun runner(template: ReplyingKafkaTemplate<String?, String?, String?>): ApplicationRunner {
84+
fun runner(template: ReplyingKafkaTemplate<String, String, String>): ApplicationRunner {
8585
return ApplicationRunner { _ ->
8686
// tag::sendReceive[]
87-
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing> =
87+
val future1: RequestReplyTypedMessageFuture<String, String, Thing> =
8888
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
8989
object : ParameterizedTypeReference<Thing>() {})
90-
log.info(future1.sendFuture.get(10, TimeUnit.SECONDS).recordMetadata.toString())
90+
log.info(future1.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
9191
val thing = future1.get(10, TimeUnit.SECONDS).payload
9292
log.info(thing.toString())
9393

94-
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing>> =
94+
val future2: RequestReplyTypedMessageFuture<String, String, List<Thing>> =
9595
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
9696
object : ParameterizedTypeReference<List<Thing>>() {})
97-
log.info(future2.sendFuture.get(10, TimeUnit.SECONDS).recordMetadata.toString())
97+
log.info(future2.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
9898
val things = future2.get(10, TimeUnit.SECONDS).payload
9999
things.forEach { thing1 -> log.info(thing1.toString()) }
100100
// end::sendReceive[]

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.util.Arrays;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.util.Assert;
2224

2325
/**
@@ -33,9 +35,9 @@ public final class CorrelationKey {
3335

3436
private final byte[] correlationId;
3537

36-
private String asString;
38+
private @Nullable String asString;
3739

38-
private volatile Integer hashCode;
40+
private volatile @Nullable Integer hashCode;
3941

4042
public CorrelationKey(byte[] correlationId) { // NOSONAR array reference
4143
Assert.notNull(correlationId, "'correlationId' cannot be null");

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaOperations.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ default <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> me
9696
* @since 2.7
9797
*/
9898
default <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, Duration replyTimeout,
99-
ParameterizedTypeReference<P> returnType) {
99+
@Nullable ParameterizedTypeReference<P> returnType) {
100100

101101
throw new UnsupportedOperationException();
102102
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
113113

114114
private String replyPartitionHeaderName = KafkaHeaders.REPLY_PARTITION;
115115

116-
private Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker = rec -> null;
116+
private Function<ConsumerRecord<?, ?>, @Nullable Exception> replyErrorChecker = rec -> null;
117117

118118
private CountDownLatch assignLatch = new CountDownLatch(1);
119119

@@ -127,6 +127,7 @@ public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
127127
this(producerFactory, replyContainer, false);
128128
}
129129

130+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
130131
public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
131132
GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) {
132133

@@ -290,7 +291,7 @@ public void setReplyPartitionHeaderName(String replyPartitionHeaderName) {
290291
* @param replyErrorChecker the error checker function.
291292
* @since 2.6.7
292293
*/
293-
public void setReplyErrorChecker(Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker) {
294+
public void setReplyErrorChecker(Function<ConsumerRecord<?, ?>, @Nullable Exception> replyErrorChecker) {
294295
Assert.notNull(replyErrorChecker, "'replyErrorChecker' cannot be null");
295296
this.replyErrorChecker = replyErrorChecker;
296297
}
@@ -363,7 +364,7 @@ public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message) {
363364
}
364365

365366
@Override
366-
public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message, Duration replyTimeout) {
367+
public RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message, @Nullable Duration replyTimeout) {
367368
return sendAndReceive(message, replyTimeout, null);
368369
}
369370

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyFuture.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.CompletableFuture;
2020

2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.jspecify.annotations.Nullable;
2223

2324
import org.springframework.kafka.support.SendResult;
2425

@@ -35,7 +36,7 @@
3536
*/
3637
public class RequestReplyFuture<K, V, R> extends CompletableFuture<ConsumerRecord<K, R>> {
3738

38-
private volatile CompletableFuture<SendResult<K, V>> sendFuture;
39+
private volatile @Nullable CompletableFuture<SendResult<K, V>> sendFuture;
3940

4041
protected void setSendFuture(CompletableFuture<SendResult<K, V>> sendFuture) {
4142
this.sendFuture = sendFuture;
@@ -45,7 +46,7 @@ protected void setSendFuture(CompletableFuture<SendResult<K, V>> sendFuture) {
4546
* Return the send future.
4647
* @return the send future.
4748
*/
48-
public CompletableFuture<SendResult<K, V>> getSendFuture() {
49+
public @Nullable CompletableFuture<SendResult<K, V>> getSendFuture() {
4950
return this.sendFuture;
5051
}
5152

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyMessageFuture.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.util.concurrent.CompletableFuture;
2020

21+
import org.jspecify.annotations.Nullable;
22+
2123
import org.springframework.kafka.support.SendResult;
2224
import org.springframework.messaging.Message;
2325

@@ -33,17 +35,17 @@
3335
*/
3436
public class RequestReplyMessageFuture<K, V> extends CompletableFuture<Message<?>> {
3537

36-
private final CompletableFuture<SendResult<K, V>> sendFuture; // NOSONAR
38+
private final @Nullable CompletableFuture<SendResult<K, V>> sendFuture; // NOSONAR
3739

38-
RequestReplyMessageFuture(CompletableFuture<SendResult<K, V>> sendFuture) {
40+
RequestReplyMessageFuture(@Nullable CompletableFuture<SendResult<K, V>> sendFuture) {
3941
this.sendFuture = sendFuture;
4042
}
4143

4244
/**
4345
* Return the send future.
4446
* @return the send future.
4547
*/
46-
public CompletableFuture<SendResult<K, V>> getSendFuture() {
48+
public @Nullable CompletableFuture<SendResult<K, V>> getSendFuture() {
4749
return this.sendFuture;
4850
}
4951

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/RequestReplyTypedMessageFuture.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.TimeoutException;
2323

24+
import org.jspecify.annotations.Nullable;
25+
2426
import org.springframework.kafka.support.SendResult;
2527
import org.springframework.messaging.Message;
2628

@@ -37,7 +39,7 @@
3739
*/
3840
public class RequestReplyTypedMessageFuture<K, V, P> extends RequestReplyMessageFuture<K, V> {
3941

40-
RequestReplyTypedMessageFuture(CompletableFuture<SendResult<K, V>> sendFuture) {
42+
RequestReplyTypedMessageFuture(@Nullable CompletableFuture<SendResult<K, V>> sendFuture) {
4143
super(sendFuture);
4244
}
4345

Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes for request/reply semantics.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.kafka.requestreply;

0 commit comments

Comments
 (0)