Skip to content

Commit

Permalink
v2.15.2
Browse files Browse the repository at this point in the history
Release on `master`.
  • Loading branch information
laurent-laporte-pro authored Oct 11, 2023
2 parents 45ee182 + b7ba8b4 commit d00699d
Show file tree
Hide file tree
Showing 71 changed files with 2,815 additions and 1,704 deletions.
4 changes: 2 additions & 2 deletions antarest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

# Standard project metadata

__version__ = "2.15.1"
__version__ = "2.15.2"
__author__ = "RTE, Antares Web Team"
__date__ = "2023-10-05"
__date__ = "2023-10-11"
# noinspection SpellCheckingInspection
__credits__ = "(c) Réseau de Transport de l’Électricité (RTE)"

Expand Down
1 change: 1 addition & 0 deletions antarest/core/cache/business/redis_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self, redis_client: Redis): # type: ignore
self.redis = redis_client

def start(self) -> None:
# Assuming the Redis service is already running; no need to start it here.
pass

def put(self, id: str, data: JSON, duration: int = 3600) -> None:
Expand Down
469 changes: 307 additions & 162 deletions antarest/core/config.py

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions antarest/core/filetransfer/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ class FileDownloadNotFound(HTTPException):
def __init__(self) -> None:
super().__init__(
HTTPStatus.NOT_FOUND,
f"Requested download file was not found. It must have expired",
"Requested download file was not found. It must have expired",
)


class FileDownloadNotReady(HTTPException):
def __init__(self) -> None:
super().__init__(
HTTPStatus.NOT_ACCEPTABLE,
f"Requested file is not ready for download.",
"Requested file is not ready for download.",
)


Expand Down Expand Up @@ -70,4 +70,11 @@ def to_dto(self) -> FileDownloadDTO:
)

def __repr__(self) -> str:
return f"(id={self.id},name={self.name},filename={self.filename},path={self.path},ready={self.ready},expiration_date={self.expiration_date})"
return (
f"(id={self.id},"
f" name={self.name},"
f" filename={self.filename},"
f" path={self.path},"
f" ready={self.ready},"
f" expiration_date={self.expiration_date})"
)
8 changes: 5 additions & 3 deletions antarest/core/logging/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ def configure_logger(config: Config, handler_cls: str = "logging.FileHandler") -
"filters": ["context"],
}
elif handler_cls == "logging.handlers.TimedRotatingFileHandler":
# 90 days = 3 months
# keep only 1 backup (0 means keep all)
logging_config["handlers"]["default"] = {
"class": handler_cls,
"filename": config.logging.logfile,
"when": "D", # D = day
"interval": 90, # 90 days = 3 months
"backupCount": 1, # keep only 1 backup (0 means keep all)
"when": "D",
"interval": 90,
"backupCount": 1,
"encoding": "utf-8",
"delay": False,
"utc": False,
Expand Down
16 changes: 10 additions & 6 deletions antarest/core/tasks/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def await_task(self, task_id: str, timeout_sec: Optional[int] = None) -> None:

# noinspection PyUnusedLocal
def noop_notifier(message: str) -> None:
pass
"""This function is used in tasks when no notification is required."""


DEFAULT_AWAIT_MAX_TIMEOUT = 172800
Expand Down Expand Up @@ -121,7 +121,7 @@ async def _await_task_end(event: Event) -> None:

return _await_task_end

# todo: Is `logger_` parameter required? (consider refactoring)
# noinspection PyUnusedLocal
def _send_worker_task(logger_: TaskUpdateNotifier) -> TaskResult:
listener_id = self.event_bus.add_listener(
_create_awaiter(task_result_wrapper),
Expand Down Expand Up @@ -338,14 +338,18 @@ def _run_task(
result.message,
result.return_value,
)
event_type = {True: EventType.TASK_COMPLETED, False: EventType.TASK_FAILED}[result.success]
event_msg = {True: "completed", False: "failed"}[result.success]
self.event_bus.push(
Event(
type=EventType.TASK_COMPLETED if result.success else EventType.TASK_FAILED,
type=event_type,
payload=TaskEventPayload(
id=task_id,
message=custom_event_messages.end
if custom_event_messages is not None
else f'Task {task_id} {"completed" if result.success else "failed"}',
message=(
custom_event_messages.end
if custom_event_messages is not None
else f"Task {task_id} {event_msg}"
),
).dict(),
permissions=PermissionInfo(public_mode=PublicMode.READ),
channel=EventChannelDirectory.TASK + task_id,
Expand Down
9 changes: 5 additions & 4 deletions antarest/launcher/adapters/local_launcher/local_launcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import logging
import shutil
import signal
Expand All @@ -6,15 +7,15 @@
import threading
import time
from pathlib import Path
from typing import IO, Callable, Dict, Optional, Tuple, cast
from typing import Callable, Dict, Optional, Tuple, cast
from uuid import UUID

from antarest.core.config import Config
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import IEventBus
from antarest.core.requests import RequestParameters
from antarest.launcher.adapters.abstractlauncher import AbstractLauncher, LauncherCallbacks, LauncherInitException
from antarest.launcher.adapters.log_manager import LogTailManager
from antarest.launcher.adapters.log_manager import follow
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -133,8 +134,8 @@ def stop_reading_output() -> bool:
)

thread = threading.Thread(
target=lambda: LogTailManager.follow(
cast(IO[str], process.stdout),
target=lambda: follow(
cast(io.StringIO, process.stdout),
self.create_update_log(str(uuid)),
stop_reading_output,
None,
Expand Down
78 changes: 38 additions & 40 deletions antarest/launcher/adapters/log_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import contextlib
import io
import logging
import time
from pathlib import Path
from threading import Thread
from typing import IO, Callable, Dict, Optional
from typing import Callable, Dict, Optional, cast

logger = logging.getLogger(__name__)

Expand All @@ -11,7 +13,7 @@ class LogTailManager:
BATCH_SIZE = 10

def __init__(self, log_base_dir: Path) -> None:
logger.info(f"Initiating Log manager")
logger.info("Initiating Log manager")
self.log_base_dir = log_base_dir
self.tracked_logs: Dict[str, Thread] = {}

Expand Down Expand Up @@ -47,43 +49,6 @@ def stop_tracking(self, log_path: Optional[Path]) -> None:
if log_path_key in self.tracked_logs:
del self.tracked_logs[log_path_key]

@staticmethod
def follow(
io: IO[str],
handler: Callable[[str], None],
stop: Callable[[], bool],
log_file: Optional[str],
) -> None:
line = ""
line_count = 0

while True:
if stop():
break
tmp = io.readline()
if not tmp:
if line:
logger.debug(f"Calling handler for {log_file}")
try:
handler(line)
except Exception as e:
logger.error("Could not handle this log line", exc_info=e)
line = ""
line_count = 0
time.sleep(0.1)
else:
line += tmp
if line.endswith("\n"):
line_count += 1
if line_count >= LogTailManager.BATCH_SIZE:
logger.debug(f"Calling handler for {log_file}")
try:
handler(line)
except Exception as e:
logger.error("Could not handle this log line", exc_info=e)
line = ""
line_count = 0

def _follow(
self,
log_file: Optional[Path],
Expand All @@ -97,4 +62,37 @@ def _follow(

with open(log_file, "r") as fh:
logger.info(f"Scanning {log_file}")
LogTailManager.follow(fh, handler, stop, str(log_file))
follow(cast(io.StringIO, fh), handler, stop, str(log_file))


def follow(
file: io.StringIO,
handler: Callable[[str], None],
stop: Callable[[], bool],
log_file: Optional[str],
) -> None:
line = ""
line_count = 0

while True:
if stop():
break
tmp = file.readline()
if tmp:
line += tmp
if line.endswith("\n"):
line_count += 1
if line_count >= LogTailManager.BATCH_SIZE:
logger.debug(f"Calling handler for {log_file}")
with contextlib.suppress(Exception):
handler(line)
line = ""
line_count = 0
else:
if line:
logger.debug(f"Calling handler for {log_file}")
with contextlib.suppress(Exception):
handler(line)
line = ""
line_count = 0
time.sleep(0.1)
54 changes: 39 additions & 15 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
logger = logging.getLogger(__name__)
logging.getLogger("paramiko").setLevel("WARN")

MAX_NB_CPU = 24
MAX_TIME_LIMIT = 864000
MIN_TIME_LIMIT = 3600
WORKSPACE_LOCK_FILE_NAME = ".lock"
Expand Down Expand Up @@ -153,7 +152,7 @@ def _init_launcher_arguments(self, local_workspace: Optional[Path] = None) -> ar
main_options_parameters = ParserParameters(
default_wait_time=self.slurm_config.default_wait_time,
default_time_limit=self.slurm_config.default_time_limit,
default_n_cpu=self.slurm_config.default_n_cpu,
default_n_cpu=self.slurm_config.nb_cores.default,
studies_in_dir=str((Path(local_workspace or self.slurm_config.local_workspace) / STUDIES_INPUT_DIR_NAME)),
log_dir=str((Path(self.slurm_config.local_workspace) / LOG_DIR_NAME)),
finished_dir=str((Path(local_workspace or self.slurm_config.local_workspace) / STUDIES_OUTPUT_DIR_NAME)),
Expand Down Expand Up @@ -440,7 +439,7 @@ def _run_study(
_override_solver_version(study_path, version)

append_log(launch_uuid, "Submitting study to slurm launcher")
launcher_args = self._check_and_apply_launcher_params(launcher_params)
launcher_args = self._apply_params(launcher_params)
self._call_launcher(launcher_args, self.launcher_params)

launch_success = self._check_if_study_is_in_launcher_db(launch_uuid)
Expand Down Expand Up @@ -481,23 +480,40 @@ def _check_if_study_is_in_launcher_db(self, job_id: str) -> bool:
studies = self.data_repo_tinydb.get_list_of_studies()
return any(s.name == job_id for s in studies)

def _check_and_apply_launcher_params(self, launcher_params: LauncherParametersDTO) -> argparse.Namespace:
def _apply_params(self, launcher_params: LauncherParametersDTO) -> argparse.Namespace:
"""
Populate a `argparse.Namespace` object with the user parameters.
Args:
launcher_params:
Contains the launcher parameters selected by the user.
If a parameter is not provided (`None`), the default value should be retrieved
from the configuration.
Returns:
The `argparse.Namespace` object which is then passed to `antarestlauncher.main.run_with`,
to launch a simulation using Antares Launcher.
"""
if launcher_params:
launcher_args = deepcopy(self.launcher_args)
other_options = []

if launcher_params.other_options:
options = re.split("\\s+", launcher_params.other_options)
for opt in options:
other_options.append(re.sub("[^a-zA-Z0-9_,-]", "", opt))
if launcher_params.xpansion is not None:
launcher_args.xpansion_mode = "r" if launcher_params.xpansion_r_version else "cpp"
options = launcher_params.other_options.split()
other_options = [re.sub("[^a-zA-Z0-9_,-]", "", opt) for opt in options]
else:
other_options = []

# launcher_params.xpansion can be an `XpansionParametersDTO`, a bool or `None`
if launcher_params.xpansion: # not None and not False
launcher_args.xpansion_mode = {True: "r", False: "cpp"}[launcher_params.xpansion_r_version]
if (
isinstance(launcher_params.xpansion, XpansionParametersDTO)
and launcher_params.xpansion.sensitivity_mode
):
other_options.append("xpansion_sensitivity")

time_limit = launcher_params.time_limit
if time_limit and isinstance(time_limit, int):
if time_limit is not None:
if MIN_TIME_LIMIT > time_limit:
logger.warning(
f"Invalid slurm launcher time limit ({time_limit}),"
Expand All @@ -512,15 +528,23 @@ def _check_and_apply_launcher_params(self, launcher_params: LauncherParametersDT
launcher_args.time_limit = MAX_TIME_LIMIT - 3600
else:
launcher_args.time_limit = time_limit

post_processing = launcher_params.post_processing
if isinstance(post_processing, bool):
if post_processing is not None:
launcher_args.post_processing = post_processing

nb_cpu = launcher_params.nb_cpu
if nb_cpu and isinstance(nb_cpu, int):
if 0 < nb_cpu <= MAX_NB_CPU:
if nb_cpu is not None:
nb_cores = self.slurm_config.nb_cores
if nb_cores.min <= nb_cpu <= nb_cores.max:
launcher_args.n_cpu = nb_cpu
else:
logger.warning(f"Invalid slurm launcher nb_cpu ({nb_cpu}), should be between 1 and 24")
logger.warning(
f"Invalid slurm launcher nb_cpu ({nb_cpu}),"
f" should be between {nb_cores.min} and {nb_cores.max}"
)
launcher_args.n_cpu = nb_cores.default

if launcher_params.adequacy_patch is not None: # the adequacy patch can be an empty object
launcher_args.post_processing = True

Expand Down
6 changes: 3 additions & 3 deletions antarest/launcher/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class XpansionParametersDTO(BaseModel):


class LauncherParametersDTO(BaseModel):
# Warning ! This class must be retrocompatible (that's the reason for the weird bool/XpansionParametersDTO union)
# Warning ! This class must be retro-compatible (that's the reason for the weird bool/XpansionParametersDTO union)
# The reason is that it's stored in json format in database and deserialized using the latest class version
# If compatibility is to be broken, an (alembic) data migration script should be added
adequacy_patch: Optional[Dict[str, Any]] = None
nb_cpu: Optional[int] = None
post_processing: bool = False
time_limit: Optional[int] = None
xpansion: Union[bool, Optional[XpansionParametersDTO]] = None
time_limit: Optional[int] = None # 3600 <= time_limit < 864000 (10 days)
xpansion: Union[XpansionParametersDTO, bool, None] = None
xpansion_r_version: bool = False
archive_output: bool = True
auto_unzip: bool = True
Expand Down
Loading

0 comments on commit d00699d

Please sign in to comment.