diff --git a/antarest/core/config.py b/antarest/core/config.py index 450b88fb12..02394efc55 100644 --- a/antarest/core/config.py +++ b/antarest/core/config.py @@ -264,6 +264,7 @@ class LocalConfig: nb_cores: NbCoresConfig = NbCoresConfig() time_limit: TimeLimitConfig = TimeLimitConfig() xpress_dir: Optional[str] = None + local_workspace: Path = Path("./local_workspace") @classmethod def from_dict(cls, data: JSON) -> "LocalConfig": @@ -280,11 +281,13 @@ def from_dict(cls, data: JSON) -> "LocalConfig": if enable_nb_cores_detection: nb_cores.update(cls._autodetect_nb_cores()) xpress_dir = data.get("xpress_dir", defaults.xpress_dir) + local_workspace = Path(data["local_workspace"]) if "local_workspace" in data else defaults.local_workspace return cls( binaries={str(v): Path(p) for v, p in binaries.items()}, enable_nb_cores_detection=enable_nb_cores_detection, nb_cores=NbCoresConfig(**nb_cores), xpress_dir=xpress_dir, + local_workspace=local_workspace, ) @classmethod diff --git a/antarest/launcher/adapters/abstractlauncher.py b/antarest/launcher/adapters/abstractlauncher.py index 8919bbc803..c25797eb66 100644 --- a/antarest/launcher/adapters/abstractlauncher.py +++ b/antarest/launcher/adapters/abstractlauncher.py @@ -69,17 +69,12 @@ def __init__( @abstractmethod def run_study( - self, - study_uuid: str, - job_id: str, - version: SolverVersion, - launcher_parameters: LauncherParametersDTO, - study_path: Path, + self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO ) -> None: raise NotImplementedError() @abstractmethod - def get_log(self, job_id: str, log_type: LogType, study_path: Path) -> Optional[str]: + def get_log(self, job_id: str, log_type: LogType) -> Optional[str]: raise NotImplementedError() @abstractmethod diff --git a/antarest/launcher/adapters/local_launcher/local_launcher.py b/antarest/launcher/adapters/local_launcher/local_launcher.py index ee0b860f19..7692403a0d 100644 --- a/antarest/launcher/adapters/local_launcher/local_launcher.py +++ b/antarest/launcher/adapters/local_launcher/local_launcher.py @@ -10,17 +10,15 @@ # # This file is part of the Antares project. -import io import logging import os import shutil import signal import subprocess -import tempfile import threading import time from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, cast +from typing import Any, Callable, Dict, List, Optional, Tuple from antares.study.version import SolverVersion @@ -28,7 +26,7 @@ from antarest.core.interfaces.cache import ICache from antarest.core.interfaces.eventbus import IEventBus from antarest.launcher.adapters.abstractlauncher import AbstractLauncher, LauncherCallbacks, LauncherInitException -from antarest.launcher.adapters.log_manager import follow +from antarest.launcher.adapters.log_manager import LogTailManager from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType logger = logging.getLogger(__name__) @@ -49,7 +47,11 @@ def __init__( super().__init__(config, callbacks, event_bus, cache) if self.config.launcher.local is None: raise LauncherInitException("Missing parameter 'launcher.local'") - self.tmpdir = config.storage.tmp_dir + self.local_workspace = self.config.launcher.local.local_workspace + logs_path = self.local_workspace / "LOGS" + logs_path.mkdir(parents=True, exist_ok=True) + self.log_directory = logs_path + self.log_tail_manager = LogTailManager(self.local_workspace) self.job_id_to_study_id: Dict[str, Tuple[str, Path, subprocess.Popen]] = {} # type: ignore self.logs: Dict[str, str] = {} @@ -73,12 +75,7 @@ def _select_best_binary(self, version: str) -> Path: return antares_solver_path def run_study( - self, - study_uuid: str, - job_id: str, - version: SolverVersion, - launcher_parameters: LauncherParametersDTO, - study_path: Path, + self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO ) -> None: antares_solver_path = self._select_best_binary(f"{version:ddd}") @@ -88,7 +85,6 @@ def run_study( self, antares_solver_path, study_uuid, - study_path, job_id, launcher_parameters, ), @@ -100,64 +96,33 @@ def _compute( self, antares_solver_path: Path, study_uuid: str, - study_path: Path, job_id: str, launcher_parameters: LauncherParametersDTO, ) -> None: - end = False - - # create study logs - logs_path = study_path / "output" / "logs" - logs_path.mkdir(exist_ok=True, parents=True) - - def stop_reading_output() -> bool: - if end and job_id in self.logs: - with open(logs_path / f"{job_id}-out.log", "w") as log_file: - log_file.write(self.logs[job_id]) - del self.logs[job_id] - return end - - tmp_path = tempfile.mkdtemp(prefix="local_launch_", dir=str(self.tmpdir)) - export_path = Path(tmp_path) / "export" + export_path = self.local_workspace / job_id + logs_path = self.log_directory / job_id + logs_path.mkdir() try: self.callbacks.export_study(job_id, study_uuid, export_path, launcher_parameters) - simulator_args, environment_variables = self.parse_launcher_options(launcher_parameters) + simulator_args, environment_variables = self._parse_launcher_options(launcher_parameters) new_args = [str(antares_solver_path)] + simulator_args + [str(export_path)] - std_out_file = study_path / "output" / "logs" / f"{job_id}-err.log" - with open(std_out_file, "w") as err_file: + std_err_file = logs_path / f"{job_id}-err.log" + std_out_file = logs_path / f"{job_id}-out.log" + with open(std_err_file, "w") as err_file, open(std_out_file, "w") as out_file: process = subprocess.Popen( new_args, env=environment_variables, - stdout=subprocess.PIPE, + stdout=out_file, stderr=err_file, universal_newlines=True, encoding="utf-8", ) - self.job_id_to_study_id[job_id] = ( - study_uuid, - export_path, - process, - ) - self.callbacks.update_status( - job_id, - JobStatus.RUNNING, - None, - None, - ) + self.job_id_to_study_id[job_id] = (study_uuid, export_path, process) + self.callbacks.update_status(job_id, JobStatus.RUNNING, None, None) - thread = threading.Thread( - target=lambda: follow( - cast(io.StringIO, process.stdout), - self.create_update_log(job_id), - stop_reading_output, - None, - ), - name=f"{self.__class__.__name__}-LogsWatcher", - daemon=True, - ) - thread.start() + self.log_tail_manager.track(std_out_file, self.create_update_log(job_id)) while process.poll() is None: time.sleep(1) @@ -171,7 +136,8 @@ def stop_reading_output() -> bool: if process.returncode == 0: # The job succeed we need to import the output try: - output_id = self.callbacks.import_output(job_id, export_path / "output", {}) + launcher_logs = self._import_launcher_logs(job_id) + output_id = self.callbacks.import_output(job_id, export_path / "output", launcher_logs) except Exception as e: logger.error( f"Failed to import output for study {study_uuid} located at {export_path}", @@ -193,11 +159,17 @@ def stop_reading_output() -> bool: None, ) finally: - logger.info(f"Removing launch {job_id} export path at {tmp_path}") - end = True - shutil.rmtree(tmp_path) + logger.info(f"Removing launch {job_id} export path at {export_path}") + shutil.rmtree(export_path) + + def _import_launcher_logs(self, job_id: str) -> Dict[str, List[Path]]: + logs_path = self.log_directory / job_id + return { + "antares-out.log": [logs_path / f"{job_id}-out.log"], + "antares-err.log": [logs_path / f"{job_id}-err.log"], + } - def parse_launcher_options(self, launcher_parameters: LauncherParametersDTO) -> Tuple[List[str], Dict[str, Any]]: + def _parse_launcher_options(self, launcher_parameters: LauncherParametersDTO) -> Tuple[List[str], Dict[str, Any]]: simulator_args = [f"--force-parallel={launcher_parameters.nb_cpu}"] environment_variables = os.environ.copy() if launcher_parameters.other_options: @@ -225,10 +197,10 @@ def append_to_log(log_line: str) -> None: return append_to_log - def get_log(self, job_id: str, log_type: LogType, study_path: Path) -> Optional[str]: + def get_log(self, job_id: str, log_type: LogType) -> Optional[str]: if job_id in self.job_id_to_study_id and job_id in self.logs and log_type == LogType.STDOUT: return self.logs[job_id] - job_path = study_path / "output" / "logs" / f"{job_id}-{log_type.to_suffix()}" + job_path = self.log_directory / job_id / f"{job_id}-{log_type.to_suffix()}" if job_path.exists(): return job_path.read_text() return None diff --git a/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py b/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py index 0f884b5622..3c11b4670d 100644 --- a/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py +++ b/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py @@ -589,12 +589,7 @@ def _apply_params(self, launcher_params: LauncherParametersDTO) -> argparse.Name return self.launcher_args def run_study( - self, - study_uuid: str, - job_id: str, - version: SolverVersion, - launcher_parameters: LauncherParametersDTO, - study_path: Path, + self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO ) -> None: thread = threading.Thread( target=self._run_study, @@ -603,7 +598,7 @@ def run_study( ) thread.start() - def get_log(self, job_id: str, log_type: LogType, study_path: Path) -> t.Optional[str]: + def get_log(self, job_id: str, log_type: LogType) -> t.Optional[str]: log_path: t.Optional[Path] = None for study in self.data_repo_tinydb.get_list_of_studies(): if study.name == job_id: diff --git a/antarest/launcher/service.py b/antarest/launcher/service.py index 58613180c3..c919321a39 100644 --- a/antarest/launcher/service.py +++ b/antarest/launcher/service.py @@ -231,7 +231,6 @@ def run_study( logger.info(f"New study launch (study={study_uuid}, job_id={job_uuid})") study_info = self.study_service.get_study_information(uuid=study_uuid, params=params) solver_version = SolverVersion.parse(study_version or study_info.version) - study_path = self.study_service.get_study_path(uuid=study_uuid, params=params) self._assert_launcher_is_initialized(launcher) assert_permission( @@ -252,7 +251,7 @@ def run_study( ) self.job_result_repository.save(job_status) - self.launchers[launcher].run_study(study_uuid, job_uuid, solver_version, launcher_parameters, study_path) + self.launchers[launcher].run_study(study_uuid, job_uuid, solver_version, launcher_parameters) self.event_bus.push( Event( @@ -404,8 +403,7 @@ def get_log(self, job_id: str, log_type: LogType, params: RequestParameters) -> if job_result.launcher is None: raise ValueError(f"Job {job_id} has no launcher") self._assert_launcher_is_initialized(job_result.launcher) - study_path = self.study_service.get_study_path(job_result.study_id, params) - launcher_logs = str(self.launchers[job_result.launcher].get_log(job_id, log_type, study_path) or "") + launcher_logs = str(self.launchers[job_result.launcher].get_log(job_id, log_type) or "") if log_type == LogType.STDOUT: app_logs: Dict[JobLogType, List[str]] = functools.reduce( lambda logs, log: LauncherService.sort_log(log, logs), diff --git a/docs/developer-guide/install/1-CONFIG.md b/docs/developer-guide/install/1-CONFIG.md index 2a0b1e36cf..49c2b791ed 100644 --- a/docs/developer-guide/install/1-CONFIG.md +++ b/docs/developer-guide/install/1-CONFIG.md @@ -395,6 +395,12 @@ it is instantiated on shared servers using `slurm`. - **Description:** Path towards your xpress_dir. Needed if you want to launch a study with xpress. If the environment variables "XPRESS_DIR" and "XPRESS" are set on your local environment it should work without setting them. +### **local_workspace** + +- **Type:** Path +- **Default value:** `./local_workspace` +- **Description:** Antares Web uses this directory to run the simulations. + ## **slurm** SLURM (Simple Linux Utility for Resource Management) is used to interact with a remote environment (for Antares it's diff --git a/resources/deploy/config.yaml b/resources/deploy/config.yaml index 08831198ce..be61caeb29 100644 --- a/resources/deploy/config.yaml +++ b/resources/deploy/config.yaml @@ -22,6 +22,7 @@ launcher: local: binaries: VER: ANTARES_SOLVER_PATH + local_workspace: ./local_workspace # slurm: # local_workspace: /path/to/slurm_workspace # Path to the local SLURM workspace diff --git a/tests/launcher/test_local_launcher.py b/tests/launcher/test_local_launcher.py index 57c6a087d6..3ad04675da 100644 --- a/tests/launcher/test_local_launcher.py +++ b/tests/launcher/test_local_launcher.py @@ -97,7 +97,6 @@ def test_compute(tmp_path: Path, launcher_config: Config): local_launcher._compute( antares_solver_path=solver_path, study_uuid="study-id", - study_path=Path(tmp_path / "run"), job_id=study_id, launcher_parameters=launcher_parameters, ) @@ -115,23 +114,23 @@ def test_compute(tmp_path: Path, launcher_config: Config): def test_parse_launcher_arguments(launcher_config: Config): local_launcher = LocalLauncher(launcher_config, callbacks=Mock(), event_bus=Mock(), cache=Mock()) launcher_parameters = LauncherParametersDTO(nb_cpu=4) - sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters) + sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters) assert sim_args == ["--force-parallel=4"] launcher_parameters = LauncherParametersDTO(nb_cpu=8) - sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters) + sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters) assert sim_args == ["--force-parallel=8"] launcher_parameters.other_options = "coin" - sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters) + sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters) assert sim_args == ["--force-parallel=8", "--use-ortools", "--ortools-solver=coin"] launcher_parameters.other_options = "xpress" - sim_args, blabla = local_launcher.parse_launcher_options(launcher_parameters) + sim_args, blabla = local_launcher._parse_launcher_options(launcher_parameters) assert sim_args == ["--force-parallel=8", "--use-ortools", "--ortools-solver=xpress"] launcher_parameters.other_options = "xpress presolve" - sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters) + sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters) assert sim_args == [ "--force-parallel=8", "--use-ortools", @@ -142,7 +141,7 @@ def test_parse_launcher_arguments(launcher_config: Config): os.environ["XPRESS_DIR"] = "fake_path_for_test" launcher_parameters.other_options = "xpress presolve" - _, env_variables = local_launcher.parse_launcher_options(launcher_parameters) + _, env_variables = local_launcher._parse_launcher_options(launcher_parameters) assert env_variables["XPRESS_DIR"] == "fake_path_for_test" @@ -151,7 +150,7 @@ def test_parse_xpress_dir(tmp_path: Path): data = {"xpress_dir": "fake_path_for_test"} launcher_config = Config(launcher=LauncherConfig(local=LocalConfig.from_dict(data))) local_launcher = LocalLauncher(launcher_config, callbacks=Mock(), event_bus=Mock(), cache=Mock()) - _, env_variables = local_launcher.parse_launcher_options(LauncherParametersDTO()) + _, env_variables = local_launcher._parse_launcher_options(LauncherParametersDTO()) assert env_variables["XPRESS_DIR"] == "fake_path_for_test"