Skip to content

Commit 68c18a3

Browse files
chloeho7getsantry[bot]ceorourke
authored
feat(integration-slack): store request error counts and disable on broken (#53621)
Continuing #52994 after revert > > Continuing #51126 > Milestone 1 of [Notify on Disabled Integration project](https://www.notion.so/sentry/Tech-Spec-Notify-on-Disabled-Integration-Spec-e7ea0f86ccd6419cb3e564067cf4a2ef?pvs=4) > > Implementing IntegrationRequestBuffer using Redis for logging errors and detecting broken integrations to be disabled. Request buffer stores daily aggregate error, fatal and success counts for an integration for 30 days. If an integration consistently fails for 7 days or has a fatal response, then the integration is broken. Later users will be alerted of broken integrations with an email or in-app. > > Disabling implemented under feature flag "slack-disable-on-broken". > Conditions for logging Implemented in BaseApiClient and SlackClient. --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com> Co-authored-by: Colleen O'Rourke <[email protected]>
1 parent bc16615 commit 68c18a3

File tree

10 files changed

+531
-9
lines changed

10 files changed

+531
-9
lines changed

Diff for: src/sentry/conf/server.py

+3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def env(
112112
SENTRY_TRANSACTION_NAMES_REDIS_CLUSTER = "default"
113113
SENTRY_WEBHOOK_LOG_REDIS_CLUSTER = "default"
114114
SENTRY_ARTIFACT_BUNDLES_INDEXING_REDIS_CLUSTER = "default"
115+
SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER = "default"
115116
SENTRY_DEBUG_FILES_REDIS_CLUSTER = "default"
116117

117118
# Hosts that are allowed to use system token authentication.
@@ -1340,6 +1341,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
13401341
"organizations:create": True,
13411342
# Enable usage of customer domains on the frontend
13421343
"organizations:customer-domains": False,
1344+
# Allow disabling integrations when broken is detected
1345+
"organizations:slack-disable-on-broken": False,
13431346
# Enable the 'discover' interface.
13441347
"organizations:discover": False,
13451348
# Enables events endpoint rate limit

Diff for: src/sentry/features/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,11 @@
259259
default_manager.add("organizations:pr-comment-bot", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
260260
default_manager.add("organizations:ds-org-recalibration", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
261261
default_manager.add("organizations:slack-use-new-lookup", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
262+
default_manager.add("organizations:slack-disable-on-broken", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
262263
default_manager.add("organizations:sourcemaps-bundle-flat-file-indexing", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
263264
default_manager.add("organizations:recap-server", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
264265

266+
265267
# Project scoped features
266268
default_manager.add("projects:alert-filters", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
267269
default_manager.add("projects:custom-inbound-filters", ProjectFeature, FeatureHandlerStrategy.INTERNAL)

Diff for: src/sentry/integrations/discord/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def __init__(
3737
org_integration_id = infer_org_integration(
3838
integration_id=integration_id, ctx_logger=logger
3939
)
40-
super().__init__(org_integration_id, verify_ssl, logging_context)
40+
super().__init__(integration_id, org_integration_id, verify_ssl, logging_context)
4141

4242
@control_silo_function
4343
def authorize_request(self, prepared_request: PreparedRequest) -> PreparedRequest:

Diff for: src/sentry/integrations/request_buffer.py

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import logging
2+
from datetime import datetime
3+
4+
from django.conf import settings
5+
from redis.exceptions import WatchError
6+
7+
from sentry.utils import json, redis
8+
9+
BUFFER_SIZE = 30 # 30 days
10+
KEY_EXPIRY = 60 * 60 * 24 * 30 # 30 days
11+
12+
IS_BROKEN_RANGE = 7 # 7 days
13+
14+
15+
class IntegrationRequestBuffer:
16+
"""
17+
Create a data structure to store daily successful and failed request counts for each installed integration in Redis
18+
This should store the aggregate counts of each type for last 30 days for each integration
19+
"""
20+
21+
def __init__(self, key):
22+
self.integrationkey = key
23+
logger = logging.getLogger(__name__)
24+
25+
try:
26+
cluster_id = settings.SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER
27+
self.client = redis.redis_clusters.get(cluster_id)
28+
except KeyError as e:
29+
logger.info("no_redis_cluster", extra={"error": e, "cluster_id": cluster_id})
30+
31+
def _convert_obj_to_dict(self, redis_object):
32+
"""
33+
Convert the request string stored in Redis to a python dict
34+
"""
35+
36+
return json.loads(redis_object)
37+
38+
def _get_all_from_buffer(self, buffer_key):
39+
"""
40+
Get the list at the buffer key.
41+
"""
42+
43+
return self.client.lrange(buffer_key, 0, BUFFER_SIZE - 1)
44+
45+
def _get_broken_range_from_buffer(self, buffer_key):
46+
"""
47+
Get the list at the buffer key in the broken range.
48+
"""
49+
50+
return self.client.lrange(buffer_key, 0, IS_BROKEN_RANGE - 1)
51+
52+
def _get(self):
53+
"""
54+
Returns the list of daily aggregate error counts.
55+
"""
56+
return [
57+
self._convert_obj_to_dict(obj)
58+
for obj in self._get_broken_range_from_buffer(self.integrationkey)
59+
]
60+
61+
def is_integration_broken(self):
62+
"""
63+
Integration is broken if we have 7 consecutive days of errors and no successes OR have a fatal error
64+
65+
"""
66+
items = self._get()
67+
68+
data = [
69+
datetime.strptime(item.get("date"), "%Y-%m-%d").date()
70+
for item in items
71+
if item.get("fatal_count", 0) > 0 and item.get("date")
72+
]
73+
74+
if len(data) > 0:
75+
return True
76+
77+
data = [
78+
datetime.strptime(item.get("date"), "%Y-%m-%d").date()
79+
for item in items
80+
if item.get("error_count", 0) > 0
81+
and item.get("success_count", 0) == 0
82+
and item.get("date")
83+
]
84+
85+
if not len(data):
86+
return False
87+
88+
if len(data) < IS_BROKEN_RANGE:
89+
return False
90+
91+
return True
92+
93+
def add(self, count: str):
94+
VALID_KEYS = ["success", "error", "fatal"]
95+
if count not in VALID_KEYS:
96+
raise Exception("Requires a valid key param.")
97+
98+
other_count1, other_count2 = list(set(VALID_KEYS).difference([count]))[0:2]
99+
now = datetime.now().strftime("%Y-%m-%d")
100+
101+
buffer_key = self.integrationkey
102+
pipe = self.client.pipeline()
103+
104+
while True:
105+
try:
106+
pipe.watch(buffer_key)
107+
recent_item_array = pipe.lrange(buffer_key, 0, 1) # get first element from array
108+
pipe.multi()
109+
if len(recent_item_array):
110+
recent_item = self._convert_obj_to_dict(recent_item_array[0])
111+
if recent_item.get("date") == now:
112+
recent_item[f"{count}_count"] += 1
113+
pipe.lset(buffer_key, 0, json.dumps(recent_item))
114+
else:
115+
data = {
116+
"date": now,
117+
f"{count}_count": 1,
118+
f"{other_count1}_count": 0,
119+
f"{other_count2}_count": 0,
120+
}
121+
pipe.lpush(buffer_key, json.dumps(data))
122+
123+
else:
124+
data = {
125+
"date": now,
126+
f"{count}_count": 1,
127+
f"{other_count1}_count": 0,
128+
f"{other_count2}_count": 0,
129+
}
130+
pipe.lpush(buffer_key, json.dumps(data))
131+
pipe.ltrim(buffer_key, 0, BUFFER_SIZE - 1)
132+
pipe.expire(buffer_key, KEY_EXPIRY)
133+
pipe.execute()
134+
break
135+
except WatchError:
136+
continue
137+
finally:
138+
pipe.reset()
139+
140+
def record_error(self):
141+
self.add("error")
142+
143+
def record_success(self):
144+
self.add("success")
145+
146+
def record_fatal(self):
147+
self.add("fatal")

Diff for: src/sentry/integrations/slack/client.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ def __init__(
3838
integration_id=self.integration_id, ctx_logger=logger
3939
)
4040

41-
super().__init__(org_integration_id, verify_ssl, logging_context)
41+
super().__init__(
42+
org_integration_id=org_integration_id,
43+
verify_ssl=verify_ssl,
44+
integration_id=integration_id,
45+
logging_context=logging_context,
46+
)
4247

4348
@control_silo_function
4449
def authorize_request(self, prepared_request: PreparedRequest) -> PreparedRequest:
@@ -57,13 +62,19 @@ def authorize_request(self, prepared_request: PreparedRequest) -> PreparedReques
5762
if not integration:
5863
logger.info("no_integration", extra={"path_url": prepared_request.path_url})
5964
return prepared_request
60-
6165
token = (
6266
integration.metadata.get("user_access_token") or integration.metadata["access_token"]
6367
)
6468
prepared_request.headers["Authorization"] = f"Bearer {token}"
6569
return prepared_request
6670

71+
def is_response_fatal(self, response: Response) -> bool:
72+
resp_json = response.json()
73+
if not resp_json.get("ok"):
74+
if "account_inactive" == resp_json.get("error", ""):
75+
return True
76+
return False
77+
6778
def track_response_data(
6879
self,
6980
code: Union[str, int],
@@ -74,7 +85,6 @@ def track_response_data(
7485
# if no span was passed, create a dummy to which to add data to avoid having to wrap every
7586
# span call in `if span`
7687
span = span or Span()
77-
7888
try:
7989
span.set_http_status(int(code))
8090
except ValueError:

Diff for: src/sentry/integrations/slack/integration.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class SlackIntegration(SlackNotifyBasicMixin, IntegrationInstallation):
7777
def get_client(self) -> SlackClient:
7878
if not self.org_integration:
7979
raise IntegrationError("Organization Integration does not exist")
80-
return SlackClient(org_integration_id=self.org_integration.id)
80+
return SlackClient(org_integration_id=self.org_integration.id, integration_id=self.model.id)
8181

8282
def get_config_data(self) -> Mapping[str, str]:
8383
metadata_ = self.model.metadata

Diff for: src/sentry/models/integrations/integration.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def get_provider(self) -> IntegrationProvider:
6868

6969
def delete(self, *args, **kwds):
7070
with outbox_context(
71-
transaction.atomic(router.db_for_write(OrganizationIntegration)), flush=False
71+
transaction.atomic(using=router.db_for_write(OrganizationIntegration)), flush=False
7272
):
7373
for organization_integration in self.organizationintegration_set.all():
7474
organization_integration.delete()
@@ -107,7 +107,7 @@ def add_organization(self, organization: RpcOrganization, user=None, default_aut
107107
from sentry.models import OrganizationIntegration
108108

109109
try:
110-
with transaction.atomic(router.db_for_write(OrganizationIntegration)):
110+
with transaction.atomic(using=router.db_for_write(OrganizationIntegration)):
111111
org_integration, created = OrganizationIntegration.objects.get_or_create(
112112
organization_id=organization.id,
113113
integration_id=self.id,
@@ -134,3 +134,11 @@ def add_organization(self, organization: RpcOrganization, user=None, default_aut
134134
},
135135
)
136136
return False
137+
138+
def disable(self):
139+
"""
140+
Disable this integration
141+
"""
142+
143+
self.update(status=ObjectStatus.DISABLED)
144+
self.save()

0 commit comments

Comments
 (0)