Skip to content

Commit

Permalink
v2.14.5
Browse files Browse the repository at this point in the history
v2.14.5
  • Loading branch information
laurent-laporte-pro authored Aug 11, 2023
2 parents ef52bce + b090f44 commit 52e7fae
Show file tree
Hide file tree
Showing 78 changed files with 271,931 additions and 1,375 deletions.
4 changes: 2 additions & 2 deletions antarest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

# Standard project metadata

__version__ = "2.14.4"
__version__ = "2.14.5"
__author__ = "RTE, Antares Web Team"
__date__ = "2023-06-28"
__date__ = "2023-08-11"
# noinspection SpellCheckingInspection
__credits__ = "(c) Réseau de Transport de l’Électricité (RTE)"

Expand Down
9 changes: 3 additions & 6 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, List, Dict, Any
from typing import Any, Dict, List, Optional

import yaml

from antarest.core.model import JSON
from antarest.core.roles import RoleType

Expand Down Expand Up @@ -162,7 +161,7 @@ def from_dict(data: JSON) -> "StorageConfig":

@dataclass(frozen=True)
class LocalConfig:
binaries: Dict[str, Path] = field(default_factory=lambda: {})
binaries: Dict[str, Path] = field(default_factory=dict)

@staticmethod
def from_dict(data: JSON) -> Optional["LocalConfig"]:
Expand All @@ -186,9 +185,7 @@ class SlurmConfig:
default_json_db_name: str = ""
slurm_script_path: str = ""
max_cores: int = 64
antares_versions_on_remote_server: List[str] = field(
default_factory=lambda: []
)
antares_versions_on_remote_server: List[str] = field(default_factory=list)

@staticmethod
def from_dict(data: JSON) -> "SlurmConfig":
Expand Down
2 changes: 1 addition & 1 deletion antarest/core/tasks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def to_dto(self, with_logs: bool = False) -> TaskDTO:
if self.completion_date
else None,
logs=sorted(
[log.to_dto() for log in self.logs], key=lambda l: l.id # type: ignore
[log.to_dto() for log in self.logs], key=lambda l: l.id
)
if with_logs
else None,
Expand Down
10 changes: 2 additions & 8 deletions antarest/launcher/adapters/abstractlauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)
from antarest.core.model import PermissionInfo, PublicMode
from antarest.core.requests import RequestParameters
from antarest.launcher.adapters.log_parser import LaunchProgressDTO, LogParser
from antarest.launcher.adapters.log_parser import LaunchProgressDTO
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType


Expand Down Expand Up @@ -102,13 +102,7 @@ def update_log(log_line: str) -> None:
launch_progress_dto = LaunchProgressDTO.parse_obj(
launch_progress_json
)
progress_updated = False
for line in log_line.split("\n"):
progress_updated = (
LogParser.update_progress(line, launch_progress_dto)
or progress_updated
)
if progress_updated:
if launch_progress_dto.parse_log_lines(log_line.splitlines()):
self.event_bus.push(
Event(
type=EventType.LAUNCH_PROGRESS,
Expand Down
131 changes: 89 additions & 42 deletions antarest/launcher/adapters/log_parser.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,102 @@
import logging
import functools
import re
import typing as t

from pydantic import BaseModel

_SearchFunc = t.Callable[[str], t.Optional[t.Match[str]]]

logger = logging.getLogger(__name__)
_compile = functools.partial(re.compile, flags=re.IGNORECASE | re.VERBOSE)

# Search for the line indicating the loading of areas (first line of data loading).
_loading_areas = t.cast(
_SearchFunc,
_compile(r"Loading \s+ the \s+ list \s+ of \s+ areas").search,
)

# Search for the total number of Monté-Carlo (MC) years.
_total_mc_years = t.cast(
_SearchFunc,
_compile(
r"""
MC-Years \s* : \s*
\[ \d+ \s* \.{2,3} \s* \d+ ], \s* total \s* : \s*
(?P<total_mc_years> \d+)
"""
).search,
)

# Search for the line indicating the export of annual results of a Monté-Carlo year.
_annual_results = t.cast(
_SearchFunc,
_compile(r"Exporting \s+ the \s+ annual \s+ results").search,
)

# Search for the line indicating the export of survey results.
_survey_results = t.cast(
_SearchFunc,
_compile(r"Exporting \s+ the \s+ survey \s+ results").search,
)

# Search for the line indicating the solver is quitting gracefully or an error
_quitting = t.cast(
_SearchFunc,
_compile(
r"""
Quitting \s+ the \s+ solver \s+ gracefully |
\[error] |
\[fatal]
"""
).search,
)


class LaunchProgressDTO(BaseModel):
coef: float = 0.8
"""
Measure the progress of a study simulation.
The progress percentage is calculated based on the number of Monté-Carlo
years completed relative to the total number of years.
Attributes:
progress:
The percentage of completion for the simulation, ranging from 0 to 100.
total_mc_years:
The total number of Monté-Carlo years for the simulation.
"""

progress: float = 0
N_ANNUAL_RESULT: int = 1
N_K: int = 1


class LogParser:
@staticmethod
def update_progress(
line: str, launch_progress_dto: LaunchProgressDTO
) -> bool:
if "MC-Years : [" in line:
regex = re.compile(
r".+?(?:\s\.\.\s)(\d+).+?(\d+)"
) # group 1 is the first number after " .. ", and group 2 is the las number of the line
mo = regex.search(line)
launch_progress_dto.N_K = int(mo.group(1)) # type:ignore
launch_progress_dto.N_ANNUAL_RESULT = int(
mo.group(2) # type:ignore
)
total_mc_years: int = 1

def _update_progress(self, line: str) -> bool:
"""Updates the progress based on the given log line."""
if _loading_areas(line):
self.progress = 1.0
return True
if mo := _total_mc_years(line):
self.progress = 2.0
self.total_mc_years = int(mo["total_mc_years"])
return True
if _annual_results(line):
self.progress += 96 / self.total_mc_years
return True
elif "parallel batch size : " in line:
mk = re.search(r"parallel batch size : (\d+)", line)
if mk:
K = int(mk.group(1))
launch_progress_dto.progress += (
launch_progress_dto.coef * 90 * K / launch_progress_dto.N_K
)
return True
else:
logger.warning(
f"Failed to extract log progress batch size on line : {line}"
)
elif "Exporting the annual results" in line:
launch_progress_dto.progress += (
launch_progress_dto.coef
* 9
* 1
/ launch_progress_dto.N_ANNUAL_RESULT
)
if _survey_results(line):
self.progress = 99.0
return True
elif "Exporting the survey results" in line:
launch_progress_dto.progress = launch_progress_dto.coef * 99
if _quitting(line):
self.progress = 100.0
return True
return False

def parse_log_lines(self, lines: t.Iterable[str]) -> bool:
"""
Parses a sequence of log lines and updates the progress accordingly.
Args:
lines (Iterable[str]): An iterable containing log lines to be parsed.
Returns:
bool: `True` if progress was updated at least once during the parsing,
`False` otherwise.
"""
return bool(sum(self._update_progress(line) for line in lines))
50 changes: 31 additions & 19 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import functools
import logging
import os
import shutil
from datetime import datetime, timedelta
from functools import reduce
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from pathlib import Path
from typing import Dict, List, Optional, cast
Expand Down Expand Up @@ -186,7 +186,7 @@ def update(
job_result.output_id = output_id
final_status = status in [JobStatus.SUCCESS, JobStatus.FAILED]
if final_status:
job_result.completion_date = datetime.utcnow()
job_result.completion_date = datetime.now(timezone.utc)
self.job_result_repository.save(job_result)
self.event_bus.push(
Event(
Expand Down Expand Up @@ -434,7 +434,7 @@ def get_log(
or ""
)
if log_type == LogType.STDOUT:
app_logs: Dict[JobLogType, List[str]] = reduce(
app_logs: Dict[JobLogType, List[str]] = functools.reduce(
lambda logs, log: LauncherService.sort_log(log, logs),
job_result.logs or [],
{JobLogType.BEFORE: [], JobLogType.AFTER: []},
Expand Down Expand Up @@ -704,7 +704,7 @@ def get_load(self, from_cluster: bool = False) -> Dict[str, float]:
if self.config.launcher.slurm:
if from_cluster:
raise NotImplementedError
slurm_used_cpus = reduce(
slurm_used_cpus = functools.reduce(
lambda count, j: count
+ (
LauncherParametersDTO.parse_raw(
Expand All @@ -719,7 +719,7 @@ def get_load(self, from_cluster: bool = False) -> Dict[str, float]:
float(slurm_used_cpus) / self.config.launcher.slurm.max_cores
)
if self.config.launcher.local:
local_used_cpus = reduce(
local_used_cpus = functools.reduce(
lambda count, j: count
+ (
LauncherParametersDTO.parse_raw(
Expand All @@ -733,19 +733,31 @@ def get_load(self, from_cluster: bool = False) -> Dict[str, float]:
load["local"] = float(local_used_cpus) / (os.cpu_count() or 1)
return load

def get_versions(self, params: RequestParameters) -> Dict[str, List[str]]:
version_dict = {}
if self.config.launcher.local:
version_dict["local"] = list(
self.config.launcher.local.binaries.keys()
)

if self.config.launcher.slurm:
version_dict[
"slurm"
] = self.config.launcher.slurm.antares_versions_on_remote_server

return version_dict
def get_solver_versions(self, solver: str) -> List[str]:
"""
Fetch the list of solver versions from the configuration.
Args:
solver: name of the configuration to read: "default", "slurm" or "local".
Returns:
The list of solver versions.
This list is empty if the configuration is not available.
Raises:
KeyError: if the configuration is not "default", "slurm" or "local".
"""
local_config = self.config.launcher.local
slurm_config = self.config.launcher.slurm
default_config = self.config.launcher.default
versions_map = {
"local": sorted(local_config.binaries) if local_config else [],
"slurm": sorted(slurm_config.antares_versions_on_remote_server)
if slurm_config
else [],
}
versions_map["default"] = versions_map[default_config]
return versions_map[solver]

def get_launch_progress(
self, job_id: str, params: RequestParameters
Expand Down
62 changes: 51 additions & 11 deletions antarest/launcher/web.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import http
import logging
from typing import Any, Optional, List, Dict
from typing import Any, Dict, List, Optional
from uuid import UUID

from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, Query
from fastapi.exceptions import HTTPException

from antarest.core.config import Config
from antarest.core.filetransfer.model import FileDownloadTaskDTO
Expand All @@ -24,6 +26,19 @@
DEFAULT_MAX_LATEST_JOBS = 200


class UnknownSolverConfig(HTTPException):
"""
Exception raised during solver versions retrieval when
the name of the launcher is not "default", "slurm" or "local".
"""

def __init__(self, solver: str) -> None:
super().__init__(
http.HTTPStatus.UNPROCESSABLE_ENTITY,
f"Unknown solver configuration: '{solver}'",
)


def create_launcher_api(service: LauncherService, config: Config) -> APIRouter:
bp = APIRouter(prefix="/v1")

Expand Down Expand Up @@ -201,16 +216,41 @@ def get_load(
return service.get_load(from_cluster)

@bp.get(
"/launcher/_versions",
"/launcher/versions",
tags=[APITag.launcher],
summary="Get list of supported study version for all configured launchers",
response_model=Dict[str, List[str]],
summary="Get list of supported solver versions",
response_model=List[str],
)
def get_versions(
current_user: JWTUser = Depends(auth.get_current_user),
) -> Any:
params = RequestParameters(user=current_user)
logger.info(f"Fetching version list")
return service.get_versions(params=params)
def get_solver_versions(
solver: str = Query(
"default",
examples={
"Default solver": {
"description": "Get the solver versions of the default configuration",
"value": "default",
},
"SLURM solver": {
"description": "Get the solver versions of the SLURM server if available",
"value": "slurm",
},
"Local solver": {
"description": "Get the solver versions of the Local server if available",
"value": "local",
},
},
)
) -> List[str]:
"""
Get list of supported solver versions defined in the configuration.
Args:
- `solver`: name of the configuration to read: "default", "slurm" or "local".
"""
logger.info(
f"Fetching the list of solver versions for the '{solver}' configuration"
)
if solver not in {"default", "slurm", "local"}:
raise UnknownSolverConfig(solver)
return service.get_solver_versions(solver)

return bp
Loading

0 comments on commit 52e7fae

Please sign in to comment.