From 126d0c902c121a8de025a84e4edd53c23cc88659 Mon Sep 17 00:00:00 2001 From: Luis Pereira <91738660+lpereiracgn@users.noreply.github.com> Date: Tue, 3 Dec 2024 13:42:22 +0100 Subject: [PATCH] feat(workflows): Add support for simulation integration (#1999) --- CHANGELOG.md | 6 + cognite/client/_version.py | 3 +- cognite/client/data_classes/__init__.py | 4 + .../data_classes/simulators/__init__.py | 1 + .../client/data_classes/simulators/runs.py | 58 +++++ cognite/client/data_classes/workflows.py | 104 +++++++- pyproject.toml | 3 +- .../test_api/test_simulators/__init__.py | 0 .../test_simulators/seed/empty_model.json | 1 + .../test_simulators/seed/resources.py | 230 ++++++++++++++++++ .../test_api/test_workflows.py | 78 ++++++ .../data/workflow_execution.json | 24 ++ .../test_data_classes/test_workflows.py | 16 +- 13 files changed, 524 insertions(+), 4 deletions(-) create mode 100644 cognite/client/data_classes/simulators/__init__.py create mode 100644 cognite/client/data_classes/simulators/runs.py create mode 100644 tests/tests_integration/test_api/test_simulators/__init__.py create mode 100644 tests/tests_integration/test_api/test_simulators/seed/empty_model.json create mode 100644 tests/tests_integration/test_api/test_simulators/seed/resources.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3570316260..9b9d2ce5cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. + +## [7.70.0] - 2024-12-02 +### Added +- Workflow support for "simulation" task type. + ## [7.69.4] - 2024-12-02 ### Added - An IsNull filter has been added for use in Data Modeling. @@ -27,6 +32,7 @@ Changes are grouped as follows ### Fixed - Revoking sessions through `client.iam.sessions.revoke` no longer raises an API error for very large payloads + ## [7.69.2] - 2024-11-28 ### Improved - Handle conversion of instance lists like NodeList to pandas DataFrame in scenarios where: a) properties are expanded diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 5fb03bbbe8..f039101b95 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,5 @@ from __future__ import annotations -__version__ = "7.69.4" +__version__ = "7.70.0" + __api_subversion__ = "20230101" diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index fadaae74a9..e6a268e7ca 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -291,6 +291,8 @@ DynamicTaskParameters, FunctionTaskOutput, FunctionTaskParameters, + SimulationTaskOutput, + SimulationTaskParameters, SubworkflowTaskParameters, TransformationTaskOutput, TransformationTaskParameters, @@ -473,6 +475,8 @@ "DatapointSubscriptionWriteList", "OidcCredentials", "RawTable", + "SimulationTaskParameters", + "SimulationTaskOutput", "Transformation", "TransformationWrite", "TransformationBlockedInfo", diff --git a/cognite/client/data_classes/simulators/__init__.py b/cognite/client/data_classes/simulators/__init__.py new file mode 100644 index 0000000000..9d48db4f9f --- /dev/null +++ b/cognite/client/data_classes/simulators/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/cognite/client/data_classes/simulators/runs.py b/cognite/client/data_classes/simulators/runs.py new file mode 100644 index 0000000000..2d730dba38 --- /dev/null +++ b/cognite/client/data_classes/simulators/runs.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, +) +from cognite.client.utils._experimental import FeaturePreviewWarning + +if TYPE_CHECKING: + from cognite.client import CogniteClient + +_WARNING = FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") + + +@dataclass +class SimulationValueUnitName(CogniteObject): + name: str + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + name=resource["name"], + ) + + def __post_init__(self) -> None: + _WARNING.warn() + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return super().dump(camel_case=camel_case) + + +@dataclass +class SimulationInputOverride(CogniteObject): + reference_id: str + value: str | int | float | list[str] | list[int] | list[float] + unit: SimulationValueUnitName | None = None + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + reference_id=resource["referenceId"], + value=resource["value"], + unit=SimulationValueUnitName._load(resource["unit"], cognite_client) if "unit" in resource else None, + ) + + def __post_init__(self) -> None: + _WARNING.warn() + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + output = super().dump(camel_case=camel_case) + if self.unit is not None: + output["unit"] = self.unit.dump(camel_case=camel_case) + + return output diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 836c7a264c..bcaf0aef70 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -20,6 +20,10 @@ WriteableCogniteResourceList, ) from cognite.client.data_classes.data_modeling.query import Query, ResultSetExpression, Select +from cognite.client.data_classes.simulators.runs import ( + SimulationInputOverride, +) +from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._text import convert_all_keys_to_camel_case, to_snake_case if TYPE_CHECKING: @@ -131,7 +135,7 @@ def as_write(self) -> WorkflowUpsertList: return WorkflowUpsertList([workflow.as_write() for workflow in self.data]) -ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow"] +ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow", "simulation"] class WorkflowTaskParameters(CogniteObject, ABC): @@ -159,6 +163,8 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: return SubworkflowTaskParameters._load(parameters) elif type_ == "subworkflow" and "workflowExternalId" in parameters["subworkflow"]: return SubworkflowReferenceParameters._load(parameters) + elif type_ == "simulation": + return SimulationTaskParameters._load(parameters) else: raise ValueError(f"Unknown task type: {type_}. Expected {ValidTaskType}") @@ -238,6 +244,59 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return output +_SIMULATORS_WARNING = FeaturePreviewWarning( + api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" +) + + +class SimulationTaskParameters(WorkflowTaskParameters): + """ + The simulation parameters are used to specify the simulation routine to be executed. + Args: + routine_external_id (str): The external ID of the simulation routine to be executed. + run_time (int | None): Reference timestamp used for data pre-processing and data sampling. + inputs (list[SimulationInputOverride] | None): List of input overrides. + """ + + task_type = "simulation" + + def __init__( + self, + routine_external_id: str, + run_time: int | None = None, + inputs: list[SimulationInputOverride] | None = None, + ) -> None: + self.routine_external_id = routine_external_id + self.run_time = run_time + self.inputs = inputs + + def __post_init__(self) -> None: + _SIMULATORS_WARNING.warn() + + @classmethod + def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> SimulationTaskParameters: + simulation: dict[str, Any] = resource["simulation"] + + return cls( + routine_external_id=simulation["routineExternalId"], + run_time=simulation.get("runTime"), + inputs=[SimulationInputOverride._load(item) for item in simulation.get("inputs", [])] + if simulation.get("inputs") + else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + simulation: dict[str, Any] = { + "routineExternalId" if camel_case else "routine_external_id": self.routine_external_id, + } + if self.run_time: + simulation["runTime" if camel_case else "run_time"] = self.run_time + if self.inputs: + simulation["inputs" if camel_case else "inputs"] = [item.dump(camel_case) for item in self.inputs] + + return {"simulation": simulation} + + class TransformationTaskParameters(WorkflowTaskParameters): """ The transformation parameters are used to specify the transformation to be called. @@ -540,6 +599,8 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput: return DynamicTaskOutput.load(data) elif task_type == "subworkflow": return SubworkflowTaskOutput.load(data) + elif task_type == "simulation": + return SimulationTaskOutput.load(data) else: raise ValueError(f"Unknown task type: {task_type}") @@ -579,6 +640,47 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } +class SimulationTaskOutput(WorkflowTaskOutput): + """ + The class represent the output of Simulation execution. + Args: + run_id (int | None): The run ID of the simulation run. + log_id (int | None): The log ID of the simulation run. + status_message (str | None): Status message of the simulation execution. + """ + + task_type: ClassVar[str] = "simulation" + + def __post_init__(self) -> None: + _SIMULATORS_WARNING.warn() + + def __init__( + self, + run_id: int | None, + log_id: int | None, + status_message: str | None, + ) -> None: + self.run_id = run_id + self.log_id = log_id + self.status_message = status_message + + @classmethod + def load(cls, data: dict[str, Any]) -> SimulationTaskOutput: + output = data["output"] + return cls( + run_id=output.get("runId"), + log_id=output.get("logId"), + status_message=output.get("statusMessage"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "runId" if camel_case else "run_id": self.run_id, + "logId" if camel_case else "log_id": self.log_id, + "statusMessage" if camel_case else "status_message": self.status_message, + } + + class TransformationTaskOutput(WorkflowTaskOutput): """ The transformation output is used to specify the output of a transformation task. diff --git a/pyproject.toml b/pyproject.toml index aaea85ea3a..dd6637c882 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,8 @@ [tool.poetry] name = "cognite-sdk" -version = "7.69.4" +version = "7.70.0" + description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/tests/tests_integration/test_api/test_simulators/__init__.py b/tests/tests_integration/test_api/test_simulators/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/tests_integration/test_api/test_simulators/seed/empty_model.json b/tests/tests_integration/test_api/test_simulators/seed/empty_model.json new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/tests/tests_integration/test_api/test_simulators/seed/empty_model.json @@ -0,0 +1 @@ +{} diff --git a/tests/tests_integration/test_api/test_simulators/seed/resources.py b/tests/tests_integration/test_api/test_simulators/seed/resources.py new file mode 100644 index 0000000000..9f76a53112 --- /dev/null +++ b/tests/tests_integration/test_api/test_simulators/seed/resources.py @@ -0,0 +1,230 @@ +import time + +from cognite.client._cognite_client import CogniteClient +from cognite.client.data_classes.data_sets import DataSet +from cognite.client.exceptions import CogniteAPIError + + +def get_workflow_seed_data(data_set_id: int, file_id: int): + timestamp = int(time.time() * 1000) + simulator = { + "name": "test_sim_for_workflow", + "externalId": "test_sim_for_workflow", + "fileExtensionTypes": ["json"], + "modelTypes": [{"name": "Steady State", "key": "SteadyState"}], + "stepFields": [ + { + "stepType": "get/set", + "fields": [ + { + "info": "The address of the input/output", + "name": "address", + "label": "Address", + } + ], + }, + { + "stepType": "command", + "fields": [ + { + "info": "The command to execute", + "name": "command", + "label": "Command", + "options": [{"label": "Solve Flowsheet", "value": "Solve"}], + } + ], + }, + ], + "unitQuantities": [ + { + "name": "temperature", + "label": "Temperature", + "units": [ + {"label": "K", "name": "K"}, + {"label": "R", "name": "R"}, + {"label": "C", "name": "C"}, + {"label": "F", "name": "F"}, + ], + }, + ], + } + + simulator_integration = { + "externalId": "integration_tests_workflow_connector", + "simulatorExternalId": simulator["externalId"], + "heartbeat": timestamp, + "dataSetId": data_set_id, + "connectorVersion": "1.0.0", + "simulatorVersion": "1.0.0", + } + + simulator_model = { + "externalId": "integration_tests_workflow_model", + "simulatorExternalId": simulator["externalId"], + "name": "Test Simulator Model", + "description": "Test Simulator Model Desc", + "dataSetId": data_set_id, + "type": "SteadyState", + } + + simulator_model_revision = { + "externalId": "integration_tests_workflow_model_revision", + "modelExternalId": "integration_tests_workflow_model", + "description": "test sim model revision description", + "fileId": file_id, + } + + simulator_routine = { + "externalId": "integration_tests_workflow_routine", + "modelExternalId": simulator_model["externalId"], + "simulatorIntegrationExternalId": simulator_integration["externalId"], + "name": "Routine test", + } + + simulator_routine_revision = { + "externalId": "integration_tests_workflow_routine_revision", + "routineExternalId": simulator_routine["externalId"], + "configuration": { + "schedule": {"enabled": False}, + "dataSampling": {"enabled": False}, + "logicalCheck": [], + "steadyStateDetection": [], + "inputs": [ + { + "name": "Cold Water Temperature", + "referenceId": "CWT", + "value": 10, + "valueType": "DOUBLE", + "unit": {"name": "C", "quantity": "temperature"}, + }, + ], + "outputs": [ + { + "name": "Shower Temperature", + "referenceId": "ST", + "unit": {"name": "C", "quantity": "temperature"}, + "valueType": "DOUBLE", + } + ], + }, + "script": [ + { + "order": 1, + "description": "Set Inputs", + "steps": [ + { + "order": 1, + "stepType": "Set", + "arguments": {"referenceId": "CWT", "address": "input.com"}, + } + ], + }, + { + "order": 2, + "description": "Solve the flowsheet", + "steps": [{"order": 1, "stepType": "Command", "arguments": {"command": "Solve"}}], + }, + { + "order": 3, + "description": "Set simulation outputs", + "steps": [ + { + "order": 1, + "stepType": "Get", + "arguments": {"referenceId": "ST", "address": "output.com"}, + }, + ], + }, + ], + } + + return { + "": simulator, + "/integrations": simulator_integration, + "/models": simulator_model, + "/models/revisions": simulator_model_revision, + "/routines": simulator_routine, + "/routines/revisions": simulator_routine_revision, + } + + +def update_seed_integration(integration_id: int, cognite_client: CogniteClient): + cognite_client.post( + f"/api/v1/projects/{cognite_client.config.project}/simulators/integrations/update", + json={"items": [{"id": integration_id, "update": {"heartbeat": {"set": int(time.time() * 1000)}}}]}, + ) + + +def get_seed_simulator_integration(cognite_client: CogniteClient): + integrations_list = cognite_client.post( + f"/api/v1/projects/{cognite_client.config.project}/simulators/integrations/list", + json={}, + ).json()["items"] + integrations = [item for item in integrations_list if item["externalId"] == "integration_tests_workflow_connector"] + if len(integrations) == 0: + return None + return integrations[0] + + +def ensure_workflow_simint_routine(cognite_client: CogniteClient) -> str: + data_set = cognite_client.data_sets.retrieve(external_id="integration_tests_workflow") + + if data_set is None: + data_set = cognite_client.data_sets.create( + DataSet( + external_id="integration_tests_workflow", + name="Integration Tests Workflow", + description="Data set for integration tests of the workflow API", + ) + ) + + file = cognite_client.files.retrieve(external_id="integration_tests_workflow_model_file") + + if file is None: + file = cognite_client.files.upload( + path="tests/tests_integration/test_api/test_workflows/seed_model.json", + external_id="integration_tests_workflow_model_file", + name="seed_mode.json", + data_set_id=data_set.id, + ) + + integration = get_seed_simulator_integration(cognite_client) + + seed_data = get_workflow_seed_data(data_set.id, file.id) + if integration is None: + for path, item in seed_data.items(): + try: + cognite_client.post( + f"/api/v1/projects/{cognite_client.config.project}/simulators{path}", + json={"items": [item]}, + ) + except CogniteAPIError: + pass + + integration = get_seed_simulator_integration(cognite_client) + update_seed_integration(integration["id"], cognite_client) + + return seed_data["/routines"]["externalId"] + + +def finish_simulation_runs(cognite_client: CogniteClient, routine_external_id: str): + list_runs = cognite_client.post( + f"/api/v1/projects/{cognite_client.config.project}/simulators/runs/list", + json={ + "filter": { + "routineExternalIds": [routine_external_id], + "status": "ready", + "createdTime": {"min": int(time.time() * 1000) - 1000 * 60}, + }, + "limit": 10, + }, + ).json()["items"] + + for run in list_runs: + try: + cognite_client.post( + f"/api/v1/projects/{cognite_client.config.project}/simulators/run/callback", + json={"items": [{"id": run["id"], "status": "success"}]}, + ) + except CogniteAPIError: + pass diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 3f46dc8120..ef95fb6426 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -10,9 +10,12 @@ from cognite.client.data_classes import DataSet from cognite.client.data_classes.data_modeling import ViewId from cognite.client.data_classes.data_modeling.query import NodeResultSetExpression, Select, SourceSelector +from cognite.client.data_classes.simulators.runs import SimulationInputOverride, SimulationValueUnitName from cognite.client.data_classes.workflows import ( CDFTaskParameters, FunctionTaskParameters, + SimulationTaskOutput, + SimulationTaskParameters, SubworkflowReferenceParameters, SubworkflowTaskParameters, TransformationTaskParameters, @@ -35,6 +38,15 @@ from cognite.client.exceptions import CogniteAPIError from cognite.client.utils import timestamp_to_ms from cognite.client.utils._text import random_string +from tests.tests_integration.test_api.test_simulators.seed.resources import ( + ensure_workflow_simint_routine, + finish_simulation_runs, +) + + +@pytest.fixture +def workflow_simint_routine(cognite_client: CogniteClient) -> str: + return ensure_workflow_simint_routine(cognite_client) @pytest.fixture(autouse=True, scope="module") @@ -350,6 +362,72 @@ def test_list_workflows(self, cognite_client: CogniteClient, workflow_list: Work class TestWorkflowVersions: + def test_upsert_run_delete_with_simulation_task( + self, + cognite_client: CogniteClient, + workflow_simint_routine: str, + ): + workflow_id = "integration_test-workflow_for_simulator_integration" + random_string(5) + + version = WorkflowVersionUpsert( + workflow_external_id=workflow_id, + version="1", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id=f"{workflow_id}-1-task1" + random_string(5), + parameters=SimulationTaskParameters( + routine_external_id=workflow_simint_routine, + inputs=[ + SimulationInputOverride( + reference_id="CWT", value=11, unit=SimulationValueUnitName(name="F") + ) + ], + ), + timeout=100, + ) + ], + description=None, + ), + ) + + cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) + created_version: WorkflowVersion | None = None + + try: + created_version = cognite_client.workflows.versions.upsert(version) + assert created_version.workflow_external_id == workflow_id + assert created_version.workflow_definition.tasks[0].type == "simulation" + assert len(created_version.workflow_definition.tasks) > 0 + + execution = cognite_client.workflows.executions.run(workflow_id, version.version) + execution_detailed = None + simulation_task = None + + for _ in range(20): + execution_detailed = cognite_client.workflows.executions.retrieve_detailed(execution.id) + simulation_task = execution_detailed.executed_tasks[0] + + if simulation_task.status == "in_progress": + finish_simulation_runs(cognite_client, workflow_simint_routine) + + if execution_detailed.status == "completed" or execution_detailed.status == "failed": + break + + time.sleep(1.5) + + assert isinstance(simulation_task.output, SimulationTaskOutput) + assert simulation_task.status == "completed" + assert simulation_task.output.run_id is not None + assert simulation_task.output.log_id is not None + + finally: + if created_version is not None: + cognite_client.workflows.versions.delete( + created_version.as_id(), + ) + cognite_client.workflows.delete(created_version.workflow_external_id) + def test_upsert_preexisting(self, cognite_client: CogniteClient, new_workflow_version: WorkflowVersion) -> None: new_workflow_version.workflow_definition.description = "Updated description for testing purposes" updated_version = cognite_client.workflows.versions.upsert(new_workflow_version.as_write()) diff --git a/tests/tests_unit/test_data_classes/data/workflow_execution.json b/tests/tests_unit/test_data_classes/data/workflow_execution.json index 37a1004d03..52a71b52b1 100644 --- a/tests/tests_unit/test_data_classes/data/workflow_execution.json +++ b/tests/tests_unit/test_data_classes/data/workflow_execution.json @@ -40,6 +40,30 @@ } ] }, + { + "externalId": "testSimulation", + "type": "simulation", + "parameters": { + "simulation": { + "routineExternalId": "routine-1", + "inputs": [ + { + "referenceId": "some-ref", + "value": 1 + } + ], + "runTime": 123 + } + }, + "retries": 0, + "timeout": 3600, + "onFailure": "skipTask", + "dependsOn": [ + { + "externalId": "testTaskDispatcher" + } + ] + }, { "externalId": "testMatrixCalculation", "type": "dynamic", diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index c3c976cd39..9d9b55058b 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -10,6 +10,8 @@ DynamicTaskParameters, FunctionTaskOutput, FunctionTaskParameters, + SimulationInputOverride, + SimulationTaskParameters, TransformationTaskOutput, TransformationTaskParameters, WorkflowDefinition, @@ -156,8 +158,20 @@ def test_definition_parsed_correctly(self, execution_data: dict): on_failure="skipTask", depends_on=["testTaskDispatcher"], ), + WorkflowTask( + external_id="testSimulation", + parameters=SimulationTaskParameters( + inputs=[SimulationInputOverride(reference_id="some-ref", value=1)], + routine_external_id="routine-1", + run_time=123, + ), + retries=0, + timeout=3600, + on_failure="skipTask", + depends_on=["testTaskDispatcher"], + ), ] - assert len(wf_execution.workflow_definition.tasks) == 4 + assert len(wf_execution.workflow_definition.tasks) == 5 for expected_task, actual_task in zip(expected, wf_execution.workflow_definition.tasks): assert actual_task.external_id == expected_task.external_id