Skip to content

Commit 37ae176

Browse files
committed
feat(backpressure): add metrics (#2198)
* feat(backpressure): log it on recovery from backpressure Signed-off-by: Ning Yu <[email protected]> * feat: add metric fetch_limiter_waiting_task_num Signed-off-by: Ning Yu <[email protected]> * feat: add metric fetch_limiter_timeout_count Signed-off-by: Ning Yu <[email protected]> * feat: add metric fetch_limiter_time Signed-off-by: Ning Yu <[email protected]> * feat: add metric back_pressure_state Signed-off-by: Ning Yu <[email protected]> * feat: add metric broker_quota_limit Signed-off-by: Ning Yu <[email protected]> * fix(backpressure): run checkers with fixed delay Signed-off-by: Ning Yu <[email protected]> * style: fix lint Signed-off-by: Ning Yu <[email protected]> * perf: drop too large values Signed-off-by: Ning Yu <[email protected]> * refactor: record -1 for other states Signed-off-by: Ning Yu <[email protected]> * test: fix tests Signed-off-by: Ning Yu <[email protected]> --------- Signed-off-by: Ning Yu <[email protected]>
1 parent 8e004ee commit 37ae176

File tree

12 files changed

+252
-13
lines changed

12 files changed

+252
-13
lines changed

core/src/main/scala/kafka/server/FairLimiter.java

+42-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.util.concurrent.Semaphore;
1515
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicInteger;
1617
import java.util.concurrent.locks.Lock;
1718
import java.util.concurrent.locks.ReentrantLock;
1819

@@ -27,13 +28,32 @@ public class FairLimiter implements Limiter {
2728
private final Lock lock = new ReentrantLock(true);
2829
private final Semaphore permits;
2930

30-
public FairLimiter(int size) {
31-
maxPermits = size;
32-
permits = new Semaphore(size);
31+
/**
32+
* The name of this limiter, used for metrics.
33+
*/
34+
private final String name;
35+
/**
36+
* The number of threads waiting for permits, used for metrics.
37+
*/
38+
private final AtomicInteger waitingThreads = new AtomicInteger(0);
39+
40+
public FairLimiter(int size, String name) {
41+
this.maxPermits = size;
42+
this.permits = new Semaphore(size);
43+
this.name = name;
3344
}
3445

3546
@Override
3647
public Handler acquire(int permit) throws InterruptedException {
48+
waitingThreads.incrementAndGet();
49+
try {
50+
return acquire0(permit);
51+
} finally {
52+
waitingThreads.decrementAndGet();
53+
}
54+
}
55+
56+
private Handler acquire0(int permit) throws InterruptedException {
3757
lock.lock();
3858
try {
3959
permits.acquire(permit);
@@ -45,6 +65,15 @@ public Handler acquire(int permit) throws InterruptedException {
4565

4666
@Override
4767
public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
68+
waitingThreads.incrementAndGet();
69+
try {
70+
return acquire0(permit, timeoutMs);
71+
} finally {
72+
waitingThreads.decrementAndGet();
73+
}
74+
}
75+
76+
private Handler acquire0(int permit, long timeoutMs) throws InterruptedException {
4877
long start = System.nanoTime();
4978
if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
5079
try {
@@ -72,6 +101,16 @@ public int availablePermits() {
72101
return permits.availablePermits();
73102
}
74103

104+
@Override
105+
public int waitingThreads() {
106+
return waitingThreads.get();
107+
}
108+
109+
@Override
110+
public String name() {
111+
return name;
112+
}
113+
75114
private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException {
76115
if (permit > maxPermits) {
77116
permit = maxPermits;

core/src/main/scala/kafka/server/Limiter.java

+10
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ public interface Limiter {
4949
*/
5050
int availablePermits();
5151

52+
/**
53+
* Return the number of threads waiting for permits.
54+
*/
55+
int waitingThreads();
56+
57+
/**
58+
* Return the name of this limiter.
59+
*/
60+
String name();
61+
5262
/**
5363
* A handler to release acquired permits.
5464
*/

core/src/main/scala/kafka/server/NoopLimiter.java

+10
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public int availablePermits() {
3838
return Integer.MAX_VALUE;
3939
}
4040

41+
@Override
42+
public int waitingThreads() {
43+
return 0;
44+
}
45+
46+
@Override
47+
public String name() {
48+
return "noop";
49+
}
50+
4151
public static class NoopHandler implements Handler {
4252
@Override
4353
public void close() {

core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala

+8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
package kafka.server.streamaspect
1313

14+
import com.automq.stream.s3.metrics.S3StreamMetricsManager
1415
import kafka.network.RequestChannel
1516
import kafka.server._
1617
import kafka.utils.QuotaUtils
@@ -42,6 +43,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
4243

4344
override def delayQueueSensor: Sensor = brokerDelayQueueSensor
4445

46+
S3StreamMetricsManager.registerBrokerQuotaLimitSupplier(() => java.util.Map.of(
47+
QuotaType.RequestRate.toString, quotaLimit(QuotaType.RequestRate),
48+
QuotaType.Produce.toString, quotaLimit(QuotaType.Produce),
49+
QuotaType.Fetch.toString, quotaLimit(QuotaType.Fetch),
50+
QuotaType.SlowFetch.toString, quotaLimit(QuotaType.SlowFetch)
51+
))
52+
4553
def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = {
4654
if (shouldThrottle(request)) {
4755
quotaLimit(quotaType)

core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala

+25-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kafka.server.streamaspect
22

33
import com.automq.stream.api.exceptions.FastReadFailFastException
4+
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
45
import com.automq.stream.utils.FutureUtil
56
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
67
import kafka.cluster.Partition
@@ -114,14 +115,28 @@ class ElasticReplicaManager(
114115
fetchExecutorQueueSizeGaugeMap
115116
})
116117

117-
private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB
118-
private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB
119-
private val fetchLimiterGaugeMap = new util.HashMap[String, Integer]()
118+
private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_FAST_NAME) // 200MiB
119+
private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_SLOW_NAME) // 200MiB
120+
private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]()
121+
S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => {
122+
fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads())
123+
fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.waitingThreads())
124+
fetchLimiterWaitingTasksGaugeMap
125+
})
126+
private val fetchLimiterPermitsGaugeMap = new util.HashMap[String, Integer]()
120127
S3StreamKafkaMetricsManager.setFetchLimiterPermitNumSupplier(() => {
121-
fetchLimiterGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits())
122-
fetchLimiterGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits())
123-
fetchLimiterGaugeMap
128+
fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits())
129+
fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits())
130+
fetchLimiterPermitsGaugeMap
124131
})
132+
private val fetchLimiterTimeoutCounterMap = util.Map.of(
133+
fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(fastFetchLimiter.name),
134+
slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(slowFetchLimiter.name)
135+
)
136+
private val fetchLimiterTimeHistogramMap = util.Map.of(
137+
fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, fastFetchLimiter.name),
138+
slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, slowFetchLimiter.name)
139+
)
125140

126141
/**
127142
* Used to reduce allocation in [[readFromLocalLogV2]]
@@ -558,14 +573,16 @@ class ElasticReplicaManager(
558573
math.min(bytesNeedFromParam, limiter.maxPermits())
559574
}
560575

576+
val timer: TimerUtil = new TimerUtil()
561577
val handler: Handler = timeoutMs match {
562578
case t if t > 0 => limiter.acquire(bytesNeed(), t)
563579
case _ => limiter.acquire(bytesNeed())
564580
}
581+
fetchLimiterTimeHistogramMap.get(limiter.name).record(timer.elapsedAs(TimeUnit.NANOSECONDS))
565582

566583
if (handler == null) {
567-
// handler maybe null if it timed out to acquire from limiter
568-
// TODO add metrics for this
584+
// the handler will be null if it timed out to acquire from limiter
585+
fetchLimiterTimeoutCounterMap.get(limiter.name).add(MetricsLevel.INFO, 1)
569586
// warn(s"Returning emtpy fetch response for fetch request $readPartitionInfo since the wait time exceeds $timeoutMs ms.")
570587
ElasticReplicaManager.emptyReadResults(readPartitionInfo.map(_._1))
571588
} else {

s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
package com.automq.stream.s3.backpressure;
1313

14+
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
1415
import com.automq.stream.utils.ThreadUtils;
1516
import com.automq.stream.utils.Threads;
1617
import java.util.HashMap;
@@ -47,6 +48,11 @@ public class DefaultBackPressureManager implements BackPressureManager {
4748
* Note: It should only be accessed in the {@link #checkerScheduler} thread.
4849
*/
4950
private long lastRegulateTime = System.currentTimeMillis();
51+
/**
52+
* The last load level to trigger the regulator.
53+
* Only used for logging and monitoring.
54+
*/
55+
private LoadLevel lastRegulateLevel = LoadLevel.NORMAL;
5056

5157
public DefaultBackPressureManager(Regulator regulator) {
5258
this(regulator, DEFAULT_COOLDOWN_MS);
@@ -60,11 +66,12 @@ public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
6066
@Override
6167
public void start() {
6268
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
69+
S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel);
6370
}
6471

6572
@Override
6673
public void registerChecker(Checker checker) {
67-
checkerScheduler.scheduleAtFixedRate(() -> {
74+
checkerScheduler.scheduleWithFixedDelay(() -> {
6875
loadLevels.put(checker.source(), checker.check());
6976
maybeRegulate();
7077
}, 0, checker.intervalMs(), TimeUnit.MILLISECONDS);
@@ -111,6 +118,9 @@ private LoadLevel currentLoadLevel() {
111118

112119
private void regulate(LoadLevel loadLevel, long now) {
113120
if (LoadLevel.NORMAL.equals(loadLevel)) {
121+
if (!LoadLevel.NORMAL.equals(lastRegulateLevel)) {
122+
LOGGER.info("The system is back to a normal state, checkers: {}", loadLevels);
123+
}
114124
if (LOGGER.isDebugEnabled()) {
115125
LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels);
116126
}
@@ -120,5 +130,6 @@ private void regulate(LoadLevel loadLevel, long now) {
120130

121131
loadLevel.regulate(regulator);
122132
lastRegulateTime = now;
133+
lastRegulateLevel = loadLevel;
123134
}
124135
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package com.automq.stream.s3.metrics;
13+
14+
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
15+
16+
public class NoopObservableDoubleGauge implements ObservableDoubleGauge {
17+
}

s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java

+8
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,12 @@ public class S3StreamMetricsConstant {
151151
public static final String LABEL_STAGE_GET_OBJECTS = "get_objects";
152152
public static final String LABEL_STAGE_FIND_INDEX = "find_index";
153153
public static final String LABEL_STAGE_COMPUTE = "compute";
154+
155+
// Back Pressure
156+
public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state";
157+
public static final AttributeKey<String> LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state");
158+
159+
// Broker Quota
160+
public static final String BROKER_QUOTA_LIMIT_METRIC_NAME = "broker_quota_limit";
161+
public static final AttributeKey<String> LABEL_BROKER_QUOTA_TYPE = AttributeKey.stringKey("type");
154162
}

s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java

+62
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package com.automq.stream.s3.metrics;
1313

1414
import com.automq.stream.s3.ByteBufAlloc;
15+
import com.automq.stream.s3.backpressure.LoadLevel;
1516
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
1617
import com.automq.stream.s3.metrics.operations.S3Stage;
1718
import com.automq.stream.s3.metrics.wrapper.ConfigListener;
@@ -34,6 +35,7 @@
3435
import io.opentelemetry.api.common.Attributes;
3536
import io.opentelemetry.api.metrics.LongCounter;
3637
import io.opentelemetry.api.metrics.Meter;
38+
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
3739
import io.opentelemetry.api.metrics.ObservableLongGauge;
3840

3941
import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.LABEL_CACHE_NAME;
@@ -139,10 +141,23 @@ public class S3StreamMetricsManager {
139141
private static final MultiAttributes<String> OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
140142
S3StreamMetricsConstant.LABEL_INDEX);
141143

144+
// Back Pressure
145+
private static final MultiAttributes<String> BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
146+
S3StreamMetricsConstant.LABEL_BACK_PRESSURE_STATE);
147+
private static ObservableLongGauge backPressureState = new NoopObservableLongGauge();
148+
private static Supplier<LoadLevel> backPressureStateSupplier = () -> LoadLevel.NORMAL;
149+
150+
// Broker Quota
151+
private static final MultiAttributes<String> BROKER_QUOTA_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
152+
S3StreamMetricsConstant.LABEL_BROKER_QUOTA_TYPE);
153+
private static ObservableDoubleGauge brokerQuotaLimit = new NoopObservableDoubleGauge();
154+
private static Supplier<Map<String, Double>> brokerQuotaLimitSupplier = () -> new ConcurrentHashMap<>();
142155

143156
static {
144157
BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES);
145158
BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES);
159+
BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES);
160+
BASE_ATTRIBUTES_LISTENERS.add(BROKER_QUOTA_TYPE_ATTRIBUTES);
146161
}
147162

148163
public static void configure(MetricsConfig metricsConfig) {
@@ -400,6 +415,8 @@ public static void initMetrics(Meter meter, String prefix) {
400415
});
401416

402417
initAsyncCacheMetrics(meter, prefix);
418+
initBackPressureMetrics(meter, prefix);
419+
initBrokerQuotaMetrics(meter, prefix);
403420
}
404421

405422
private static void initAsyncCacheMetrics(Meter meter, String prefix) {
@@ -475,6 +492,43 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) {
475492
});
476493
}
477494

495+
private static void initBackPressureMetrics(Meter meter, String prefix) {
496+
backPressureState = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BACK_PRESSURE_STATE_METRIC_NAME)
497+
.setDescription("Back pressure state")
498+
.ofLongs()
499+
.buildWithCallback(result -> {
500+
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
501+
LoadLevel state = backPressureStateSupplier.get();
502+
result.record(state.ordinal(), BACK_PRESSURE_STATE_ATTRIBUTES.get(state.name()));
503+
// To beautify Grafana dashboard, we record -1 for other states
504+
for (LoadLevel l : LoadLevel.values()) {
505+
if (l != state) {
506+
result.record(-1, BACK_PRESSURE_STATE_ATTRIBUTES.get(l.name()));
507+
}
508+
}
509+
}
510+
});
511+
}
512+
513+
private static void initBrokerQuotaMetrics(Meter meter, String prefix) {
514+
brokerQuotaLimit = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BROKER_QUOTA_LIMIT_METRIC_NAME)
515+
.setDescription("Broker quota limit")
516+
.buildWithCallback(result -> {
517+
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
518+
Map<String, Double> brokerQuotaLimitMap = brokerQuotaLimitSupplier.get();
519+
for (Map.Entry<String, Double> entry : brokerQuotaLimitMap.entrySet()) {
520+
String quotaType = entry.getKey();
521+
Double quotaLimit = entry.getValue();
522+
// drop too large values
523+
if (quotaLimit > 1e15) {
524+
continue;
525+
}
526+
result.record(quotaLimit, BROKER_QUOTA_TYPE_ATTRIBUTES.get(quotaType));
527+
}
528+
}
529+
});
530+
}
531+
478532
public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type,
479533
Supplier<Integer> networkLimiterQueueSizeSupplier) {
480534
switch (type) {
@@ -907,4 +961,12 @@ public static void registerLocalStreamRangeIndexCacheSizeSupplier(Supplier<Integ
907961
public static void registerLocalStreamRangeIndexCacheStreamNumSupplier(Supplier<Integer> localStreamRangeIndexCacheStreamNum) {
908962
S3StreamMetricsManager.localStreamRangeIndexCacheStreamNum = localStreamRangeIndexCacheStreamNum;
909963
}
964+
965+
public static void registerBackPressureStateSupplier(Supplier<LoadLevel> backPressureStateSupplier) {
966+
S3StreamMetricsManager.backPressureStateSupplier = backPressureStateSupplier;
967+
}
968+
969+
public static void registerBrokerQuotaLimitSupplier(Supplier<Map<String, Double>> brokerQuotaLimitSupplier) {
970+
S3StreamMetricsManager.brokerQuotaLimitSupplier = brokerQuotaLimitSupplier;
971+
}
910972
}

s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void setup() {
5757
Runnable runnable = invocation.getArgument(0);
5858
runnable.run();
5959
return null;
60-
}).when(scheduler).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
60+
}).when(scheduler).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
6161
doAnswer(invocation -> {
6262
Runnable runnable = invocation.getArgument(0);
6363
runnable.run();

0 commit comments

Comments
 (0)