Skip to content

Commit fc0b22b

Browse files
authored
ref(dyanmic_sampling): Add more intstrumentation to sliding window org (#53450)
1 parent 37d4a80 commit fc0b22b

File tree

5 files changed

+56
-26
lines changed

5 files changed

+56
-26
lines changed

src/sentry/dynamic_sampling/tasks/boost_low_volume_projects.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ def boost_low_volume_projects_of_org(
114114
Tuple[ProjectId, int, DecisionKeepCount, DecisionDropCount]
115115
],
116116
) -> None:
117-
adjust_sample_rates_of_projects(org_id, projects_with_tx_count_and_rates)
117+
# secondary tasks should not log the context, I need the context only for calling
118+
# `adjust_sample_rates_of_projects`, the accumulated info will be ignored.
119+
context = TaskContext("not_used", MAX_TASK_SECONDS)
120+
adjust_sample_rates_of_projects(org_id, projects_with_tx_count_and_rates, context)
118121

119122

120123
def fetch_projects_with_total_root_transaction_count_and_rates(
@@ -239,6 +242,7 @@ def fetch_projects_with_total_root_transaction_count_and_rates(
239242
def adjust_sample_rates_of_projects(
240243
org_id: int,
241244
projects_with_tx_count: Sequence[Tuple[ProjectId, int, DecisionKeepCount, DecisionDropCount]],
245+
context: TaskContext,
242246
) -> None:
243247
"""
244248
Adjusts the sample rates of projects belonging to a specific org.
@@ -253,7 +257,7 @@ def adjust_sample_rates_of_projects(
253257

254258
# We get the sample rate either directly from quotas or from the new sliding window org mechanism.
255259
if organization is not None and is_sliding_window_org_enabled(organization):
256-
sample_rate = get_adjusted_base_rate_from_cache_or_compute(org_id)
260+
sample_rate = get_adjusted_base_rate_from_cache_or_compute(org_id, context)
257261
log_sample_rate_source(
258262
org_id, None, "boost_low_volume_projects", "sliding_window_org", sample_rate
259263
)

src/sentry/dynamic_sampling/tasks/common.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,11 @@ def are_equal_with_epsilon(a: Optional[float], b: Optional[float]) -> bool:
583583

584584

585585
def compute_guarded_sliding_window_sample_rate(
586-
org_id: int, project_id: Optional[int], total_root_count: int, window_size: int
586+
org_id: int,
587+
project_id: Optional[int],
588+
total_root_count: int,
589+
window_size: int,
590+
context: TaskContext,
587591
) -> Optional[float]:
588592
"""
589593
Computes the actual sliding window sample rate by guarding any exceptions and returning None in case
@@ -592,14 +596,16 @@ def compute_guarded_sliding_window_sample_rate(
592596
try:
593597
# We want to compute the sliding window sample rate by considering a window of time.
594598
# This piece of code is very delicate, thus we want to guard it properly and capture any errors.
595-
return compute_sliding_window_sample_rate(org_id, project_id, total_root_count, window_size)
599+
return compute_sliding_window_sample_rate(
600+
org_id, project_id, total_root_count, window_size, context
601+
)
596602
except Exception as e:
597603
sentry_sdk.capture_exception(e)
598604
return None
599605

600606

601607
def compute_sliding_window_sample_rate(
602-
org_id: int, project_id: Optional[int], total_root_count: int, window_size: int
608+
org_id: int, project_id: Optional[int], total_root_count: int, window_size: int, context
603609
) -> Optional[float]:
604610
"""
605611
Computes the actual sample rate for the sliding window given the total root count and the size of the
@@ -608,7 +614,8 @@ def compute_sliding_window_sample_rate(
608614
The org_id is used only because it is required on the quotas side to determine whether dynamic sampling is
609615
enabled in the first place for that project.
610616
"""
611-
extrapolated_volume = extrapolate_monthly_volume(volume=total_root_count, hours=window_size)
617+
with context.get_timer("extrapolate_monthly_volume"):
618+
extrapolated_volume = extrapolate_monthly_volume(volume=total_root_count, hours=window_size)
612619
if extrapolated_volume is None:
613620
with sentry_sdk.push_scope() as scope:
614621
scope.set_extra("org_id", org_id)
@@ -622,9 +629,10 @@ def compute_sliding_window_sample_rate(
622629
org_id, project_id, total_root_count, extrapolated_volume, window_size
623630
)
624631

625-
sampling_tier = quotas.get_transaction_sampling_tier_for_volume( # type:ignore
626-
org_id, extrapolated_volume
627-
)
632+
with context.get_timer("get_transaction_sampling_tier_for_volume"):
633+
sampling_tier = quotas.get_transaction_sampling_tier_for_volume( # type:ignore
634+
org_id, extrapolated_volume
635+
)
628636
if sampling_tier is None:
629637
return None
630638

@@ -636,7 +644,9 @@ def compute_sliding_window_sample_rate(
636644
return float(sample_rate)
637645

638646

639-
def get_adjusted_base_rate_from_cache_or_compute(org_id: int) -> Optional[float]:
647+
def get_adjusted_base_rate_from_cache_or_compute(
648+
org_id: int, context: TaskContext
649+
) -> Optional[float]:
640650
"""
641651
Gets the adjusted base sample rate from the sliding window directly from the Redis cache or tries to compute
642652
it synchronously.
@@ -656,7 +666,11 @@ def get_adjusted_base_rate_from_cache_or_compute(org_id: int) -> Optional[float]
656666
)
657667
if (org_total_root_count := orgs_with_counts.get(org_id)) is not None:
658668
return compute_guarded_sliding_window_sample_rate(
659-
org_id, None, org_total_root_count, window_size
669+
org_id,
670+
None,
671+
org_total_root_count,
672+
window_size,
673+
context,
660674
)
661675

662676
return None

src/sentry/dynamic_sampling/tasks/recalibrate_orgs.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ def recalibrate_org(org_volume: OrganizationDataVolume, context: TaskContext) ->
114114
"ready_for_recalibration", {"org_id": org_volume.org_id}, orgs_to_check(org_volume)
115115
)
116116

117-
target_sample_rate = get_adjusted_base_rate_from_cache_or_compute(org_volume.org_id)
117+
target_sample_rate = get_adjusted_base_rate_from_cache_or_compute(
118+
org_volume.org_id, context
119+
)
118120
log_sample_rate_source(
119121
org_volume.org_id, None, "recalibrate_orgs", "sliding_window_org", target_sample_rate
120122
)

src/sentry/dynamic_sampling/tasks/sliding_window.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,11 @@ def adjust_base_sample_rates_of_projects(
133133

134134
for project_id, total_root_count in projects_with_total_root_count:
135135
sample_rate = compute_guarded_sliding_window_sample_rate(
136-
org_id, project_id, total_root_count, window_size
136+
org_id,
137+
project_id,
138+
total_root_count,
139+
window_size,
140+
context,
137141
)
138142

139143
# If the sample rate is None, we want to add a sentinel value into Redis, the goal being that when generating

src/sentry/dynamic_sampling/tasks/sliding_window_org.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,25 +83,31 @@ def adjust_base_sample_rate_of_org(
8383
if time.monotonic() > context.expiration_time:
8484
raise TimeoutException(context)
8585

86-
name = adjust_base_sample_rate_of_org.__name__
87-
timer = context.get_timer(name)
86+
func_name = adjust_base_sample_rate_of_org.__name__
87+
timer = context.get_timer(func_name)
8888
with timer:
89-
sample_rate = compute_guarded_sliding_window_sample_rate(
90-
org_id, None, total_root_count, window_size
91-
)
89+
with context.get_timer(compute_guarded_sliding_window_sample_rate.__name__):
90+
sample_rate = compute_guarded_sliding_window_sample_rate(
91+
org_id,
92+
None,
93+
total_root_count,
94+
window_size,
95+
context,
96+
)
9297
# If the sample rate is None, we don't want to store a value into Redis, but we prefer to keep the system
9398
# with the old value.
9499
if sample_rate is None:
95100
return
96101

97-
redis_client = get_redis_client_for_ds()
98-
with redis_client.pipeline(transaction=False) as pipeline:
99-
cache_key = generate_sliding_window_org_cache_key(org_id=org_id)
100-
pipeline.set(cache_key, sample_rate)
101-
pipeline.pexpire(cache_key, DEFAULT_REDIS_CACHE_KEY_TTL)
102-
pipeline.execute()
102+
with context.get_timer("redis_updates"):
103+
redis_client = get_redis_client_for_ds()
104+
with redis_client.pipeline(transaction=False) as pipeline:
105+
cache_key = generate_sliding_window_org_cache_key(org_id=org_id)
106+
pipeline.set(cache_key, sample_rate)
107+
pipeline.pexpire(cache_key, DEFAULT_REDIS_CACHE_KEY_TTL)
108+
pipeline.execute()
103109

104-
state = context.get_function_state(name)
110+
state = context.get_function_state(func_name)
105111
state.num_orgs += 1
106112
state.num_iterations += 1
107-
context.set_function_state(name, state)
113+
context.set_function_state(func_name, state)

0 commit comments

Comments
 (0)