Skip to content

Commit 58b6889

Browse files
garyrussellartembilan
authored andcommitted
GH-2344: AggReplyingKT Support Custom Correlation
Resolves #2344 The `ReplyingKafkaTemplate` supports the use of a custom header name for correlation, but the subclass `AggregatingReplyingKafkaTemplate` was hard coded to use the default header. Support the use of a custom header name in the subclass. **cherry-pick to 2.9.x, 2.8.x**
1 parent c978b86 commit 58b6889

File tree

3 files changed

+22
-8
lines changed

3 files changed

+22
-8
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.springframework.kafka.listener.BatchConsumerAwareMessageListener;
3939
import org.springframework.kafka.listener.ContainerProperties.AckMode;
4040
import org.springframework.kafka.listener.GenericMessageListenerContainer;
41-
import org.springframework.kafka.support.KafkaHeaders;
4241
import org.springframework.kafka.support.KafkaUtils;
4342
import org.springframework.util.Assert;
4443

@@ -124,12 +123,13 @@ public synchronized void setReturnPartialOnTimeout(boolean returnPartialOnTimeou
124123
@Override
125124
public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>> data, Consumer<?, ?> consumer) {
126125
List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>> completed = new ArrayList<>();
126+
String correlationHeaderName = getCorrelationHeaderName();
127127
data.forEach(record -> {
128-
Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
128+
Header correlation = record.headers().lastHeader(correlationHeaderName);
129129
if (correlation == null) {
130130
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
131131
+ " - to use request/reply semantics, the responding server must return the correlation id "
132-
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
132+
+ " in the '" + correlationHeaderName + "' header");
133133
}
134134
else {
135135
CorrelationKey correlationId = new CorrelationKey(correlation.value());
@@ -142,7 +142,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
142142
ConsumerRecord<K, Collection<ConsumerRecord<K, R>>> done =
143143
new ConsumerRecord<>(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list);
144144
done.headers()
145-
.add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationId
145+
.add(new RecordHeader(correlationHeaderName, correlationId
146146
.getCorrelationId()));
147147
this.pending.remove(correlationId);
148148
checkOffsetsAndCommitIfNecessary(list, consumer);

Diff for: spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

+9
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,15 @@ public void setCorrelationHeaderName(String correlationHeaderName) {
251251
this.correlationHeaderName = correlationHeaderName;
252252
}
253253

254+
/**
255+
* Return the correlation header name.
256+
* @return the header name.
257+
* @since 2.8.8
258+
*/
259+
protected String getCorrelationHeaderName() {
260+
return this.correlationHeaderName;
261+
}
262+
254263
/**
255264
* Set a custom header name for the reply topic. Default
256265
* {@link KafkaHeaders#REPLY_TOPIC}.

Diff for: spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,7 @@ public void testAggregateNormal() throws Exception {
448448
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
449449
new TopicPartitionOffset(D_REPLY, 0), 2, new AtomicInteger());
450450
try {
451+
template.setCorrelationHeaderName("customCorrelation");
451452
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
452453
ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, "foo");
453454
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
@@ -838,14 +839,18 @@ public HandlerReturn handlerReturn() {
838839

839840
@KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
840841
@SendTo // default REPLY_TOPIC header
841-
public String dListener1(String in) {
842-
return in.toUpperCase();
842+
public Message<String> dListener1(String in, @Header("customCorrelation") byte[] correlation) {
843+
return MessageBuilder.withPayload(in.toUpperCase())
844+
.setHeader("customCorrelation", correlation)
845+
.build();
843846
}
844847

845848
@KafkaListener(id = "def2", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
846849
@SendTo // default REPLY_TOPIC header
847-
public String dListener2(String in) {
848-
return in.substring(0, 1) + in.substring(1).toUpperCase();
850+
public Message<String> dListener2(String in, @Header("customCorrelation") byte[] correlation) {
851+
return MessageBuilder.withPayload(in.substring(0, 1) + in.substring(1).toUpperCase())
852+
.setHeader("customCorrelation", correlation)
853+
.build();
849854
}
850855

851856
@KafkaListener(id = G_REQUEST, topics = G_REQUEST)

0 commit comments

Comments
 (0)