From d76d6e9d74de86266746d4381d9c95ed52c08106 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 10:27:11 +0800 Subject: [PATCH 01/11] feat(backpressure): log it on recovery from backpressure Signed-off-by: Ning Yu --- .../s3/backpressure/DefaultBackPressureManager.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java index 5934972722..6eba371f93 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java @@ -49,6 +49,11 @@ public class DefaultBackPressureManager implements BackPressureManager { * Note: It should only be accessed in the {@link #checkerScheduler} thread. */ private long lastRegulateTime = System.currentTimeMillis(); + /** + * The last load level to trigger the regulator. + * Only used for logging. + */ + private LoadLevel lastRegulateLevel = LoadLevel.NORMAL; public DefaultBackPressureManager(Regulator regulator) { this(regulator, DEFAULT_COOLDOWN_MS); @@ -113,6 +118,9 @@ private LoadLevel currentLoadLevel() { private void regulate(LoadLevel loadLevel, long now) { if (LoadLevel.NORMAL.equals(loadLevel)) { + if (!LoadLevel.NORMAL.equals(lastRegulateLevel)) { + LOGGER.info("The system is back to a normal state, checkers: {}", loadLevels); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels); } @@ -122,5 +130,6 @@ private void regulate(LoadLevel loadLevel, long now) { loadLevel.regulate(regulator); lastRegulateTime = now; + lastRegulateLevel = loadLevel; } } From 772271956e026985b257f983ab05b51f3bbf5201 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 15:13:36 +0800 Subject: [PATCH 02/11] feat: add metric fetch_limiter_waiting_task_num Signed-off-by: Ning Yu --- .../main/scala/kafka/server/FairLimiter.java | 25 +++++++++++++++++++ core/src/main/scala/kafka/server/Limiter.java | 7 ++++++ .../main/scala/kafka/server/NoopLimiter.java | 5 ++++ .../streamaspect/ElasticReplicaManager.scala | 14 ++++++++--- .../S3StreamKafkaMetricsConstants.java | 1 + .../s3stream/S3StreamKafkaMetricsManager.java | 18 +++++++++++++ 6 files changed, 66 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/FairLimiter.java b/core/src/main/scala/kafka/server/FairLimiter.java index d6ad1edc6c..6b39a4b0ce 100644 --- a/core/src/main/scala/kafka/server/FairLimiter.java +++ b/core/src/main/scala/kafka/server/FairLimiter.java @@ -13,6 +13,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -26,6 +27,7 @@ public class FairLimiter implements Limiter { */ private final Lock lock = new ReentrantLock(true); private final Semaphore permits; + private final AtomicInteger waitingThreads = new AtomicInteger(0); public FairLimiter(int size) { maxPermits = size; @@ -34,6 +36,15 @@ public FairLimiter(int size) { @Override public Handler acquire(int permit) throws InterruptedException { + waitingThreads.incrementAndGet(); + try { + return acquire0(permit); + } finally { + waitingThreads.decrementAndGet(); + } + } + + private Handler acquire0(int permit) throws InterruptedException { lock.lock(); try { permits.acquire(permit); @@ -45,6 +56,15 @@ public Handler acquire(int permit) throws InterruptedException { @Override public Handler acquire(int permit, long timeoutMs) throws InterruptedException { + waitingThreads.incrementAndGet(); + try { + return acquire0(permit, timeoutMs); + } finally { + waitingThreads.decrementAndGet(); + } + } + + private Handler acquire0(int permit, long timeoutMs) throws InterruptedException { long start = System.nanoTime(); if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) { try { @@ -72,6 +92,11 @@ public int availablePermits() { return permits.availablePermits(); } + @Override + public int waitingThreads() { + return waitingThreads.get(); + } + private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException { if (permit > maxPermits) { permit = maxPermits; diff --git a/core/src/main/scala/kafka/server/Limiter.java b/core/src/main/scala/kafka/server/Limiter.java index 3e3511de77..86c9621520 100644 --- a/core/src/main/scala/kafka/server/Limiter.java +++ b/core/src/main/scala/kafka/server/Limiter.java @@ -49,6 +49,13 @@ public interface Limiter { */ int availablePermits(); + /** + * Return the number of threads waiting for permits. + * + * @return the number of threads waiting for permits + */ + int waitingThreads(); + /** * A handler to release acquired permits. */ diff --git a/core/src/main/scala/kafka/server/NoopLimiter.java b/core/src/main/scala/kafka/server/NoopLimiter.java index 1fab234492..1be07e5c80 100644 --- a/core/src/main/scala/kafka/server/NoopLimiter.java +++ b/core/src/main/scala/kafka/server/NoopLimiter.java @@ -38,6 +38,11 @@ public int availablePermits() { return Integer.MAX_VALUE; } + @Override + public int waitingThreads() { + return 0; + } + public static class NoopHandler implements Handler { @Override public void close() { diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 96da3503d0..ffb3fb944d 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -116,11 +116,17 @@ class ElasticReplicaManager( private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB - private val fetchLimiterGaugeMap = new util.HashMap[String, Integer]() + private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]() + S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => { + fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads()) + fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.waitingThreads()) + fetchLimiterWaitingTasksGaugeMap + }) + private val fetchLimiterPermitsGaugeMap = new util.HashMap[String, Integer]() S3StreamKafkaMetricsManager.setFetchLimiterPermitNumSupplier(() => { - fetchLimiterGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits()) - fetchLimiterGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits()) - fetchLimiterGaugeMap + fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits()) + fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits()) + fetchLimiterPermitsGaugeMap }) /** diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java index 98ea1b5bf6..2273c36eff 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java @@ -20,6 +20,7 @@ public class S3StreamKafkaMetricsConstants { public static final String STREAM_SET_OBJECT_NUM = "stream_set_object_num"; public static final String STREAM_OBJECT_NUM = "stream_object_num"; public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num"; + public static final String FETCH_LIMITER_WAITING_TASK_NUM = "fetch_limiter_waiting_task_num"; public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num"; public static final String LOG_APPEND_PERMIT_NUM = "log_append_permit_num"; public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count"; diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index 27d2426eae..2f076380f3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -61,6 +61,8 @@ public class S3StreamKafkaMetricsManager { private static Supplier streamObjectNumSupplier = () -> 0; private static ObservableLongGauge fetchLimiterPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchLimiterPermitNumSupplier = Collections::emptyMap; + private static ObservableLongGauge fetchLimiterWaitingTaskNumMetrics = new NoopObservableLongGauge(); + private static Supplier> fetchLimiterWaitingTaskNumSupplier = Collections::emptyMap; private static ObservableLongGauge fetchPendingTaskNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchPendingTaskNumSupplier = Collections::emptyMap; private static ObservableLongGauge logAppendPermitNumMetrics = new NoopObservableLongGauge(); @@ -193,6 +195,18 @@ private static void initFetchMetrics(Meter meter, String prefix) { } } }); + fetchLimiterWaitingTaskNumMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_WAITING_TASK_NUM) + .setDescription("The number of tasks waiting for permits in fetch limiters") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map fetchLimiterWaitingTaskNumMap = fetchLimiterWaitingTaskNumSupplier.get(); + for (Map.Entry entry : fetchLimiterWaitingTaskNumMap.entrySet()) { + result.record(entry.getValue(), FETCH_LIMITER_ATTRIBUTES.get(entry.getKey())); + } + } + }); + fetchPendingTaskNumMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_PENDING_TASK_NUM) .setDescription("The number of pending tasks in fetch executors") .ofLongs() @@ -258,6 +272,10 @@ public static void setFetchLimiterPermitNumSupplier(Supplier> fetchLimiterWaitingTaskNumSupplier) { + S3StreamKafkaMetricsManager.fetchLimiterWaitingTaskNumSupplier = fetchLimiterWaitingTaskNumSupplier; + } + public static void setFetchPendingTaskNumSupplier(Supplier> fetchPendingTaskNumSupplier) { S3StreamKafkaMetricsManager.fetchPendingTaskNumSupplier = fetchPendingTaskNumSupplier; } From 87e6e670239f3a55757e15735b9512e1578a0945 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 15:37:48 +0800 Subject: [PATCH 03/11] feat: add metric fetch_limiter_timeout_count Signed-off-by: Ning Yu --- .../main/scala/kafka/server/FairLimiter.java | 20 ++++++++++++++++--- core/src/main/scala/kafka/server/Limiter.java | 7 +++++-- .../main/scala/kafka/server/NoopLimiter.java | 5 +++++ .../streamaspect/ElasticReplicaManager.scala | 13 ++++++++---- .../S3StreamKafkaMetricsConstants.java | 1 + .../s3stream/S3StreamKafkaMetricsManager.java | 19 ++++++++++++++++++ 6 files changed, 56 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/FairLimiter.java b/core/src/main/scala/kafka/server/FairLimiter.java index 6b39a4b0ce..86ac171494 100644 --- a/core/src/main/scala/kafka/server/FairLimiter.java +++ b/core/src/main/scala/kafka/server/FairLimiter.java @@ -27,11 +27,20 @@ public class FairLimiter implements Limiter { */ private final Lock lock = new ReentrantLock(true); private final Semaphore permits; + + /** + * The name of this limiter, used for metrics. + */ + private final String name; + /** + * The number of threads waiting for permits, used for metrics. + */ private final AtomicInteger waitingThreads = new AtomicInteger(0); - public FairLimiter(int size) { - maxPermits = size; - permits = new Semaphore(size); + public FairLimiter(int size, String name) { + this.maxPermits = size; + this.permits = new Semaphore(size); + this.name = name; } @Override @@ -97,6 +106,11 @@ public int waitingThreads() { return waitingThreads.get(); } + @Override + public String name() { + return name; + } + private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException { if (permit > maxPermits) { permit = maxPermits; diff --git a/core/src/main/scala/kafka/server/Limiter.java b/core/src/main/scala/kafka/server/Limiter.java index 86c9621520..b6742adc0b 100644 --- a/core/src/main/scala/kafka/server/Limiter.java +++ b/core/src/main/scala/kafka/server/Limiter.java @@ -51,11 +51,14 @@ public interface Limiter { /** * Return the number of threads waiting for permits. - * - * @return the number of threads waiting for permits */ int waitingThreads(); + /** + * Return the name of this limiter. + */ + String name(); + /** * A handler to release acquired permits. */ diff --git a/core/src/main/scala/kafka/server/NoopLimiter.java b/core/src/main/scala/kafka/server/NoopLimiter.java index 1be07e5c80..aaaa63ab7d 100644 --- a/core/src/main/scala/kafka/server/NoopLimiter.java +++ b/core/src/main/scala/kafka/server/NoopLimiter.java @@ -43,6 +43,11 @@ public int waitingThreads() { return 0; } + @Override + public String name() { + return "noop"; + } + public static class NoopHandler implements Handler { @Override public void close() { diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index ffb3fb944d..4a768b1d8d 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -1,6 +1,7 @@ package kafka.server.streamaspect import com.automq.stream.api.exceptions.FastReadFailFastException +import com.automq.stream.s3.metrics.MetricsLevel import com.automq.stream.utils.FutureUtil import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import kafka.cluster.Partition @@ -114,8 +115,8 @@ class ElasticReplicaManager( fetchExecutorQueueSizeGaugeMap }) - private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB - private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB + private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_FAST_NAME) // 200MiB + private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_SLOW_NAME) // 200MiB private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]() S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => { fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads()) @@ -128,6 +129,10 @@ class ElasticReplicaManager( fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits()) fetchLimiterPermitsGaugeMap }) + private val fetchLimiterTimeoutCounterMap = util.Map.of( + fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(fastFetchLimiter.name), + slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(slowFetchLimiter.name) + ) /** * Used to reduce allocation in [[readFromLocalLogV2]] @@ -573,8 +578,8 @@ class ElasticReplicaManager( } if (handler == null) { - // handler maybe null if it timed out to acquire from limiter - // TODO add metrics for this + // the handler will be null if it timed out to acquire from limiter + fetchLimiterTimeoutCounterMap.get(limiter.name).add(MetricsLevel.INFO, 1) // warn(s"Returning emtpy fetch response for fetch request $readPartitionInfo since the wait time exceeds $timeoutMs ms.") ElasticReplicaManager.emptyReadResults(readPartitionInfo.map(_._1)) } else { diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java index 2273c36eff..82c98c9be8 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java @@ -22,6 +22,7 @@ public class S3StreamKafkaMetricsConstants { public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num"; public static final String FETCH_LIMITER_WAITING_TASK_NUM = "fetch_limiter_waiting_task_num"; public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num"; + public static final String FETCH_LIMITER_TIMEOUT_COUNT = "fetch_limiter_timeout_count"; public static final String LOG_APPEND_PERMIT_NUM = "log_append_permit_num"; public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count"; public static final String TOPIC_PARTITION_COUNT_METRIC_NAME = "topic_partition_count"; diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index 2f076380f3..0f7febcfaa 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -13,8 +13,10 @@ import com.automq.stream.s3.metrics.MetricsConfig; import com.automq.stream.s3.metrics.MetricsLevel; +import com.automq.stream.s3.metrics.NoopLongCounter; import com.automq.stream.s3.metrics.NoopObservableLongGauge; import com.automq.stream.s3.metrics.wrapper.ConfigListener; +import com.automq.stream.s3.metrics.wrapper.CounterMetric; import java.util.ArrayList; import java.util.Collections; @@ -24,6 +26,7 @@ import java.util.function.Supplier; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongGauge; @@ -59,12 +62,15 @@ public class S3StreamKafkaMetricsManager { private static Supplier> streamSetObjectNumSupplier = Collections::emptyMap; private static ObservableLongGauge streamObjectNumMetrics = new NoopObservableLongGauge(); private static Supplier streamObjectNumSupplier = () -> 0; + private static ObservableLongGauge fetchLimiterPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchLimiterPermitNumSupplier = Collections::emptyMap; private static ObservableLongGauge fetchLimiterWaitingTaskNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchLimiterWaitingTaskNumSupplier = Collections::emptyMap; private static ObservableLongGauge fetchPendingTaskNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchPendingTaskNumSupplier = Collections::emptyMap; + private static LongCounter fetchLimiterTimeoutCount = new NoopLongCounter(); + private static ObservableLongGauge logAppendPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier logAppendPermitNumSupplier = () -> 0; private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty()); @@ -218,6 +224,10 @@ private static void initFetchMetrics(Meter meter, String prefix) { } } }); + + fetchLimiterTimeoutCount = meter.counterBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_TIMEOUT_COUNT) + .setDescription("The number of acquire permits timeout in fetch limiters") + .build(); } private static void initLogAppendMetrics(Meter meter, String prefix) { @@ -280,6 +290,15 @@ public static void setFetchPendingTaskNumSupplier(Supplier> S3StreamKafkaMetricsManager.fetchPendingTaskNumSupplier = fetchPendingTaskNumSupplier; } + public static CounterMetric buildFetchLimiterTimeoutMetric(String limiterName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, Attributes.builder() + .put(S3StreamKafkaMetricsConstants.LABEL_FETCH_LIMITER_NAME, limiterName).build(), () -> fetchLimiterTimeoutCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + public static void setLogAppendPermitNumSupplier(Supplier logAppendPermitNumSupplier) { S3StreamKafkaMetricsManager.logAppendPermitNumSupplier = logAppendPermitNumSupplier; } From 7e4b0a6a62a5c8532f5bca4b3b1168d1050db0dd Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 16:08:13 +0800 Subject: [PATCH 04/11] feat: add metric fetch_limiter_time Signed-off-by: Ning Yu --- .../streamaspect/ElasticReplicaManager.scala | 8 ++++++- .../S3StreamKafkaMetricsConstants.java | 1 + .../s3stream/S3StreamKafkaMetricsManager.java | 21 +++++++++++++++++-- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 4a768b1d8d..358c27d3f1 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -1,7 +1,7 @@ package kafka.server.streamaspect import com.automq.stream.api.exceptions.FastReadFailFastException -import com.automq.stream.s3.metrics.MetricsLevel +import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil} import com.automq.stream.utils.FutureUtil import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import kafka.cluster.Partition @@ -133,6 +133,10 @@ class ElasticReplicaManager( fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(fastFetchLimiter.name), slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(slowFetchLimiter.name) ) + private val fetchLimiterTimeHistogramMap = util.Map.of( + fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, fastFetchLimiter.name), + slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, slowFetchLimiter.name) + ) /** * Used to reduce allocation in [[readFromLocalLogV2]] @@ -572,10 +576,12 @@ class ElasticReplicaManager( math.min(bytesNeedFromParam, limiter.maxPermits()) } + val timer: TimerUtil = new TimerUtil() val handler: Handler = timeoutMs match { case t if t > 0 => limiter.acquire(bytesNeed(), t) case _ => limiter.acquire(bytesNeed()) } + fetchLimiterTimeHistogramMap.get(limiter.name).record(timer.elapsedAs(TimeUnit.NANOSECONDS)) if (handler == null) { // the handler will be null if it timed out to acquire from limiter diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java index 82c98c9be8..e4dd6b6f93 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java @@ -23,6 +23,7 @@ public class S3StreamKafkaMetricsConstants { public static final String FETCH_LIMITER_WAITING_TASK_NUM = "fetch_limiter_waiting_task_num"; public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num"; public static final String FETCH_LIMITER_TIMEOUT_COUNT = "fetch_limiter_timeout_count"; + public static final String FETCH_LIMITER_TIME = "fetch_limiter_time"; public static final String LOG_APPEND_PERMIT_NUM = "log_append_permit_num"; public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count"; public static final String TOPIC_PARTITION_COUNT_METRIC_NAME = "topic_partition_count"; diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index 0f7febcfaa..19fbca4e07 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -17,11 +17,14 @@ import com.automq.stream.s3.metrics.NoopObservableLongGauge; import com.automq.stream.s3.metrics.wrapper.ConfigListener; import com.automq.stream.s3.metrics.wrapper.CounterMetric; +import com.automq.stream.s3.metrics.wrapper.HistogramInstrument; +import com.automq.stream.s3.metrics.wrapper.HistogramMetric; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.function.Supplier; @@ -33,6 +36,9 @@ public class S3StreamKafkaMetricsManager { private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); + + public static final List FETCH_LIMITER_TIME_METRICS = new CopyOnWriteArrayList<>(); + private static final MultiAttributes BROKER_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamKafkaMetricsConstants.LABEL_NODE_ID); private static final MultiAttributes S3_OBJECT_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), @@ -70,6 +76,7 @@ public class S3StreamKafkaMetricsManager { private static ObservableLongGauge fetchPendingTaskNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchPendingTaskNumSupplier = Collections::emptyMap; private static LongCounter fetchLimiterTimeoutCount = new NoopLongCounter(); + private static HistogramInstrument fetchLimiterTime; private static ObservableLongGauge logAppendPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier logAppendPermitNumSupplier = () -> 0; @@ -228,6 +235,8 @@ private static void initFetchMetrics(Meter meter, String prefix) { fetchLimiterTimeoutCount = meter.counterBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_TIMEOUT_COUNT) .setDescription("The number of acquire permits timeout in fetch limiters") .build(); + fetchLimiterTime = new HistogramInstrument(meter, prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_TIME, + "The time cost of acquire permits in fetch limiters", "nanoseconds", () -> FETCH_LIMITER_TIME_METRICS); } private static void initLogAppendMetrics(Meter meter, String prefix) { @@ -292,9 +301,17 @@ public static void setFetchPendingTaskNumSupplier(Supplier> public static CounterMetric buildFetchLimiterTimeoutMetric(String limiterName) { synchronized (BASE_ATTRIBUTES_LISTENERS) { - CounterMetric metric = new CounterMetric(metricsConfig, Attributes.builder() - .put(S3StreamKafkaMetricsConstants.LABEL_FETCH_LIMITER_NAME, limiterName).build(), () -> fetchLimiterTimeoutCount); + CounterMetric metric = new CounterMetric(metricsConfig, FETCH_LIMITER_ATTRIBUTES.get(limiterName), () -> fetchLimiterTimeoutCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static HistogramMetric buildFetchLimiterTimeMetric(MetricsLevel metricsLevel, String limiterName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + HistogramMetric metric = new HistogramMetric(metricsLevel, metricsConfig, FETCH_LIMITER_ATTRIBUTES.get(limiterName)); BASE_ATTRIBUTES_LISTENERS.add(metric); + FETCH_LIMITER_TIME_METRICS.add(metric); return metric; } } From 1a25298889a6441ee1555d4582fcc4eecac14899 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 17:07:33 +0800 Subject: [PATCH 05/11] feat: add metric back_pressure_state Signed-off-by: Ning Yu --- .../DefaultBackPressureManager.java | 4 +++- .../s3/metrics/S3StreamMetricsConstant.java | 4 ++++ .../s3/metrics/S3StreamMetricsManager.java | 23 +++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java index 6eba371f93..02ff4a07d4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java @@ -11,6 +11,7 @@ package com.automq.stream.s3.backpressure; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; @@ -51,7 +52,7 @@ public class DefaultBackPressureManager implements BackPressureManager { private long lastRegulateTime = System.currentTimeMillis(); /** * The last load level to trigger the regulator. - * Only used for logging. + * Only used for logging and monitoring. */ private LoadLevel lastRegulateLevel = LoadLevel.NORMAL; @@ -67,6 +68,7 @@ public DefaultBackPressureManager(Regulator regulator, long cooldownMs) { @Override public void start() { this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER); + S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel); } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 38f07a4117..0739616a73 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -151,4 +151,8 @@ public class S3StreamMetricsConstant { public static final String LABEL_STAGE_GET_OBJECTS = "get_objects"; public static final String LABEL_STAGE_FIND_INDEX = "find_index"; public static final String LABEL_STAGE_COMPUTE = "compute"; + + // Back Pressure Constants + public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state"; + public static final AttributeKey LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state"); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 32228a8f28..52c68e450d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -12,6 +12,7 @@ package com.automq.stream.s3.metrics; import com.automq.stream.s3.ByteBufAlloc; +import com.automq.stream.s3.backpressure.LoadLevel; import com.automq.stream.s3.metrics.operations.S3ObjectStage; import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.metrics.wrapper.ConfigListener; @@ -139,10 +140,16 @@ public class S3StreamMetricsManager { private static final MultiAttributes OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamMetricsConstant.LABEL_INDEX); + // Back Pressure Metrics + private static final MultiAttributes BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), + S3StreamMetricsConstant.LABEL_BACK_PRESSURE_STATE); + private static ObservableLongGauge backPressureState = new NoopObservableLongGauge(); + private static Supplier backPressureStateSupplier = () -> LoadLevel.NORMAL; static { BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES); + BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES); } public static void configure(MetricsConfig metricsConfig) { @@ -475,6 +482,18 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) { }); } + private static void initBackPressureMetrics(Meter meter, String prefix) { + backPressureState = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BACK_PRESSURE_STATE_METRIC_NAME) + .setDescription("Back pressure state") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + LoadLevel state = backPressureStateSupplier.get(); + result.record(state.ordinal(), BACK_PRESSURE_STATE_ATTRIBUTES.get(state.name())); + } + }); + } + public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type, Supplier networkLimiterQueueSizeSupplier) { switch (type) { @@ -907,4 +926,8 @@ public static void registerLocalStreamRangeIndexCacheSizeSupplier(Supplier localStreamRangeIndexCacheStreamNum) { S3StreamMetricsManager.localStreamRangeIndexCacheStreamNum = localStreamRangeIndexCacheStreamNum; } + + public static void registerBackPressureStateSupplier(Supplier backPressureStateSupplier) { + S3StreamMetricsManager.backPressureStateSupplier = backPressureStateSupplier; + } } From 187d3ec080fac45990f8ce6606f95e88701ad9de Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 17:29:57 +0800 Subject: [PATCH 06/11] feat: add metric broker_quota_limit Signed-off-by: Ning Yu --- .../streamaspect/BrokerQuotaManager.scala | 8 +++++ .../s3/metrics/NoopObservableDoubleGauge.java | 17 ++++++++++ .../s3/metrics/S3StreamMetricsConstant.java | 6 +++- .../s3/metrics/S3StreamMetricsManager.java | 31 ++++++++++++++++++- 4 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java diff --git a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala index 772e804ece..864fa41f96 100644 --- a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala @@ -11,6 +11,7 @@ package kafka.server.streamaspect +import com.automq.stream.s3.metrics.S3StreamMetricsManager import kafka.network.RequestChannel import kafka.server._ import kafka.utils.QuotaUtils @@ -42,6 +43,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, override def delayQueueSensor: Sensor = brokerDelayQueueSensor + S3StreamMetricsManager.registerBrokerQuotaLimitSupplier(() => java.util.Map.of( + QuotaType.RequestRate, quotaLimit(QuotaType.RequestRate), + QuotaType.Produce, quotaLimit(QuotaType.Produce), + QuotaType.Fetch, quotaLimit(QuotaType.Fetch), + QuotaType.SlowFetch, quotaLimit(QuotaType.SlowFetch) + )) + def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = { if (shouldThrottle(request)) { quotaLimit(quotaType) diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java new file mode 100644 index 0000000000..6486d55a9d --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java @@ -0,0 +1,17 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.metrics; + +import io.opentelemetry.api.metrics.ObservableDoubleGauge; + +public class NoopObservableDoubleGauge implements ObservableDoubleGauge { +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 0739616a73..592b579719 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -152,7 +152,11 @@ public class S3StreamMetricsConstant { public static final String LABEL_STAGE_FIND_INDEX = "find_index"; public static final String LABEL_STAGE_COMPUTE = "compute"; - // Back Pressure Constants + // Back Pressure public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state"; public static final AttributeKey LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state"); + + // Broker Quota + public static final String BROKER_QUOTA_LIMIT_METRIC_NAME = "broker_quota_limit"; + public static final AttributeKey LABEL_BROKER_QUOTA_TYPE = AttributeKey.stringKey("type"); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 52c68e450d..32222a0ddf 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -22,6 +22,7 @@ import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import com.automq.stream.s3.network.ThrottleStrategy; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -140,16 +141,23 @@ public class S3StreamMetricsManager { private static final MultiAttributes OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamMetricsConstant.LABEL_INDEX); - // Back Pressure Metrics + // Back Pressure private static final MultiAttributes BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamMetricsConstant.LABEL_BACK_PRESSURE_STATE); private static ObservableLongGauge backPressureState = new NoopObservableLongGauge(); private static Supplier backPressureStateSupplier = () -> LoadLevel.NORMAL; + // Broker Quota + private static final MultiAttributes BROKER_QUOTA_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), + S3StreamMetricsConstant.LABEL_BROKER_QUOTA_TYPE); + private static ObservableDoubleGauge brokerQuotaLimit = new NoopObservableDoubleGauge(); + private static Supplier> brokerQuotaLimitSupplier = () -> new ConcurrentHashMap<>(); + static { BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES); + BASE_ATTRIBUTES_LISTENERS.add(BROKER_QUOTA_TYPE_ATTRIBUTES); } public static void configure(MetricsConfig metricsConfig) { @@ -407,6 +415,8 @@ public static void initMetrics(Meter meter, String prefix) { }); initAsyncCacheMetrics(meter, prefix); + initBackPressureMetrics(meter, prefix); + initBrokerQuotaMetrics(meter, prefix); } private static void initAsyncCacheMetrics(Meter meter, String prefix) { @@ -494,6 +504,21 @@ private static void initBackPressureMetrics(Meter meter, String prefix) { }); } + private static void initBrokerQuotaMetrics(Meter meter, String prefix) { + brokerQuotaLimit = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BROKER_QUOTA_LIMIT_METRIC_NAME) + .setDescription("Broker quota limit") + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map brokerQuotaLimitMap = brokerQuotaLimitSupplier.get(); + for (Map.Entry entry : brokerQuotaLimitMap.entrySet()) { + String quotaType = entry.getKey(); + Double quotaLimit = entry.getValue(); + result.record(quotaLimit, BROKER_QUOTA_TYPE_ATTRIBUTES.get(quotaType)); + } + } + }); + } + public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type, Supplier networkLimiterQueueSizeSupplier) { switch (type) { @@ -930,4 +955,8 @@ public static void registerLocalStreamRangeIndexCacheStreamNumSupplier(Supplier< public static void registerBackPressureStateSupplier(Supplier backPressureStateSupplier) { S3StreamMetricsManager.backPressureStateSupplier = backPressureStateSupplier; } + + public static void registerBrokerQuotaLimitSupplier(Supplier> brokerQuotaLimitSupplier) { + S3StreamMetricsManager.brokerQuotaLimitSupplier = brokerQuotaLimitSupplier; + } } From 0f4bb150c3d703435eee74dfca64e746fac72b10 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 18:07:56 +0800 Subject: [PATCH 07/11] fix(backpressure): run checkers with fixed delay Signed-off-by: Ning Yu --- .../stream/s3/backpressure/DefaultBackPressureManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java index 02ff4a07d4..3ccc553a2e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java @@ -73,7 +73,7 @@ public void start() { @Override public void registerChecker(Checker checker) { - checkerScheduler.scheduleAtFixedRate(() -> { + checkerScheduler.scheduleWithFixedDelay(() -> { loadLevels.put(checker.source(), checker.check()); maybeRegulate(); }, 0, checker.intervalMs(), TimeUnit.MILLISECONDS); From 1da31e25267c81b572404836747ab42dfd02ab97 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 18:19:48 +0800 Subject: [PATCH 08/11] style: fix lint Signed-off-by: Ning Yu --- .../kafka/server/streamaspect/BrokerQuotaManager.scala | 8 ++++---- .../automq/stream/s3/metrics/S3StreamMetricsManager.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala index 864fa41f96..b9b5bd3ddd 100644 --- a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala @@ -44,10 +44,10 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, override def delayQueueSensor: Sensor = brokerDelayQueueSensor S3StreamMetricsManager.registerBrokerQuotaLimitSupplier(() => java.util.Map.of( - QuotaType.RequestRate, quotaLimit(QuotaType.RequestRate), - QuotaType.Produce, quotaLimit(QuotaType.Produce), - QuotaType.Fetch, quotaLimit(QuotaType.Fetch), - QuotaType.SlowFetch, quotaLimit(QuotaType.SlowFetch) + QuotaType.RequestRate.toString, quotaLimit(QuotaType.RequestRate), + QuotaType.Produce.toString, quotaLimit(QuotaType.Produce), + QuotaType.Fetch.toString, quotaLimit(QuotaType.Fetch), + QuotaType.SlowFetch.toString, quotaLimit(QuotaType.SlowFetch) )) def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = { diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 32222a0ddf..750ddc9ba6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -22,7 +22,6 @@ import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; import com.automq.stream.s3.network.ThrottleStrategy; -import io.opentelemetry.api.metrics.ObservableDoubleGauge; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -36,6 +35,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; import io.opentelemetry.api.metrics.ObservableLongGauge; import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.LABEL_CACHE_NAME; From 5ff7b344eb0ba84aa04524e933bf297c12420df2 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 18:39:21 +0800 Subject: [PATCH 09/11] perf: drop too large values Signed-off-by: Ning Yu --- .../com/automq/stream/s3/metrics/S3StreamMetricsManager.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 750ddc9ba6..c8020c1c2a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -513,6 +513,10 @@ private static void initBrokerQuotaMetrics(Meter meter, String prefix) { for (Map.Entry entry : brokerQuotaLimitMap.entrySet()) { String quotaType = entry.getKey(); Double quotaLimit = entry.getValue(); + // drop too large values + if (quotaLimit > 1e15) { + continue; + } result.record(quotaLimit, BROKER_QUOTA_TYPE_ATTRIBUTES.get(quotaType)); } } From 2d24c04d068ff9ad5b1199fbb1a19d5713ebb251 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 19:04:03 +0800 Subject: [PATCH 10/11] refactor: record -1 for other states Signed-off-by: Ning Yu --- .../automq/stream/s3/metrics/S3StreamMetricsManager.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index c8020c1c2a..056340b5df 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -500,6 +500,12 @@ private static void initBackPressureMetrics(Meter meter, String prefix) { if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { LoadLevel state = backPressureStateSupplier.get(); result.record(state.ordinal(), BACK_PRESSURE_STATE_ATTRIBUTES.get(state.name())); + // To beautify Grafana dashboard, we record -1 for other states + for (LoadLevel l : LoadLevel.values()) { + if (l != state) { + result.record(-1, BACK_PRESSURE_STATE_ATTRIBUTES.get(l.name())); + } + } } }); } From e625cb441c5192e7b00527e47b6959f5d45bdf04 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Fri, 29 Nov 2024 19:59:46 +0800 Subject: [PATCH 11/11] test: fix tests Signed-off-by: Ning Yu --- .../stream/s3/backpressure/DefaultBackPressureManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java index 41d1690e3f..9e5b2bbed5 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java @@ -58,7 +58,7 @@ public void setup() { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(scheduler).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + }).when(scheduler).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); runnable.run();