You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The b3 header generated by spring-cloud-sleuth for Kafka messages is not getting propagated to retry topics. The same message when sent to retry topic after an exception, has a new b3 header.
Please see the whenMessageSendWithB3HeaderToRetryableTopic_sameHeaderShouldBeCarriedForward test case below-
import ...
@Slf4j
@SpringBootTest(classes = TestApplication.class)
@EmbeddedKafka(
topics = {
RetryableTopicIntegrationTest.MAIN_RETRYABLE_TOPIC
},
partitions = 1)
class RetryableTopicIntegrationTest {
static final String MAIN_RETRYABLE_TOPIC = "main-retryable-topic";
private static final String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory";
private static final String CUSTOM_TRACE_ID = "custom-trace-id";
public static final String TRACE_ID = "traceId";
String mainTopicTraceId;
String retryTopicTraceId;
private final CountDownLatch retryableTopicLatch = new CountDownLatch(1);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void whenMessageSendWithB3HeaderToRetryableTopic_sameHeaderShouldBeCarriedForward() throws InterruptedException {
// given
final ProducerRecord<String, String> record = new ProducerRecord<>(MAIN_RETRYABLE_TOPIC, "Test message");
record.headers().add(CUSTOM_TRACE_ID, "12345".getBytes());
// when
log.info("Sending message :{}", record);
kafkaTemplate.send(record);
retryableTopicLatch.await(60, TimeUnit.SECONDS);
// then
assertThat(retryableTopicLatch.getCount()).isZero();
log.info("Retry topic traceId: {}, Main topic traceId: {}", retryTopicTraceId, mainTopicTraceId);
assertThat(retryTopicTraceId).isEqualTo(mainTopicTraceId);
}
@RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "2")
@KafkaListener(id = "test-consumer", topics = MAIN_RETRYABLE_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
public void listen(ConsumerRecord consumerRecord) {
log.info("Received: {}", consumerRecord);
Headers headers = consumerRecord.headers();
String traceId = MDC.get(TRACE_ID);
if (Objects.nonNull(headers.lastHeader(KafkaHeaders.ORIGINAL_TOPIC))) {
this.retryTopicTraceId = traceId;
retryableTopicLatch.countDown();
} else {
mainTopicTraceId = traceId;
}
throw new RuntimeException("Woooops... in topic with consumerRecord: " + consumerRecord);
}
}
Hello @deepesh-verma, nice to see you again! That's a very interesting issue, thanks. TBH I don't know much about b3 headers, but here's what I thought about this.
Since sleuth uses thread-based MDC for the headers, I think it's expected that they won't be propagated to a new message that'll be consumed by a different thread, so I guess this is more an enhancement than a bug, right?
The solution I've used in the past for this kind of problem is to wrap the original message into a message containing the ThreadLocal values, and then set the ThreadLocal values again in message consumption while unwrapping the message so the user receives the original message.
But this has implications that go beyond the application's boundaries, because it interferes with the messages' contracts, which might be consumed by a different application, so I don't think it's necessarily a good solution for a framework.
What do you think, did you think of a solution for this?
The b3 header generated by
spring-cloud-sleuth
for Kafka messages is not getting propagated to retry topics. The same message when sent to retry topic after an exception, has a new b3 header.Please see the
whenMessageSendWithB3HeaderToRetryableTopic_sameHeaderShouldBeCarriedForward
test case below-Complete test class - https://github.com/deepesh-verma/spring-kafka-sample/blob/main/src/test/java/dev/deepesh/springkafkasample/RetryableTopicIntegrationTest.java
Repository with complete code - https://github.com/deepesh-verma/spring-kafka-sample
The text was updated successfully, but these errors were encountered: