Skip to content

fix redis connection leaks + exclusions error: (fixes #1065) #1066

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from datetime import timedelta
from typing import Optional, List, Union
import urllib.parse
import contextlib

from pydantic import UUID4
from fastapi import HTTPException, Depends
from redis import asyncio as aioredis, exceptions
from redis import exceptions

from .models import (
CrawlFile,
Expand Down Expand Up @@ -216,8 +217,8 @@ async def _resolve_crawl_refs(
# more responsive, saves db update in operator
if crawl.state in RUNNING_STATES:
try:
redis = await self.get_redis(crawl.id)
crawl.stats = await get_redis_crawl_stats(redis, crawl.id)
async with self.get_redis(crawl.id) as redis:
crawl.stats = await get_redis_crawl_stats(redis, crawl.id)
# redis not available, ignore
except exceptions.ConnectionError:
pass
Expand Down Expand Up @@ -281,13 +282,17 @@ async def _update_presigned(self, updates):
for update in updates:
await self.crawls.find_one_and_update(*update)

@contextlib.asynccontextmanager
async def get_redis(self, crawl_id):
"""get redis url for crawl id"""
redis_url = self.crawl_manager.get_redis_url(crawl_id)

return await aioredis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)
redis = await self.crawl_manager.get_redis_client(redis_url)

try:
yield redis
finally:
await redis.close()

async def add_to_collection(
self, crawl_ids: List[uuid.UUID], collection_id: uuid.UUID, org: Organization
Expand Down
161 changes: 85 additions & 76 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,106 +363,115 @@ async def get_crawl_queue(self, crawl_id, offset, count, regex):

total = 0
results = []
redis = None

try:
redis = await self.get_redis(crawl_id)
async with self.get_redis(crawl_id) as redis:
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
results = await self._crawl_queue_range(
redis, f"{crawl_id}:q", offset, count
)
results = [json.loads(result)["url"] for result in results]

total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
results = await self._crawl_queue_range(
redis, f"{crawl_id}:q", offset, count
)
results = [json.loads(result)["url"] for result in results]
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass

matched = []
if regex:
regex = re.compile(regex)
try:
regex = re.compile(regex)
except re.error as exc:
raise HTTPException(status_code=400, detail="invalid_regex") from exc

matched = [result for result in results if regex.search(result)]

return {"total": total, "results": results, "matched": matched}

async def match_crawl_queue(self, crawl_id, regex):
"""get list of urls that match regex"""
total = 0
redis = None

try:
redis = await self.get_redis(crawl_id)
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass

regex = re.compile(regex)
matched = []
step = 50

for count in range(0, total, step):
results = await self._crawl_queue_range(redis, f"{crawl_id}:q", count, step)
for result in results:
url = json.loads(result)["url"]
if regex.search(url):
matched.append(url)
async with self.get_redis(crawl_id) as redis:
try:
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass

try:
regex = re.compile(regex)
except re.error as exc:
raise HTTPException(status_code=400, detail="invalid_regex") from exc

for count in range(0, total, step):
results = await self._crawl_queue_range(
redis, f"{crawl_id}:q", count, step
)
for result in results:
url = json.loads(result)["url"]
if regex.search(url):
matched.append(url)

return {"total": total, "matched": matched}

async def filter_crawl_queue(self, crawl_id, regex):
"""filter out urls that match regex"""
# pylint: disable=too-many-locals
total = 0
redis = None

q_key = f"{crawl_id}:q"
s_key = f"{crawl_id}:s"

try:
redis = await self.get_redis(crawl_id)
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass

dircount = -1
regex = re.compile(regex)
step = 50

count = 0
num_removed = 0

# pylint: disable=fixme
# todo: do this in a more efficient way?
# currently quite inefficient as redis does not have a way
# to atomically check and remove value from list
# so removing each jsob block by value
while count < total:
if dircount == -1 and count > total / 2:
dircount = 1
results = await self._crawl_queue_range(redis, q_key, count, step)
count += step

qrems = []
srems = []

for result in results:
url = json.loads(result)["url"]
if regex.search(url):
srems.append(url)
# await redis.srem(s_key, url)
# res = await self._crawl_queue_rem(redis, q_key, result, dircount)
qrems.append(result)

if not srems:
continue

await redis.srem(s_key, *srems)
res = await self._crawl_queue_rem(redis, q_key, qrems, dircount)
if res:
count -= res
num_removed += res
print(f"Removed {res} from queue", flush=True)
async with self.get_redis(crawl_id) as redis:
try:
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass

dircount = -1

try:
regex = re.compile(regex)
except re.error as exc:
raise HTTPException(status_code=400, detail="invalid_regex") from exc

count = 0

# pylint: disable=fixme
# todo: do this in a more efficient way?
# currently quite inefficient as redis does not have a way
# to atomically check and remove value from list
# so removing each jsob block by value
while count < total:
if dircount == -1 and count > total / 2:
dircount = 1
results = await self._crawl_queue_range(redis, q_key, count, step)
count += step

qrems = []
srems = []

for result in results:
url = json.loads(result)["url"]
if regex.search(url):
srems.append(url)
# await redis.srem(s_key, url)
# res = await self._crawl_queue_rem(redis, q_key, result, dircount)
qrems.append(result)

if not srems:
continue

await redis.srem(s_key, *srems)
res = await self._crawl_queue_rem(redis, q_key, qrems, dircount)
if res:
count -= res
num_removed += res
print(f"Removed {res} from queue", flush=True)

return num_removed

Expand All @@ -475,13 +484,13 @@ async def get_errors_from_redis(
skip = page * page_size
upper_bound = skip + page_size - 1

try:
redis = await self.get_redis(crawl_id)
errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
total = await redis.llen(f"{crawl_id}:e")
except exceptions.ConnectionError:
# pylint: disable=raise-missing-from
raise HTTPException(status_code=503, detail="redis_connection_error")
async with self.get_redis(crawl_id) as redis:
try:
errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
total = await redis.llen(f"{crawl_id}:e")
except exceptions.ConnectionError:
# pylint: disable=raise-missing-from
raise HTTPException(status_code=503, detail="redis_connection_error")

parsed_errors = parse_jsonl_error_messages(errors)
return parsed_errors, total
Expand Down
14 changes: 14 additions & 0 deletions backend/btrixcloud/k8sapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from kubernetes_asyncio.utils import create_from_dict
from kubernetes_asyncio.client.exceptions import ApiException

from redis.asyncio import Redis
from redis.asyncio.connection import ConnectionPool

from fastapi.templating import Jinja2Templates
from .utils import get_templates_dir, dt_now, to_k8s_date

Expand Down Expand Up @@ -62,6 +65,17 @@ def get_redis_url(self, crawl_id):
)
return redis_url

async def get_redis_client(self, redis_url):
"""return redis client with correct params for one-time use"""
# manual settings until redis 5.0.0 is released
pool = ConnectionPool.from_url(redis_url, decode_responses=True)
redis = Redis(
connection_pool=pool,
decode_responses=True,
)
redis.auto_close_connection_pool = True
return redis

# pylint: disable=too-many-arguments
async def new_crawl_job(
self, cid, userid, oid, scale=1, crawl_timeout=0, manual=True
Expand Down
34 changes: 21 additions & 13 deletions backend/btrixcloud/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import humanize

from pydantic import BaseModel
from redis import asyncio as aioredis

from .utils import (
from_k8s_date,
Expand Down Expand Up @@ -430,6 +429,7 @@ async def delete_pvc(self, crawl_id):
async def cancel_crawl(self, redis_url, crawl_id, cid, status, state):
"""immediately cancel crawl with specified state
return true if db mark_finished update succeeds"""
redis = None
try:
redis = await self._get_redis(redis_url)
await self.mark_finished(redis, crawl_id, uuid.UUID(cid), status, state)
Expand All @@ -438,6 +438,10 @@ async def cancel_crawl(self, redis_url, crawl_id, cid, status, state):
except:
return False

finally:
if redis:
await redis.close()

def _done_response(self, status, finalized=False):
"""done response for removing crawl"""
return {
Expand All @@ -462,15 +466,16 @@ async def _get_redis(self, redis_url):
"""init redis, ensure connectivity"""
redis = None
try:
redis = await aioredis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)
redis = await self.get_redis_client(redis_url)
# test connection
await redis.ping()
return redis

# pylint: disable=bare-except
except:
if redis:
await redis.close()

return None

async def check_if_finished(self, crawl, status):
Expand Down Expand Up @@ -512,16 +517,16 @@ async def sync_crawl_state(self, redis_url, crawl, status, pods):
status.resync_after = self.fast_retry_secs
return status

# set state to running (if not already)
if status.state not in RUNNING_STATES:
await self.set_state(
"running",
status,
crawl.id,
allowed_from=["starting", "waiting_capacity"],
)

try:
# set state to running (if not already)
if status.state not in RUNNING_STATES:
await self.set_state(
"running",
status,
crawl.id,
allowed_from=["starting", "waiting_capacity"],
)

file_done = await redis.lpop(self.done_key)

while file_done:
Expand All @@ -547,6 +552,9 @@ async def sync_crawl_state(self, redis_url, crawl, status, pods):
print(f"Crawl get failed: {exc}, will try again")
return status

finally:
await redis.close()

def check_if_pods_running(self, pods):
"""check if at least one crawler pod has started"""
try:
Expand Down
2 changes: 1 addition & 1 deletion backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ loguru
aiofiles
kubernetes-asyncio==22.6.5
aiobotocore
redis>=4.2.0rc1
redis>=5.0.0rc2
pyyaml
jinja2
humanize
Expand Down