Skip to content

Commit f351a7a

Browse files
authored
GH-2638: Support Dynamic Tags via Observation
See #2648
1 parent 0f8dc80 commit f351a7a

File tree

3 files changed

+34
-4
lines changed

3 files changed

+34
-4
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

+5-1
Original file line numberDiff line numberDiff line change
@@ -3447,7 +3447,11 @@ The default implementations add the `bean.name` tag for template observations an
34473447

34483448
You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations.
34493449

3450-
See <<observation-gen>> for details of the observations that are recorded.
3450+
See <<observation-gen>> for details of the default observations that are recorded.
3451+
3452+
Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records.
3453+
To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively.
3454+
The `record` property in both observation contexts contains the `ConsumerRecord` or `ProducerRecord` respectively.
34513455

34523456
[[transactions]]
34533457
==== Transactions

spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordReceiverContext.java

+13
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ public KafkaRecordReceiverContext(ConsumerRecord<?, ?> record, String listenerId
5252
setRemoteServiceName("Apache Kafka" + (cluster != null ? ": " + cluster : ""));
5353
}
5454

55+
/**
56+
* Return the listener id.
57+
* @return the listener id.
58+
*/
5559
public String getListenerId() {
5660
return this.listenerId;
5761
}
@@ -64,4 +68,13 @@ public String getSource() {
6468
return this.record.topic();
6569
}
6670

71+
/**
72+
* Return the consumer record.
73+
* @return the record the record.
74+
* @since 3.0.6
75+
*/
76+
public ConsumerRecord<?, ?> getRecord() {
77+
return this.record;
78+
}
79+
6780
}

spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,21 @@ public class KafkaRecordSenderContext extends SenderContext<ProducerRecord<?, ?>
3434

3535
private final String beanName;
3636

37-
private final String destination;
37+
private final ProducerRecord<?, ?> record;
3838

3939
public KafkaRecordSenderContext(ProducerRecord<?, ?> record, String beanName, Supplier<String> clusterId) {
4040
super((carrier, key, value) -> record.headers().add(key, value.getBytes(StandardCharsets.UTF_8)));
4141
setCarrier(record);
4242
this.beanName = beanName;
43-
this.destination = record.topic();
43+
this.record = record;
4444
String cluster = clusterId.get();
4545
setRemoteServiceName("Apache Kafka" + (cluster != null ? ": " + cluster : ""));
4646
}
4747

48+
/**
49+
* Return the template's bean name.
50+
* @return the name.
51+
*/
4852
public String getBeanName() {
4953
return this.beanName;
5054
}
@@ -54,7 +58,16 @@ public String getBeanName() {
5458
* @return the topic.
5559
*/
5660
public String getDestination() {
57-
return this.destination;
61+
return this.record.topic();
62+
}
63+
64+
/**
65+
* Return the producer record.
66+
* @return the record the record.
67+
* @since 3.0.6
68+
*/
69+
public ProducerRecord<?, ?> getRecord() {
70+
return this.record;
5871
}
5972

6073
}

0 commit comments

Comments
 (0)