Skip to content

fix(integrations) removing redis watch and changing key to a daily redis hash key #54001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 2, 2023
83 changes: 30 additions & 53 deletions src/sentry/integrations/request_buffer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from datetime import datetime, timedelta

from django.conf import settings
from redis.exceptions import WatchError
from freezegun import freeze_time

from sentry.utils import json, redis

Expand All @@ -27,29 +27,45 @@ def _convert_obj_to_dict(self, redis_object):
"""
Convert the request string stored in Redis to a python dict
"""

redis_object = redis_object.replace("'", '"')
return json.loads(redis_object)

def _get_all_from_buffer(self, buffer_key):
"""
Get the list at the buffer key.
"""

return self.client.lrange(buffer_key, 0, BUFFER_SIZE - 1)
ret = []
now = datetime.now()
for i in reversed(range(BUFFER_SIZE)):
with freeze_time(now - timedelta(days=i)):
cur = datetime.now().strftime("%Y-%m-%d")
buffer_key = self.integrationkey + cur
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer_key = f"{self.integrationkey}:{cur}"?

ret.append(self.client.hgetall(buffer_key))

return ret

def _get_broken_range_from_buffer(self, buffer_key):
"""
Get the list at the buffer key in the broken range.
"""

return self.client.lrange(buffer_key, 0, IS_BROKEN_RANGE - 1)
ret = []
now = datetime.now()
for i in reversed(range(IS_BROKEN_RANGE)):
with freeze_time(now - timedelta(days=i)):
cur = datetime.now().strftime("%Y-%m-%d")
buffer_key = self.integrationkey + cur
ret.append(self.client.hgetall(buffer_key))

return ret

def _get(self):
"""
Returns the list of daily aggregate error counts.
"""
return [
self._convert_obj_to_dict(obj)
self._convert_obj_to_dict(str(obj))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we stringifying the object here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json.loads() expects a str and redis object uses single quotes instead of double which json expects so I fix the quotes using .replace()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if json.dumps() could help here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is hgetall not returning a python dict for you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hgetall does return a python dict so I removed this method :)

for obj in self._get_broken_range_from_buffer(self.integrationkey)
]

Expand All @@ -60,21 +76,15 @@ def is_integration_broken(self):
"""
items = self._get()

data = [
datetime.strptime(item.get("date"), "%Y-%m-%d").date()
for item in items
if item.get("fatal_count", 0) > 0 and item.get("date")
]
data = [item for item in items if int(item.get("fatal_count", 0)) > 0]

if len(data) > 0:
return True

data = [
datetime.strptime(item.get("date"), "%Y-%m-%d").date()
item
for item in items
if item.get("error_count", 0) > 0
and item.get("success_count", 0) == 0
and item.get("date")
if int(item.get("error_count", 0)) > 0 and int(item.get("success_count", 0)) == 0
]

if not len(data):
Expand All @@ -90,47 +100,14 @@ def add(self, count: str):
if count not in VALID_KEYS:
raise Exception("Requires a valid key param.")

other_count1, other_count2 = list(set(VALID_KEYS).difference([count]))[0:2]
now = datetime.now().strftime("%Y-%m-%d")

buffer_key = self.integrationkey
buffer_key = self.integrationkey + now
pipe = self.client.pipeline()

while True:
try:
pipe.watch(buffer_key)
recent_item_array = pipe.lrange(buffer_key, 0, 1) # get first element from array
pipe.multi()
if len(recent_item_array):
recent_item = self._convert_obj_to_dict(recent_item_array[0])
if recent_item.get("date") == now:
recent_item[f"{count}_count"] += 1
pipe.lset(buffer_key, 0, json.dumps(recent_item))
else:
data = {
"date": now,
f"{count}_count": 1,
f"{other_count1}_count": 0,
f"{other_count2}_count": 0,
}
pipe.lpush(buffer_key, json.dumps(data))

else:
data = {
"date": now,
f"{count}_count": 1,
f"{other_count1}_count": 0,
f"{other_count2}_count": 0,
}
pipe.lpush(buffer_key, json.dumps(data))
pipe.ltrim(buffer_key, 0, BUFFER_SIZE - 1)
pipe.expire(buffer_key, KEY_EXPIRY)
pipe.execute()
break
except WatchError:
continue
finally:
pipe.reset()
pipe.hincrby(buffer_key, count + "_count", 1)
pipe.expire(buffer_key, KEY_EXPIRY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can maybe include an NX optional param for the expire command https://redis.io/commands/expire/ and only set the expire once. In practical terms, it probably won't matter since you're only refreshing the ttl for this day and then when the its the next day, the previous day hash won't ever be modified again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like NX also isn't supported

pipe.execute()
pipe.reset()

def record_error(self):
self.add("error")
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/integrations/slack/test_disable.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_error_integration(self):
with pytest.raises(ApiError):
client.post("/chat.postMessage", data=self.payload)
buffer = IntegrationRequestBuffer(client._get_redis_key())
assert (buffer._get()[0]["error_count"]) == 2
assert int(buffer._get()[-1]["error_count"]) == 2
assert buffer.is_integration_broken() is False

@responses.activate
Expand Down