|
| 1 | +import logging |
| 2 | +import time |
| 3 | +from enum import Enum |
| 4 | +from math import ceil |
| 5 | +from typing import Any, Literal, NotRequired, TypedDict, cast, overload |
| 6 | + |
| 7 | +from django.core.cache import cache |
| 8 | + |
| 9 | +from sentry.ratelimits.sliding_windows import ( |
| 10 | + GrantedQuota, |
| 11 | + Quota, |
| 12 | + RedisSlidingWindowRateLimiter, |
| 13 | + RequestedQuota, |
| 14 | +) |
| 15 | + |
| 16 | +logger = logging.getLogger(__name__) |
| 17 | + |
| 18 | + |
| 19 | +class CircuitBreakerState(Enum): |
| 20 | + CLOSED = "circuit_closed" |
| 21 | + BROKEN = "circuit_broken" |
| 22 | + RECOVERY = "recovery" |
| 23 | + |
| 24 | + |
| 25 | +class CircuitBreakerConfig(TypedDict): |
| 26 | + # The number of errors within the given time period necessary to trip the breaker |
| 27 | + error_limit: int |
| 28 | + # The time period, in seconds, over which we're tracking errors |
| 29 | + error_limit_window: int |
| 30 | + # The length, in seconds, of each time bucket ("granule") used by the underlying rate limiter - |
| 31 | + # effectively the resolution of the time window. Will be set automatically based on |
| 32 | + # `error_limit_window` if not provided. |
| 33 | + error_limit_window_granularity: NotRequired[int] |
| 34 | + # How long, in seconds, to stay in the broken state (blocking all requests) before entering the |
| 35 | + # recovery phase |
| 36 | + broken_state_duration: int |
| 37 | + # The number of errors within the given time period necessary to trip the breaker while in recovery |
| 38 | + recovery_error_limit: int |
| 39 | + # The time period, in seconds, over which we're tracking errors in recovery |
| 40 | + recovery_error_limit_window: int |
| 41 | + # The length, in seconds, of each time bucket ("granule") used by the underlying rate limiter - |
| 42 | + # effectively the resolution of the time window. Will be set automatically based on |
| 43 | + # `recovery_error_limit_window` if not provided. |
| 44 | + recovery_error_limit_window_granularity: NotRequired[int] |
| 45 | + # How long, in seconds, to stay in the recovery state (allowing requests but with a stricter |
| 46 | + # error limit) before returning to normal operation. |
| 47 | + recovery_duration: int |
| 48 | + |
| 49 | + |
| 50 | +# TODO: These limits were estimated based on EA traffic. (In an average 10 min period, there are |
| 51 | +# roughly 35K events without matching hashes. About 2% of orgs are EA, so for simplicity, assume 2% |
| 52 | +# of those events are from EA orgs. If we're willing to tolerate up to a 95% failure rate, then we |
| 53 | +# need 35K * 0.02 * 0.95 events to fail to trip the breaker. Technically that's 665, not 666, but |
| 54 | +# we're talking about everything going to hell, so the bump to 666 seemed appropriate!) |
| 55 | +# |
| 56 | +# When we GA, we should multiply both the limits by 50 (to remove the 2% part of the current |
| 57 | +# calculation). |
| 58 | +CIRCUIT_BREAKER_DEFAULT_CONFIG: CircuitBreakerConfig = { |
| 59 | + "error_limit": 666, |
| 60 | + "error_limit_window": 600, # 10 min |
| 61 | + "broken_state_duration": 300, # 5 min |
| 62 | + "recovery_error_limit": 3, # In recovery, we're twice as strict as normal limit |
| 63 | + "recovery_error_limit_window": 60, # And we bail much more quickly |
| 64 | + "recovery_duration": 300, # 5 min |
| 65 | +} |
| 66 | + |
| 67 | + |
| 68 | +class CircuitBreaker: |
| 69 | + """ |
| 70 | + A circuit breaker to be used to temporarily block requests to or calls of a service or function |
| 71 | + which is throwing too many errors. |
| 72 | +
|
| 73 | + The breaker has three states: circuit CLOSED (normal operation), circuit BROKEN (all requests |
| 74 | + blocked), and RECOVERY (requests allowed under a stricter error limit). |
| 75 | +
|
| 76 | + In a CLOSED state (normal operation), the breaker tracks errors but allows through all |
| 77 | + requests. If the frequency of errors passes a given threshold, it moves to BROKEN state. |
| 78 | +
|
| 79 | + In a BROKEN state, all requests are blocked. Once a set amount of time has passed, it moves |
| 80 | + to RECOVERY state. |
| 81 | +
|
| 82 | + RECOVERY state is identical to CLOSED state, except that the threshold for the circuit |
| 83 | + breaking (moving back into BROKEN state) is much stricter. Once a set amount of time has |
| 84 | + passed without the breaker being tripped, it moves back to CLOSED state. |
| 85 | +
|
| 86 | + The overall idea is to stop hitting a service which seems to have broken, but periodically make |
| 87 | + short attempts to use it in order to be able to resume requests once it comes back up. |
| 88 | +
|
| 89 | + Usage: |
| 90 | +
|
| 91 | + # See `CircuitBreakerConfig` class for config options |
| 92 | + breaker = CircuitBreaker("squirrel_chasing", config) |
| 93 | +
|
| 94 | + def get_top_dogs(payload): |
| 95 | + try: |
| 96 | + if breaker.should_allow_request(): |
| 97 | + response = call_chase_simulation_service("/hall-of-fame", payload) |
| 98 | + else: |
| 99 | + logger.warning("Request blocked by circuit breaker!") |
| 100 | + return None |
| 101 | + except TimeoutError: |
| 102 | + breaker.record_error() |
| 103 | + return None |
| 104 | +
|
| 105 | + if response.status == 500: |
| 106 | + breaker.record_error() |
| 107 | + return None |
| 108 | +
|
| 109 | + return format_hof_entries(response) |
| 110 | +
|
| 111 | + The `breaker.should_allow_request()` check can alternatively be used outside of |
| 112 | + `get_top_dogs`, to prevent calls to it. In that case, the original `breaker` object can be |
| 113 | + imported alongside `get_top_dogs` or reinstantiated with the same config - it has no state of |
| 114 | + its own, instead relying on redis and the cache to track error count and breaker status. |
| 115 | + """ |
| 116 | + |
| 117 | + def __init__(self, key: str, config: CircuitBreakerConfig | None = None): |
| 118 | + self.key = key |
| 119 | + self.broken_state_key = f"{key}.circuit_breaker.broken" |
| 120 | + self.recovery_state_key = f"{key}.circuit_breaker.in_recovery" |
| 121 | + |
| 122 | + final_config: CircuitBreakerConfig = { |
| 123 | + **CIRCUIT_BREAKER_DEFAULT_CONFIG, |
| 124 | + **(config or cast(Any, {})), |
| 125 | + } |
| 126 | + default_window_granularity = self._get_default_window_granularity( |
| 127 | + final_config["error_limit_window"] |
| 128 | + ) |
| 129 | + default_recovery_window_granularity = self._get_default_window_granularity( |
| 130 | + final_config["recovery_error_limit_window"] |
| 131 | + ) |
| 132 | + |
| 133 | + self.limiter = RedisSlidingWindowRateLimiter() |
| 134 | + self.primary_quota = Quota( |
| 135 | + final_config["error_limit_window"], |
| 136 | + final_config.get("error_limit_window_granularity", default_window_granularity), |
| 137 | + final_config["error_limit"], |
| 138 | + f"{key}.circuit_breaker", |
| 139 | + ) |
| 140 | + self.recovery_quota = Quota( |
| 141 | + final_config["recovery_error_limit_window"], |
| 142 | + final_config.get( |
| 143 | + "recovery_error_limit_window_granularity", default_recovery_window_granularity |
| 144 | + ), |
| 145 | + final_config["recovery_error_limit"], |
| 146 | + f"{key}.circuit_breaker_recovery", |
| 147 | + ) |
| 148 | + |
| 149 | + self.broken_state_duration = final_config["broken_state_duration"] |
| 150 | + self.recovery_duration = final_config["recovery_duration"] |
| 151 | + |
| 152 | + if self.recovery_duration < final_config["recovery_error_limit_window"]: |
| 153 | + logger.warning( |
| 154 | + "Circuit breaker %s has `recovery_duration` < `recovery_error_limit_window`." |
| 155 | + + " Recovery duration has been reset to match the window.", |
| 156 | + key, |
| 157 | + ) |
| 158 | + self.recovery_duration = final_config["recovery_error_limit_window"] |
| 159 | + if self.recovery_duration < final_config["recovery_error_limit_window"]: |
| 160 | + logger.warning( |
| 161 | + "Circuit breaker %s has `recovery_duration` < `recovery_error_limit_window`." |
| 162 | + + " Recovery duration has been reset to match the window.", |
| 163 | + key, |
| 164 | + ) |
| 165 | + self.recovery_duration = final_config["recovery_error_limit_window"] |
| 166 | + |
| 167 | + def record_error(self) -> None: |
| 168 | + state, seconds_left_in_state = self._get_state_and_remaining_time() |
| 169 | + |
| 170 | + if state == CircuitBreakerState.BROKEN: |
| 171 | + # If the circuit is broken, and `should_allow_request` is being used correctly, requests |
| 172 | + # should be blocked and we shouldn't even be here. That said, maybe there was a race |
| 173 | + # condition, so make sure the circuit hasn't just broken before crying foul. |
| 174 | + seconds_elapsed_in_state = self.broken_state_duration - seconds_left_in_state |
| 175 | + if seconds_elapsed_in_state > 5: |
| 176 | + logger.warning( |
| 177 | + "Attempt to record circuit breaker error while circuit is broken", |
| 178 | + extra={"key": self.key, "time_in_state": seconds_elapsed_in_state}, |
| 179 | + ) |
| 180 | + # We shouldn't have made the request, so don't record the error |
| 181 | + return |
| 182 | + |
| 183 | + # We track errors with the primary quota even during recovery (when we're not checking it), |
| 184 | + # because they still happened, and eventually switching back to the closed state doesn't |
| 185 | + # make that untrue |
| 186 | + quotas = ( |
| 187 | + [self.primary_quota, self.recovery_quota] |
| 188 | + if state == CircuitBreakerState.RECOVERY |
| 189 | + else [self.primary_quota] |
| 190 | + ) |
| 191 | + self.limiter.use_quotas( |
| 192 | + [RequestedQuota(self.key, 1, quotas)], [GrantedQuota(self.key, 1, [])], int(time.time()) |
| 193 | + ) |
| 194 | + |
| 195 | + # If incrementing has made us hit the current limit, switch to the broken state |
| 196 | + controlling_quota = self._get_controlling_quota(state) |
| 197 | + remaining_errors_allowed = self.get_remaining_error_quota(controlling_quota) |
| 198 | + if remaining_errors_allowed == 0: |
| 199 | + logger.warning( |
| 200 | + "Circuit breaker '%s' error limit hit", |
| 201 | + self.key, |
| 202 | + extra={ |
| 203 | + "current_state": state, |
| 204 | + "error_limit": controlling_quota.limit, |
| 205 | + "error_limit_window": controlling_quota.window_seconds, |
| 206 | + }, |
| 207 | + ) |
| 208 | + |
| 209 | + # Recovery will only start after the broken state has expired, so push out the recovery |
| 210 | + # expiry time. We'll store the expiries as our cache values so we can determine how long |
| 211 | + # we've been in a given state. |
| 212 | + now = int(time.time()) |
| 213 | + broken_state_timeout = self.broken_state_duration |
| 214 | + recovery_state_timeout = self.broken_state_duration + self.recovery_duration |
| 215 | + broken_state_expiry = now + broken_state_timeout |
| 216 | + recovery_state_expiry = now + recovery_state_timeout |
| 217 | + |
| 218 | + # Set cache keys for switching state. While they're both set (starting now) we'll be in |
| 219 | + # the broken state. Once the broken state key expires we'll switch to recovery, and then |
| 220 | + # once the recovery key expires we'll be back to normal. |
| 221 | + cache.set(self.broken_state_key, broken_state_expiry, broken_state_timeout) |
| 222 | + cache.set(self.recovery_state_key, recovery_state_expiry, recovery_state_timeout) |
| 223 | + |
| 224 | + def should_allow_request(self) -> bool: |
| 225 | + state, _ = self._get_state_and_remaining_time() |
| 226 | + |
| 227 | + if state == CircuitBreakerState.BROKEN: |
| 228 | + return False |
| 229 | + |
| 230 | + controlling_quota = self._get_controlling_quota(state) |
| 231 | + |
| 232 | + return self.get_remaining_error_quota(controlling_quota) > 0 |
| 233 | + |
| 234 | + @overload |
| 235 | + def get_remaining_error_quota(self, quota: None, window_end: int | None) -> None: |
| 236 | + ... |
| 237 | + |
| 238 | + @overload |
| 239 | + def get_remaining_error_quota(self, quota: Quota, window_end: int | None) -> int: |
| 240 | + ... |
| 241 | + |
| 242 | + @overload |
| 243 | + def get_remaining_error_quota(self, quota: None) -> None: |
| 244 | + ... |
| 245 | + |
| 246 | + @overload |
| 247 | + def get_remaining_error_quota(self, quota: Quota) -> int: |
| 248 | + ... |
| 249 | + |
| 250 | + @overload |
| 251 | + def get_remaining_error_quota(self) -> int | None: |
| 252 | + ... |
| 253 | + |
| 254 | + def get_remaining_error_quota( |
| 255 | + self, quota: Quota | None = None, window_end: int | None = None |
| 256 | + ) -> int | None: |
| 257 | + """ |
| 258 | + # TODO: write me |
| 259 | + returns None when in broken state |
| 260 | + """ |
| 261 | + if not quota: |
| 262 | + quota = self._get_controlling_quota() |
| 263 | + if quota is None: # broken state |
| 264 | + return None |
| 265 | + |
| 266 | + now = int(time.time()) |
| 267 | + window_end = window_end or now |
| 268 | + |
| 269 | + _, result = self.limiter.check_within_quotas( |
| 270 | + [RequestedQuota(self.key, quota.limit, [quota])], |
| 271 | + window_end, |
| 272 | + ) |
| 273 | + |
| 274 | + return result[0].granted |
| 275 | + |
| 276 | + def _get_default_window_granularity(self, window_duration: int) -> int: |
| 277 | + # Never more than 10 buckets, and no bucket smaller than 5 seconds. If greater precision is |
| 278 | + # needed, the `error_limit_window_granularity` and `recovery_error_limit_window_granularity` |
| 279 | + # config options can be used. |
| 280 | + return max(ceil(window_duration / 10), 5) |
| 281 | + |
| 282 | + @overload |
| 283 | + def _get_controlling_quota( |
| 284 | + self, state: Literal[CircuitBreakerState.CLOSED, CircuitBreakerState.RECOVERY] |
| 285 | + ) -> Quota: |
| 286 | + ... |
| 287 | + |
| 288 | + @overload |
| 289 | + def _get_controlling_quota(self, state: Literal[CircuitBreakerState.BROKEN]) -> None: |
| 290 | + ... |
| 291 | + |
| 292 | + @overload |
| 293 | + def _get_controlling_quota(self) -> Quota | None: |
| 294 | + ... |
| 295 | + |
| 296 | + def _get_controlling_quota(self, state: CircuitBreakerState | None = None) -> Quota | None: |
| 297 | + """ |
| 298 | + # TODO: write me |
| 299 | + returns None when in broken state |
| 300 | + """ |
| 301 | + controlling_quota_by_state = { |
| 302 | + CircuitBreakerState.CLOSED: self.primary_quota, |
| 303 | + CircuitBreakerState.BROKEN: None, |
| 304 | + CircuitBreakerState.RECOVERY: self.recovery_quota, |
| 305 | + } |
| 306 | + |
| 307 | + _state = state or self._get_state_and_remaining_time()[0] |
| 308 | + |
| 309 | + return controlling_quota_by_state[_state] |
| 310 | + |
| 311 | + def _get_state_and_remaining_time(self) -> tuple[CircuitBreakerState, int]: |
| 312 | + """ |
| 313 | + Return the current state of the breaker (closed, broken, or in recovery), along with the |
| 314 | + number of seconds until that state expires. |
| 315 | + """ |
| 316 | + now = int(time.time()) |
| 317 | + |
| 318 | + # The broken state key should always expire before the recovery state one, so check it first |
| 319 | + if cache.has_key(self.broken_state_key): |
| 320 | + return (CircuitBreakerState.BROKEN, cache.get(self.broken_state_key) - now) |
| 321 | + |
| 322 | + if cache.has_key(self.recovery_state_key): |
| 323 | + return (CircuitBreakerState.RECOVERY, cache.get(self.recovery_state_key) - now) |
| 324 | + |
| 325 | + # TODO Fix this with overloads? |
| 326 | + # 0 here is just a placeholder, as "remaining seconds" doesn't really apply to a state we |
| 327 | + # hope to stay in indefinitely |
| 328 | + return (CircuitBreakerState.CLOSED, 0) |
0 commit comments