Skip to content

Commit

Permalink
feat(aggregation-api): update following code review
Browse files Browse the repository at this point in the history
  • Loading branch information
mabw-rte committed Aug 22, 2024
1 parent 9d7fae1 commit 875045f
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 86 deletions.
19 changes: 19 additions & 0 deletions antarest/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,20 @@ def __str__(self) -> str:
return self.detail


class OutputSubFolderNotFound(HTTPException):
"""
Exception raised when an output sub folders do not exist
"""

def __init__(self, output_id: str, mc_root: str) -> None:
message = f"The output '{output_id}' sub-folder '{mc_root}' does not exist"
super().__init__(HTTPStatus.NOT_FOUND, message)

def __str__(self) -> str:
"""Return a string representation of the exception."""
return self.detail


class BadZipBinary(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, message)
Expand Down Expand Up @@ -446,6 +460,11 @@ def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message)


class MCRootNotHandled(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message)


class MatrixWidthMismatchError(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.UNPROCESSABLE_ENTITY, message)
Expand Down
199 changes: 113 additions & 86 deletions antarest/study/business/aggregator_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np
import pandas as pd

from antarest.core.exceptions import FileTooLargeError, InvalidFieldForVersionError, OutputNotFound
from antarest.core.exceptions import FileTooLargeError, MCRootNotHandled, OutputNotFound, OutputSubFolderNotFound
from antarest.study.storage.rawstudy.ini_reader import IniReader
from antarest.study.storage.rawstudy.model.filesystem.matrix.date_serializer import (
FactoryDateSerializer,
Expand Down Expand Up @@ -91,18 +91,13 @@ def _columns_ordering(df_cols: t.List[str], column_name: str, is_details: bool,
org_cols = [col for col in org_cols if col not in {column_name, MCYEAR_COL, TIME_COL}]
new_column_order = [column_name] + ([CLUSTER_ID_COL] if is_details else []) + [TIME_ID_COL, TIME_COL] + org_cols
else:
raise InvalidFieldForVersionError(f"Unknown Monte Carlo root: {mc_root}")
raise MCRootNotHandled(f"Unknown Monte Carlo root: {mc_root}")

return new_column_order


def _infer_production_column(cols: t.Sequence[str]) -> t.Optional[str]:
prod_col = None
for c in cols:
if PRODUCTION_COLUMN_REGEX in c.lower().strip():
prod_col = c
break
return prod_col
return next((c for c in cols if PRODUCTION_COLUMN_REGEX in c.lower().strip()), None)


def _infer_time_id(df: pd.DataFrame, is_details: bool) -> t.List[int]:
Expand All @@ -112,6 +107,19 @@ def _infer_time_id(df: pd.DataFrame, is_details: bool) -> t.List[int]:
return list(range(1, len(df) + 1))


def _filtered_files_listing(
folders_to_check: t.List[Path],
query_file: str,
frequency: str,
) -> t.Dict[str, t.MutableSequence[str]]:
filtered_files: t.Dict[str, t.MutableSequence[str]] = {}
for folder_path in folders_to_check:
for file in folder_path.iterdir():
if file.stem == f"{query_file}-{frequency}":
filtered_files.setdefault(folder_path.name, []).append(file.name)
return filtered_files


class AggregatorManager:
def __init__(
self,
Expand All @@ -123,13 +131,13 @@ def __init__(
columns_names: t.Sequence[str],
mc_years: t.Optional[t.Sequence[int]] = None,
):
self.study_path: Path = study_path
self.output_id: str = output_id
self.study_path = study_path
self.output_id = output_id
self.query_file = query_file
self.frequency: MatrixFrequency = frequency
self.mc_years: t.Optional[t.Sequence[int]] = mc_years
self.columns_names: t.Sequence[str] = columns_names
self.ids_to_consider: t.Sequence[str] = ids_to_consider
self.frequency = frequency
self.mc_years = mc_years
self.columns_names = columns_names
self.ids_to_consider = ids_to_consider
self.output_type = (
"areas"
if (isinstance(query_file, MCIndAreasQueryFile) or isinstance(query_file, MCAllAreasQueryFile))
Expand Down Expand Up @@ -191,56 +199,49 @@ def _filter_ids(self, folder_path: Path) -> t.List[str]:
return [link for link in links_ids if link in self.ids_to_consider]
return links_ids

def _gather_all_files_to_consider__ind(self) -> t.Sequence[Path]:
# Monte Carlo years filtering
all_mc_years = [d.name for d in self.mc_ind_path.iterdir()]
if self.mc_years:
all_mc_years = [year for year in all_mc_years if int(year) in self.mc_years]
if not all_mc_years:
return []

# Links / Areas ids filtering

# The list of areas and links is the same whatever the MC year under consideration:
# Therefore we choose the first year by default avoiding useless scanning directory operations.
first_mc_year = all_mc_years[0]
areas_or_links_ids = self._filter_ids(self.mc_ind_path / first_mc_year / self.output_type)

# Frequency and query file filtering
folders_to_check = [self.mc_ind_path / first_mc_year / self.output_type / id for id in areas_or_links_ids]
filtered_files: t.Dict[str, t.MutableSequence[str]] = {}
for folder_path in folders_to_check:
for file in folder_path.iterdir():
if file.stem == f"{self.query_file.value}-{self.frequency}":
filtered_files.setdefault(folder_path.name, []).append(file.name)

# Loop on MC years to return the whole list of files
all_output_files = [
self.mc_ind_path / mc_year / self.output_type / area_or_link / file
for mc_year in all_mc_years
for area_or_link, files in filtered_files.items()
for file in files
]
return all_output_files

def _gather_all_files_to_consider__all(self) -> t.Sequence[Path]:
# Links / Areas ids filtering
areas_or_links_ids = self._filter_ids(self.mc_all_path / self.output_type)

# Frequency and query file filtering
folders_to_check = [self.mc_all_path / self.output_type / id for id in areas_or_links_ids]
filtered_files: t.Dict[str, t.MutableSequence[str]] = {}
for folder_path in folders_to_check:
for file in folder_path.iterdir():
if file.stem == f"{self.query_file.value}-{self.frequency}":
filtered_files.setdefault(folder_path.name, []).append(file.name)

# Loop to return the whole list of files
all_output_files = [
self.mc_all_path / self.output_type / area_or_link / file
for area_or_link, files in filtered_files.items()
for file in files
]
def _gather_all_files_to_consider(self) -> t.Sequence[Path]:
if self.mc_root == MCRoot.MC_IND:
# Monte Carlo years filtering
all_mc_years = [d.name for d in self.mc_ind_path.iterdir()]
if self.mc_years:
all_mc_years = [year for year in all_mc_years if int(year) in self.mc_years]
if not all_mc_years:
return []

# Links / Areas ids filtering

# The list of areas and links is the same whatever the MC year under consideration:
# Therefore we choose the first year by default avoiding useless scanning directory operations.
first_mc_year = all_mc_years[0]
areas_or_links_ids = self._filter_ids(self.mc_ind_path / first_mc_year / self.output_type)

# Frequency and query file filtering
folders_to_check = [self.mc_ind_path / first_mc_year / self.output_type / id for id in areas_or_links_ids]
filtered_files = _filtered_files_listing(folders_to_check, self.query_file, self.frequency)

# Loop on MC years to return the whole list of files
all_output_files = [
self.mc_ind_path / mc_year / self.output_type / area_or_link / file
for mc_year in all_mc_years
for area_or_link, files in filtered_files.items()
for file in files
]
elif self.mc_root == MCRoot.MC_ALL:
# Links / Areas ids filtering
areas_or_links_ids = self._filter_ids(self.mc_all_path / self.output_type)

# Frequency and query file filtering
folders_to_check = [self.mc_all_path / self.output_type / id for id in areas_or_links_ids]
filtered_files = _filtered_files_listing(folders_to_check, self.query_file, self.frequency)

# Loop to return the whole list of files
all_output_files = [
self.mc_all_path / self.output_type / area_or_link / file
for area_or_link, files in filtered_files.items()
for file in files
]
else:
raise MCRootNotHandled(f"Unknown Monte Carlo root: {self.mc_root}")
return all_output_files

def columns_filtering(self, df: pd.DataFrame, is_details: bool) -> pd.DataFrame:
Expand All @@ -261,28 +262,53 @@ def columns_filtering(self, df: pd.DataFrame, is_details: bool) -> pd.DataFrame:
return df

def _process_df(self, file_path: Path, is_details: bool) -> pd.DataFrame:
"""
Process the output file to return a DataFrame with the correct columns and values
- In the case of a details file, the DataFrame, the columns include two parts cluster name + actual column name
- In other cases, the DataFrame, the columns include only the actual column name
Thus, the DataFrame is normalized to have the real columns names in both cases. And a new column is added to
for the details file to record the cluster id.
Args:
file_path: the file Path to extract the data Frame from
is_details: whether the file is a details file or not
Returns:
the DataFrame with the correct columns and values
"""

if is_details:
# extract the data frame from the file without processing the columns
un_normalized_df = self._parse_output_file(file_path, normalize_column_name=False)
# number of rows in the data frame
df_len = len(un_normalized_df)
cluster_dummy_product_cols = sorted(
set([(x[CLUSTER_ID_COMPONENT], x[DUMMY_COMPONENT]) for x in un_normalized_df.columns])
)
# actual columns without the cluster id (NODU, production etc.)
actual_cols = sorted(set(un_normalized_df.columns.map(lambda x: x[ACTUAL_COLUMN_COMPONENT])))
new_obj: t.Dict[str, t.Any] = {k: [] for k in actual_cols}
new_obj[CLUSTER_ID_COL] = []
new_obj[TIME_ID_COL] = []

# using a dictionary to build the new data frame with the base columns (NO2, production etc.)
# and the cluster id and time id
new_obj: t.Dict[str, t.Any] = {k: [] for k in [CLUSTER_ID_COL, TIME_ID_COL] + actual_cols}

# loop over the cluster id to extract the values of the actual columns
for cluster_id, dummy_component in cluster_dummy_product_cols:
for actual_col in actual_cols:
col_values = un_normalized_df[(cluster_id, actual_col, dummy_component)].tolist() # type: ignore
new_obj[actual_col] += col_values
new_obj[CLUSTER_ID_COL] += [cluster_id for _ in range(df_len)]
new_obj[TIME_ID_COL] += list(range(1, df_len + 1))

# check if there is a production column to rename it to `PRODUCTION_COLUMN_NAME`
prod_col = _infer_production_column(actual_cols)
if prod_col is not None:
new_obj[PRODUCTION_COLUMN_NAME] = new_obj.pop(prod_col)
actual_cols.remove(prod_col)

# reorganize the data frame
# first the production column if it exists
add_prod = [PRODUCTION_COLUMN_NAME] if prod_col is not None else []
columns_order = [CLUSTER_ID_COL, TIME_ID_COL] + add_prod + list(actual_cols)
df = pd.DataFrame(new_obj).reindex(columns=columns_order).sort_values(by=[TIME_ID_COL, CLUSTER_ID_COL])
Expand All @@ -291,11 +317,12 @@ def _process_df(self, file_path: Path, is_details: bool) -> pd.DataFrame:
return df

else:
# just extract the data frame from the file by just merging the columns components
return self._parse_output_file(file_path)

def _build_dataframe(self, files: t.Sequence[Path], horizon: int) -> pd.DataFrame:
if self.mc_root not in [MCRoot.MC_IND, MCRoot.MC_ALL]:
raise InvalidFieldForVersionError(f"Unknown Monte Carlo root: {self.mc_root}")
raise MCRootNotHandled(f"Unknown Monte Carlo root: {self.mc_root}")
is_details = self.query_file in [
MCIndAreasQueryFile.DETAILS,
MCAllAreasQueryFile.DETAILS,
Expand Down Expand Up @@ -352,32 +379,32 @@ def _build_dataframe(self, files: t.Sequence[Path], horizon: int) -> pd.DataFram

return final_df

def _check_mc_root_folder_exists(self) -> None:
if self.mc_root == MCRoot.MC_IND:
if not self.mc_ind_path.exists():
raise OutputSubFolderNotFound(self.output_id, f"economy/{MCRoot.MC_IND.value}")
elif self.mc_root == MCRoot.MC_ALL:
if not self.mc_all_path.exists():
raise OutputSubFolderNotFound(self.output_id, f"economy/{MCRoot.MC_ALL.value}")
else:
raise MCRootNotHandled(f"Unknown Monte Carlo root: {self.mc_root}")

def aggregate_output_data(self) -> pd.DataFrame:
"""
Aggregates the output data of a study and returns it as a DataFrame
"""

if self.mc_root == MCRoot.MC_IND:
# check that the `mc_years` is Sequence[int]
assert self.mc_years is not None, "mc_years should be a `Sequence` of integers"
output_folder = (self.mc_ind_path or self.mc_all_path).parent.parent

# Checks if mc-ind results exist
if not self.mc_ind_path.exists():
raise OutputNotFound(self.output_id)
# checks if the output folder exists
if not output_folder.exists():
raise OutputNotFound(self.output_id)

# filters files to consider
all_output_files = sorted(self._gather_all_files_to_consider__ind())
# checks if the mc root folder exists
self._check_mc_root_folder_exists()

elif self.mc_root == MCRoot.MC_ALL:
# Checks if mc-all results exist
if not self.mc_all_path.exists():
raise OutputNotFound(self.output_id)

# filters files to consider
all_output_files = sorted(self._gather_all_files_to_consider__all())

else:
raise InvalidFieldForVersionError(f"Unknown Monte Carlo root: {self.mc_root}")
# filters files to consider
all_output_files = sorted(self._gather_all_files_to_consider())

# Retrieves the horizon from the study output
horizon_path = self.study_path / HORIZON_TEMPLATE.format(sim_id=self.output_id)
Expand Down

0 comments on commit 875045f

Please sign in to comment.