Skip to content

Commit

Permalink
fix(synthesis): prevent 500 error during study synthesis parsing (#2011)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurent-laporte-pro authored and skamril committed Apr 19, 2024
1 parent 0c47610 commit 1a211fd
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import typing as t

_ALL_FILTERING = ["hourly", "daily", "weekly", "monthly", "annual"]


def extract_filtering(v: t.Any) -> t.Sequence[str]:
"""
Extract filtering values from a comma-separated list of values.
"""

if v is None:
values = set()
elif isinstance(v, str):
values = {x.strip() for x in v.lower().split(",")} if v else set()
elif isinstance(v, (list, tuple)):
values = set(x.strip().lower() for x in v)
else:
raise TypeError(f"Invalid type for filtering: {type(v)!r}")

try:
return sorted(values, key=lambda x: _ALL_FILTERING.index(x))
except ValueError as e:
raise ValueError(f"Invalid value for filtering: {e!s}") from None


def validate_filtering(v: t.Any) -> str:
"""
Validate the filtering field and convert it to a comma separated string.
"""

return ", ".join(extract_filtering(v))


# noinspection SpellCheckingInspection
def validate_colors(values: t.MutableMapping[str, t.Any]) -> t.Mapping[str, t.Any]:
"""
Validate ``color_rgb``, ``color_r``, ``color_g``, ``color_b`` and convert them to ``color_rgb``.
"""

def _pop_any(dictionary: t.MutableMapping[str, t.Any], *keys: str) -> t.Any:
"""Save as `pop` but for multiple keys. Return the first found value."""
return next((dictionary.pop(key, None) for key in keys if key in dictionary), None)

color_r = _pop_any(values, "color_r", "colorr")
color_g = _pop_any(values, "color_g", "colorg")
color_b = _pop_any(values, "color_b", "colorb")
if color_r is not None and color_g is not None and color_b is not None:
values["color_rgb"] = color_r, color_g, color_b
return values


def validate_color_rgb(v: t.Any) -> str:
"""
Validate RGB color field and convert it to color code.
Accepts:
- a string in the format "#RRGGBB"
- a string in the format "rgb(R, G, B)"
- a string in the format "R, G, B"
- a list or tuple of 3 integers
"""

if isinstance(v, str):
if v.startswith("#"):
r = int(v[1:3], 16)
g = int(v[3:5], 16)
b = int(v[5:7], 16)
elif v.startswith("rgb("):
r, g, b = [int(c) for c in v[4:-1].split(",")]
else:
r, g, b = [int(c) for c in v.split(",")]
elif isinstance(v, (list, tuple)):
r, g, b = map(int, v)
else:
raise TypeError(f"Invalid type for 'color_rgb': {type(v)}")

return f"#{r:02X}{g:02X}{b:02X}"
118 changes: 75 additions & 43 deletions antarest/study/storage/rawstudy/model/filesystem/config/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
SimulationParsingError,
XpansionParsingError,
)
from antarest.study.storage.rawstudy.model.filesystem.config.field_validators import extract_filtering
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
Area,
DistrictSet,
Expand Down Expand Up @@ -83,6 +84,48 @@ def build(study_path: Path, study_id: str, output_path: t.Optional[Path] = None)
)


def _extract_text_from_zip(root: Path, posix_path: str) -> t.Sequence[str]:
"""
Extracts text from a file inside a ZIP archive and returns it as a list of lines.
Args:
root: The path to the ZIP archive.
posix_path: The relative path to the file inside the ZIP archive.
Returns:
A list of lines in the file. If the file is not found, an empty list is returned.
"""
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
text = f.read().decode("utf-8")
return text.splitlines(keepends=False)
except KeyError:
return []


def _extract_ini_from_zip(root: Path, posix_path: str, multi_ini_keys: t.Sequence[str] = ()) -> t.Mapping[str, t.Any]:
"""
Extracts data from an INI file inside a ZIP archive and returns it as a dictionary.
Args:
root: The path to the ZIP archive.
posix_path: The relative path to the file inside the ZIP archive.
multi_ini_keys: List of keys to use for multi INI files.
Returns:
A dictionary of keys/values in the INI file. If the file is not found, an empty dictionary is returned.
"""
reader = IniReader(multi_ini_keys)
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
buffer = io.StringIO(f.read().decode("utf-8"))
return reader.read(buffer)
except KeyError:
return {}


def _extract_data_from_file(
root: Path,
inside_root_path: Path,
Expand Down Expand Up @@ -110,14 +153,7 @@ def _extract_data_from_file(
if file_type == FileType.TXT:
# Parse the file as a list of lines, return an empty list if missing.
if is_zip_file:
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
text = f.read().decode("utf-8")
return text.splitlines(keepends=False)
except KeyError:
# File not found in the ZIP archive
return []
return _extract_text_from_zip(root, posix_path)
else:
output_data_path = root / inside_root_path
try:
Expand All @@ -127,19 +163,12 @@ def _extract_data_from_file(

elif file_type in {FileType.MULTI_INI, FileType.SIMPLE_INI}:
# Parse the file as a dictionary of keys/values, return an empty dictionary if missing.
reader = IniReader(multi_ini_keys)
if is_zip_file:
with zipfile.ZipFile(root) as zf:
try:
with zf.open(posix_path) as f:
buffer = io.StringIO(f.read().decode("utf-8"))
return reader.read(buffer)
except KeyError:
# File not found in the ZIP archive
return {}
return _extract_ini_from_zip(root, posix_path, multi_ini_keys=multi_ini_keys)
else:
output_data_path = root / inside_root_path
try:
reader = IniReader(multi_ini_keys)
return reader.read(output_data_path)
except FileNotFoundError:
return {}
Expand Down Expand Up @@ -294,7 +323,7 @@ def _parse_xpansion_version(path: Path) -> str:
raise XpansionParsingError(xpansion_json, f"key '{exc}' not found in JSON object") from exc


_regex_eco_adq = re.compile("^([0-9]{8}-[0-9]{4})(eco|adq)-?(.*)")
_regex_eco_adq = re.compile(r"^(\d{8}-\d{4})(eco|adq)-?(.*)")
match_eco_adq = _regex_eco_adq.match


Expand Down Expand Up @@ -359,14 +388,36 @@ def get_playlist(config: JSON) -> t.Optional[t.Dict[int, float]]:


def parse_area(root: Path, area: str) -> "Area":
"""
Parse an area configuration and extract its filtering configuration.
Args:
root: The root directory of the study.
area: The name of the area to parse.
Returns:
The area configuration.
"""
area_id = transform_name_to_id(area)

# Parse the optimization INI file to extract the filtering configuration.
# The file is optional, so we use a default value to avoid a parsing error.
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area_id}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filtering = optimization.get("filtering", {})
filter_synthesis = extract_filtering(filtering.get("filter-synthesis", ""))
filter_year_by_year = extract_filtering(filtering.get("filter-year-by-year", ""))

return Area(
name=area,
links=_parse_links(root, area_id),
links=_parse_links_filtering(root, area_id),
thermals=_parse_thermal(root, area_id),
renewables=_parse_renewables(root, area_id),
filters_synthesis=_parse_filters_synthesis(root, area_id),
filters_year=_parse_filters_year(root, area_id),
filters_synthesis=filter_synthesis,
filters_year=filter_year_by_year,
st_storages=_parse_st_storage(root, area_id),
)

Expand Down Expand Up @@ -444,33 +495,14 @@ def _parse_st_storage(root: Path, area: str) -> t.List[STStorageConfigType]:
return config_list


def _parse_links(root: Path, area: str) -> t.Dict[str, Link]:
def _parse_links_filtering(root: Path, area: str) -> t.Dict[str, Link]:
properties_ini = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/links/{area}/properties.ini"),
file_type=FileType.SIMPLE_INI,
)
return {link: Link.from_json(properties_ini[link]) for link in list(properties_ini.keys())}


def _parse_filters_synthesis(root: Path, area: str) -> t.List[str]:
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filters: str = optimization["filtering"]["filter-synthesis"]
return Link.split(filters)


def _parse_filters_year(root: Path, area: str) -> t.List[str]:
optimization = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
file_type=FileType.SIMPLE_INI,
)
filters: str = optimization["filtering"]["filter-year-by-year"]
return Link.split(filters)
links_by_ids = {link_id: Link(**obj) for link_id, obj in properties_ini.items()}
return links_by_ids


def _check_build_on_solver_tests(test_dir: Path) -> None:
Expand Down
Loading

0 comments on commit 1a211fd

Please sign in to comment.