From 1a211fde88463131ea158202029af852a7e0cbe1 Mon Sep 17 00:00:00 2001
From: Laurent LAPORTE <43534797+laurent-laporte-pro@users.noreply.github.com>
Date: Fri, 19 Apr 2024 16:12:56 +0200
Subject: [PATCH] fix(synthesis): prevent 500 error during study synthesis
parsing (#2011)
---
.../filesystem/config/field_validators.py | 77 +++++++++++
.../rawstudy/model/filesystem/config/files.py | 118 ++++++++++-------
.../rawstudy/model/filesystem/config/model.py | 114 +++++++++--------
docs/CHANGELOG.md | 1 +
.../filesystem/config/test_config_files.py | 120 ++++++++++++------
5 files changed, 291 insertions(+), 139 deletions(-)
create mode 100644 antarest/study/storage/rawstudy/model/filesystem/config/field_validators.py
diff --git a/antarest/study/storage/rawstudy/model/filesystem/config/field_validators.py b/antarest/study/storage/rawstudy/model/filesystem/config/field_validators.py
new file mode 100644
index 0000000000..74f93f5c46
--- /dev/null
+++ b/antarest/study/storage/rawstudy/model/filesystem/config/field_validators.py
@@ -0,0 +1,77 @@
+import typing as t
+
+_ALL_FILTERING = ["hourly", "daily", "weekly", "monthly", "annual"]
+
+
+def extract_filtering(v: t.Any) -> t.Sequence[str]:
+ """
+ Extract filtering values from a comma-separated list of values.
+ """
+
+ if v is None:
+ values = set()
+ elif isinstance(v, str):
+ values = {x.strip() for x in v.lower().split(",")} if v else set()
+ elif isinstance(v, (list, tuple)):
+ values = set(x.strip().lower() for x in v)
+ else:
+ raise TypeError(f"Invalid type for filtering: {type(v)!r}")
+
+ try:
+ return sorted(values, key=lambda x: _ALL_FILTERING.index(x))
+ except ValueError as e:
+ raise ValueError(f"Invalid value for filtering: {e!s}") from None
+
+
+def validate_filtering(v: t.Any) -> str:
+ """
+ Validate the filtering field and convert it to a comma separated string.
+ """
+
+ return ", ".join(extract_filtering(v))
+
+
+# noinspection SpellCheckingInspection
+def validate_colors(values: t.MutableMapping[str, t.Any]) -> t.Mapping[str, t.Any]:
+ """
+ Validate ``color_rgb``, ``color_r``, ``color_g``, ``color_b`` and convert them to ``color_rgb``.
+ """
+
+ def _pop_any(dictionary: t.MutableMapping[str, t.Any], *keys: str) -> t.Any:
+ """Save as `pop` but for multiple keys. Return the first found value."""
+ return next((dictionary.pop(key, None) for key in keys if key in dictionary), None)
+
+ color_r = _pop_any(values, "color_r", "colorr")
+ color_g = _pop_any(values, "color_g", "colorg")
+ color_b = _pop_any(values, "color_b", "colorb")
+ if color_r is not None and color_g is not None and color_b is not None:
+ values["color_rgb"] = color_r, color_g, color_b
+ return values
+
+
+def validate_color_rgb(v: t.Any) -> str:
+ """
+ Validate RGB color field and convert it to color code.
+
+ Accepts:
+ - a string in the format "#RRGGBB"
+ - a string in the format "rgb(R, G, B)"
+ - a string in the format "R, G, B"
+ - a list or tuple of 3 integers
+ """
+
+ if isinstance(v, str):
+ if v.startswith("#"):
+ r = int(v[1:3], 16)
+ g = int(v[3:5], 16)
+ b = int(v[5:7], 16)
+ elif v.startswith("rgb("):
+ r, g, b = [int(c) for c in v[4:-1].split(",")]
+ else:
+ r, g, b = [int(c) for c in v.split(",")]
+ elif isinstance(v, (list, tuple)):
+ r, g, b = map(int, v)
+ else:
+ raise TypeError(f"Invalid type for 'color_rgb': {type(v)}")
+
+ return f"#{r:02X}{g:02X}{b:02X}"
diff --git a/antarest/study/storage/rawstudy/model/filesystem/config/files.py b/antarest/study/storage/rawstudy/model/filesystem/config/files.py
index 3248b6560a..cafc901644 100644
--- a/antarest/study/storage/rawstudy/model/filesystem/config/files.py
+++ b/antarest/study/storage/rawstudy/model/filesystem/config/files.py
@@ -18,6 +18,7 @@
SimulationParsingError,
XpansionParsingError,
)
+from antarest.study.storage.rawstudy.model.filesystem.config.field_validators import extract_filtering
from antarest.study.storage.rawstudy.model.filesystem.config.model import (
Area,
DistrictSet,
@@ -83,6 +84,48 @@ def build(study_path: Path, study_id: str, output_path: t.Optional[Path] = None)
)
+def _extract_text_from_zip(root: Path, posix_path: str) -> t.Sequence[str]:
+ """
+ Extracts text from a file inside a ZIP archive and returns it as a list of lines.
+
+ Args:
+ root: The path to the ZIP archive.
+ posix_path: The relative path to the file inside the ZIP archive.
+
+ Returns:
+ A list of lines in the file. If the file is not found, an empty list is returned.
+ """
+ with zipfile.ZipFile(root) as zf:
+ try:
+ with zf.open(posix_path) as f:
+ text = f.read().decode("utf-8")
+ return text.splitlines(keepends=False)
+ except KeyError:
+ return []
+
+
+def _extract_ini_from_zip(root: Path, posix_path: str, multi_ini_keys: t.Sequence[str] = ()) -> t.Mapping[str, t.Any]:
+ """
+ Extracts data from an INI file inside a ZIP archive and returns it as a dictionary.
+
+ Args:
+ root: The path to the ZIP archive.
+ posix_path: The relative path to the file inside the ZIP archive.
+ multi_ini_keys: List of keys to use for multi INI files.
+
+ Returns:
+ A dictionary of keys/values in the INI file. If the file is not found, an empty dictionary is returned.
+ """
+ reader = IniReader(multi_ini_keys)
+ with zipfile.ZipFile(root) as zf:
+ try:
+ with zf.open(posix_path) as f:
+ buffer = io.StringIO(f.read().decode("utf-8"))
+ return reader.read(buffer)
+ except KeyError:
+ return {}
+
+
def _extract_data_from_file(
root: Path,
inside_root_path: Path,
@@ -110,14 +153,7 @@ def _extract_data_from_file(
if file_type == FileType.TXT:
# Parse the file as a list of lines, return an empty list if missing.
if is_zip_file:
- with zipfile.ZipFile(root) as zf:
- try:
- with zf.open(posix_path) as f:
- text = f.read().decode("utf-8")
- return text.splitlines(keepends=False)
- except KeyError:
- # File not found in the ZIP archive
- return []
+ return _extract_text_from_zip(root, posix_path)
else:
output_data_path = root / inside_root_path
try:
@@ -127,19 +163,12 @@ def _extract_data_from_file(
elif file_type in {FileType.MULTI_INI, FileType.SIMPLE_INI}:
# Parse the file as a dictionary of keys/values, return an empty dictionary if missing.
- reader = IniReader(multi_ini_keys)
if is_zip_file:
- with zipfile.ZipFile(root) as zf:
- try:
- with zf.open(posix_path) as f:
- buffer = io.StringIO(f.read().decode("utf-8"))
- return reader.read(buffer)
- except KeyError:
- # File not found in the ZIP archive
- return {}
+ return _extract_ini_from_zip(root, posix_path, multi_ini_keys=multi_ini_keys)
else:
output_data_path = root / inside_root_path
try:
+ reader = IniReader(multi_ini_keys)
return reader.read(output_data_path)
except FileNotFoundError:
return {}
@@ -294,7 +323,7 @@ def _parse_xpansion_version(path: Path) -> str:
raise XpansionParsingError(xpansion_json, f"key '{exc}' not found in JSON object") from exc
-_regex_eco_adq = re.compile("^([0-9]{8}-[0-9]{4})(eco|adq)-?(.*)")
+_regex_eco_adq = re.compile(r"^(\d{8}-\d{4})(eco|adq)-?(.*)")
match_eco_adq = _regex_eco_adq.match
@@ -359,14 +388,36 @@ def get_playlist(config: JSON) -> t.Optional[t.Dict[int, float]]:
def parse_area(root: Path, area: str) -> "Area":
+ """
+ Parse an area configuration and extract its filtering configuration.
+
+ Args:
+ root: The root directory of the study.
+ area: The name of the area to parse.
+
+ Returns:
+ The area configuration.
+ """
area_id = transform_name_to_id(area)
+
+ # Parse the optimization INI file to extract the filtering configuration.
+ # The file is optional, so we use a default value to avoid a parsing error.
+ optimization = _extract_data_from_file(
+ root=root,
+ inside_root_path=Path(f"input/areas/{area_id}/optimization.ini"),
+ file_type=FileType.SIMPLE_INI,
+ )
+ filtering = optimization.get("filtering", {})
+ filter_synthesis = extract_filtering(filtering.get("filter-synthesis", ""))
+ filter_year_by_year = extract_filtering(filtering.get("filter-year-by-year", ""))
+
return Area(
name=area,
- links=_parse_links(root, area_id),
+ links=_parse_links_filtering(root, area_id),
thermals=_parse_thermal(root, area_id),
renewables=_parse_renewables(root, area_id),
- filters_synthesis=_parse_filters_synthesis(root, area_id),
- filters_year=_parse_filters_year(root, area_id),
+ filters_synthesis=filter_synthesis,
+ filters_year=filter_year_by_year,
st_storages=_parse_st_storage(root, area_id),
)
@@ -444,33 +495,14 @@ def _parse_st_storage(root: Path, area: str) -> t.List[STStorageConfigType]:
return config_list
-def _parse_links(root: Path, area: str) -> t.Dict[str, Link]:
+def _parse_links_filtering(root: Path, area: str) -> t.Dict[str, Link]:
properties_ini = _extract_data_from_file(
root=root,
inside_root_path=Path(f"input/links/{area}/properties.ini"),
file_type=FileType.SIMPLE_INI,
)
- return {link: Link.from_json(properties_ini[link]) for link in list(properties_ini.keys())}
-
-
-def _parse_filters_synthesis(root: Path, area: str) -> t.List[str]:
- optimization = _extract_data_from_file(
- root=root,
- inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
- file_type=FileType.SIMPLE_INI,
- )
- filters: str = optimization["filtering"]["filter-synthesis"]
- return Link.split(filters)
-
-
-def _parse_filters_year(root: Path, area: str) -> t.List[str]:
- optimization = _extract_data_from_file(
- root=root,
- inside_root_path=Path(f"input/areas/{area}/optimization.ini"),
- file_type=FileType.SIMPLE_INI,
- )
- filters: str = optimization["filtering"]["filter-year-by-year"]
- return Link.split(filters)
+ links_by_ids = {link_id: Link(**obj) for link_id, obj in properties_ini.items()}
+ return links_by_ids
def _check_build_on_solver_tests(test_dir: Path) -> None:
diff --git a/antarest/study/storage/rawstudy/model/filesystem/config/model.py b/antarest/study/storage/rawstudy/model/filesystem/config/model.py
index 79400d8165..18e9702571 100644
--- a/antarest/study/storage/rawstudy/model/filesystem/config/model.py
+++ b/antarest/study/storage/rawstudy/model/filesystem/config/model.py
@@ -1,15 +1,15 @@
import re
+import typing as t
from enum import Enum
from pathlib import Path
-from typing import Dict, List, Optional
-from pydantic import Extra
+from pydantic import Field, root_validator
from pydantic.main import BaseModel
-from antarest.core.model import JSON
from antarest.core.utils.utils import DTO
from .binding_constraint import BindingConstraintDTO
+from .field_validators import extract_filtering
from .renewable import RenewableConfigType
from .st_storage import STStorageConfigType
from .thermal import ThermalConfigType
@@ -20,42 +20,44 @@ class ENR_MODELLING(Enum):
CLUSTERS = "clusters"
-class Link(BaseModel):
+class Link(BaseModel, extra="ignore"):
"""
Object linked to /input/links//properties.ini information
- """
- filters_synthesis: List[str]
- filters_year: List[str]
+ Attributes:
+ filters_synthesis: list of filters for synthesis data
+ filters_year: list of filters for year-by-year data
- @staticmethod
- def from_json(properties: JSON) -> "Link":
- return Link(
- filters_year=Link.split(properties["filter-year-by-year"]),
- filters_synthesis=Link.split(properties["filter-synthesis"]),
- )
+ Notes:
+ Ignore extra fields, because we only need `filter-synthesis` and `filter-year-by-year`.
+ """
- @staticmethod
- def split(line: str) -> List[str]:
- return [token.strip() for token in line.split(",") if token.strip() != ""]
+ filters_synthesis: t.List[str] = Field(default_factory=list)
+ filters_year: t.List[str] = Field(default_factory=list)
+
+ @root_validator(pre=True)
+ def validation(cls, values: t.MutableMapping[str, t.Any]) -> t.MutableMapping[str, t.Any]:
+ # note: field names are in kebab-case in the INI file
+ filters_synthesis = values.pop("filter-synthesis", values.pop("filters_synthesis", ""))
+ filters_year = values.pop("filter-year-by-year", values.pop("filters_year", ""))
+ values["filters_synthesis"] = extract_filtering(filters_synthesis)
+ values["filters_year"] = extract_filtering(filters_year)
+ return values
-class Area(BaseModel):
+class Area(BaseModel, extra="forbid"):
"""
Object linked to /input//optimization.ini information
"""
- class Config:
- extra = Extra.forbid
-
name: str
- links: Dict[str, Link]
- thermals: List[ThermalConfigType]
- renewables: List[RenewableConfigType]
- filters_synthesis: List[str]
- filters_year: List[str]
+ links: t.Dict[str, Link]
+ thermals: t.List[ThermalConfigType]
+ renewables: t.List[RenewableConfigType]
+ filters_synthesis: t.List[str]
+ filters_year: t.List[str]
# since v8.6
- st_storages: List[STStorageConfigType] = []
+ st_storages: t.List[STStorageConfigType] = []
class DistrictSet(BaseModel):
@@ -64,14 +66,14 @@ class DistrictSet(BaseModel):
"""
ALL = ["hourly", "daily", "weekly", "monthly", "annual"]
- name: Optional[str] = None
+ name: t.Optional[str] = None
inverted_set: bool = False
- areas: Optional[List[str]] = None
+ areas: t.Optional[t.List[str]] = None
output: bool = True
- filters_synthesis: List[str] = ALL
- filters_year: List[str] = ALL
+ filters_synthesis: t.List[str] = ALL
+ filters_year: t.List[str] = ALL
- def get_areas(self, all_areas: List[str]) -> List[str]:
+ def get_areas(self, all_areas: t.List[str]) -> t.List[str]:
if self.inverted_set:
return list(set(all_areas).difference(set(self.areas or [])))
return self.areas or []
@@ -89,7 +91,7 @@ class Simulation(BaseModel):
synthesis: bool
by_year: bool
error: bool
- playlist: Optional[List[int]]
+ playlist: t.Optional[t.List[int]]
archived: bool = False
xpansion: str
@@ -110,16 +112,16 @@ def __init__(
path: Path,
study_id: str,
version: int,
- output_path: Optional[Path] = None,
- areas: Optional[Dict[str, Area]] = None,
- sets: Optional[Dict[str, DistrictSet]] = None,
- outputs: Optional[Dict[str, Simulation]] = None,
- bindings: Optional[List[BindingConstraintDTO]] = None,
+ output_path: t.Optional[Path] = None,
+ areas: t.Optional[t.Dict[str, Area]] = None,
+ sets: t.Optional[t.Dict[str, DistrictSet]] = None,
+ outputs: t.Optional[t.Dict[str, Simulation]] = None,
+ bindings: t.Optional[t.List[BindingConstraintDTO]] = None,
store_new_set: bool = False,
- archive_input_series: Optional[List[str]] = None,
+ archive_input_series: t.Optional[t.List[str]] = None,
enr_modelling: str = ENR_MODELLING.AGGREGATED.value,
- cache: Optional[Dict[str, List[str]]] = None,
- zip_path: Optional[Path] = None,
+ cache: t.Optional[t.Dict[str, t.List[str]]] = None,
+ zip_path: t.Optional[Path] = None,
):
self.study_path = study_path
self.path = path
@@ -138,7 +140,7 @@ def __init__(
def next_file(self, name: str, is_output: bool = False) -> "FileStudyTreeConfig":
if is_output and name in self.outputs and self.outputs[name].archived:
- zip_path: Optional[Path] = self.path / f"{name}.zip"
+ zip_path: t.Optional[Path] = self.path / f"{name}.zip"
else:
zip_path = self.zip_path
@@ -176,43 +178,43 @@ def at_file(self, filepath: Path) -> "FileStudyTreeConfig":
cache=self.cache,
)
- def area_names(self) -> List[str]:
+ def area_names(self) -> t.List[str]:
return self.cache.get("%areas", list(self.areas.keys()))
- def set_names(self, only_output: bool = True) -> List[str]:
+ def set_names(self, only_output: bool = True) -> t.List[str]:
return self.cache.get(
f"%districts%{only_output}",
[k for k, v in self.sets.items() if v.output or not only_output],
)
- def get_thermal_ids(self, area: str) -> List[str]:
+ def get_thermal_ids(self, area: str) -> t.List[str]:
"""
Returns a list of thermal cluster IDs for a given area.
Note that IDs may not be in lower case (but series IDs are).
"""
return self.cache.get(f"%thermal%{area}%{area}", [th.id for th in self.areas[area].thermals])
- def get_renewable_ids(self, area: str) -> List[str]:
+ def get_renewable_ids(self, area: str) -> t.List[str]:
"""
Returns a list of renewable cluster IDs for a given area.
Note that IDs may not be in lower case (but series IDs are).
"""
return self.cache.get(f"%renewable%{area}", [r.id for r in self.areas[area].renewables])
- def get_st_storage_ids(self, area: str) -> List[str]:
+ def get_st_storage_ids(self, area: str) -> t.List[str]:
return self.cache.get(f"%st-storage%{area}", [s.id for s in self.areas[area].st_storages])
- def get_links(self, area: str) -> List[str]:
+ def get_links(self, area: str) -> t.List[str]:
return self.cache.get(f"%links%{area}", list(self.areas[area].links.keys()))
- def get_filters_synthesis(self, area: str, link: Optional[str] = None) -> List[str]:
+ def get_filters_synthesis(self, area: str, link: t.Optional[str] = None) -> t.List[str]:
if link:
return self.areas[area].links[link].filters_synthesis
if area in self.sets and self.sets[area].output:
return self.sets[area].filters_synthesis
return self.areas[area].filters_synthesis
- def get_filters_year(self, area: str, link: Optional[str] = None) -> List[str]:
+ def get_filters_year(self, area: str, link: t.Optional[str] = None) -> t.List[str]:
if link:
return self.areas[area].links[link].filters_year
if area in self.sets and self.sets[area].output:
@@ -245,15 +247,15 @@ class FileStudyTreeConfigDTO(BaseModel):
path: Path
study_id: str
version: int
- output_path: Optional[Path] = None
- areas: Dict[str, Area] = dict()
- sets: Dict[str, DistrictSet] = dict()
- outputs: Dict[str, Simulation] = dict()
- bindings: List[BindingConstraintDTO] = list()
+ output_path: t.Optional[Path] = None
+ areas: t.Dict[str, Area] = dict()
+ sets: t.Dict[str, DistrictSet] = dict()
+ outputs: t.Dict[str, Simulation] = dict()
+ bindings: t.List[BindingConstraintDTO] = list()
store_new_set: bool = False
- archive_input_series: List[str] = list()
+ archive_input_series: t.List[str] = list()
enr_modelling: str = ENR_MODELLING.AGGREGATED.value
- zip_path: Optional[Path] = None
+ zip_path: t.Optional[Path] = None
@staticmethod
def from_build_config(
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index aa70beb064..1f38c7dd7e 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -27,6 +27,7 @@ v2.16.8 (2024-04-19)
* **launcher:** upgrade the project dependencies to use Antares-Launcher v1.3.2
- **ssh:** add retry loop around SSH Exceptions [`#68`](https://github.com/AntaresSimulatorTeam/antares-launcher/pull/68)
- **retriever:** avoid infinite loop when `sbatch` command fails [`#69`](https://github.com/AntaresSimulatorTeam/antares-launcher/pull/69)
+* **synthesis:** prevent 500 error during study synthesis parsing [`#2011`](https://github.com/AntaresSimulatorTeam/AntaREST/pull/2011)
v2.16.7 (2024-03-05)
diff --git a/tests/storage/repository/filesystem/config/test_config_files.py b/tests/storage/repository/filesystem/config/test_config_files.py
index a8d8d2fecc..ce29f1a446 100644
--- a/tests/storage/repository/filesystem/config/test_config_files.py
+++ b/tests/storage/repository/filesystem/config/test_config_files.py
@@ -1,4 +1,5 @@
import logging
+import textwrap
from pathlib import Path
from typing import Any, Dict
from zipfile import ZipFile
@@ -10,7 +11,7 @@
BindingConstraintFrequency,
)
from antarest.study.storage.rawstudy.model.filesystem.config.files import (
- _parse_links,
+ _parse_links_filtering,
_parse_renewables,
_parse_sets,
_parse_st_storage,
@@ -31,8 +32,12 @@
from tests.storage.business.assets import ASSETS_DIR
-def build_empty_files(tmp: Path) -> Path:
- study_path = tmp / "my-study"
+@pytest.fixture(name="study_path")
+def study_path_fixture(tmp_path: Path) -> Path:
+ """
+ Create a study directory with the minimal structure required to build the configuration.
+ """
+ study_path = tmp_path / "my-study"
(study_path / "input/bindingconstraints/").mkdir(parents=True)
(study_path / "input/bindingconstraints/bindingconstraints.ini").touch()
@@ -49,31 +54,29 @@ def build_empty_files(tmp: Path) -> Path:
return study_path
-def test_parse_output_parameters(tmp_path: Path) -> None:
- study = build_empty_files(tmp_path)
+def test_parse_output_parameters(study_path: Path) -> None:
content = """
[output]
synthesis = true
storenewset = true
archives =
"""
- (study / "settings/generaldata.ini").write_text(content)
+ (study_path / "settings/generaldata.ini").write_text(content)
config = FileStudyTreeConfig(
- study_path=study,
- path=study,
+ study_path=study_path,
+ path=study_path,
version=-1,
store_new_set=True,
study_id="id",
- output_path=study / "output",
+ output_path=study_path / "output",
)
- assert build(study, "id") == config
+ assert build(study_path, "id") == config
-def test_parse_bindings(tmp_path: Path) -> None:
+def test_parse_bindings(study_path: Path) -> None:
# Setup files
- study_path = build_empty_files(tmp_path)
- content = """
+ content = """\
[bindA]
id = bindA
@@ -81,7 +84,7 @@ def test_parse_bindings(tmp_path: Path) -> None:
id = bindB
type = weekly
"""
- (study_path / "input/bindingconstraints/bindingconstraints.ini").write_text(content)
+ (study_path / "input/bindingconstraints/bindingconstraints.ini").write_text(textwrap.dedent(content))
config = FileStudyTreeConfig(
study_path=study_path,
@@ -108,14 +111,13 @@ def test_parse_bindings(tmp_path: Path) -> None:
assert build(study_path, "id") == config
-def test_parse_outputs(tmp_path: Path) -> None:
- study_path = build_empty_files(tmp_path)
+def test_parse_outputs(study_path: Path) -> None:
output_path = study_path / "output/20201220-1456eco-hello/"
output_path.mkdir(parents=True)
(output_path / "about-the-study").mkdir()
file = output_path / "about-the-study/parameters.ini"
- content = """
+ content = """\
[general]
nbyears = 1
year-by-year = true
@@ -127,7 +129,7 @@ def test_parse_outputs(tmp_path: Path) -> None:
[playlist]
playlist_year + = 0
"""
- file.write_text(content)
+ file.write_text(textwrap.dedent(content))
(output_path / "checkIntegrity.txt").touch()
@@ -226,21 +228,19 @@ def test_parse_outputs__nominal(tmp_path: Path, assets_name: str, expected: Dict
assert actual == expected
-def test_parse_sets(tmp_path: Path) -> None:
- study_path = build_empty_files(tmp_path)
- content = """
-[hello]
-output = true
-+ = a
-+ = b
-"""
- (study_path / "input/areas/sets.ini").write_text(content)
+def test_parse_sets(study_path: Path) -> None:
+ content = """\
+ [hello]
+ output = true
+ + = a
+ + = b
+ """
+ (study_path / "input/areas/sets.ini").write_text(textwrap.dedent(content))
assert _parse_sets(study_path) == {"hello": DistrictSet(areas=["a", "b"], output=True, inverted_set=False)}
-def test_parse_area(tmp_path: Path) -> None:
- study_path = build_empty_files(tmp_path)
+def test_parse_area(study_path: Path) -> None:
(study_path / "input/areas/list.txt").write_text("FR\n")
(study_path / "input/areas/fr").mkdir(parents=True)
content = """
@@ -270,6 +270,51 @@ def test_parse_area(tmp_path: Path) -> None:
assert build(study_path, "id") == config
+def test_parse_area__extra_area(study_path: Path) -> None:
+ """
+ Test the case where an extra area is present in the `list.txt` file.
+
+ The extra area should be taken into account with default values to avoid any parsing error.
+ """
+
+ (study_path / "input/areas/list.txt").write_text("FR\nDE\n")
+ (study_path / "input/areas/fr").mkdir(parents=True)
+ content = """
+ [filtering]
+ filter-synthesis = daily, monthly
+ filter-year-by-year = hourly, weekly, annual
+ """
+ (study_path / "input/areas/fr/optimization.ini").write_text(content)
+
+ config = FileStudyTreeConfig(
+ study_path=study_path,
+ path=study_path,
+ study_id="id",
+ version=-1,
+ output_path=study_path / "output",
+ areas={
+ "fr": Area(
+ name="FR",
+ thermals=[],
+ renewables=[],
+ links={},
+ filters_year=["hourly", "weekly", "annual"],
+ filters_synthesis=["daily", "monthly"],
+ ),
+ "de": Area(
+ name="DE",
+ links={},
+ thermals=[],
+ renewables=[],
+ filters_synthesis=[],
+ filters_year=[],
+ st_storages=[],
+ ),
+ },
+ )
+ assert build(study_path, "id") == config
+
+
# noinspection SpellCheckingInspection
THERMAL_LIST_INI = """\
[t1]
@@ -286,8 +331,7 @@ def test_parse_area(tmp_path: Path) -> None:
"""
-def test_parse_thermal(tmp_path: Path) -> None:
- study_path = build_empty_files(tmp_path)
+def test_parse_thermal(study_path: Path) -> None:
study_path.joinpath("study.antares").write_text("[antares] \n version = 700")
ini_path = study_path.joinpath("input/thermal/clusters/fr/list.ini")
@@ -325,8 +369,7 @@ def test_parse_thermal(tmp_path: Path) -> None:
@pytest.mark.parametrize("version", [850, 860, 870])
-def test_parse_thermal_860(tmp_path: Path, version, caplog) -> None:
- study_path = build_empty_files(tmp_path)
+def test_parse_thermal_860(study_path: Path, version, caplog) -> None:
study_path.joinpath("study.antares").write_text(f"[antares] \n version = {version}")
ini_path = study_path.joinpath("input/thermal/clusters/fr/list.ini")
ini_path.parent.mkdir(parents=True)
@@ -361,8 +404,7 @@ def test_parse_thermal_860(tmp_path: Path, version, caplog) -> None:
"""
-def test_parse_renewables(tmp_path: Path) -> None:
- study_path = build_empty_files(tmp_path)
+def test_parse_renewables(study_path: Path) -> None:
study_path.joinpath("study.antares").write_text("[antares] \n version = 810")
ini_path = study_path.joinpath("input/renewables/clusters/fr/list.ini")
@@ -411,8 +453,7 @@ def test_parse_renewables(tmp_path: Path) -> None:
"""
-def test_parse_st_storage(tmp_path: Path) -> None:
- study_path = build_empty_files(tmp_path)
+def test_parse_st_storage(study_path: Path) -> None:
study_path.joinpath("study.antares").write_text("[antares] \n version = 860")
config_dir = study_path.joinpath("input", "st-storage", "clusters", "fr")
config_dir.mkdir(parents=True)
@@ -452,8 +493,7 @@ def test_parse_st_storage_with_no_file(tmp_path: Path) -> None:
assert _parse_st_storage(tmp_path, "") == []
-def test_parse_links(tmp_path: Path) -> None:
- study_path = build_empty_files(tmp_path)
+def test_parse_links(study_path: Path) -> None:
(study_path / "input/links/fr").mkdir(parents=True)
content = """
[l1]
@@ -463,4 +503,4 @@ def test_parse_links(tmp_path: Path) -> None:
(study_path / "input/links/fr/properties.ini").write_text(content)
link = Link(filters_synthesis=["annual"], filters_year=["hourly"])
- assert _parse_links(study_path, "fr") == {"l1": link}
+ assert _parse_links_filtering(study_path, "fr") == {"l1": link}