Skip to content

Add support for simulation runs #2145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite/client/_api/simulators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cognite.client._api.simulators.logs import SimulatorLogsAPI
from cognite.client._api.simulators.models import SimulatorModelsAPI
from cognite.client._api.simulators.routines import SimulatorRoutinesAPI
from cognite.client._api.simulators.runs import SimulatorRunsAPI
from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.simulators.simulators import Simulator, SimulatorList
Expand All @@ -24,6 +25,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
super().__init__(config, api_version, cognite_client)
self.integrations = SimulatorIntegrationsAPI(config, api_version, cognite_client)
self.models = SimulatorModelsAPI(config, api_version, cognite_client)
self.runs = SimulatorRunsAPI(config, api_version, cognite_client)
self.routines = SimulatorRoutinesAPI(config, api_version, cognite_client)
self.logs = SimulatorLogsAPI(config, api_version, cognite_client)
self._warning = FeaturePreviewWarning(
Expand Down
219 changes: 219 additions & 0 deletions cognite/client/_api/simulators/runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
from __future__ import annotations

from collections.abc import Iterator, Sequence
from typing import TYPE_CHECKING, overload

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.simulators.filters import SimulatorRunsFilter
from cognite.client.data_classes.simulators.runs import SimulationRun, SimulationRunWrite, SimulatorRunsList
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._validation import assert_type
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


class SimulatorRunsAPI(APIClient):
_RESOURCE_PATH = "/simulators/runs"
_RESOURCE_PATH_RUN = "/simulators/run"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._CREATE_LIMIT = 1
self._warning = FeaturePreviewWarning(
api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators"
)

def __iter__(self) -> Iterator[SimulationRun]:
"""Iterate over simulation runs

Fetches simulation runs as they are iterated over, so you keep a limited number of simulation runs in memory.

Returns:
Iterator[SimulationRun]: yields simulation runs one by one.
"""
return self()

@overload
def __call__(
self,
chunk_size: int,
limit: int | None = None,
status: str | None = None,
run_type: str | None = None,
model_external_ids: SequenceNotStr[str] | None = None,
simulator_integration_external_ids: SequenceNotStr[str] | None = None,
simulator_external_ids: SequenceNotStr[str] | None = None,
routine_external_ids: SequenceNotStr[str] | None = None,
routine_revision_external_ids: SequenceNotStr[str] | None = None,
model_revision_external_ids: SequenceNotStr[str] | None = None,
) -> Iterator[SimulatorRunsList]: ...

@overload
def __call__(
self,
chunk_size: None = None,
limit: int | None = None,
status: str | None = None,
run_type: str | None = None,
model_external_ids: SequenceNotStr[str] | None = None,
simulator_integration_external_ids: SequenceNotStr[str] | None = None,
simulator_external_ids: SequenceNotStr[str] | None = None,
routine_external_ids: SequenceNotStr[str] | None = None,
routine_revision_external_ids: SequenceNotStr[str] | None = None,
model_revision_external_ids: SequenceNotStr[str] | None = None,
) -> Iterator[SimulationRun]: ...

def __call__(
self,
chunk_size: int | None = None,
limit: int | None = None,
status: str | None = None,
run_type: str | None = None,
model_external_ids: SequenceNotStr[str] | None = None,
simulator_integration_external_ids: SequenceNotStr[str] | None = None,
simulator_external_ids: SequenceNotStr[str] | None = None,
routine_external_ids: SequenceNotStr[str] | None = None,
routine_revision_external_ids: SequenceNotStr[str] | None = None,
model_revision_external_ids: SequenceNotStr[str] | None = None,
) -> Iterator[SimulationRun] | Iterator[SimulatorRunsList]:
"""Iterate over simulation runs

Fetches simulation runs as they are iterated over, so you keep a limited number of simulation runs in memory.

Args:
chunk_size (int | None): Number of simulation runs to return in each chunk. Defaults to yielding one simulation run a time.
limit (int | None): The maximum number of simulation runs to return, pass None to return all.
status (str | None): Filter by simulation run status
run_type (str | None): Filter by simulation run type
model_external_ids (SequenceNotStr[str] | None): Filter by simulator model external ids
simulator_integration_external_ids (SequenceNotStr[str] | None): Filter by simulator integration external ids
simulator_external_ids (SequenceNotStr[str] | None): Filter by simulator external ids
routine_external_ids (SequenceNotStr[str] | None): Filter by routine external ids
routine_revision_external_ids (SequenceNotStr[str] | None): Filter by routine revision external ids
model_revision_external_ids (SequenceNotStr[str] | None): Filter by model revision external ids

Returns:
Iterator[SimulationRun] | Iterator[SimulatorRunsList]: yields Simulation Run one by one if chunk is not specified, else SimulatorRunsList objects.
"""

filter_runs = SimulatorRunsFilter(
status=status,
run_type=run_type,
model_external_ids=model_external_ids,
simulator_integration_external_ids=simulator_integration_external_ids,
simulator_external_ids=simulator_external_ids,
routine_external_ids=routine_external_ids,
routine_revision_external_ids=routine_revision_external_ids,
model_revision_external_ids=model_revision_external_ids,
)

return self._list_generator(
list_cls=SimulatorRunsList,
resource_cls=SimulationRun,
method="POST",
filter=filter_runs.dump(),
chunk_size=chunk_size,
limit=limit,
)

def list(
self,
limit: int | None = DEFAULT_LIMIT_READ,
status: str | None = None,
run_type: str | None = None,
model_external_ids: SequenceNotStr[str] | None = None,
simulator_integration_external_ids: SequenceNotStr[str] | None = None,
simulator_external_ids: SequenceNotStr[str] | None = None,
routine_external_ids: SequenceNotStr[str] | None = None,
routine_revision_external_ids: SequenceNotStr[str] | None = None,
model_revision_external_ids: SequenceNotStr[str] | None = None,
) -> SimulatorRunsList:
"""`Filter simulation runs <https://developer.cognite.com/api#tag/Simulation-Runs/operation/filter_simulation_runs_simulators_runs_list_post>`_
Retrieves a list of simulation runs that match the given criteria

Args:
limit (int | None): The maximum number of simulation runs to return, pass None to return all.
status (str | None): Filter by simulation run status
run_type (str | None): Filter by simulation run type
model_external_ids (SequenceNotStr[str] | None): Filter by simulator model external ids
simulator_integration_external_ids (SequenceNotStr[str] | None): Filter by simulator integration external ids
simulator_external_ids (SequenceNotStr[str] | None): Filter by simulator external ids
routine_external_ids (SequenceNotStr[str] | None): Filter by routine external ids
routine_revision_external_ids (SequenceNotStr[str] | None): Filter by routine revision external ids
model_revision_external_ids (SequenceNotStr[str] | None): Filter by model revision external ids

Returns:
SimulatorRunsList: List of simulation runs

Examples:

List simulation runs:
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.simulators.runs.list()

Filter runs by status and simulator external ids:
>>> res = client.simulators.runs.list(
... simulator_external_ids=["PROSPER", "DWSIM"],
... status="success"
... )
"""

filter_runs = SimulatorRunsFilter(
status=status,
run_type=run_type,
model_external_ids=model_external_ids,
simulator_integration_external_ids=simulator_integration_external_ids,
simulator_external_ids=simulator_external_ids,
routine_external_ids=routine_external_ids,
routine_revision_external_ids=routine_revision_external_ids,
model_revision_external_ids=model_revision_external_ids,
)
self._warning.warn()
return self._list(
method="POST",
limit=limit,
resource_cls=SimulationRun,
list_cls=SimulatorRunsList,
filter=filter_runs.dump(),
)

@overload
def create(self, run: SimulationRunWrite) -> SimulationRun: ...

@overload
def create(self, run: Sequence[SimulationRunWrite]) -> SimulatorRunsList: ...

def create(self, run: SimulationRunWrite | Sequence[SimulationRunWrite]) -> SimulationRun | SimulatorRunsList:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to you plan to add support for create by routine reversion. It seems you only have by routine now
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it's a feature, can it be a separate PR? we are over 500 limit already

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected it to be a separate PR, but I was wondering if these two will have a shared base class which will have consequences for this PR. However, as this is alpha we can do a breaking change later if necessary, so approving for now.

"""`Create simulation runs <https://developer.cognite.com/api#tag/Simulation-Runs/operation/filter_simulation_runs_simulators_runs_list_post>`_
Args:
run (SimulationRunWrite | Sequence[SimulationRunWrite]): The simulation run(s) to execute.
Returns:
SimulationRun | SimulatorRunsList: Created simulation run(s)
Examples:
Create new simulation run:
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.simulators.runs import SimulationRunWrite
>>> client = CogniteClient()
>>> run = [
... SimulationRunWrite(
... routine_external_id="routine1",
... log_severity="Debug",
... run_type="external",
... ),
... ]
>>> res = client.simulators.runs.create(run)
"""
assert_type(run, "simulation_run", [SimulationRunWrite, Sequence])

return self._create_multiple(
list_cls=SimulatorRunsList,
resource_cls=SimulationRun,
items=run,
input_resource_cls=SimulationRunWrite,
resource_path=self._RESOURCE_PATH_RUN,
)
10 changes: 10 additions & 0 deletions cognite/client/data_classes/simulators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
SimulatorModelRevisionWrite,
SimulatorModelWrite,
)
from cognite.client.data_classes.simulators.runs import (
SimulationRun,
SimulationRunWrite,
SimulationRunWriteList,
SimulatorRunsList,
)
from cognite.client.data_classes.simulators.simulators import (
Simulator,
SimulatorIntegration,
Expand All @@ -28,6 +34,9 @@
__all__ = [
"PropertySort",
"PropertySort",
"SimulationRun",
"SimulationRunWrite",
"SimulationRunWriteList",
"Simulator",
"SimulatorIntegration",
"SimulatorIntegrationFilter",
Expand All @@ -41,6 +50,7 @@
"SimulatorModelRevisionsFilter",
"SimulatorModelWrite",
"SimulatorModelsFilter",
"SimulatorRunsList",
"SimulatorStep",
"SimulatorStepField",
"SimulatorStepOption",
Expand Down
30 changes: 30 additions & 0 deletions cognite/client/data_classes/simulators/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
from cognite.client.utils.useful_types import SequenceNotStr


def _parse_str_or_sequence(value: str | SequenceNotStr[str] | None) -> list[str] | None:
if isinstance(value, str):
return [value]
elif isinstance(value, SequenceNotStr):
return list(value)
return None


class SimulatorIntegrationFilter(CogniteFilter):
def __init__(
self,
Expand Down Expand Up @@ -39,6 +47,28 @@ def __init__(
self.model_external_ids = [model_external_ids] if isinstance(model_external_ids, str) else model_external_ids


class SimulatorRunsFilter(CogniteFilter):
def __init__(
self,
status: str | None = None,
run_type: str | None = None,
model_external_ids: str | SequenceNotStr[str] | None = None,
simulator_integration_external_ids: str | SequenceNotStr[str] | None = None,
simulator_external_ids: str | SequenceNotStr[str] | None = None,
routine_external_ids: str | SequenceNotStr[str] | None = None,
routine_revision_external_ids: str | SequenceNotStr[str] | None = None,
model_revision_external_ids: str | SequenceNotStr[str] | None = None,
) -> None:
self.model_external_ids = _parse_str_or_sequence(model_external_ids)
self.simulator_integration_external_ids = _parse_str_or_sequence(simulator_integration_external_ids)
self.simulator_external_ids = _parse_str_or_sequence(simulator_external_ids)
self.routine_external_ids = _parse_str_or_sequence(routine_external_ids)
self.routine_revision_external_ids = _parse_str_or_sequence(routine_revision_external_ids)
self.model_revision_external_ids = _parse_str_or_sequence(model_revision_external_ids)
self.status = status
self.run_type = run_type


class SimulatorRoutinesFilter(CogniteFilter):
def __init__(
self,
Expand Down
Loading