Skip to content

Improving storage performance #1659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 113 additions & 83 deletions services/storage/src/simcore_service_storage/dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from collections import deque

import aiobotocore
import aiofiles
Expand All @@ -23,6 +24,7 @@
from servicelib.aiopg_utils import DBAPIError, PostgresRetryPolicyUponOperation
from servicelib.client_session import get_client_session

from .utils import expo
from .datcore_wrapper import DatcoreWrapper
from .models import (
DatasetMetaData,
Expand Down Expand Up @@ -198,7 +200,7 @@ async def list_files(

Can filter upon regular expression (for now only on key: value pairs of the FileMetaData)
"""
data = []
data = deque()
if location == SIMCORE_S3_STR:
async with self.engine.acquire() as conn:
query = sa.select([file_meta_data]).where(
Expand Down Expand Up @@ -234,97 +236,41 @@ async def list_files(
# there seems to be no project whatsoever for user_id
return []

# only keep files from non-deleted project --> This needs to be fixed
clean_data = []
# only keep files from non-deleted project
clean_data = deque()
for dx in data:
d = dx.fmd
if d.project_id in uuid_name_dict:
d.project_name = uuid_name_dict[d.project_id]
if d.node_id in uuid_name_dict:
d.node_name = uuid_name_dict[d.node_id]

d.raw_file_path = str(
Path(d.project_id) / Path(d.node_id) / Path(d.file_name)
if d.project_id not in uuid_name_dict:
continue

d.project_name = uuid_name_dict[d.project_id]
if d.node_id in uuid_name_dict:
d.node_name = uuid_name_dict[d.node_id]

d.raw_file_path = str(
Path(d.project_id) / Path(d.node_id) / Path(d.file_name)
)
d.display_file_path = d.raw_file_path
d.file_id = d.file_uuid
if d.node_name and d.project_name:
d.display_file_path = str(
Path(d.project_name)
/ Path(d.node_name)
/ Path(d.file_name)
)
d.display_file_path = d.raw_file_path
d.file_id = d.file_uuid
if d.node_name and d.project_name:
d.display_file_path = str(
Path(d.project_name)
/ Path(d.node_name)
/ Path(d.file_name)
)
async with self.engine.acquire() as conn:
query = (
file_meta_data.update()
.where(
and_(
file_meta_data.c.node_id == d.node_id,
file_meta_data.c.user_id == d.user_id,
)
)
.values(
project_name=d.project_name,
node_name=d.node_name,
raw_file_path=d.raw_file_path,
file_id=d.file_id,
display_file_path=d.display_file_path,
)
)
await conn.execute(query)
clean_data.append(dx)
# once the data was sync to postgres metadata table at this point
clean_data.append(dx)

data = clean_data

# same as above, make sure file is physically present on s3
clean_data = []
# FIXME: MaG: This is inefficient: Do this automatically when file is modified
session = aiobotocore.get_session()
async with session.create_client(
"s3",
endpoint_url=self.s3_client.endpoint_url,
aws_access_key_id=self.s3_client.access_key,
aws_secret_access_key=self.s3_client.secret_key,
) as client:
responses = await asyncio.gather(
*[
client.list_objects_v2(
Bucket=_d.bucket_name, Prefix=_d.object_name
)
for _d in [__d.fmd for __d in data]
]
)
for dx, resp in zip(data, responses):
if "Contents" in resp:
clean_data.append(dx)
d = dx.fmd
d.file_size = resp["Contents"][0]["Size"]
d.last_modified = str(resp["Contents"][0]["LastModified"])
async with self.engine.acquire() as conn:
query = (
file_meta_data.update()
.where(
and_(
file_meta_data.c.node_id == d.node_id,
file_meta_data.c.user_id == d.user_id,
)
)
.values(
file_size=d.file_size,
last_modified=d.last_modified,
)
)
await conn.execute(query)
data = clean_data

elif location == DATCORE_STR:
api_token, api_secret = self._get_datcore_tokens(user_id)
dcw = DatcoreWrapper(api_token, api_secret, self.loop, self.pool)
data = await dcw.list_files_raw()

if uuid_filter:
_query = re.compile(uuid_filter, re.IGNORECASE)
filtered_data = []
filtered_data = deque()
for dx in data:
d = dx.fmd
if _query.search(d.file_uuid):
Expand All @@ -334,7 +280,7 @@ async def list_files(

if regex:
_query = re.compile(regex, re.IGNORECASE)
filtered_data = []
filtered_data = deque()
for dx in data:
d = dx.fmd
_vars = vars(d)
Expand All @@ -344,7 +290,7 @@ async def list_files(
break
return filtered_data

return data
return list(data)

async def list_files_dataset(
self, user_id: str, location: str, dataset_id: str
Expand Down Expand Up @@ -468,6 +414,77 @@ async def upload_file_to_datcore(

# actually we have to query the master db

async def metadata_file_updater(
self,
file_uuid: str,
bucket_name: str,
object_name: str,
file_size: int,
last_modified: str,
max_update_retries: int = 50,
):
"""
Will retry max_update_retries to update the metadata on the file after an upload.
If it is not successfull it will exit and log an error.
Comment on lines +426 to +427
Copy link
Member

@sanderegg sanderegg Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use tenacity here? I think most of these features are available in the @Retry decorator.

stuff like this:

@retry(wait=wait_exponential(multiplier=1, min=4, max=10))
    def wait_exponential_1():
        print("Wait 2^x * 1 second between each retry starting with 4 seconds, then up to 10 seconds, then 10 seconds afterwards")
        raise Exception

Copy link
Contributor Author

@GitHK GitHK Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was too generic for my use case. I needed to sleep only for a subsection of the function

Edit: I have skipped tenacity on purpose here. Also the exponential backoff generator implementation comes from the backoff library.


Note: MinIO bucket notifications are not available with S3, that's why we have the
following hacky solution
"""
current_iteraction = 0

session = aiobotocore.get_session()
async with session.create_client(
"s3",
endpoint_url=self.s3_client.endpoint_url,
aws_access_key_id=self.s3_client.access_key,
aws_secret_access_key=self.s3_client.secret_key,
) as client:
current_iteraction += 1
continue_loop = True
sleep_generator = expo()
update_succeeded = False

while continue_loop:
result = await client.list_objects_v2(
Bucket=bucket_name, Prefix=object_name
)
sleep_amount = next(sleep_generator)
continue_loop = current_iteraction <= max_update_retries

if "Contents" not in result:
logger.info("File '%s' was not found in the bucket", object_name)
await asyncio.sleep(sleep_amount)
continue

new_file_size = result["Contents"][0]["Size"]
new_last_modified = str(result["Contents"][0]["LastModified"])
if file_size == new_file_size or last_modified == new_last_modified:
logger.info("File '%s' did not change yet", object_name)
await asyncio.sleep(sleep_amount)
continue

# finally update the data in the database and exit
continue_loop = False

logger.info(
"Obtained this from S3: new_file_size=%s new_last_modified=%s",
new_file_size,
new_last_modified,
)

async with self.engine.acquire() as conn:
query = (
file_meta_data.update()
.where(file_meta_data.c.file_uuid == file_uuid)
.values(
file_size=new_file_size, last_modified=new_last_modified
)
) # primary key search is faster
await conn.execute(query)
update_succeeded = True
if not update_succeeded:
logger.error("Could not update file metadata for '%s'", file_uuid)

async def upload_link(self, user_id: str, file_uuid: str):
@retry(**postgres_service_retry_policy_kwargs)
async def _execute_query():
Expand All @@ -484,11 +501,24 @@ async def _execute_query():
if exists is None:
ins = file_meta_data.insert().values(**vars(fmd))
await conn.execute(ins)
return fmd.file_size, fmd.last_modified

await _execute_query()
file_size, last_modified = await _execute_query()

bucket_name = self.simcore_bucket_name
object_name = file_uuid

# a parallel task is tarted which will update the metadata of the updated file
# once the update has finished.
asyncio.ensure_future(
self.metadata_file_updater(
file_uuid=file_uuid,
bucket_name=bucket_name,
object_name=object_name,
file_size=file_size,
last_modified=last_modified,
)
)
return self.s3_client.create_presigned_put_url(bucket_name, object_name)

async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str):
Expand Down Expand Up @@ -742,7 +772,7 @@ async def deep_copy_project_simcore_s3(
await conn.execute(ins)

async def delete_project_simcore_s3(
self, user_id: str, project_id: str, node_id: Optional[str]=None
self, user_id: str, project_id: str, node_id: Optional[str] = None
) -> web.Response:
""" Deletes all files from a given node in a project in simcore.s3 and updated db accordingly.
If node_id is not given, then all the project files db entries are deleted.
Expand Down
17 changes: 17 additions & 0 deletions services/storage/src/simcore_service_storage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,20 @@ async def assert_enpoint_is_ok(

def is_url(location):
return bool(URL(str(location)).host)


def expo(base=1.2, factor=0.1, max_value=2):
"""Generator for exponential decay.
Args:
base: The mathematical base of the exponentiation operation
factor: Factor to multiply the exponentation by.
max_value: The maximum value until it will yield
"""
n = 0
while True:
a = factor * base ** n
if max_value is None or a < max_value:
yield a
n += 1
else:
yield max_value