Skip to content

Commit e6e9043

Browse files
committed
fix(quota): limit the max throttle time (#2180)
Signed-off-by: Ning Yu <[email protected]>
1 parent fb106ec commit e6e9043

File tree

1 file changed

+3
-9
lines changed

1 file changed

+3
-9
lines changed

core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala

+3-9
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,8 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
8585
maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs)
8686
}
8787

88-
protected def throttleTime(quotaType: QuotaType, e: QuotaViolationException, timeMs: Long): Long = {
89-
quotaType match {
90-
case QuotaType.SlowFetch | QuotaType.RequestRate =>
91-
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
92-
case _ =>
93-
QuotaUtils.throttleTime(e, timeMs)
94-
}
95-
88+
override protected def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {
89+
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
9690
}
9791

9892
private def isInternalClient(clientId: String): Boolean = {
@@ -119,7 +113,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
119113
0
120114
} catch {
121115
case e: QuotaViolationException =>
122-
val throttleTimeMs = throttleTime(quotaType, e, timeMs).toInt
116+
val throttleTimeMs = throttleTime(e, timeMs).toInt
123117
debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)")
124118
throttleTimeMs
125119
}

0 commit comments

Comments
 (0)