Skip to content

Commit 5d4194a

Browse files
committed
feat(quota): support to update broker request rate quota (#2158)
* refactor(quota): refactor `maybeRecordAndGetThrottleTimeMs` Signed-off-by: Ning Yu <[email protected]> * fix(quota): throttle the produce request whatever the acks is Signed-off-by: Ning Yu <[email protected]> * refactor(quota): separate `Request` in `ClientQuotaManager` and `RequestRate` in `BrokerQuotaManager` Signed-off-by: Ning Yu <[email protected]> * sytle: fix lint Signed-off-by: Ning Yu <[email protected]> * feat(quota): support to update broker request rate quota Signed-off-by: Ning Yu <[email protected]> * test(quota): test update quota Signed-off-by: Ning Yu <[email protected]> --------- Signed-off-by: Ning Yu <[email protected]>
1 parent 48eeb81 commit 5d4194a

File tree

6 files changed

+121
-37
lines changed

6 files changed

+121
-37
lines changed

clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java

+3
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,8 @@ public enum ClientQuotaType {
2323
PRODUCE,
2424
FETCH,
2525
REQUEST,
26+
// AutoMQ for Kafka inject start
27+
REQUEST_RATE,
28+
// AutoMQ for Kafka inject end
2629
CONTROLLER_MUTATION
2730
}

core/src/main/scala/kafka/server/QuotaFactory.scala

+13-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ object QuotaType {
3232
case object Fetch extends QuotaType
3333
case object Produce extends QuotaType
3434
case object Request extends QuotaType
35+
// AutoMQ for Kafka inject start
36+
/**
37+
* Quota type for request rate limiting.
38+
*/
39+
case object RequestRate extends QuotaType
40+
// AutoMQ for Kafka inject end
3541
case object ControllerMutation extends QuotaType
3642
case object LeaderReplication extends QuotaType
3743
case object FollowerReplication extends QuotaType
@@ -44,11 +50,15 @@ object QuotaType {
4450
case QuotaType.Fetch => ClientQuotaType.FETCH
4551
case QuotaType.Produce => ClientQuotaType.PRODUCE
4652
case QuotaType.Request => ClientQuotaType.REQUEST
53+
// AutoMQ for Kafka inject start
54+
case QuotaType.RequestRate => ClientQuotaType.REQUEST_RATE
55+
// AutoMQ for Kafka inject end
4756
case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION
4857
case _ => throw new IllegalArgumentException(s"Not a client quota type: $quotaType")
4958
}
5059
}
5160

61+
// AutoMQ for Kafka inject start
5262
// for test
5363
def fetch(): QuotaType = {
5464
QuotaType.Fetch
@@ -58,9 +68,10 @@ object QuotaType {
5868
QuotaType.Produce
5969
}
6070

61-
def request(): QuotaType = {
62-
QuotaType.Request
71+
def requestRate(): QuotaType = {
72+
QuotaType.RequestRate
6373
}
74+
// AutoMQ for Kafka inject end
6475
}
6576

6677
sealed trait QuotaType

core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala

+36-20
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import kafka.utils.QuotaUtils
1717
import org.apache.kafka.common.MetricName
1818
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
1919
import org.apache.kafka.common.metrics.{Metrics, Quota, QuotaViolationException, Sensor}
20-
import org.apache.kafka.common.requests.RequestContext
2120
import org.apache.kafka.common.security.auth.KafkaPrincipal
2221
import org.apache.kafka.common.utils.Time
2322
import org.apache.kafka.network.Session
@@ -60,11 +59,15 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
6059
return 0
6160
}
6261

63-
maybeRecordAndGetThrottleTimeMs(quotaType, request.session, request.context, value, timeMs)
62+
if (isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName())) {
63+
return 0
64+
}
65+
66+
maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs)
6467
}
6568

6669
protected def throttleTime(quotaType: QuotaType, e: QuotaViolationException, timeMs: Long): Long = {
67-
if (quotaType == QuotaType.Request) {
70+
if (quotaType == QuotaType.RequestRate) {
6871
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
6972
} else {
7073
QuotaUtils.throttleTime(e, timeMs)
@@ -84,11 +87,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
8487
}
8588
}
8689

87-
def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, session: Session, context: RequestContext, value: Double,
88-
timeMs: Long): Int = {
89-
if (isInWhiteList(session.principal, context.clientId(), context.listenerName())) {
90-
return 0
91-
}
90+
def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, value: Double, timeMs: Long): Int = {
9291
val clientSensors = getOrCreateQuotaSensors(quotaType)
9392
try {
9493
clientSensors.quotaSensor.record(value, timeMs, true)
@@ -112,34 +111,51 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
112111
whiteListCache.clear()
113112

114113
if (!config.quotaEnabled) {
115-
metrics.removeSensor(getQuotaSensorName(QuotaType.Request, metricsTags))
114+
metrics.removeSensor(getQuotaSensorName(QuotaType.RequestRate, metricsTags))
116115
metrics.removeSensor(getQuotaSensorName(QuotaType.Produce, metricsTags))
117116
metrics.removeSensor(getQuotaSensorName(QuotaType.Fetch, metricsTags))
118-
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Request, metricsTags))
117+
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.RequestRate, metricsTags))
119118
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Produce, metricsTags))
120119
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Fetch, metricsTags))
121120
return
122121
}
123122

124123
val allMetrics = metrics.metrics()
125124

126-
val requestMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Request, metricsTags))
127-
if (requestMetrics != null) {
128-
requestMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Request)))
125+
val requestRateMetric = allMetrics.get(clientQuotaMetricName(QuotaType.RequestRate, metricsTags))
126+
if (requestRateMetric != null) {
127+
requestRateMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.RequestRate)))
129128
}
130129

131-
val produceMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Produce, metricsTags))
132-
if (produceMetrics != null) {
133-
produceMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Produce)))
130+
val produceMetric = allMetrics.get(clientQuotaMetricName(QuotaType.Produce, metricsTags))
131+
if (produceMetric != null) {
132+
produceMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Produce)))
134133
}
135134

136-
val fetchMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Fetch, metricsTags))
137-
if (fetchMetrics != null) {
138-
fetchMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch)))
135+
val fetchMetric = allMetrics.get(clientQuotaMetricName(QuotaType.Fetch, metricsTags))
136+
if (fetchMetric != null) {
137+
fetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch)))
139138
}
140139
}
141140
}
142141

142+
def updateQuota(quotaType: QuotaType, quota: Double): Unit = {
143+
// update the quota in the config first to make sure the new quota will be used if {@link #updateQuotaMetricConfigs} is called
144+
quotaType match {
145+
case QuotaType.RequestRate => config.requestRateQuota(quota)
146+
case QuotaType.Produce => config.produceQuota(quota)
147+
case QuotaType.Fetch => config.fetchQuota(quota)
148+
case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType")
149+
}
150+
151+
// update the metric config
152+
val allMetrics = metrics.metrics()
153+
val metric = allMetrics.get(clientQuotaMetricName(quotaType, metricsTags))
154+
if (metric != null) {
155+
metric.config(getQuotaMetricConfig(quotaLimit(quotaType)))
156+
}
157+
}
158+
143159
def throttle(
144160
quotaType: QuotaType,
145161
throttleCallback: ThrottleCallback,
@@ -162,7 +178,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
162178
s"$quotaType-${metricTagsToSensorSuffix(metricTags)}"
163179

164180
private def quotaLimit(quotaType: QuotaType): Double = {
165-
if (quotaType == QuotaType.Request) config.requestQuota
181+
if (quotaType == QuotaType.RequestRate) config.requestRateQuota
166182
else if (quotaType == QuotaType.Produce) config.produceQuota
167183
else if (quotaType == QuotaType.Fetch) config.fetchQuota
168184
else throw new IllegalArgumentException(s"Unknown quota type $quotaType")

core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala

+4-6
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,7 @@ class ElasticKafkaApis(
312312
val requestThrottleTimeMs =
313313
if (produceRequest.acks == 0) 0
314314
else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
315-
val brokerRequestThrottleTimeMs =
316-
if (produceRequest.acks == 0) 0
317-
else quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Request, request, 1, timeMs)
315+
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs)
318316
val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0)
319317
if (maxThrottleTimeMs > 0) {
320318
request.apiThrottleTimeMs = maxThrottleTimeMs
@@ -325,7 +323,7 @@ class ElasticKafkaApis(
325323
} else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) {
326324
requestHelper.throttle(QuotaType.Produce, quotas.broker, request, brokerBandwidthThrottleTimeMs)
327325
} else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) {
328-
requestHelper.throttle(QuotaType.Request, quotas.broker, request, brokerRequestThrottleTimeMs)
326+
requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs)
329327
}
330328
}
331329
// AutoMQ for Kafka inject end
@@ -700,7 +698,7 @@ class ElasticKafkaApis(
700698
// AutoMQ for Kafka inject start
701699
val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
702700
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
703-
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Request, request, 1, timeMs)
701+
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs)
704702
val brokerBandwidthThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Fetch, request, responseSize, timeMs)
705703

706704
val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0)
@@ -717,7 +715,7 @@ class ElasticKafkaApis(
717715
} else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) {
718716
requestHelper.throttle(QuotaType.Fetch, quotas.broker, request, brokerBandwidthThrottleTimeMs)
719717
} else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) {
720-
requestHelper.throttle(QuotaType.Request, quotas.broker, request, brokerRequestThrottleTimeMs)
718+
requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs)
721719
}
722720
// AutoMQ for Kafka inject end
723721

core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java

+49-5
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,67 @@ public void testQuota() {
109109
properties.put(QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, 0);
110110
properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 1);
111111
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
112-
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time);
112+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
113113
assertEquals(0, result);
114-
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time + 10);
114+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10);
115115
assertEquals(0, result);
116-
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time + second2millis);
116+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + second2millis);
117117
assertTrue(result > 0);
118118

119119
properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 10);
120120
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
121-
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 0, time + second2millis);
121+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 0, time + second2millis);
122122
assertEquals(0, result);
123123
}
124124

125+
@Test
126+
public void testUpdateQuota() {
127+
int result;
128+
long time = this.time.milliseconds();
129+
130+
// enable quota
131+
Properties properties = new Properties();
132+
properties.put(QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, true);
133+
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
134+
135+
brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
136+
// rate = 1 / 2000ms
137+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
138+
assertEquals(0, result);
139+
// rate = 2 / 2010ms
140+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10);
141+
assertEquals(0, result);
142+
// rate = 3 / 2999ms > 1
143+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
144+
assertEquals(1, result);
145+
146+
brokerQuotaManager.updateQuota(QuotaType.requestRate(), 2);
147+
// rate = 4 / 2999ms
148+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
149+
assertEquals(0, result);
150+
// rate = 5 / 2999ms
151+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
152+
assertEquals(0, result);
153+
// rate = 6 / 2999ms > 2
154+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
155+
assertEquals(1, result);
156+
157+
brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
158+
// rate = 5 / 2999ms > 1
159+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999);
160+
assertEquals(1000, result);
161+
// rate = 2 / 2000ms
162+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 1);
163+
assertEquals(0, result);
164+
// rate = 3 / 2999ms > 1
165+
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 2999);
166+
assertEquals(1, result);
167+
}
168+
125169
@Test
126170
public void testThrottle() {
127171
AtomicInteger throttleCounter = new AtomicInteger(0);
128-
brokerQuotaManager.throttle(QuotaType.request(), new ThrottleCallback() {
172+
brokerQuotaManager.throttle(QuotaType.requestRate(), new ThrottleCallback() {
129173
@Override
130174
public void startThrottling() {
131175
throttleCounter.incrementAndGet();

server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class BrokerQuotaManagerConfig extends ClientQuotaManagerConfig {
2626
private boolean quotaEnabled = false;
2727
private double produceQuota = Double.MAX_VALUE;
2828
private double fetchQuota = Double.MAX_VALUE;
29-
private double requestQuota = Double.MAX_VALUE;
29+
private double requestRateQuota = Double.MAX_VALUE;
3030

3131
private List<String> userWhiteList = List.of();
3232
private List<String> clientIdWhiteList = List.of();
@@ -42,7 +42,7 @@ public void update(Properties props) {
4242
quotaEnabled = getBoolean(map, QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, quotaEnabled);
4343
produceQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_PRODUCE_BYTES_CONFIG, produceQuota);
4444
fetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, fetchQuota);
45-
requestQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestQuota);
45+
requestRateQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestRateQuota);
4646

4747
String userWhiteListProp = props.getProperty(QuotaConfigs.BROKER_QUOTA_WHITE_LIST_USER_CONFIG);
4848
if (null != userWhiteListProp && !userWhiteListProp.isBlank()) {
@@ -72,12 +72,24 @@ public double produceQuota() {
7272
return produceQuota;
7373
}
7474

75+
public void produceQuota(double produceQuota) {
76+
this.produceQuota = produceQuota;
77+
}
78+
7579
public double fetchQuota() {
7680
return fetchQuota;
7781
}
7882

79-
public double requestQuota() {
80-
return requestQuota;
83+
public void fetchQuota(double fetchQuota) {
84+
this.fetchQuota = fetchQuota;
85+
}
86+
87+
public double requestRateQuota() {
88+
return requestRateQuota;
89+
}
90+
91+
public void requestRateQuota(double requestRateQuota) {
92+
this.requestRateQuota = requestRateQuota;
8193
}
8294

8395
public List<String> userWhiteList() {

0 commit comments

Comments
 (0)