Skip to content

🐛 do not allow moving folder to a child folder #6370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
from servicelib.aiohttp.application_keys import APP_FIRE_AND_FORGET_TASKS_KEY
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from servicelib.utils import fire_and_forget_task
from simcore_service_webserver.workspaces._workspaces_api import (
check_user_workspace_access,
)

from ..folders.errors import FolderValueNotPermittedError
from ..projects.projects_api import submit_delete_project_task
from ..users.api import get_user
from ..workspaces._workspaces_api import check_user_workspace_access
from ..workspaces.errors import (
WorkspaceAccessForbiddenError,
WorkspaceFolderInconsistencyError,
Expand Down Expand Up @@ -238,6 +237,24 @@ async def update_folder(
workspace_id=folder_db.workspace_id,
)

if folder_db.parent_folder_id != parent_folder_id and parent_folder_id is not None:
# Check user has access to the parent folder
await folders_db.get_for_user_or_workspace(
app,
folder_id=parent_folder_id,
product_name=product_name,
user_id=user_id if workspace_is_private else None,
workspace_id=folder_db.workspace_id,
)
# Do not allow to move to a child folder id
_child_folders = await folders_db.get_folders_recursively(
app, folder_id=folder_id, product_name=product_name
)
if parent_folder_id in _child_folders:
raise FolderValueNotPermittedError(
reason="Parent folder id should not be one of children"
)

folder_db = await folders_db.update(
app,
folder_id=folder_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,40 @@ async def get_projects_recursively_only_if_user_is_owner(
rows = await result.fetchall() or []
results = [ProjectID(row[0]) for row in rows]
return results


async def get_folders_recursively(
app: web.Application,
*,
folder_id: FolderID,
product_name: ProductName,
) -> list[FolderID]:
async with get_database_engine(app).acquire() as conn, conn.begin():
# Step 1: Define the base case for the recursive CTE
base_query = select(
folders_v2.c.folder_id, folders_v2.c.parent_folder_id
).where(
(folders_v2.c.folder_id == folder_id) # <-- specified folder id
& (folders_v2.c.product_name == product_name)
)
folder_hierarchy_cte = base_query.cte(name="folder_hierarchy", recursive=True)
# Step 2: Define the recursive case
folder_alias = aliased(folders_v2)
recursive_query = select(
folder_alias.c.folder_id, folder_alias.c.parent_folder_id
).select_from(
folder_alias.join(
folder_hierarchy_cte,
folder_alias.c.parent_folder_id == folder_hierarchy_cte.c.folder_id,
)
)
# Step 3: Combine base and recursive cases into a CTE
folder_hierarchy_cte = folder_hierarchy_cte.union_all(recursive_query)
# Step 4: Execute the query to get all descendants
final_query = select(folder_hierarchy_cte)
result = await conn.execute(final_query)
rows = ( # list of tuples [(folder_id, parent_folder_id), ...] ex. [(1, None), (2, 1)]
await result.fetchall() or []
)

return [FolderID(row[0]) for row in rows]
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from servicelib.mimetype_constants import MIMETYPE_APPLICATION_JSON
from servicelib.request_keys import RQT_USERID_KEY
from servicelib.rest_constants import RESPONSE_MODEL_POLICY
from simcore_postgres_database.utils_folders import FoldersError

from .._constants import RQ_PRODUCT_KEY
from .._meta import API_VTAG as VTAG
Expand All @@ -41,7 +40,12 @@
WorkspaceNotFoundError,
)
from . import _folders_api
from .errors import FolderAccessForbiddenError, FolderNotFoundError
from .errors import (
FolderAccessForbiddenError,
FolderNotFoundError,
FoldersValueError,
FolderValueNotPermittedError,
)

_logger = logging.getLogger(__name__)

Expand All @@ -62,7 +66,7 @@ async def wrapper(request: web.Request) -> web.StreamResponse:
) as exc:
raise web.HTTPForbidden(reason=f"{exc}") from exc

except FoldersError as exc:
except (FolderValueNotPermittedError, FoldersValueError) as exc:
raise web.HTTPBadRequest(reason=f"{exc}") from exc

return wrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ class FoldersValueError(WebServerBaseError, ValueError):
...


class FolderValueNotPermittedError(FoldersValueError):
msg_template = "Provided value is not permitted. {reason}"


class FolderNotFoundError(FoldersValueError):
msg_template = "Folder not found. {reason}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,19 @@ async def test_sub_folders_full_workflow(
assert data[0]["name"] == "My sub sub folder"
assert data[0]["parentFolderId"] == subfolder_folder["folderId"]

# try to move sub folder to sub sub folder (should not be allowed to)
url = client.app.router["replace_folder"].url_for(
folder_id=f"{subfolder_folder['folderId']}",
)
resp = await client.put(
url.path,
json={
"name": "My Updated Folder",
"parentFolderId": f"{subsubfolder_folder['folderId']}",
},
)
await assert_status(resp, status.HTTP_400_BAD_REQUEST)

# move sub sub folder to root folder
url = client.app.router["replace_folder"].url_for(
folder_id=f"{subsubfolder_folder['folderId']}"
Expand Down
Loading