Skip to content

Commit 8e004ee

Browse files
committed
fix(quota): check whether the client in white list before fetch (#2181)
Signed-off-by: Ning Yu <[email protected]>
1 parent d924569 commit 8e004ee

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

Diff for: core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala

+13-17
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
4242

4343
override def delayQueueSensor: Sensor = brokerDelayQueueSensor
4444

45-
def getMaxValueInQuotaWindow(quotaType: QuotaType): Double = {
46-
if (config.quotaEnabled) {
45+
def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = {
46+
if (shouldThrottle(request)) {
4747
quotaLimit(quotaType)
4848
} else {
4949
Double.MaxValue
@@ -67,28 +67,24 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
6767

6868
def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, request: RequestChannel.Request, value: Double,
6969
timeMs: Long): Int = {
70-
if (!config.quotaEnabled) {
71-
// Quota is disabled, no need to throttle
72-
return 0
73-
}
74-
75-
if (isInternalClient(request.context.clientId())) {
76-
// Internal clients are exempt from quota
77-
return 0
78-
}
79-
80-
if (isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName())) {
81-
// Client is in the white list, no need to throttle
82-
return 0
70+
if (shouldThrottle(request)) {
71+
maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs)
72+
} else {
73+
0
8374
}
84-
85-
maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs)
8675
}
8776

8877
override protected def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {
8978
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
9079
}
9180

81+
private def shouldThrottle(request: RequestChannel.Request): Boolean = {
82+
val quotaEnabled = config.quotaEnabled
83+
val isInternal = isInternalClient(request.context.clientId())
84+
val isWhiteListed = isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName())
85+
quotaEnabled && !isInternal && !isWhiteListed
86+
}
87+
9288
private def isInternalClient(clientId: String): Boolean = {
9389
clientId.startsWith(QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX)
9490
}

Diff for: core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ class ElasticKafkaApis(
753753
Int.MaxValue
754754
else {
755755
val maxValue = quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
756-
val brokerMaxValue = quotas.broker.getMaxValueInQuotaWindow(QuotaType.Fetch).toInt
756+
val brokerMaxValue = quotas.broker.getMaxValueInQuotaWindow(QuotaType.Fetch, request).toInt
757757
math.min(maxValue, brokerMaxValue)
758758
}
759759

0 commit comments

Comments
 (0)