Skip to content

Commit 6fe2974

Browse files
authored
Backpressure handling prototype (#2189)
* new Monitor class performs health checks in a thread every 10s * current health checks are - transport worker queue is not full and transport is not rate limited * if not healthy, we downsample / halve in steps till healthy again * we will record client reports with reason `backpressure` for when we are downsampling * exposed as experimental `enable_backpressure_handling` related to #2095 and https://github.com/getsentry/team-webplatform-meta/issues/50
1 parent d26fe80 commit 6fe2974

File tree

7 files changed

+235
-6
lines changed

7 files changed

+235
-6
lines changed

sentry_sdk/client.py

+10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from sentry_sdk.envelope import Envelope
3333
from sentry_sdk.profiler import has_profiling_enabled, setup_profiler
3434
from sentry_sdk.scrubber import EventScrubber
35+
from sentry_sdk.monitor import Monitor
3536

3637
from sentry_sdk._types import TYPE_CHECKING
3738

@@ -210,6 +211,13 @@ def _capture_envelope(envelope):
210211
_client_init_debug.set(self.options["debug"])
211212
self.transport = make_transport(self.options)
212213

214+
self.monitor = None
215+
if self.transport:
216+
if self.options["_experiments"].get(
217+
"enable_backpressure_handling", False
218+
):
219+
self.monitor = Monitor(self.transport)
220+
213221
self.session_flusher = SessionFlusher(capture_func=_capture_envelope)
214222

215223
request_bodies = ("always", "never", "small", "medium")
@@ -571,6 +579,8 @@ def close(
571579
if self.transport is not None:
572580
self.flush(timeout=timeout, callback=callback)
573581
self.session_flusher.kill()
582+
if self.monitor:
583+
self.monitor.kill()
574584
self.transport.kill()
575585
self.transport = None
576586

sentry_sdk/consts.py

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
# TODO: Remove these 2 profiling related experiments
3737
"profiles_sample_rate": Optional[float],
3838
"profiler_mode": Optional[ProfilerMode],
39+
"enable_backpressure_handling": Optional[bool],
3940
},
4041
total=False,
4142
)

sentry_sdk/monitor.py

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import os
2+
import time
3+
from threading import Thread, Lock
4+
5+
import sentry_sdk
6+
from sentry_sdk.utils import logger
7+
from sentry_sdk._types import TYPE_CHECKING
8+
9+
if TYPE_CHECKING:
10+
from typing import Optional
11+
12+
13+
class Monitor(object):
14+
"""
15+
Performs health checks in a separate thread once every interval seconds
16+
and updates the internal state. Other parts of the SDK only read this state
17+
and act accordingly.
18+
"""
19+
20+
name = "sentry.monitor"
21+
22+
def __init__(self, transport, interval=10):
23+
# type: (sentry_sdk.transport.Transport, float) -> None
24+
self.transport = transport # type: sentry_sdk.transport.Transport
25+
self.interval = interval # type: float
26+
27+
self._healthy = True
28+
self._downsample_factor = 1 # type: int
29+
30+
self._thread = None # type: Optional[Thread]
31+
self._thread_lock = Lock()
32+
self._thread_for_pid = None # type: Optional[int]
33+
self._running = True
34+
35+
def _ensure_running(self):
36+
# type: () -> None
37+
if self._thread_for_pid == os.getpid() and self._thread is not None:
38+
return None
39+
40+
with self._thread_lock:
41+
if self._thread_for_pid == os.getpid() and self._thread is not None:
42+
return None
43+
44+
def _thread():
45+
# type: (...) -> None
46+
while self._running:
47+
time.sleep(self.interval)
48+
if self._running:
49+
self.run()
50+
51+
thread = Thread(name=self.name, target=_thread)
52+
thread.daemon = True
53+
thread.start()
54+
self._thread = thread
55+
self._thread_for_pid = os.getpid()
56+
57+
return None
58+
59+
def run(self):
60+
# type: () -> None
61+
self.check_health()
62+
self.set_downsample_factor()
63+
64+
def set_downsample_factor(self):
65+
# type: () -> None
66+
if self._healthy:
67+
if self._downsample_factor > 1:
68+
logger.debug(
69+
"[Monitor] health check positive, reverting to normal sampling"
70+
)
71+
self._downsample_factor = 1
72+
else:
73+
self._downsample_factor *= 2
74+
logger.debug(
75+
"[Monitor] health check negative, downsampling with a factor of %d",
76+
self._downsample_factor,
77+
)
78+
79+
def check_health(self):
80+
# type: () -> None
81+
"""
82+
Perform the actual health checks,
83+
currently only checks if the transport is rate-limited.
84+
TODO: augment in the future with more checks.
85+
"""
86+
self._healthy = self.transport.is_healthy()
87+
88+
def is_healthy(self):
89+
# type: () -> bool
90+
self._ensure_running()
91+
return self._healthy
92+
93+
@property
94+
def downsample_factor(self):
95+
# type: () -> int
96+
self._ensure_running()
97+
return self._downsample_factor
98+
99+
def kill(self):
100+
# type: () -> None
101+
self._running = False
102+
103+
def __del__(self):
104+
# type: () -> None
105+
self.kill()

sentry_sdk/tracing.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -595,9 +595,12 @@ def finish(self, hub=None, end_timestamp=None):
595595
# exclusively based on sample rate but also traces sampler, but
596596
# we handle this the same here.
597597
if client.transport and has_tracing_enabled(client.options):
598-
client.transport.record_lost_event(
599-
"sample_rate", data_category="transaction"
600-
)
598+
if client.monitor and client.monitor.downsample_factor > 1:
599+
reason = "backpressure"
600+
else:
601+
reason = "sample_rate"
602+
603+
client.transport.record_lost_event(reason, data_category="transaction")
601604

602605
return None
603606

@@ -749,9 +752,12 @@ def _set_initial_sampling_decision(self, sampling_context):
749752

750753
self.sample_rate = float(sample_rate)
751754

755+
if client.monitor:
756+
self.sample_rate /= client.monitor.downsample_factor
757+
752758
# if the function returned 0 (or false), or if `traces_sample_rate` is
753759
# 0, it's a sign the transaction should be dropped
754-
if not sample_rate:
760+
if not self.sample_rate:
755761
logger.debug(
756762
"[Tracing] Discarding {transaction_description} because {reason}".format(
757763
transaction_description=transaction_description,
@@ -768,7 +774,7 @@ def _set_initial_sampling_decision(self, sampling_context):
768774
# Now we roll the dice. random.random is inclusive of 0, but not of 1,
769775
# so strict < is safe here. In case sample_rate is a boolean, cast it
770776
# to a float (True becomes 1.0 and False becomes 0.0)
771-
self.sampled = random.random() < float(sample_rate)
777+
self.sampled = random.random() < self.sample_rate
772778

773779
if self.sampled:
774780
logger.debug(
@@ -780,7 +786,7 @@ def _set_initial_sampling_decision(self, sampling_context):
780786
logger.debug(
781787
"[Tracing] Discarding {transaction_description} because it's not included in the random sample (sampling rate = {sample_rate})".format(
782788
transaction_description=transaction_description,
783-
sample_rate=float(sample_rate),
789+
sample_rate=self.sample_rate,
784790
)
785791
)
786792

sentry_sdk/transport.py

+16
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ def record_lost_event(
107107
"""
108108
return None
109109

110+
def is_healthy(self):
111+
# type: () -> bool
112+
return True
113+
110114
def __del__(self):
111115
# type: () -> None
112116
try:
@@ -311,6 +315,18 @@ def _disabled(bucket):
311315

312316
return _disabled(category) or _disabled(None)
313317

318+
def _is_rate_limited(self):
319+
# type: () -> bool
320+
return any(ts > datetime.utcnow() for ts in self._disabled_until.values())
321+
322+
def _is_worker_full(self):
323+
# type: () -> bool
324+
return self._worker.full()
325+
326+
def is_healthy(self):
327+
# type: () -> bool
328+
return not (self._is_worker_full() or self._is_rate_limited())
329+
314330
def _send_event(
315331
self, event # type: Event
316332
):

sentry_sdk/worker.py

+4
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ def flush(self, timeout, callback=None):
9595
self._wait_flush(timeout, callback)
9696
logger.debug("background worker flushed")
9797

98+
def full(self):
99+
# type: () -> bool
100+
return self._queue.full()
101+
98102
def _wait_flush(self, timeout, callback):
99103
# type: (float, Optional[Any]) -> None
100104
initial_timeout = min(0.1, timeout)

tests/test_monitor.py

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import random
2+
3+
from sentry_sdk import Hub, start_transaction
4+
from sentry_sdk.transport import Transport
5+
6+
7+
class HealthyTestTransport(Transport):
8+
def _send_event(self, event):
9+
pass
10+
11+
def _send_envelope(self, envelope):
12+
pass
13+
14+
def is_healthy(self):
15+
return True
16+
17+
18+
class UnhealthyTestTransport(HealthyTestTransport):
19+
def is_healthy(self):
20+
return False
21+
22+
23+
def test_no_monitor_if_disabled(sentry_init):
24+
sentry_init(transport=HealthyTestTransport())
25+
assert Hub.current.client.monitor is None
26+
27+
28+
def test_monitor_if_enabled(sentry_init):
29+
sentry_init(
30+
transport=HealthyTestTransport(),
31+
_experiments={"enable_backpressure_handling": True},
32+
)
33+
34+
monitor = Hub.current.client.monitor
35+
assert monitor is not None
36+
assert monitor._thread is None
37+
38+
assert monitor.is_healthy() is True
39+
assert monitor.downsample_factor == 1
40+
assert monitor._thread is not None
41+
assert monitor._thread.name == "sentry.monitor"
42+
43+
44+
def test_monitor_unhealthy(sentry_init):
45+
sentry_init(
46+
transport=UnhealthyTestTransport(),
47+
_experiments={"enable_backpressure_handling": True},
48+
)
49+
50+
monitor = Hub.current.client.monitor
51+
monitor.interval = 0.1
52+
53+
assert monitor.is_healthy() is True
54+
monitor.run()
55+
assert monitor.is_healthy() is False
56+
assert monitor.downsample_factor == 2
57+
monitor.run()
58+
assert monitor.downsample_factor == 4
59+
60+
61+
def test_transaction_uses_downsampled_rate(
62+
sentry_init, capture_client_reports, monkeypatch
63+
):
64+
sentry_init(
65+
traces_sample_rate=1.0,
66+
transport=UnhealthyTestTransport(),
67+
_experiments={"enable_backpressure_handling": True},
68+
)
69+
70+
reports = capture_client_reports()
71+
72+
monitor = Hub.current.client.monitor
73+
monitor.interval = 0.1
74+
75+
# make sure rng doesn't sample
76+
monkeypatch.setattr(random, "random", lambda: 0.9)
77+
78+
assert monitor.is_healthy() is True
79+
monitor.run()
80+
assert monitor.is_healthy() is False
81+
assert monitor.downsample_factor == 2
82+
83+
with start_transaction(name="foobar") as transaction:
84+
assert transaction.sampled is False
85+
assert transaction.sample_rate == 0.5
86+
87+
assert reports == [("backpressure", "transaction")]

0 commit comments

Comments
 (0)