Skip to content

🎨 introduce include_children query parameter for activity monitor / project activity listings (πŸ—ƒοΈ) #7718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 22, 2025
Merged
10 changes: 6 additions & 4 deletions api/specs/web-server/_computations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from models_library.api_schemas_webserver.computations import (
ComputationGet,
ComputationPathParams,
ComputationRunIterationsLatestListQueryParams,
ComputationRunIterationsListQueryParams,
ComputationRunPathParams,
ComputationRunRestGet,
ComputationRunWithFiltersListQueryParams,
ComputationStart,
ComputationStarted,
ComputationTaskRestGet,
)
from models_library.generics import Envelope
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.director_v2._controller.computations_rest import (
ComputationRunListQueryParams,
ComputationTaskListQueryParams,
ComputationTaskPathParams,
)
Expand Down Expand Up @@ -71,7 +71,9 @@ async def stop_computation(_path: Annotated[ComputationPathParams, Depends()]):
response_model=Page[ComputationRunRestGet],
)
async def list_computations_latest_iteration(
_query: Annotated[as_query(ComputationRunWithFiltersListQueryParams), Depends()],
_query: Annotated[
as_query(ComputationRunIterationsLatestListQueryParams), Depends()
],
): ...


Expand All @@ -80,7 +82,7 @@ async def list_computations_latest_iteration(
response_model=Page[ComputationRunRestGet],
)
async def list_computation_iterations(
_query: Annotated[as_query(ComputationRunListQueryParams), Depends()],
_query: Annotated[as_query(ComputationRunIterationsListQueryParams), Depends()],
_path: Annotated[ComputationRunPathParams, Depends()],
): ...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,20 @@ class ComputationRunListQueryParams(
): ...


class ComputationRunWithFiltersListQueryParams(ComputationRunListQueryParams):
class ComputationRunIterationsLatestListQueryParams(ComputationRunListQueryParams):
filter_only_running: bool = Field(
default=False,
description="If true, only running computations are returned",
)


class ComputationRunIterationsListQueryParams(ComputationRunListQueryParams):
include_children: bool = Field(
default=False,
description="If true, all computational runs of the project and its children are returned (Currently supported only for root projects)",
)


class ComputationRunRestGet(OutputSchema):
project_uuid: ProjectID
iteration: int
Expand Down Expand Up @@ -128,7 +135,11 @@ class ComputationTaskPathParams(BaseModel):
class ComputationTaskListQueryParams(
PageQueryParameters,
ComputationTaskListOrderParams, # type: ignore[misc, valid-type]
): ...
):
include_children: bool = Field(
default=False,
description="If true, all tasks of the project and its children are returned (Currently supported only for root projects)",
)


class ComputationTaskRestGet(OutputSchema):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add index to projects_metadata

Revision ID: 4e7d8719855b
Revises: ba9c4816a31b
Create Date: 2025-05-21 11:48:34.062860+00:00

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "4e7d8719855b"
down_revision = "ba9c4816a31b"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(
"idx_projects_metadata_root_parent_project_uuid",
"projects_metadata",
["root_parent_project_uuid"],
unique=False,
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"idx_projects_metadata_root_parent_project_uuid", table_name="projects_metadata"
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
These tables were designed to be controled by projects-plugin in
the webserver's service
These tables were designed to be controled by projects-plugin in
the webserver's service
"""

import sqlalchemy as sa
Expand Down Expand Up @@ -100,6 +100,10 @@
ondelete=RefActions.SET_NULL,
name="fk_projects_metadata_root_parent_node_id",
),
#######
sa.Index(
"idx_projects_metadata_root_parent_project_uuid", "root_parent_project_uuid"
),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def list_computations_iterations_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -76,7 +76,7 @@ async def list_computations_iterations_page(
_RPC_METHOD_NAME_ADAPTER.validate_python("list_computations_iterations_page"),
product_name=product_name,
user_id=user_id,
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
Expand All @@ -92,7 +92,7 @@ async def list_computations_latest_iteration_tasks_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -106,7 +106,7 @@ async def list_computations_latest_iteration_tasks_page(
),
product_name=product_name,
user_id=user_id,
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class WalletTransactionError(OsparcErrorMixin, Exception):
msg_template = "{msg}"


class CreditTransactionNotFoundError(OsparcErrorMixin, Exception): ...
class CreditTransactionNotFoundError(OsparcErrorMixin, Exception):
msg_template = "Credit transaction for service run id {service_run_id} not found."


### Pricing Plans Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def list_computations_iterations_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -71,7 +71,7 @@ async def list_computations_iterations_page(
await comp_runs_repo.list_for_user_and_project_all_iterations(
product_name=product_name,
user_id=user_id,
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
Expand All @@ -84,12 +84,12 @@ async def list_computations_iterations_page(


async def _fetch_task_log(
user_id: UserID, project_id: ProjectID, task: ComputationTaskForRpcDBGet
user_id: UserID, task: ComputationTaskForRpcDBGet
) -> TaskLogFileGet | None:
if not task.state.is_running():
return await dask_utils.get_task_log_file(
user_id=user_id,
project_id=project_id,
project_id=task.project_uuid,
node_id=task.node_id,
)
return None
Expand All @@ -101,7 +101,7 @@ async def list_computations_latest_iteration_tasks_page(
*,
product_name: ProductName,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -114,20 +114,30 @@ async def list_computations_latest_iteration_tasks_page(
comp_tasks_repo = CompTasksRepository.instance(db_engine=app.state.engine)
comp_runs_repo = CompRunsRepository.instance(db_engine=app.state.engine)

comp_latest_run = await comp_runs_repo.get(
user_id=user_id, project_id=project_id, iteration=None # Returns last iteration
)

total, comp_tasks = await comp_tasks_repo.list_computational_tasks_rpc_domain(
project_id=project_id,
project_ids=project_ids,
offset=offset,
limit=limit,
order_by=order_by,
)

# Get unique set of all project_uuids from comp_tasks
unique_project_uuids = {task.project_uuid for task in comp_tasks}

# Fetch latest run for each project concurrently
latest_runs = await limited_gather(
*[
comp_runs_repo.get(user_id=user_id, project_id=project_uuid, iteration=None)
for project_uuid in unique_project_uuids
],
limit=20,
)
# Build a dict: project_uuid -> iteration
project_uuid_to_iteration = {run.project_uuid: run.iteration for run in latest_runs}

# Run all log fetches concurrently
log_files = await limited_gather(
*[_fetch_task_log(user_id, project_id, task) for task in comp_tasks],
*[_fetch_task_log(user_id, task) for task in comp_tasks],
limit=20,
)

Expand All @@ -142,7 +152,10 @@ async def list_computations_latest_iteration_tasks_page(
ended_at=task.ended_at,
log_download_link=log_file.download_link if log_file else None,
service_run_id=ServiceRunID.get_resource_tracking_run_id_for_computational(
user_id, project_id, task.node_id, comp_latest_run.iteration
user_id,
task.project_uuid,
task.node_id,
project_uuid_to_iteration[task.project_uuid],
),
)
for task, log_file in zip(comp_tasks, log_files, strict=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ async def list_for_user_and_project_all_iterations(
*,
product_name: str,
user_id: UserID,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int,
limit: int,
Expand All @@ -309,7 +309,11 @@ async def list_for_user_and_project_all_iterations(
*self._COMPUTATION_RUNS_RPC_GET_COLUMNS,
).where(
(comp_runs.c.user_id == user_id)
& (comp_runs.c.project_uuid == f"{project_id}")
& (
comp_runs.c.project_uuid.in_(
[f"{project_id}" for project_id in project_ids]
)
)
& (
comp_runs.c.metadata["product_name"].astext == product_name
) # <-- NOTE: We might create a separate column for this for fast retrieval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def list_computational_tasks(
async def list_computational_tasks_rpc_domain(
self,
*,
project_id: ProjectID,
project_ids: list[ProjectID],
# pagination
offset: int = 0,
limit: int = 20,
Expand All @@ -100,7 +100,11 @@ async def list_computational_tasks_rpc_domain(
)
.select_from(comp_tasks)
.where(
(comp_tasks.c.project_id == f"{project_id}")
(
comp_tasks.c.project_id.in_(
[f"{project_id}" for project_id in project_ids]
)
)
& (comp_tasks.c.node_class == NodeClass.COMPUTATIONAL)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def test_rpc_list_computation_runs_and_tasks(
# Tasks

output = await rpc_computations.list_computations_latest_iteration_tasks_page(
rpc_client, product_name="osparc", user_id=user["id"], project_id=proj.uuid
rpc_client, product_name="osparc", user_id=user["id"], project_ids=[proj.uuid]
)
assert output
assert output.total == 4
Expand Down Expand Up @@ -201,7 +201,7 @@ async def test_rpc_list_computation_runs_history(
)

output = await rpc_computations.list_computations_iterations_page(
rpc_client, product_name="osparc", user_id=user["id"], project_id=proj.uuid
rpc_client, product_name="osparc", user_id=user["id"], project_ids=[proj.uuid]
)
assert output.total == 3
assert isinstance(output, ComputationRunRpcGetPage)
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def get_transaction_current_credits_by_service_run_id(
resource_tracker_credit_transactions.c.service_run_id == f"{service_run_id}"
)
result = await conn.execute(select_stmt)
row = result.first()
row = result.one_or_none()
if row is None:
raise CreditTransactionNotFoundError
raise CreditTransactionNotFoundError(service_run_id=service_run_id)
return Decimal(row[0])
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,13 @@ paths:
type: integer
default: 0
title: Offset
- name: include_children
in: query
required: false
schema:
type: boolean
default: false
title: Include Children
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -2664,6 +2671,13 @@ paths:
type: integer
default: 0
title: Offset
- name: include_children
in: query
required: false
schema:
type: boolean
default: false
title: Include Children
responses:
'200':
description: Successful Response
Expand Down
Loading
Loading