From 875045f33d021561a478e54f0e2141ac736fe168 Mon Sep 17 00:00:00 2001 From: Mohamed Abdel Wedoud Date: Thu, 22 Aug 2024 20:56:56 +0200 Subject: [PATCH] feat(aggregation-api): update following code review --- antarest/core/exceptions.py | 19 ++ .../study/business/aggregator_management.py | 199 ++++++++++-------- 2 files changed, 132 insertions(+), 86 deletions(-) diff --git a/antarest/core/exceptions.py b/antarest/core/exceptions.py index 8358cc385d..adb38cd21b 100644 --- a/antarest/core/exceptions.py +++ b/antarest/core/exceptions.py @@ -392,6 +392,20 @@ def __str__(self) -> str: return self.detail +class OutputSubFolderNotFound(HTTPException): + """ + Exception raised when an output sub folders do not exist + """ + + def __init__(self, output_id: str, mc_root: str) -> None: + message = f"The output '{output_id}' sub-folder '{mc_root}' does not exist" + super().__init__(HTTPStatus.NOT_FOUND, message) + + def __str__(self) -> str: + """Return a string representation of the exception.""" + return self.detail + + class BadZipBinary(HTTPException): def __init__(self, message: str) -> None: super().__init__(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, message) @@ -446,6 +460,11 @@ def __init__(self, message: str) -> None: super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message) +class MCRootNotHandled(HTTPException): + def __init__(self, message: str) -> None: + super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message) + + class MatrixWidthMismatchError(HTTPException): def __init__(self, message: str) -> None: super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message) diff --git a/antarest/study/business/aggregator_management.py b/antarest/study/business/aggregator_management.py index ef836a034f..ac88b9d82b 100644 --- a/antarest/study/business/aggregator_management.py +++ b/antarest/study/business/aggregator_management.py @@ -6,7 +6,7 @@ import numpy as np import pandas as pd -from antarest.core.exceptions import FileTooLargeError, InvalidFieldForVersionError, OutputNotFound +from antarest.core.exceptions import FileTooLargeError, MCRootNotHandled, OutputNotFound, OutputSubFolderNotFound from antarest.study.storage.rawstudy.ini_reader import IniReader from antarest.study.storage.rawstudy.model.filesystem.matrix.date_serializer import ( FactoryDateSerializer, @@ -91,18 +91,13 @@ def _columns_ordering(df_cols: t.List[str], column_name: str, is_details: bool, org_cols = [col for col in org_cols if col not in {column_name, MCYEAR_COL, TIME_COL}] new_column_order = [column_name] + ([CLUSTER_ID_COL] if is_details else []) + [TIME_ID_COL, TIME_COL] + org_cols else: - raise InvalidFieldForVersionError(f"Unknown Monte Carlo root: {mc_root}") + raise MCRootNotHandled(f"Unknown Monte Carlo root: {mc_root}") return new_column_order def _infer_production_column(cols: t.Sequence[str]) -> t.Optional[str]: - prod_col = None - for c in cols: - if PRODUCTION_COLUMN_REGEX in c.lower().strip(): - prod_col = c - break - return prod_col + return next((c for c in cols if PRODUCTION_COLUMN_REGEX in c.lower().strip()), None) def _infer_time_id(df: pd.DataFrame, is_details: bool) -> t.List[int]: @@ -112,6 +107,19 @@ def _infer_time_id(df: pd.DataFrame, is_details: bool) -> t.List[int]: return list(range(1, len(df) + 1)) +def _filtered_files_listing( + folders_to_check: t.List[Path], + query_file: str, + frequency: str, +) -> t.Dict[str, t.MutableSequence[str]]: + filtered_files: t.Dict[str, t.MutableSequence[str]] = {} + for folder_path in folders_to_check: + for file in folder_path.iterdir(): + if file.stem == f"{query_file}-{frequency}": + filtered_files.setdefault(folder_path.name, []).append(file.name) + return filtered_files + + class AggregatorManager: def __init__( self, @@ -123,13 +131,13 @@ def __init__( columns_names: t.Sequence[str], mc_years: t.Optional[t.Sequence[int]] = None, ): - self.study_path: Path = study_path - self.output_id: str = output_id + self.study_path = study_path + self.output_id = output_id self.query_file = query_file - self.frequency: MatrixFrequency = frequency - self.mc_years: t.Optional[t.Sequence[int]] = mc_years - self.columns_names: t.Sequence[str] = columns_names - self.ids_to_consider: t.Sequence[str] = ids_to_consider + self.frequency = frequency + self.mc_years = mc_years + self.columns_names = columns_names + self.ids_to_consider = ids_to_consider self.output_type = ( "areas" if (isinstance(query_file, MCIndAreasQueryFile) or isinstance(query_file, MCAllAreasQueryFile)) @@ -191,56 +199,49 @@ def _filter_ids(self, folder_path: Path) -> t.List[str]: return [link for link in links_ids if link in self.ids_to_consider] return links_ids - def _gather_all_files_to_consider__ind(self) -> t.Sequence[Path]: - # Monte Carlo years filtering - all_mc_years = [d.name for d in self.mc_ind_path.iterdir()] - if self.mc_years: - all_mc_years = [year for year in all_mc_years if int(year) in self.mc_years] - if not all_mc_years: - return [] - - # Links / Areas ids filtering - - # The list of areas and links is the same whatever the MC year under consideration: - # Therefore we choose the first year by default avoiding useless scanning directory operations. - first_mc_year = all_mc_years[0] - areas_or_links_ids = self._filter_ids(self.mc_ind_path / first_mc_year / self.output_type) - - # Frequency and query file filtering - folders_to_check = [self.mc_ind_path / first_mc_year / self.output_type / id for id in areas_or_links_ids] - filtered_files: t.Dict[str, t.MutableSequence[str]] = {} - for folder_path in folders_to_check: - for file in folder_path.iterdir(): - if file.stem == f"{self.query_file.value}-{self.frequency}": - filtered_files.setdefault(folder_path.name, []).append(file.name) - - # Loop on MC years to return the whole list of files - all_output_files = [ - self.mc_ind_path / mc_year / self.output_type / area_or_link / file - for mc_year in all_mc_years - for area_or_link, files in filtered_files.items() - for file in files - ] - return all_output_files - - def _gather_all_files_to_consider__all(self) -> t.Sequence[Path]: - # Links / Areas ids filtering - areas_or_links_ids = self._filter_ids(self.mc_all_path / self.output_type) - - # Frequency and query file filtering - folders_to_check = [self.mc_all_path / self.output_type / id for id in areas_or_links_ids] - filtered_files: t.Dict[str, t.MutableSequence[str]] = {} - for folder_path in folders_to_check: - for file in folder_path.iterdir(): - if file.stem == f"{self.query_file.value}-{self.frequency}": - filtered_files.setdefault(folder_path.name, []).append(file.name) - - # Loop to return the whole list of files - all_output_files = [ - self.mc_all_path / self.output_type / area_or_link / file - for area_or_link, files in filtered_files.items() - for file in files - ] + def _gather_all_files_to_consider(self) -> t.Sequence[Path]: + if self.mc_root == MCRoot.MC_IND: + # Monte Carlo years filtering + all_mc_years = [d.name for d in self.mc_ind_path.iterdir()] + if self.mc_years: + all_mc_years = [year for year in all_mc_years if int(year) in self.mc_years] + if not all_mc_years: + return [] + + # Links / Areas ids filtering + + # The list of areas and links is the same whatever the MC year under consideration: + # Therefore we choose the first year by default avoiding useless scanning directory operations. + first_mc_year = all_mc_years[0] + areas_or_links_ids = self._filter_ids(self.mc_ind_path / first_mc_year / self.output_type) + + # Frequency and query file filtering + folders_to_check = [self.mc_ind_path / first_mc_year / self.output_type / id for id in areas_or_links_ids] + filtered_files = _filtered_files_listing(folders_to_check, self.query_file, self.frequency) + + # Loop on MC years to return the whole list of files + all_output_files = [ + self.mc_ind_path / mc_year / self.output_type / area_or_link / file + for mc_year in all_mc_years + for area_or_link, files in filtered_files.items() + for file in files + ] + elif self.mc_root == MCRoot.MC_ALL: + # Links / Areas ids filtering + areas_or_links_ids = self._filter_ids(self.mc_all_path / self.output_type) + + # Frequency and query file filtering + folders_to_check = [self.mc_all_path / self.output_type / id for id in areas_or_links_ids] + filtered_files = _filtered_files_listing(folders_to_check, self.query_file, self.frequency) + + # Loop to return the whole list of files + all_output_files = [ + self.mc_all_path / self.output_type / area_or_link / file + for area_or_link, files in filtered_files.items() + for file in files + ] + else: + raise MCRootNotHandled(f"Unknown Monte Carlo root: {self.mc_root}") return all_output_files def columns_filtering(self, df: pd.DataFrame, is_details: bool) -> pd.DataFrame: @@ -261,16 +262,38 @@ def columns_filtering(self, df: pd.DataFrame, is_details: bool) -> pd.DataFrame: return df def _process_df(self, file_path: Path, is_details: bool) -> pd.DataFrame: + """ + Process the output file to return a DataFrame with the correct columns and values + - In the case of a details file, the DataFrame, the columns include two parts cluster name + actual column name + - In other cases, the DataFrame, the columns include only the actual column name + + Thus, the DataFrame is normalized to have the real columns names in both cases. And a new column is added to + for the details file to record the cluster id. + + Args: + file_path: the file Path to extract the data Frame from + is_details: whether the file is a details file or not + + Returns: + the DataFrame with the correct columns and values + """ + if is_details: + # extract the data frame from the file without processing the columns un_normalized_df = self._parse_output_file(file_path, normalize_column_name=False) + # number of rows in the data frame df_len = len(un_normalized_df) cluster_dummy_product_cols = sorted( set([(x[CLUSTER_ID_COMPONENT], x[DUMMY_COMPONENT]) for x in un_normalized_df.columns]) ) + # actual columns without the cluster id (NODU, production etc.) actual_cols = sorted(set(un_normalized_df.columns.map(lambda x: x[ACTUAL_COLUMN_COMPONENT]))) - new_obj: t.Dict[str, t.Any] = {k: [] for k in actual_cols} - new_obj[CLUSTER_ID_COL] = [] - new_obj[TIME_ID_COL] = [] + + # using a dictionary to build the new data frame with the base columns (NO2, production etc.) + # and the cluster id and time id + new_obj: t.Dict[str, t.Any] = {k: [] for k in [CLUSTER_ID_COL, TIME_ID_COL] + actual_cols} + + # loop over the cluster id to extract the values of the actual columns for cluster_id, dummy_component in cluster_dummy_product_cols: for actual_col in actual_cols: col_values = un_normalized_df[(cluster_id, actual_col, dummy_component)].tolist() # type: ignore @@ -278,11 +301,14 @@ def _process_df(self, file_path: Path, is_details: bool) -> pd.DataFrame: new_obj[CLUSTER_ID_COL] += [cluster_id for _ in range(df_len)] new_obj[TIME_ID_COL] += list(range(1, df_len + 1)) + # check if there is a production column to rename it to `PRODUCTION_COLUMN_NAME` prod_col = _infer_production_column(actual_cols) if prod_col is not None: new_obj[PRODUCTION_COLUMN_NAME] = new_obj.pop(prod_col) actual_cols.remove(prod_col) + # reorganize the data frame + # first the production column if it exists add_prod = [PRODUCTION_COLUMN_NAME] if prod_col is not None else [] columns_order = [CLUSTER_ID_COL, TIME_ID_COL] + add_prod + list(actual_cols) df = pd.DataFrame(new_obj).reindex(columns=columns_order).sort_values(by=[TIME_ID_COL, CLUSTER_ID_COL]) @@ -291,11 +317,12 @@ def _process_df(self, file_path: Path, is_details: bool) -> pd.DataFrame: return df else: + # just extract the data frame from the file by just merging the columns components return self._parse_output_file(file_path) def _build_dataframe(self, files: t.Sequence[Path], horizon: int) -> pd.DataFrame: if self.mc_root not in [MCRoot.MC_IND, MCRoot.MC_ALL]: - raise InvalidFieldForVersionError(f"Unknown Monte Carlo root: {self.mc_root}") + raise MCRootNotHandled(f"Unknown Monte Carlo root: {self.mc_root}") is_details = self.query_file in [ MCIndAreasQueryFile.DETAILS, MCAllAreasQueryFile.DETAILS, @@ -352,32 +379,32 @@ def _build_dataframe(self, files: t.Sequence[Path], horizon: int) -> pd.DataFram return final_df + def _check_mc_root_folder_exists(self) -> None: + if self.mc_root == MCRoot.MC_IND: + if not self.mc_ind_path.exists(): + raise OutputSubFolderNotFound(self.output_id, f"economy/{MCRoot.MC_IND.value}") + elif self.mc_root == MCRoot.MC_ALL: + if not self.mc_all_path.exists(): + raise OutputSubFolderNotFound(self.output_id, f"economy/{MCRoot.MC_ALL.value}") + else: + raise MCRootNotHandled(f"Unknown Monte Carlo root: {self.mc_root}") + def aggregate_output_data(self) -> pd.DataFrame: """ Aggregates the output data of a study and returns it as a DataFrame """ - if self.mc_root == MCRoot.MC_IND: - # check that the `mc_years` is Sequence[int] - assert self.mc_years is not None, "mc_years should be a `Sequence` of integers" + output_folder = (self.mc_ind_path or self.mc_all_path).parent.parent - # Checks if mc-ind results exist - if not self.mc_ind_path.exists(): - raise OutputNotFound(self.output_id) + # checks if the output folder exists + if not output_folder.exists(): + raise OutputNotFound(self.output_id) - # filters files to consider - all_output_files = sorted(self._gather_all_files_to_consider__ind()) + # checks if the mc root folder exists + self._check_mc_root_folder_exists() - elif self.mc_root == MCRoot.MC_ALL: - # Checks if mc-all results exist - if not self.mc_all_path.exists(): - raise OutputNotFound(self.output_id) - - # filters files to consider - all_output_files = sorted(self._gather_all_files_to_consider__all()) - - else: - raise InvalidFieldForVersionError(f"Unknown Monte Carlo root: {self.mc_root}") + # filters files to consider + all_output_files = sorted(self._gather_all_files_to_consider()) # Retrieves the horizon from the study output horizon_path = self.study_path / HORIZON_TEMPLATE.format(sim_id=self.output_id)