Skip to content

GH-2344: AggReplyingKT Support Custom Correlation #2347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.springframework.kafka.listener.BatchConsumerAwareMessageListener;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -124,12 +123,13 @@ public synchronized void setReturnPartialOnTimeout(boolean returnPartialOnTimeou
@Override
public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>> data, Consumer<?, ?> consumer) {
List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>> completed = new ArrayList<>();
String correlationHeaderName = getCorrelationHeaderName();
data.forEach(record -> {
Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
Header correlation = record.headers().lastHeader(correlationHeaderName);
if (correlation == null) {
this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record)
+ " - to use request/reply semantics, the responding server must return the correlation id "
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
+ " in the '" + correlationHeaderName + "' header");
}
else {
CorrelationKey correlationId = new CorrelationKey(correlation.value());
Expand All @@ -142,7 +142,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
ConsumerRecord<K, Collection<ConsumerRecord<K, R>>> done =
new ConsumerRecord<>(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list);
done.headers()
.add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationId
.add(new RecordHeader(correlationHeaderName, correlationId
.getCorrelationId()));
this.pending.remove(correlationId);
checkOffsetsAndCommitIfNecessary(list, consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ public void setCorrelationHeaderName(String correlationHeaderName) {
this.correlationHeaderName = correlationHeaderName;
}

/**
* Return the correlation header name.
* @return the header name.
* @since 2.8.8
*/
protected String getCorrelationHeaderName() {
return this.correlationHeaderName;
}

/**
* Set a custom header name for the reply topic. Default
* {@link KafkaHeaders#REPLY_TOPIC}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ public void testAggregateNormal() throws Exception {
AggregatingReplyingKafkaTemplate<Integer, String, String> template = aggregatingTemplate(
new TopicPartitionOffset(D_REPLY, 0), 2, new AtomicInteger());
try {
template.setCorrelationHeaderName("customCorrelation");
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
ProducerRecord<Integer, String> record = new ProducerRecord<>(D_REQUEST, null, null, null, "foo");
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
Expand Down Expand Up @@ -838,14 +839,18 @@ public HandlerReturn handlerReturn() {

@KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
@SendTo // default REPLY_TOPIC header
public String dListener1(String in) {
return in.toUpperCase();
public Message<String> dListener1(String in, @Header("customCorrelation") byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader("customCorrelation", correlation)
.build();
}

@KafkaListener(id = "def2", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
@SendTo // default REPLY_TOPIC header
public String dListener2(String in) {
return in.substring(0, 1) + in.substring(1).toUpperCase();
public Message<String> dListener2(String in, @Header("customCorrelation") byte[] correlation) {
return MessageBuilder.withPayload(in.substring(0, 1) + in.substring(1).toUpperCase())
.setHeader("customCorrelation", correlation)
.build();
}

@KafkaListener(id = G_REQUEST, topics = G_REQUEST)
Expand Down