From d42188a8e12e5ab2e93fc7f20d13c297038c025f Mon Sep 17 00:00:00 2001 From: Mohamed Abdel Wedoud Date: Tue, 19 Mar 2024 21:00:36 +0100 Subject: [PATCH] feat(api-study): rearrange the aggregation processing --- antarest/study/common/studystorage.py | 2 +- antarest/study/service.py | 2 +- .../study/storage/abstract_storage_service.py | 196 +++++++++++++----- .../variantstudy/variant_study_service.py | 2 +- antarest/study/web/raw_studies_blueprint.py | 2 +- 5 files changed, 148 insertions(+), 56 deletions(-) diff --git a/antarest/study/common/studystorage.py b/antarest/study/common/studystorage.py index 1be7e7c33b..5717ae686a 100644 --- a/antarest/study/common/studystorage.py +++ b/antarest/study/common/studystorage.py @@ -58,7 +58,7 @@ def aggregate_data( mc_years: t.Sequence[str], areas_names: t.Sequence[str], columns_names: t.Sequence[str], - ) -> t.Any: + ) -> t.Dict[str, t.Any]: """ Entry point to fetch data inside study. diff --git a/antarest/study/service.py b/antarest/study/service.py index fa9102bce5..fa15f50998 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -329,7 +329,7 @@ def aggregate( areas_names: t.Sequence[str], columns_names: t.Sequence[str], params: RequestParameters, - ) -> t.Any: + ) -> JSON: """ Get study data inside filesystem Args: diff --git a/antarest/study/storage/abstract_storage_service.py b/antarest/study/storage/abstract_storage_service.py index ccd753d017..576f80e42f 100644 --- a/antarest/study/storage/abstract_storage_service.py +++ b/antarest/study/storage/abstract_storage_service.py @@ -4,9 +4,13 @@ import tempfile import typing as t from abc import ABC +from enum import Enum from pathlib import Path from uuid import uuid4 +import numpy as np +import pandas as pd + from antarest.core.config import Config from antarest.core.exceptions import BadOutputError, StudyOutputNotFoundError from antarest.core.interfaces.cache import CacheConstants, ICache @@ -31,6 +35,7 @@ from antarest.study.storage.rawstudy.model.filesystem.config.files import get_playlist from antarest.study.storage.rawstudy.model.filesystem.config.model import Simulation from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy, StudyFactory +from antarest.study.storage.rawstudy.model.filesystem.folder_node import ChildNotFoundError from antarest.study.storage.rawstudy.model.filesystem.matrix.matrix import MatrixFrequency from antarest.study.storage.rawstudy.model.helpers import FileStudyHelpers from antarest.study.storage.utils import extract_output_name, fix_study_root, remove_from_cache @@ -38,19 +43,56 @@ logger = logging.getLogger(__name__) TEMPLATE_PARTS = "output,{sim_id},economy,mc-ind" +# noinspection SpellCheckingInspection +MCYEAR_COL = "mcYear" MC_YEAR_INDEX = 0 FILE_TYPE_1_INDEX = 1 """ Index in path parts starting from the Monte Carlo year to determine the if we fetch links, areas or binding constraints. """ -FILE_TYPE_2_INDEX = -1 +FILE_TYPE_2_INDEX = -2 """Index in path parts starting from the Monte Carlo year to determine the if we fetch values, details etc .""" AREA_NAME_INDEX = 2 """Index in path parts starting from the Monte Carlo year to determine the area name.""" -FREQUENCY_INDEX = -1 +FREQUENCY_INDEX = -2 """Index in path parts starting from the Monte Carlo year to determine matrix frequency.""" +class FileType1(str, Enum): + """ + Enum to determine the type of file we want to fetch in the study + """ + + LINKS = "links" + AREAS = "areas" + BINDING_CONSTRAINTS = "binding_constraints" + + +class FileType2(str, Enum): + """ + Enum to determine the type of file we want to fetch in the study + """ + + VALUES = "values" + DETAILS = "details" + # noinspection SpellCheckingInspection + DETAILS_ST_STORAGE = "details-STstorage" + DETAILS_RES = "details-res" + BINDING_CONSTRAINTS = "binding-constraints" + + +def stringify(col: t.Union[str, t.Tuple[str, ...]]) -> str: + """ + Convert a column name from Generic Hashable to a string without comers + Args: + col: column name to convert + + Returns: column name as string + + """ + return "|".join(col) if isinstance(col, tuple) else col + + def flatten_tree(path_tree: t.Dict[str, t.Any]) -> t.List[t.Tuple[str, ...]]: """ Flatten paths tree @@ -69,7 +111,9 @@ def flatten_tree(path_tree: t.Dict[str, t.Any]) -> t.List[t.Tuple[str, ...]]: return result -def parts_query_file_filtering(parts: t.List[t.Tuple[str, ...]], query_file: QueryFile) -> t.List[t.Tuple[str, ...]]: +def parts_query_file_filtering( + parts: t.List[t.Tuple[str, ...]], query_file: QueryFile +) -> t.Tuple[FileType1, FileType2, t.List[t.Tuple[str, ...]]]: """ Filter parts list Args: @@ -80,50 +124,81 @@ def parts_query_file_filtering(parts: t.List[t.Tuple[str, ...]], query_file: Que """ if query_file == QueryFile.LINKS_VALUES: - return [ - path_parts - for path_parts in parts - if path_parts[FILE_TYPE_1_INDEX] == "links" and path_parts[FILE_TYPE_2_INDEX].startswith("values") - ] + return ( + FileType1.LINKS, + FileType2.VALUES, + [ + path_parts + for path_parts in parts + if path_parts[FILE_TYPE_1_INDEX] == FileType1.LINKS + and path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.VALUES) + ], + ) if query_file == QueryFile.LINKS_DETAILS: - return [ - path_parts - for path_parts in parts - if path_parts[FILE_TYPE_1_INDEX] == "links" and path_parts[FILE_TYPE_2_INDEX].startswith("details") - ] + return ( + FileType1.LINKS, + FileType2.DETAILS, + [ + path_parts + for path_parts in parts + if path_parts[FILE_TYPE_1_INDEX] == FileType1.LINKS + and path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.DETAILS) + ], + ) if query_file == QueryFile.AREAS_VALUES: - return [ - path_parts - for path_parts in parts - if path_parts[FILE_TYPE_1_INDEX] == "areas" and path_parts[FILE_TYPE_2_INDEX].startswith("values") - ] + return ( + FileType1.AREAS, + FileType2.VALUES, + [ + path_parts + for path_parts in parts + if path_parts[FILE_TYPE_1_INDEX] == FileType1.AREAS + and path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.VALUES) + ], + ) if query_file == QueryFile.AREAS_DETAILS: - # noinspection SpellCheckingInspection - return [ - path_parts - for path_parts in parts - if path_parts[FILE_TYPE_1_INDEX] == "areas" - and path_parts[FILE_TYPE_2_INDEX].startswith("details") - and not path_parts[FILE_TYPE_2_INDEX].startswith("details-STstorage") - and not path_parts[FILE_TYPE_2_INDEX].startswith("details-res") - ] + return ( + FileType1.AREAS, + FileType2.DETAILS, + [ + path_parts + for path_parts in parts + if path_parts[FILE_TYPE_1_INDEX] == FileType1.AREAS + and path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.DETAILS) + and not path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.DETAILS_ST_STORAGE) + and not path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.DETAILS_RES) + ], + ) if query_file == QueryFile.AREAS_DETAILS_ST_STORAGE: - # noinspection SpellCheckingInspection - return [ - path_parts - for path_parts in parts - if path_parts[FILE_TYPE_1_INDEX] == "areas" - and path_parts[FILE_TYPE_2_INDEX].startswith("details-STstorage") - ] + return ( + FileType1.AREAS, + FileType2.DETAILS_ST_STORAGE, + [ + path_parts + for path_parts in parts + if path_parts[FILE_TYPE_1_INDEX] == FileType1.AREAS + and path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.DETAILS_ST_STORAGE) + ], + ) if query_file == QueryFile.AREAS_DETAILS_RES: - return [ - path_parts - for path_parts in parts - if path_parts[FILE_TYPE_1_INDEX] == "areas" and path_parts[FILE_TYPE_2_INDEX].startswith("details-res") - ] + return ( + FileType1.AREAS, + FileType2.DETAILS_RES, + [ + path_parts + for path_parts in parts + if path_parts[FILE_TYPE_1_INDEX] == FileType1.AREAS + and path_parts[FILE_TYPE_2_INDEX].startswith(FileType2.DETAILS_RES) + ], + ) if query_file == QueryFile.BINDING_CONSTRAINTS: - return [path_parts for path_parts in parts if path_parts[FILE_TYPE_1_INDEX] == "binding_constraints"] - return parts + return ( + FileType1.BINDING_CONSTRAINTS, + FileType2.BINDING_CONSTRAINTS, + [path_parts for path_parts in parts if path_parts[FILE_TYPE_1_INDEX] == FileType1.BINDING_CONSTRAINTS], + ) + + raise ValueError(f"Unknown query file {query_file}") class AbstractStorageService(IStudyStorageService[T], ABC): @@ -256,7 +331,7 @@ def aggregate_data( mc_years: t.Sequence[str], areas_names: t.Sequence[str], columns_names: t.Sequence[str], - ) -> t.Any: + ) -> t.Dict[str, t.Any]: """ Entry point to fetch data inside study. Args: @@ -274,20 +349,37 @@ def aggregate_data( self._check_study_exists(metadata) study = self.get_raw(metadata) parts = TEMPLATE_PARTS.format(sim_id=output_name.replace(",", "")).split(",") - all_paths = parts_query_file_filtering(flatten_tree(study.tree.get(parts)), query_file) + file_type_1, _, all_paths = parts_query_file_filtering(flatten_tree(study.tree.get(parts)), query_file) if mc_years: all_paths = [path_parts for path_parts in all_paths if path_parts[MC_YEAR_INDEX] in mc_years] - if areas_names: - if query_file not in [ - QueryFile.AREAS_VALUES, - QueryFile.AREAS_DETAILS, - QueryFile.AREAS_DETAILS_ST_STORAGE, - QueryFile.AREAS_DETAILS_RES, - ]: - raise ValueError(f"You specified areas names for a query file that does not support it: {query_file}") + if areas_names and file_type_1 != FileType1.AREAS: + raise ValueError(f"You specified areas names for a query file that does not support it: {query_file}") + elif areas_names: all_paths = [path_parts for path_parts in all_paths if path_parts[AREA_NAME_INDEX] in areas_names] + all_paths = [path_parts for path_parts in all_paths if frequency.value in path_parts[FREQUENCY_INDEX]] + + final_df = None + for path_parts in all_paths: + try: + node_data = study.tree.get(parts + list(path_parts[:-1])) + except ChildNotFoundError: + continue + + columns, matrix = node_data["columns"], np.array(node_data["data"]).T.tolist() + columns = [stringify(col) for col in columns] + kept_columns = [col for col in columns if col in columns_names] if columns_names else columns + node_data = ( + {file_type_1.value: [path_parts[FILE_TYPE_1_INDEX + 1]] * len(matrix[0])} + if file_type_1 != FileType1.BINDING_CONSTRAINTS + else {} + ) + node_data.update({MCYEAR_COL: [path_parts[MC_YEAR_INDEX]] * len(matrix[0])}) + node_data.update({col: vals for col, vals in zip(columns, matrix) if col in kept_columns}) + + df = pd.DataFrame(node_data) + final_df = df if final_df is None else pd.concat([final_df, df], ignore_index=True) # type: ignore - return all_paths + return {} if final_df is None else final_df.fillna("N/A").to_dict(orient="list") # type: ignore def get_study_sim_result( self, diff --git a/antarest/study/storage/variantstudy/variant_study_service.py b/antarest/study/storage/variantstudy/variant_study_service.py index f303ce7885..928a7e9031 100644 --- a/antarest/study/storage/variantstudy/variant_study_service.py +++ b/antarest/study/storage/variantstudy/variant_study_service.py @@ -503,7 +503,7 @@ def aggregate_data( mc_years: t.Sequence[str], areas_names: t.Sequence[str], columns_names: t.Sequence[str], - ) -> t.Any: + ) -> t.Dict[str, t.Any]: """ Entry point to fetch data inside study. Args: diff --git a/antarest/study/web/raw_studies_blueprint.py b/antarest/study/web/raw_studies_blueprint.py index cc2228ec26..2d7ab7bf37 100644 --- a/antarest/study/web/raw_studies_blueprint.py +++ b/antarest/study/web/raw_studies_blueprint.py @@ -230,7 +230,7 @@ def aggregate_raw_data( areas_names: str = "", columns_names: str = "", current_user: JWTUser = Depends(auth.get_current_user), - ) -> t.Any: + ) -> t.Dict[str, t.Any]: """ Create an aggregation of raw data