From 8979f4db00e145f3059d0b3aa026b44478dc18a9 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 1 Aug 2023 16:18:44 -0700 Subject: [PATCH 1/9] changing redis key to a daily key using redis hashes --- src/sentry/integrations/request_buffer.py | 84 +++++++------------ .../sentry/integrations/slack/test_disable.py | 2 +- 2 files changed, 32 insertions(+), 54 deletions(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index 4b79bfab8b52c3..1389acd570d0cb 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -1,7 +1,7 @@ -from datetime import datetime +from datetime import datetime, timedelta from django.conf import settings -from redis.exceptions import WatchError +from freezegun import freeze_time from sentry.utils import json, redis @@ -27,7 +27,7 @@ def _convert_obj_to_dict(self, redis_object): """ Convert the request string stored in Redis to a python dict """ - + redis_object = redis_object.replace("'", '"') return json.loads(redis_object) def _get_all_from_buffer(self, buffer_key): @@ -35,21 +35,38 @@ def _get_all_from_buffer(self, buffer_key): Get the list at the buffer key. """ - return self.client.lrange(buffer_key, 0, BUFFER_SIZE - 1) + ret = [] + now = datetime.now() + for i in reversed(range(BUFFER_SIZE)): + with freeze_time(now - timedelta(days=i)): + cur = datetime.now().strftime("%Y-%m-%d") + buffer_key = self.integrationkey + cur + ret.append(self.client.hgetall(buffer_key)) + + return ret def _get_broken_range_from_buffer(self, buffer_key): """ Get the list at the buffer key in the broken range. """ - return self.client.lrange(buffer_key, 0, IS_BROKEN_RANGE - 1) + # return self.client.lrange(buffer_key, 0, IS_BROKEN_RANGE - 1) + ret = [] + now = datetime.now() + for i in reversed(range(IS_BROKEN_RANGE)): + with freeze_time(now - timedelta(days=i)): + cur = datetime.now().strftime("%Y-%m-%d") + buffer_key = self.integrationkey + cur + ret.append(self.client.hgetall(buffer_key)) + + return ret def _get(self): """ Returns the list of daily aggregate error counts. """ return [ - self._convert_obj_to_dict(obj) + self._convert_obj_to_dict(str(obj)) for obj in self._get_broken_range_from_buffer(self.integrationkey) ] @@ -60,21 +77,15 @@ def is_integration_broken(self): """ items = self._get() - data = [ - datetime.strptime(item.get("date"), "%Y-%m-%d").date() - for item in items - if item.get("fatal_count", 0) > 0 and item.get("date") - ] + data = [item for item in items if int(item.get("fatal_count", 0)) > 0] if len(data) > 0: return True data = [ - datetime.strptime(item.get("date"), "%Y-%m-%d").date() + item for item in items - if item.get("error_count", 0) > 0 - and item.get("success_count", 0) == 0 - and item.get("date") + if int(item.get("error_count", 0)) > 0 and int(item.get("success_count", 0)) == 0 ] if not len(data): @@ -90,47 +101,14 @@ def add(self, count: str): if count not in VALID_KEYS: raise Exception("Requires a valid key param.") - other_count1, other_count2 = list(set(VALID_KEYS).difference([count]))[0:2] now = datetime.now().strftime("%Y-%m-%d") - buffer_key = self.integrationkey + buffer_key = self.integrationkey + now pipe = self.client.pipeline() - - while True: - try: - pipe.watch(buffer_key) - recent_item_array = pipe.lrange(buffer_key, 0, 1) # get first element from array - pipe.multi() - if len(recent_item_array): - recent_item = self._convert_obj_to_dict(recent_item_array[0]) - if recent_item.get("date") == now: - recent_item[f"{count}_count"] += 1 - pipe.lset(buffer_key, 0, json.dumps(recent_item)) - else: - data = { - "date": now, - f"{count}_count": 1, - f"{other_count1}_count": 0, - f"{other_count2}_count": 0, - } - pipe.lpush(buffer_key, json.dumps(data)) - - else: - data = { - "date": now, - f"{count}_count": 1, - f"{other_count1}_count": 0, - f"{other_count2}_count": 0, - } - pipe.lpush(buffer_key, json.dumps(data)) - pipe.ltrim(buffer_key, 0, BUFFER_SIZE - 1) - pipe.expire(buffer_key, KEY_EXPIRY) - pipe.execute() - break - except WatchError: - continue - finally: - pipe.reset() + pipe.hincrby(buffer_key, count + "_count", 1) + pipe.expire(buffer_key, KEY_EXPIRY) + pipe.execute() + pipe.reset() def record_error(self): self.add("error") diff --git a/tests/sentry/integrations/slack/test_disable.py b/tests/sentry/integrations/slack/test_disable.py index a1a74876f7bd32..4ee4440c4cc78d 100644 --- a/tests/sentry/integrations/slack/test_disable.py +++ b/tests/sentry/integrations/slack/test_disable.py @@ -128,7 +128,7 @@ def test_error_integration(self): with pytest.raises(ApiError): client.post("/chat.postMessage", data=self.payload) buffer = IntegrationRequestBuffer(client._get_redis_key()) - assert (buffer._get()[0]["error_count"]) == 2 + assert int(buffer._get()[-1]["error_count"]) == 2 assert buffer.is_integration_broken() is False @responses.activate From dab29a420e4d3b5103a49846fc15f50c87d16c23 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 1 Aug 2023 16:23:38 -0700 Subject: [PATCH 2/9] removing comments --- src/sentry/integrations/request_buffer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index 1389acd570d0cb..5b1130fe34b9c5 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -50,7 +50,6 @@ def _get_broken_range_from_buffer(self, buffer_key): Get the list at the buffer key in the broken range. """ - # return self.client.lrange(buffer_key, 0, IS_BROKEN_RANGE - 1) ret = [] now = datetime.now() for i in reversed(range(IS_BROKEN_RANGE)): From 6d77e09989f63167e4c57e706df338659e710265 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 1 Aug 2023 17:08:36 -0700 Subject: [PATCH 3/9] fixing time and key str formating --- src/sentry/integrations/request_buffer.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index 5b1130fe34b9c5..0a1dc76bec2d7d 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -53,10 +53,9 @@ def _get_broken_range_from_buffer(self, buffer_key): ret = [] now = datetime.now() for i in reversed(range(IS_BROKEN_RANGE)): - with freeze_time(now - timedelta(days=i)): - cur = datetime.now().strftime("%Y-%m-%d") - buffer_key = self.integrationkey + cur - ret.append(self.client.hgetall(buffer_key)) + cur = (now - timedelta(days=i)).strftime("%Y-%m-%d") + buffer_key = f"{self.integrationkey}:{cur}" + ret.append(self.client.hgetall(buffer_key)) return ret @@ -102,7 +101,7 @@ def add(self, count: str): now = datetime.now().strftime("%Y-%m-%d") - buffer_key = self.integrationkey + now + buffer_key = f"{self.integrationkey}:{now}" pipe = self.client.pipeline() pipe.hincrby(buffer_key, count + "_count", 1) pipe.expire(buffer_key, KEY_EXPIRY) From 1810f508a660b77c17ba5a0c5ebb249333d0b605 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 1 Aug 2023 17:09:50 -0700 Subject: [PATCH 4/9] set key expiry once --- src/sentry/integrations/request_buffer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index 0a1dc76bec2d7d..61ab41639375d1 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -104,7 +104,7 @@ def add(self, count: str): buffer_key = f"{self.integrationkey}:{now}" pipe = self.client.pipeline() pipe.hincrby(buffer_key, count + "_count", 1) - pipe.expire(buffer_key, KEY_EXPIRY) + pipe.expire(buffer_key, KEY_EXPIRY, nx=True) pipe.execute() pipe.reset() From 5d1cc0ec3d73d098a6c471b878544c3924f80222 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 1 Aug 2023 17:15:41 -0700 Subject: [PATCH 5/9] set key expiry once --- src/sentry/integrations/request_buffer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index 61ab41639375d1..03a039208bb01c 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -73,9 +73,9 @@ def is_integration_broken(self): Integration is broken if we have 7 consecutive days of errors and no successes OR have a fatal error """ - items = self._get() + items = self._get_broken_range_from_buffer() - data = [item for item in items if int(item.get("fatal_count", 0)) > 0] + data = [item for item in items if int(item.hget("fatal_count", 0)) > 0] if len(data) > 0: return True @@ -83,7 +83,7 @@ def is_integration_broken(self): data = [ item for item in items - if int(item.get("error_count", 0)) > 0 and int(item.get("success_count", 0)) == 0 + if int(item.hget("error_count", 0)) > 0 and int(item.hget("success_count", 0)) == 0 ] if not len(data): From 0d84fd09326e7de5cca9421038c5f65a2557eed9 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 1 Aug 2023 17:44:32 -0700 Subject: [PATCH 6/9] new get_all --- src/sentry/integrations/request_buffer.py | 35 +++++++------------ .../sentry/integrations/slack/test_disable.py | 4 +-- 2 files changed, 15 insertions(+), 24 deletions(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index 03a039208bb01c..f3505ca72ac909 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta from django.conf import settings -from freezegun import freeze_time from sentry.utils import json, redis @@ -27,47 +26,39 @@ def _convert_obj_to_dict(self, redis_object): """ Convert the request string stored in Redis to a python dict """ - redis_object = redis_object.replace("'", '"') return json.loads(redis_object) - def _get_all_from_buffer(self, buffer_key): + def _get_all_from_buffer(self): """ Get the list at the buffer key. """ ret = [] now = datetime.now() - for i in reversed(range(BUFFER_SIZE)): - with freeze_time(now - timedelta(days=i)): - cur = datetime.now().strftime("%Y-%m-%d") - buffer_key = self.integrationkey + cur - ret.append(self.client.hgetall(buffer_key)) + i = 0 + buffer_key = f"{self.integrationkey}:{now.strftime('%Y-%m-%d')}" + while self.client.hgetall(buffer_key): + cur = (now - timedelta(days=i)).strftime("%Y-%m-%d") + buffer_key = f"{self.integrationkey}:{cur}" + ret.append(self.client.hgetall(buffer_key)) + i += 1 return ret - def _get_broken_range_from_buffer(self, buffer_key): + def _get_broken_range_from_buffer(self): """ Get the list at the buffer key in the broken range. """ ret = [] now = datetime.now() - for i in reversed(range(IS_BROKEN_RANGE)): + for i in range(IS_BROKEN_RANGE): cur = (now - timedelta(days=i)).strftime("%Y-%m-%d") buffer_key = f"{self.integrationkey}:{cur}" ret.append(self.client.hgetall(buffer_key)) return ret - def _get(self): - """ - Returns the list of daily aggregate error counts. - """ - return [ - self._convert_obj_to_dict(str(obj)) - for obj in self._get_broken_range_from_buffer(self.integrationkey) - ] - def is_integration_broken(self): """ Integration is broken if we have 7 consecutive days of errors and no successes OR have a fatal error @@ -75,7 +66,7 @@ def is_integration_broken(self): """ items = self._get_broken_range_from_buffer() - data = [item for item in items if int(item.hget("fatal_count", 0)) > 0] + data = [item for item in items if int(item.get("fatal_count", 0)) > 0] if len(data) > 0: return True @@ -83,7 +74,7 @@ def is_integration_broken(self): data = [ item for item in items - if int(item.hget("error_count", 0)) > 0 and int(item.hget("success_count", 0)) == 0 + if int(item.get("error_count", 0)) > 0 and int(item.get("success_count", 0)) == 0 ] if not len(data): @@ -104,7 +95,7 @@ def add(self, count: str): buffer_key = f"{self.integrationkey}:{now}" pipe = self.client.pipeline() pipe.hincrby(buffer_key, count + "_count", 1) - pipe.expire(buffer_key, KEY_EXPIRY, nx=True) + pipe.expire(buffer_key, KEY_EXPIRY) pipe.execute() pipe.reset() diff --git a/tests/sentry/integrations/slack/test_disable.py b/tests/sentry/integrations/slack/test_disable.py index 4ee4440c4cc78d..a4ce5d2e8f83a2 100644 --- a/tests/sentry/integrations/slack/test_disable.py +++ b/tests/sentry/integrations/slack/test_disable.py @@ -128,7 +128,7 @@ def test_error_integration(self): with pytest.raises(ApiError): client.post("/chat.postMessage", data=self.payload) buffer = IntegrationRequestBuffer(client._get_redis_key()) - assert int(buffer._get()[-1]["error_count"]) == 2 + assert int(buffer._get_all_from_buffer()[0]["error_count"]) == 2 assert buffer.is_integration_broken() is False @responses.activate @@ -204,4 +204,4 @@ def test_expiry(self): buffer.record_error() with pytest.raises(ApiError): client.post("/chat.postMessage", data=self.payload) - assert len(buffer._get_all_from_buffer(buffer.integrationkey)) == 30 + assert len(buffer._get_all_from_buffer()) == 30 From ad8301b8b599b392505283060f0d98424df690ca Mon Sep 17 00:00:00 2001 From: Chloe Date: Wed, 2 Aug 2023 09:49:44 -0700 Subject: [PATCH 7/9] working on expiry --- src/sentry/integrations/request_buffer.py | 2 +- tests/sentry/integrations/slack/test_disable.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index f3505ca72ac909..cbd6b0c7249a09 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -38,9 +38,9 @@ def _get_all_from_buffer(self): i = 0 buffer_key = f"{self.integrationkey}:{now.strftime('%Y-%m-%d')}" while self.client.hgetall(buffer_key): + ret.append(self.client.hgetall(buffer_key)) cur = (now - timedelta(days=i)).strftime("%Y-%m-%d") buffer_key = f"{self.integrationkey}:{cur}" - ret.append(self.client.hgetall(buffer_key)) i += 1 return ret diff --git a/tests/sentry/integrations/slack/test_disable.py b/tests/sentry/integrations/slack/test_disable.py index a4ce5d2e8f83a2..4f0309010e398d 100644 --- a/tests/sentry/integrations/slack/test_disable.py +++ b/tests/sentry/integrations/slack/test_disable.py @@ -1,3 +1,4 @@ +import time from datetime import datetime, timedelta import pytest @@ -204,4 +205,5 @@ def test_expiry(self): buffer.record_error() with pytest.raises(ApiError): client.post("/chat.postMessage", data=self.payload) + time.sleep(1) assert len(buffer._get_all_from_buffer()) == 30 From a1d6e00f55512fed9082d68541d59e6073dc2685 Mon Sep 17 00:00:00 2001 From: Gilbert Szeto Date: Wed, 2 Aug 2023 11:16:07 -0700 Subject: [PATCH 8/9] potential improvements to the class (#54035) --- src/sentry/integrations/request_buffer.py | 120 +++++++++++----------- 1 file changed, 62 insertions(+), 58 deletions(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index cbd6b0c7249a09..490bb53bf56cd3 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -2,12 +2,14 @@ from django.conf import settings -from sentry.utils import json, redis +from sentry.utils import redis BUFFER_SIZE = 30 # 30 days KEY_EXPIRY = 60 * 60 * 24 * 30 # 30 days -IS_BROKEN_RANGE = 7 # 7 days +BROKEN_RANGE_DAYS = 7 # 7 days + +VALID_KEYS = ["success", "error", "fatal"] class IntegrationRequestBuffer: @@ -17,93 +19,95 @@ class IntegrationRequestBuffer: """ def __init__(self, key): - self.integrationkey = key + self.integration_key = key cluster_id = settings.SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER self.client = redis.redis_clusters.get(cluster_id) - def _convert_obj_to_dict(self, redis_object): - """ - Convert the request string stored in Redis to a python dict - """ - return json.loads(redis_object) - - def _get_all_from_buffer(self): - """ - Get the list at the buffer key. - """ - - ret = [] - now = datetime.now() - i = 0 - buffer_key = f"{self.integrationkey}:{now.strftime('%Y-%m-%d')}" - while self.client.hgetall(buffer_key): - ret.append(self.client.hgetall(buffer_key)) - cur = (now - timedelta(days=i)).strftime("%Y-%m-%d") - buffer_key = f"{self.integrationkey}:{cur}" - i += 1 - - return ret - - def _get_broken_range_from_buffer(self): - """ - Get the list at the buffer key in the broken range. - """ + def record_error(self): + self._add("error") - ret = [] - now = datetime.now() - for i in range(IS_BROKEN_RANGE): - cur = (now - timedelta(days=i)).strftime("%Y-%m-%d") - buffer_key = f"{self.integrationkey}:{cur}" - ret.append(self.client.hgetall(buffer_key)) + def record_success(self): + self._add("success") - return ret + def record_fatal(self): + self._add("fatal") def is_integration_broken(self): """ Integration is broken if we have 7 consecutive days of errors and no successes OR have a fatal error """ - items = self._get_broken_range_from_buffer() + broken_range_days_counts = self._get_broken_range_from_buffer() - data = [item for item in items if int(item.get("fatal_count", 0)) > 0] + days_fatal = [] + days_error = [] - if len(data) > 0: - return True + for day_count in broken_range_days_counts: + if int(day_count.get("fatal_count", 0)) > 0: + days_fatal.append(day_count) + elif ( + int(day_count.get("error_count", 0)) > 0 + and int(day_count.get("success_count", 0)) == 0 + ): + days_error.append(day_count) - data = [ - item - for item in items - if int(item.get("error_count", 0)) > 0 and int(item.get("success_count", 0)) == 0 - ] + if len(days_fatal) > 0: + return True - if not len(data): + if not len(days_error): return False - if len(data) < IS_BROKEN_RANGE: + if len(days_error) < BROKEN_RANGE_DAYS: return False return True - def add(self, count: str): - VALID_KEYS = ["success", "error", "fatal"] + def _add(self, count: str): if count not in VALID_KEYS: raise Exception("Requires a valid key param.") now = datetime.now().strftime("%Y-%m-%d") + buffer_key = f"{self.integration_key}:{now}" - buffer_key = f"{self.integrationkey}:{now}" pipe = self.client.pipeline() pipe.hincrby(buffer_key, count + "_count", 1) pipe.expire(buffer_key, KEY_EXPIRY) pipe.execute() - pipe.reset() - def record_error(self): - self.add("error") + def _get_all_from_buffer(self): + """ + Get the list at the buffer key. + """ - def record_success(self): - self.add("success") + now = datetime.now() + all_range = [ + f"{self.integration_key}:{(now - timedelta(days=i)).strftime('%Y-%m-%d')}" + for i in range(BUFFER_SIZE) + ] - def record_fatal(self): - self.add("fatal") + return self._get_range_buffers(all_range) + + def _get_broken_range_from_buffer(self): + """ + Get the list at the buffer key in the broken range. + """ + + now = datetime.now() + broken_range_keys = [ + f"{self.integration_key}:{(now - timedelta(days=i)).strftime('%Y-%m-%d')}" + for i in range(BROKEN_RANGE_DAYS) + ] + + return self._get_range_buffers(broken_range_keys) + + def _get_range_buffers(self, keys): + pipe = self.client.pipeline() + ret = [] + for key in keys: + pipe.hgetall(key) + response = pipe.execute() + for item in response: + ret.append(item) + + return ret From 98ee50ff776a6088a668bc899cbbea6fa95649a7 Mon Sep 17 00:00:00 2001 From: Gilbert Szeto Date: Wed, 2 Aug 2023 11:33:26 -0700 Subject: [PATCH 9/9] optional expiration ttl on constructor (#54044) fix test --- src/sentry/integrations/request_buffer.py | 8 ++++---- .../sentry/integrations/slack/test_disable.py | 18 ++++++++++++++++-- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/sentry/integrations/request_buffer.py b/src/sentry/integrations/request_buffer.py index 490bb53bf56cd3..bf337de0dc3d58 100644 --- a/src/sentry/integrations/request_buffer.py +++ b/src/sentry/integrations/request_buffer.py @@ -18,11 +18,11 @@ class IntegrationRequestBuffer: This should store the aggregate counts of each type for last 30 days for each integration """ - def __init__(self, key): - self.integration_key = key - + def __init__(self, key, expiration_seconds=KEY_EXPIRY): cluster_id = settings.SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER self.client = redis.redis_clusters.get(cluster_id) + self.integration_key = key + self.key_expiration_seconds = expiration_seconds def record_error(self): self._add("error") @@ -72,7 +72,7 @@ def _add(self, count: str): pipe = self.client.pipeline() pipe.hincrby(buffer_key, count + "_count", 1) - pipe.expire(buffer_key, KEY_EXPIRY) + pipe.expire(buffer_key, self.key_expiration_seconds) pipe.execute() def _get_all_from_buffer(self): diff --git a/tests/sentry/integrations/slack/test_disable.py b/tests/sentry/integrations/slack/test_disable.py index 4f0309010e398d..3b31999afcaf00 100644 --- a/tests/sentry/integrations/slack/test_disable.py +++ b/tests/sentry/integrations/slack/test_disable.py @@ -200,10 +200,24 @@ def test_expiry(self): client = SlackClient(integration_id=self.integration.id) buffer = IntegrationRequestBuffer(client._get_redis_key()) now = datetime.now() - timedelta(hours=1) - for i in reversed(range(32)): + for i in reversed(range(30)): with freeze_time(now - timedelta(days=i)): buffer.record_error() + + buffer_expired = IntegrationRequestBuffer(client._get_redis_key(), 1) + with freeze_time(now - timedelta(days=30)): + buffer_expired.record_error() + with freeze_time(now - timedelta(days=31)): + buffer_expired.record_error() + with pytest.raises(ApiError): client.post("/chat.postMessage", data=self.payload) time.sleep(1) - assert len(buffer._get_all_from_buffer()) == 30 + resp = buffer._get_range_buffers( + [ + f"{client._get_redis_key()}:{(now - timedelta(days=i)).strftime('%Y-%m-%d')}" + for i in range(32) + ] + ) + assert len(resp) == 32 + assert len([item for item in resp if item]) == 30