Skip to content

Commit 32e2e76

Browse files
Remove meters that are not published by Kafka anymore
fixes gh-2843
1 parent 42af250 commit 32e2e76

File tree

2 files changed

+156
-13
lines changed

2 files changed

+156
-13
lines changed

micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java

+29-13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.micrometer.core.instrument.Meter;
2222
import io.micrometer.core.instrument.MeterRegistry;
2323
import io.micrometer.core.instrument.Tag;
24+
import io.micrometer.core.instrument.Tags;
2425
import io.micrometer.core.instrument.binder.MeterBinder;
2526
import io.micrometer.core.instrument.util.NamedThreadFactory;
2627
import io.micrometer.core.lang.NonNullApi;
@@ -44,6 +45,7 @@
4445
import org.apache.kafka.common.Metric;
4546
import org.apache.kafka.common.MetricName;
4647

48+
import static io.micrometer.core.instrument.Meter.Type.OTHER;
4749
import static java.util.Collections.emptyList;
4850

4951
/**
@@ -117,7 +119,7 @@ public void bindTo(MeterRegistry registry) {
117119

118120
private Iterable<Tag> getCommonTags(MeterRegistry registry) {
119121
// FIXME hack until we have proper API to retrieve common tags
120-
Meter.Id dummyId = Meter.builder("delete.this", Meter.Type.OTHER, Collections.emptyList()).register(registry).getId();
122+
Meter.Id dummyId = Meter.builder("delete.this", OTHER, Collections.emptyList()).register(registry).getId();
121123
registry.remove(dummyId);
122124
return dummyId.getTags();
123125
}
@@ -129,20 +131,20 @@ void prepareToBindMetrics(MeterRegistry registry) {
129131
this.metrics.set(this.metricsSupplier.get());
130132
Map<MetricName, ? extends Metric> metrics = this.metrics.get();
131133
// Collect static metrics and tags
132-
Metric startTime = null;
134+
MetricName startTime = null;
133135

134136
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
135137
MetricName name = entry.getKey();
136138
if (METRIC_GROUP_APP_INFO.equals(name.group()))
137139
if (VERSION_METRIC_NAME.equals(name.name())) {
138140
kafkaVersion = (String) entry.getValue().metricValue();
139141
} else if (START_TIME_METRIC_NAME.equals(name.name())) {
140-
startTime = entry.getValue();
142+
startTime = entry.getKey();
141143
}
142144
}
143145

144146
if (startTime != null) {
145-
bindMeter(registry, startTime.metricName(), meterName(startTime), meterTags(startTime));
147+
bindMeter(registry, startTime, meterName(startTime), meterTags(startTime));
146148
}
147149
}
148150

@@ -162,6 +164,16 @@ void checkAndBindMetrics(MeterRegistry registry) {
162164
Map<MetricName, ? extends Metric> metrics = this.metrics.get();
163165

164166
if (!currentMeters.equals(metrics.keySet())) {
167+
Set<MetricName> metricsToRemove = currentMeters.stream()
168+
.filter(metricName -> !metrics.containsKey(metricName))
169+
.collect(Collectors.toSet());
170+
171+
for (MetricName metricName : metricsToRemove) {
172+
Meter.Id id = meterIdForComparison(metricName);
173+
Meter meter = registry.remove(id);
174+
registeredMeters.remove(meter);
175+
}
176+
165177
currentMeters = new HashSet<>(metrics.keySet());
166178

167179
Map<String, List<Meter>> registryMetersByNames = registry.getMeters().stream()
@@ -176,14 +188,14 @@ void checkAndBindMetrics(MeterRegistry registry) {
176188
return;
177189
}
178190

179-
String meterName = meterName(metric);
191+
String meterName = meterName(name);
180192

181193
// Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag)
182194
// Remove meters with lower number of tags
183195
boolean hasLessTags = false;
184196
for (Meter other : registryMetersByNames.getOrDefault(meterName, emptyList())) {
185197
List<Tag> tags = other.getId().getTags();
186-
List<Tag> meterTagsWithCommonTags = meterTags(metric, true);
198+
List<Tag> meterTagsWithCommonTags = meterTags(name, true);
187199
if (tags.size() < meterTagsWithCommonTags.size()) {
188200
registry.remove(other);
189201
registeredMeters.remove(other);
@@ -196,7 +208,7 @@ else if (tags.size() == meterTagsWithCommonTags.size())
196208
}
197209
if (hasLessTags) return;
198210

199-
List<Tag> tags = meterTags(metric);
211+
List<Tag> tags = meterTags(name);
200212
try {
201213
Meter meter = bindMeter(registry, metric.metricName(), meterName, tags);
202214
List<Meter> meters = registryMetersByNames.computeIfAbsent(meterName, k -> new ArrayList<>());
@@ -252,9 +264,9 @@ private double toDouble(@Nullable Metric metric) {
252264
return (metric != null) ? ((Number) metric.metricValue()).doubleValue() : Double.NaN;
253265
}
254266

255-
private List<Tag> meterTags(Metric metric, boolean includeCommonTags) {
267+
private List<Tag> meterTags(MetricName metricName, boolean includeCommonTags) {
256268
List<Tag> tags = new ArrayList<>();
257-
metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key.replaceAll("-", "."), value)));
269+
metricName.tags().forEach((key, value) -> tags.add(Tag.of(key.replaceAll("-", "."), value)));
258270
tags.add(Tag.of(KAFKA_VERSION_TAG_NAME, kafkaVersion));
259271
extraTags.forEach(tags::add);
260272
if (includeCommonTags) {
@@ -263,15 +275,19 @@ private List<Tag> meterTags(Metric metric, boolean includeCommonTags) {
263275
return tags;
264276
}
265277

266-
private List<Tag> meterTags(Metric metric) {
267-
return meterTags(metric, false);
278+
private List<Tag> meterTags(MetricName metricName) {
279+
return meterTags(metricName, false);
268280
}
269281

270-
private String meterName(Metric metric) {
271-
String name = METRIC_NAME_PREFIX + metric.metricName().group() + "." + metric.metricName().name();
282+
private String meterName(MetricName metricName) {
283+
String name = METRIC_NAME_PREFIX + metricName.group() + "." + metricName.name();
272284
return name.replaceAll("-metrics", "").replaceAll("-", ".");
273285
}
274286

287+
private Meter.Id meterIdForComparison(MetricName metricName) {
288+
return new Meter.Id(meterName(metricName), Tags.of(meterTags(metricName, true)), null, null, OTHER);
289+
}
290+
275291
@Override
276292
public void close() {
277293
this.scheduler.shutdownNow();

micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java

+127
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.atomic.AtomicReference;
3838
import java.util.function.Supplier;
3939

40+
import static java.util.Collections.EMPTY_MAP;
4041
import static org.assertj.core.api.Assertions.assertThat;
4142

4243
class KafkaMetricsTest {
@@ -402,4 +403,130 @@ void shouldUseMetricFromSupplierIndirectly() {
402403
.isEqualTo(2.0)
403404
); // referencing the new value since the map was updated in checkAndBindMetrics
404405
}
406+
407+
@Issue("#2843")
408+
@Test
409+
void shouldRemoveOldMeters() {
410+
Map<MetricName, Metric> kafkaMetricMap = new HashMap<>();
411+
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> kafkaMetricMap;
412+
kafkaMetrics = new KafkaMetrics(supplier);
413+
MeterRegistry registry = new SimpleMeterRegistry();
414+
registry.config().commonTags("commonTest", "42");
415+
kafkaMetrics.bindTo(registry);
416+
assertThat(registry.getMeters()).hasSize(0);
417+
418+
MetricName aMetric = createMetricName("a");
419+
MetricName bMetric = createMetricName("b");
420+
421+
kafkaMetricMap.put(aMetric, createKafkaMetric(aMetric));
422+
kafkaMetrics.checkAndBindMetrics(registry);
423+
assertThat(registry.getMeters()).hasSize(1);
424+
425+
kafkaMetricMap.clear();
426+
kafkaMetrics.checkAndBindMetrics(registry);
427+
assertThat(registry.getMeters()).hasSize(0);
428+
429+
kafkaMetricMap.put(aMetric, createKafkaMetric(aMetric));
430+
kafkaMetricMap.put(bMetric, createKafkaMetric(bMetric));
431+
kafkaMetrics.checkAndBindMetrics(registry);
432+
assertThat(registry.getMeters()).hasSize(2);
433+
434+
kafkaMetricMap.clear();
435+
kafkaMetrics.checkAndBindMetrics(registry);
436+
assertThat(registry.getMeters()).hasSize(0);
437+
438+
kafkaMetricMap.put(aMetric, createKafkaMetric(aMetric));
439+
kafkaMetricMap.put(bMetric, createKafkaMetric(bMetric));
440+
kafkaMetrics.checkAndBindMetrics(registry);
441+
assertThat(registry.getMeters()).hasSize(2);
442+
443+
kafkaMetricMap.remove(bMetric);
444+
kafkaMetrics.checkAndBindMetrics(registry);
445+
assertThat(registry.getMeters()).hasSize(1);
446+
assertThat(registry.getMeters().get(0).getId().getName()).isEqualTo("kafka.test.a");
447+
448+
kafkaMetricMap.clear();
449+
kafkaMetrics.checkAndBindMetrics(registry);
450+
assertThat(registry.getMeters()).hasSize(0);
451+
}
452+
453+
@Issue("#2843")
454+
@Test
455+
void shouldRemoveOldMetersWithTags() {
456+
Map<MetricName, Metric> kafkaMetricMap = new HashMap<>();
457+
Supplier<Map<MetricName, ? extends Metric>> supplier = () -> kafkaMetricMap;
458+
kafkaMetrics = new KafkaMetrics(supplier);
459+
MeterRegistry registry = new SimpleMeterRegistry();
460+
registry.config().commonTags("commonTest", "42");
461+
kafkaMetrics.bindTo(registry);
462+
assertThat(registry.getMeters()).hasSize(0);
463+
464+
MetricName aMetricV1 = createMetricName("a", "foo", "v1");
465+
MetricName aMetricV2 = createMetricName("a", "foo", "v2");
466+
MetricName bMetric = createMetricName("b", "foo", "n/a");
467+
468+
kafkaMetricMap.put(aMetricV1, createKafkaMetric(aMetricV1));
469+
kafkaMetrics.checkAndBindMetrics(registry);
470+
assertThat(registry.getMeters()).hasSize(1);
471+
472+
kafkaMetricMap.clear();
473+
kafkaMetrics.checkAndBindMetrics(registry);
474+
assertThat(registry.getMeters()).hasSize(0);
475+
476+
kafkaMetricMap.put(aMetricV1, createKafkaMetric(aMetricV1));
477+
kafkaMetricMap.put(aMetricV2, createKafkaMetric(aMetricV2));
478+
kafkaMetricMap.put(bMetric, createKafkaMetric(bMetric));
479+
kafkaMetrics.checkAndBindMetrics(registry);
480+
assertThat(registry.getMeters()).hasSize(3);
481+
482+
kafkaMetricMap.clear();
483+
kafkaMetrics.checkAndBindMetrics(registry);
484+
assertThat(registry.getMeters()).hasSize(0);
485+
486+
kafkaMetricMap.put(aMetricV1, createKafkaMetric(aMetricV1));
487+
kafkaMetricMap.put(aMetricV2, createKafkaMetric(aMetricV2));
488+
kafkaMetrics.checkAndBindMetrics(registry);
489+
assertThat(registry.getMeters()).hasSize(2);
490+
491+
kafkaMetricMap.remove(aMetricV1);
492+
kafkaMetrics.checkAndBindMetrics(registry);
493+
assertThat(registry.getMeters()).hasSize(1);
494+
assertThat(registry.getMeters().get(0).getId().getName()).isEqualTo("kafka.test.a");
495+
assertThat(registry.getMeters().get(0).getId().getTags()).containsExactlyInAnyOrder(
496+
Tag.of("commonTest", "42"),
497+
Tag.of("kafka.version", "unknown"),
498+
Tag.of("foo", "v2")
499+
);
500+
501+
kafkaMetricMap.clear();
502+
kafkaMetrics.checkAndBindMetrics(registry);
503+
assertThat(registry.getMeters()).hasSize(0);
504+
}
505+
506+
@SuppressWarnings("unchecked")
507+
private MetricName createMetricName(String name) {
508+
return createMetricName(name, EMPTY_MAP);
509+
}
510+
511+
private MetricName createMetricName(String name, String... keyValues) {
512+
if (keyValues.length % 2 == 1) {
513+
throw new IllegalArgumentException("size must be even, it is a set of key=value pairs");
514+
}
515+
else {
516+
Map<String, String> tagsMap = new HashMap<>();
517+
for (int i = 0; i < keyValues.length; i += 2) {
518+
tagsMap.put(keyValues[i], keyValues[i + 1]);
519+
}
520+
521+
return createMetricName(name, tagsMap);
522+
}
523+
}
524+
525+
private MetricName createMetricName(String name, Map<String, String> tags) {
526+
return new MetricName(name, "test", "for testing", tags);
527+
}
528+
529+
private KafkaMetric createKafkaMetric(MetricName metricName) {
530+
return new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM);
531+
}
405532
}

0 commit comments

Comments
 (0)