Skip to content

feat(quota): support to get current quota metric value... #2170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,13 @@ public void config(MetricConfig config) {
this.config = config;
}
}

// AutoMQ inject start
/**
* A public method to expose the {@link #measurableValue} method.
*/
public double measurableValueV2(long timeMs) {
return measurableValue(timeMs);
}
// AutoMQ inject end
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ public void increase() {
@Override
public void decrease() {
}

@Override
public void minimize() {
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
private val quotaCallback: Option[ClientQuotaCallback])
extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix, quotaCallback) {

protected val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
private val exemptMetricName = metrics.metricName("exempt-request-time",
QuotaType.Request.toString, "Tracking exempt-request-time utilization percentage")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
import org.apache.kafka.server.config.BrokerQuotaManagerConfig

import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.{Optional, Properties}
import scala.collection.mutable
import scala.jdk.CollectionConverters._

Expand All @@ -31,6 +32,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
private val time: Time,
private val threadNamePrefix: String)
extends ClientRequestQuotaManager(config, metrics, time, threadNamePrefix, None) {
private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds * this.config.numQuotaSamples)
private val metricsTags = Map("domain" -> "broker", "nodeId" -> String.valueOf(config.nodeId()))
private val whiteListCache = mutable.HashMap[String, Boolean]()

Expand All @@ -48,6 +50,16 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
}
}

/**
* Get the value of the metric for the given quota type at the given time.
* It return empty if the metric is not found, which is possible if the quota is disabled or no request has been
* processed yet.
*/
def getQuotaMetricValue(quotaType: QuotaType, timeMs: Long): Optional[java.lang.Double] = {
Optional.ofNullable(metrics.metric(clientQuotaMetricName(quotaType, metricsTags)))
.map(_.measurableValueV2(timeMs))
}

def recordNoThrottle(quotaType: QuotaType, value: Double): Unit = {
val clientSensors = getOrCreateQuotaSensors(quotaType)
clientSensors.quotaSensor.record(value, time.milliseconds(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,34 +170,43 @@ public void testUpdateQuota() {
brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
// rate = 1 / 2000ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 1 / 2, time);
assertEquals(0, result);
// rate = 2 / 2010ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 2 / 2.01, time + 10);
assertEquals(0, result);
// rate = 3 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 3 / 2.999, time + 2999);
assertEquals(1, result);

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 2);
// rate = 4 / 2999ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 4 / 2.999, time + 2999);
assertEquals(0, result);
// rate = 5 / 2999ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 5 / 2.999, time + 2999);
assertEquals(0, result);
// rate = 6 / 2999ms > 2
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 6 / 2.999, time + 2999);
assertEquals(1, result);

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
// rate = 5 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 5 / 2.999, time + 2999 + 2999);
assertEquals(1000, result);
// rate = 2 / 2000ms
// rate = 2 / 2001ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 1);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 2 / 2.001, time + 2999 + 2999 + 1);
assertEquals(0, result);
// rate = 3 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 3 / 2.999, time + 2999 + 2999 + 2999);
assertEquals(1, result);
}

Expand Down Expand Up @@ -263,4 +272,9 @@ public void testWhiteList() {
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.produce(), request, 1000, time.milliseconds());
assertEquals(0, result);
}

private void assertQuotaMetricValue(QuotaType quotaType, double expected, long timeMs) {
double value = brokerQuotaManager.getQuotaMetricValue(quotaType, timeMs).get();
assertEquals(expected, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class DefaultBackPressureManager implements BackPressureManager {

public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(20);
public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(15);

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class);

Expand Down Expand Up @@ -82,25 +82,15 @@ private void maybeRegulate() {
}

/**
* Regulate the system if necessary, which means
* <ul>
* <li>the system is in a {@link LoadLevel#CRITICAL} state.</li>
* <li>the cooldown time has passed.</li>
* </ul>
* Regulate the system if the cooldown time has passed.
*
* @param isInternal True if it is an internal call, which means it should not schedule the next regulate action.
*/
private void maybeRegulate(boolean isInternal) {
LoadLevel loadLevel = currentLoadLevel();
long now = System.currentTimeMillis();

if (LoadLevel.CRITICAL.equals(loadLevel)) {
// Regulate immediately regardless of the cooldown time.
regulate(loadLevel, now);
return;
}

long timeElapsed = now - lastRegulateTime;

if (timeElapsed < cooldownMs) {
// Skip regulating if the cooldown time has not passed.
if (!isInternal) {
Expand All @@ -109,7 +99,6 @@ private void maybeRegulate(boolean isInternal) {
}
return;
}

regulate(loadLevel, now);
}

Expand All @@ -123,8 +112,10 @@ private LoadLevel currentLoadLevel() {
}

private void regulate(LoadLevel loadLevel, long now) {
if (LoadLevel.NORMAL.equals(loadLevel) && LOGGER.isDebugEnabled()) {
LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels);
if (LoadLevel.NORMAL.equals(loadLevel)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels);
}
} else {
LOGGER.info("The system is in a {} state, checkers: {}", loadLevel, loadLevels);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@ public void regulate(Regulator regulator) {
public void regulate(Regulator regulator) {
regulator.decrease();
}
},
/**
* The system is in a critical state, and the most severe actions should be taken.
*/
CRITICAL {
@Override
public void regulate(Regulator regulator) {
regulator.minimize();
}
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,4 @@ public interface Regulator {
* If the rate is already at the minimum, this method does nothing.
*/
void decrease();

/**
* Minimize the rate of incoming requests.
*/
void minimize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class DefaultBackPressureManagerTest {
Regulator regulator;
int regulatorIncreaseCalled = 0;
int regulatorDecreaseCalled = 0;
int regulatorMinimizeCalled = 0;

ScheduledExecutorService scheduler;
int schedulerScheduleCalled = 0;
Expand All @@ -53,10 +52,6 @@ public void setup() {
regulatorDecreaseCalled++;
return null;
}).when(regulator).decrease();
doAnswer(invocation -> {
regulatorMinimizeCalled++;
return null;
}).when(regulator).minimize();

// Mock the scheduler to run the scheduled task immediately and only once
doAnswer(invocation -> {
Expand All @@ -77,11 +72,10 @@ public void setup() {
public void testPriority1() {
initManager(0);

callChecker(sourceA, LoadLevel.CRITICAL);
callChecker(sourceB, LoadLevel.HIGH);
callChecker(sourceC, LoadLevel.NORMAL);

assertRegulatorCalled(0, 0, 3);
assertRegulatorCalled(0, 2);
}

@Test
Expand All @@ -90,9 +84,8 @@ public void testPriority2() {

callChecker(sourceC, LoadLevel.NORMAL);
callChecker(sourceB, LoadLevel.HIGH);
callChecker(sourceA, LoadLevel.CRITICAL);

assertRegulatorCalled(1, 1, 1);
assertRegulatorCalled(1, 1);
}

@Test
Expand All @@ -101,10 +94,9 @@ public void testOverride() {

callChecker(sourceA, LoadLevel.NORMAL);
callChecker(sourceA, LoadLevel.HIGH);
callChecker(sourceA, LoadLevel.CRITICAL);
callChecker(sourceA, LoadLevel.NORMAL);

assertRegulatorCalled(2, 1, 1);
assertRegulatorCalled(2, 1);
}

@Test
Expand All @@ -114,22 +106,13 @@ public void testCooldown() {

initManager(cooldownMs);

callChecker(sourceA, LoadLevel.CRITICAL);
assertRegulatorCalled(0, 0, 1);
assertSchedulerCalled(0);

callChecker(sourceA, LoadLevel.HIGH);
assertRegulatorCalled(0, 0, 1);
assertRegulatorCalled(0, 0);
assertSchedulerCalled(1);
assertEquals(cooldownMs, schedulerScheduleDelay, tolerance);

callChecker(sourceA, LoadLevel.NORMAL);
assertRegulatorCalled(0, 0, 1);
assertSchedulerCalled(2);
assertEquals(cooldownMs, schedulerScheduleDelay, tolerance);

callChecker(sourceA, LoadLevel.CRITICAL);
assertRegulatorCalled(0, 0, 2);
assertRegulatorCalled(0, 0);
assertSchedulerCalled(2);
assertEquals(cooldownMs, schedulerScheduleDelay, tolerance);
}
Expand Down Expand Up @@ -161,10 +144,9 @@ public long intervalMs() {
});
}

private void assertRegulatorCalled(int increase, int decrease, int minimize) {
private void assertRegulatorCalled(int increase, int decrease) {
assertEquals(increase, regulatorIncreaseCalled);
assertEquals(decrease, regulatorDecreaseCalled);
assertEquals(minimize, regulatorMinimizeCalled);
}

private void assertSchedulerCalled(int times) {
Expand Down
Loading