Skip to content

Commit e19e6d4

Browse files
garyrussellartembilan
authored andcommitted
GH-3942: Fix Race in Kafka OB Gateway
Resolves #3942 When determining the default reply-to topic/partition, we need to wait for assignment. Already covered by `KafkaDslTests` (a recent build failure exposed this problem). **No back-port - 5.5.x uses 2.7.x by default, which does not support this.** 5.5.x users can call `waitForAssignment` on the `ReplyingKafkaTemplate` that is supplied to the gateways before sending messages.
1 parent 2895a1e commit e19e6d4

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

Diff for: spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java

+12
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,18 @@ public S replyMessageConverter(RecordMessageConverter messageConverter) {
6060
return _this();
6161
}
6262

63+
/**
64+
* Set the time to wait for partition assignment, when used as a gateway, to determine
65+
* the default reply-to topic/partition.
66+
* @param duration the duration.
67+
* @return the spec.
68+
* @since 6.0
69+
*/
70+
public S assigmentDuration(Duration duration) {
71+
this.target.setAssignmentDuration(duration);
72+
return _this();
73+
}
74+
6375
/**
6476
* A {@link org.springframework.kafka.core.KafkaTemplate}-based {@link KafkaProducerMessageHandlerSpec} extension.
6577
*

Diff for: spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

+24
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.lang.reflect.Type;
2020
import java.nio.charset.StandardCharsets;
21+
import java.time.Duration;
2122
import java.util.Collection;
2223
import java.util.HashMap;
2324
import java.util.Map;
@@ -103,6 +104,10 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
103104
*/
104105
private static final int DEFAULT_TIMEOUT_BUFFER = 5000;
105106

107+
private static final int TWENTY = 20;
108+
109+
private static final Duration DEFAULT_ASSIGNMENT_TIMEOUT = Duration.ofSeconds(TWENTY);
110+
106111
private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap<>();
107112

108113
private final KafkaTemplate<K, V> kafkaTemplate;
@@ -162,6 +167,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
162167

163168
private boolean useTemplateConverter;
164169

170+
private Duration assignmentDuration = DEFAULT_ASSIGNMENT_TIMEOUT;
171+
165172
private volatile byte[] singleReplyTopic;
166173

167174
public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
@@ -415,6 +422,17 @@ public void setUseTemplateConverter(boolean useTemplateConverter) {
415422
this.useTemplateConverter = useTemplateConverter;
416423
}
417424

425+
/**
426+
* Set the time to wait for partition assignment, when used as a gateway, to determine
427+
* the default reply-to topic/partition.
428+
* @param assignmentDuration the assignmentDuration to set.
429+
* @since 6.0
430+
*/
431+
public void setAssignmentDuration(Duration assignmentDuration) {
432+
Assert.notNull(assignmentDuration, "'assignmentDuration' cannot be null");
433+
this.assignmentDuration = assignmentDuration;
434+
}
435+
418436
@Override
419437
public String getComponentType() {
420438
return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
@@ -647,6 +665,12 @@ private byte[] getSingleReplyTopic() {
647665

648666
private void determineValidReplyTopicsAndPartitions() {
649667
ReplyingKafkaTemplate<?, ?, ?> rkt = (ReplyingKafkaTemplate<?, ?, ?>) this.kafkaTemplate;
668+
try {
669+
rkt.waitForAssignment(this.assignmentDuration);
670+
}
671+
catch (InterruptedException e) {
672+
Thread.currentThread().interrupt();
673+
}
650674
Collection<TopicPartition> replyTopics = rkt.getAssignedReplyTopicPartitions();
651675
Map<String, Set<Integer>> topicsAndPartitions = new HashMap<>();
652676
if (replyTopics != null) {

Diff for: spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ public IntegrationFlow sourceFlow() {
400400
public IntegrationFlow outboundGateFlow() {
401401
return IntegrationFlow.from(Gate.class)
402402
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
403+
.assigmentDuration(Duration.ofSeconds(30))
403404
.flushExpression("true")
404405
.sync(true)
405406
.configureKafkaTemplate(t -> t.defaultReplyTimeout(Duration.ofSeconds(30))))

0 commit comments

Comments
 (0)