Skip to content

Commit 7af2e94

Browse files
committed
spring-projectsGH-1883: Fix replyTimeout Logic
Resolves spring-projects#1883 NPE when `null` `replyTimeout` supplied (javadoc says it's allowed and means use default). `replyTimeout` argument ignored on message-based methods. **cherry-pick to 2.7.x**
1 parent e20c2ec commit 7af2e94

File tree

2 files changed

+73
-4
lines changed

2 files changed

+73
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,11 +333,12 @@ public <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> mes
333333

334334
@SuppressWarnings("unchecked")
335335
@Override
336-
public <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, Duration replyTimeout,
336+
public <P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
337+
@Nullable Duration replyTimeout,
337338
@Nullable ParameterizedTypeReference<P> returnType) {
338339

339340
RequestReplyFuture<K, V, R> future = sendAndReceive((ProducerRecord<K, V>) getMessageConverter()
340-
.fromMessage(message, getDefaultTopic()));
341+
.fromMessage(message, getDefaultTopic()), replyTimeout);
341342
RequestReplyTypedMessageFuture<K, V, P> replyFuture =
342343
new RequestReplyTypedMessageFuture<>(future.getSendFuture());
343344
future.addCallback(
@@ -360,8 +361,12 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
360361
}
361362

362363
@Override
363-
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, Duration replyTimeout) {
364+
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
364365
Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
366+
Duration timeout = replyTimeout;
367+
if (timeout == null) {
368+
timeout = this.defaultReplyTimeout;
369+
}
365370
CorrelationKey correlationId = this.correlationStrategy.apply(record);
366371
Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
367372
Headers headers = record.headers();
@@ -383,7 +388,7 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, D
383388
this.futures.remove(correlationId);
384389
throw new KafkaException("Send failed", e);
385390
}
386-
scheduleTimeout(record, correlationId, replyTimeout);
391+
scheduleTimeout(record, correlationId, timeout);
387392
return future;
388393
}
389394

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.CountDownLatch;
3838
import java.util.concurrent.ExecutionException;
3939
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.TimeoutException;
4041
import java.util.concurrent.atomic.AtomicInteger;
4142
import java.util.concurrent.atomic.AtomicReference;
4243

@@ -45,9 +46,11 @@
4546
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
4647
import org.apache.kafka.clients.consumer.ConsumerRecord;
4748
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
49+
import org.apache.kafka.clients.producer.Callback;
4850
import org.apache.kafka.clients.producer.Producer;
4951
import org.apache.kafka.clients.producer.ProducerConfig;
5052
import org.apache.kafka.clients.producer.ProducerRecord;
53+
import org.apache.kafka.clients.producer.RecordMetadata;
5154
import org.apache.kafka.common.TopicPartition;
5255
import org.apache.kafka.common.header.Headers;
5356
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -676,6 +679,67 @@ public void withCustomHeaders() throws Exception {
676679
}
677680
}
678681

682+
@SuppressWarnings({ "rawtypes", "unchecked" })
683+
@Test
684+
void nullDuration() throws Exception {
685+
ProducerFactory pf = mock(ProducerFactory.class);
686+
Producer producer = mock(Producer.class);
687+
willAnswer(invocation -> {
688+
Callback callback = invocation.getArgument(1);
689+
SettableListenableFuture<Object> future = new SettableListenableFuture<>();
690+
future.set("done");
691+
callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0, null, 0, 0), null);
692+
return future;
693+
}).given(producer).send(any(), any());
694+
given(pf.createProducer()).willReturn(producer);
695+
GenericMessageListenerContainer container = mock(GenericMessageListenerContainer.class);
696+
ContainerProperties properties = new ContainerProperties("two");
697+
given(container.getContainerProperties()).willReturn(properties);
698+
ReplyingKafkaTemplate template = new ReplyingKafkaTemplate(pf, container);
699+
template.start();
700+
Message<?> msg = MessageBuilder.withPayload("foo".getBytes())
701+
.setHeader(KafkaHeaders.TOPIC, "foo")
702+
.build();
703+
// was NPE here
704+
template.sendAndReceive(new ProducerRecord("foo", 0, "bar", "baz"), null).getSendFuture().get();
705+
}
706+
707+
@SuppressWarnings({ "rawtypes", "unchecked" })
708+
@Test
709+
void requestTimeoutWithMessage() {
710+
ProducerFactory pf = mock(ProducerFactory.class);
711+
Producer producer = mock(Producer.class);
712+
willAnswer(invocation -> {
713+
return new SettableListenableFuture<>();
714+
}).given(producer).send(any(), any());
715+
given(pf.createProducer()).willReturn(producer);
716+
GenericMessageListenerContainer container = mock(GenericMessageListenerContainer.class);
717+
ContainerProperties properties = new ContainerProperties("two");
718+
given(container.getContainerProperties()).willReturn(properties);
719+
ReplyingKafkaTemplate template = new ReplyingKafkaTemplate(pf, container);
720+
template.start();
721+
Message<?> msg = MessageBuilder.withPayload("foo".getBytes())
722+
.setHeader(KafkaHeaders.TOPIC, "foo")
723+
.build();
724+
long t1 = System.currentTimeMillis();
725+
RequestReplyTypedMessageFuture<String, String, Foo> future = template.sendAndReceive(msg, Duration.ofMillis(10),
726+
new ParameterizedTypeReference<Foo>() {
727+
});
728+
try {
729+
future.get(10, TimeUnit.SECONDS);
730+
}
731+
catch (TimeoutException ex) {
732+
fail("get timed out");
733+
}
734+
catch (InterruptedException e) {
735+
Thread.currentThread().interrupt();
736+
fail("Interrupted");
737+
}
738+
catch (ExecutionException e) {
739+
assertThat(System.currentTimeMillis() - t1).isLessThan(3000L);
740+
}
741+
}
742+
679743
@Configuration
680744
@EnableKafka
681745
public static class Config {

0 commit comments

Comments
 (0)