Skip to content

Commit e731729

Browse files
committed
spring-projectsGH-2348: Custom Correlation Consumer Side
Resolves spring-projects#2348 The replying template supports a custom header for correlation for cases when the consumer side is not a Spring app and uses a different header. Support a custom header name on the consumer side, for cases where the client side is not Spring and uses a different header.
1 parent 9066598 commit e731729

File tree

6 files changed

+132
-35
lines changed

6 files changed

+132
-35
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,9 @@ These header names are used by the `@KafkaListener` infrastructure to route the
726726
Starting with version 2.3, you can customize the header names - the template has 3 properties `correlationHeaderName`, `replyTopicHeaderName`, and `replyPartitionHeaderName`.
727727
This is useful if your server is not a Spring application (or does not use the `@KafkaListener`).
728728

729+
NOTE: Conversely, if the requesting application is not a spring application and puts correlation information in a different header, starting with version 3.0, you can configure a custom `correlationHeaderName` on the listener container factory and that header will be echoed back.
730+
Previously, the listener had to echo custom correlation headers.
731+
729732
[[exchanging-messages]]
730733
====== Request/Reply with `Message<?>` s
731734

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,9 @@ See <<kafka-template>>.
5252

5353
The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s.
5454
See <<replying-template>> and <<exchanging-messages>>.
55+
56+
[[x30-listener]]
57+
==== `@KafkaListener` Changes
58+
59+
You can now use a custom correlation header which will be echoed in any reply message.
60+
See the note at the end of <<replying-template>> for more information.

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
4545
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
4646
import org.springframework.kafka.support.JavaUtils;
47+
import org.springframework.kafka.support.KafkaHeaders;
4748
import org.springframework.kafka.support.TopicPartitionOffset;
4849
import org.springframework.kafka.support.converter.MessageConverter;
4950
import org.springframework.util.Assert;
@@ -108,6 +109,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
108109

109110
private ContainerCustomizer<K, V, C> containerCustomizer;
110111

112+
private String correlationHeaderName;
113+
111114
@Override
112115
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
113116
this.applicationContext = applicationContext;
@@ -321,6 +324,17 @@ public void setContainerCustomizer(ContainerCustomizer<K, V, C> containerCustomi
321324
this.containerCustomizer = containerCustomizer;
322325
}
323326

327+
/**
328+
* Set a custom header name for the correlation id. Default
329+
* {@link KafkaHeaders#CORRELATION_ID}. This header will be echoed back in any reply
330+
* message.
331+
* @param correlationHeaderName the header name.
332+
* @since 3.0
333+
*/
334+
public void setCorrelationHeaderName(String correlationHeaderName) {
335+
this.correlationHeaderName = correlationHeaderName;
336+
}
337+
324338
@SuppressWarnings("deprecation")
325339
@Override
326340
public void afterPropertiesSet() {
@@ -363,7 +377,8 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
363377
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
364378
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
365379
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)
366-
.acceptIfNotNull(this.batchToRecordAdapter, aklEndpoint::setBatchToRecordAdapter);
380+
.acceptIfNotNull(this.batchToRecordAdapter, aklEndpoint::setBatchToRecordAdapter)
381+
.acceptIfNotNull(this.correlationHeaderName, aklEndpoint::setCorrelationHeaderName);
367382
if (aklEndpoint.getBatchListener() == null) {
368383
JavaUtils.INSTANCE
369384
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener);

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
4646
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
4747
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
48+
import org.springframework.kafka.support.JavaUtils;
49+
import org.springframework.kafka.support.KafkaHeaders;
4850
import org.springframework.kafka.support.TopicPartitionOffset;
4951
import org.springframework.kafka.support.converter.MessageConverter;
5052
import org.springframework.lang.Nullable;
@@ -117,6 +119,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
117119

118120
private byte[] listenerInfo;
119121

122+
private String correlationHeaderName;
123+
120124
@Override
121125
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
122126
this.beanFactory = beanFactory;
@@ -445,6 +449,16 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
445449
this.batchToRecordAdapter = batchToRecordAdapter;
446450
}
447451

452+
/**
453+
* Set a custom header name for the correlation id. Default
454+
* {@link KafkaHeaders#CORRELATION_ID}. This header will be echoed back in any reply
455+
* message.
456+
* @param correlationHeaderName the header name.
457+
* @since 3.0
458+
*/
459+
public void setCorrelationHeaderName(String correlationHeaderName) {
460+
this.correlationHeaderName = correlationHeaderName;
461+
}
448462

449463
@Override
450464
public void afterPropertiesSet() {
@@ -485,9 +499,9 @@ private void setupMessageListener(MessageListenerContainer container,
485499
@Nullable MessageConverter messageConverter) {
486500

487501
MessagingMessageListenerAdapter<K, V> adapter = createMessageListener(container, messageConverter);
488-
if (this.replyHeadersConfigurer != null) {
489-
adapter.setReplyHeadersConfigurer(this.replyHeadersConfigurer);
490-
}
502+
JavaUtils.INSTANCE
503+
.acceptIfNotNull(this.replyHeadersConfigurer, adapter::setReplyHeadersConfigurer)
504+
.acceptIfNotNull(this.correlationHeaderName, adapter::setCorrelationHeaderName);
491505
adapter.setSplitIterables(this.splitIterables);
492506
Object messageListener = adapter;
493507
boolean isBatchListener = isBatchListener();

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,30 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
128128

129129
private boolean splitIterables = true;
130130

131+
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;
132+
133+
/**
134+
* Create an instance with the provided bean and method.
135+
* @param bean the bean.
136+
* @param method the method.
137+
*/
131138
public MessagingMessageListenerAdapter(Object bean, Method method) {
132139
this.bean = bean;
133140
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
134141
}
135142

143+
/**
144+
* Set a custom header name for the correlation id. Default
145+
* {@link KafkaHeaders#CORRELATION_ID}. This header will be echoed back in any reply
146+
* message.
147+
* @param correlationHeaderName the header name.
148+
* @since 3.0
149+
*/
150+
public void setCorrelationHeaderName(String correlationHeaderName) {
151+
Assert.notNull(correlationHeaderName, "'correlationHeaderName' cannot be null");
152+
this.correlationHeaderName = correlationHeaderName;
153+
}
154+
136155
/**
137156
* Set the MessageConverter.
138157
* @param messageConverter the converter.
@@ -478,7 +497,7 @@ private Message<?> checkHeaders(Object result, String topic, Object source) { //
478497
MessageHeaders headers = reply.getHeaders();
479498
boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null;
480499
boolean sourceIsMessage = source instanceof Message;
481-
boolean needsCorrelation = headers.get(KafkaHeaders.CORRELATION_ID) == null && sourceIsMessage;
500+
boolean needsCorrelation = headers.get(this.correlationHeaderName) == null && sourceIsMessage;
482501
boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage
483502
&& getReplyPartition((Message<?>) source) != null;
484503
if (needsTopic || needsCorrelation || needsPartition) {
@@ -487,8 +506,8 @@ private Message<?> checkHeaders(Object result, String topic, Object source) { //
487506
builder.setHeader(KafkaHeaders.TOPIC, topic);
488507
}
489508
if (needsCorrelation && sourceIsMessage) {
490-
builder.setHeader(KafkaHeaders.CORRELATION_ID,
491-
((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID));
509+
builder.setHeader(this.correlationHeaderName,
510+
((Message<?>) source).getHeaders().get(this.correlationHeaderName));
492511
}
493512
if (sourceIsMessage && reply.getHeaders().get(KafkaHeaders.REPLY_PARTITION) == null) {
494513
setPartition(builder, (Message<?>) source);
@@ -503,8 +522,8 @@ private void sendSingleResult(Object result, String topic, @Nullable Object sour
503522
byte[] correlationId = null;
504523
boolean sourceIsMessage = source instanceof Message;
505524
if (sourceIsMessage
506-
&& ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID) != null) {
507-
correlationId = ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
525+
&& ((Message<?>) source).getHeaders().get(this.correlationHeaderName) != null) {
526+
correlationId = ((Message<?>) source).getHeaders().get(this.correlationHeaderName, byte[].class);
508527
}
509528
if (sourceIsMessage) {
510529
sendReplyForMessageSource(result, topic, source, correlationId);
@@ -515,15 +534,15 @@ private void sendSingleResult(Object result, String topic, @Nullable Object sour
515534
}
516535

517536
@SuppressWarnings("unchecked")
518-
private void sendReplyForMessageSource(Object result, String topic, Object source, byte[] correlationId) {
537+
private void sendReplyForMessageSource(Object result, String topic, Object source, @Nullable byte[] correlationId) {
519538
MessageBuilder<Object> builder = MessageBuilder.withPayload(result)
520539
.setHeader(KafkaHeaders.TOPIC, topic);
521540
if (this.replyHeadersConfigurer != null) {
522541
Map<String, Object> headersToCopy = ((Message<?>) source).getHeaders().entrySet().stream()
523542
.filter(e -> {
524543
String key = e.getKey();
525544
return !key.equals(MessageHeaders.ID) && !key.equals(MessageHeaders.TIMESTAMP)
526-
&& !key.equals(KafkaHeaders.CORRELATION_ID)
545+
&& !key.equals(this.correlationHeaderName)
527546
&& !key.startsWith(KafkaHeaders.RECEIVED);
528547
})
529548
.filter(e -> this.replyHeadersConfigurer.shouldCopy(e.getKey(), e.getValue()))
@@ -537,7 +556,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
537556
}
538557
}
539558
if (correlationId != null) {
540-
builder.setHeader(KafkaHeaders.CORRELATION_ID, correlationId);
559+
builder.setHeader(this.correlationHeaderName, correlationId);
541560
}
542561
setPartition(builder, ((Message<?>) source));
543562
this.replyTemplate.send(builder.build());

0 commit comments

Comments
 (0)