Skip to content

Commit

Permalink
resolve comments by doing the same as slurm inside local
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle committed Dec 18, 2024
1 parent 9e927f5 commit 4662ffb
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 87 deletions.
3 changes: 3 additions & 0 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class LocalConfig:
nb_cores: NbCoresConfig = NbCoresConfig()
time_limit: TimeLimitConfig = TimeLimitConfig()
xpress_dir: Optional[str] = None
local_workspace: Path = Path("./local_workspace")

@classmethod
def from_dict(cls, data: JSON) -> "LocalConfig":
Expand All @@ -280,11 +281,13 @@ def from_dict(cls, data: JSON) -> "LocalConfig":
if enable_nb_cores_detection:
nb_cores.update(cls._autodetect_nb_cores())
xpress_dir = data.get("xpress_dir", defaults.xpress_dir)
local_workspace = Path(data["local_workspace"]) if "local_workspace" in data else defaults.local_workspace
return cls(
binaries={str(v): Path(p) for v, p in binaries.items()},
enable_nb_cores_detection=enable_nb_cores_detection,
nb_cores=NbCoresConfig(**nb_cores),
xpress_dir=xpress_dir,
local_workspace=local_workspace,
)

@classmethod
Expand Down
9 changes: 2 additions & 7 deletions antarest/launcher/adapters/abstractlauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,12 @@ def __init__(

@abstractmethod
def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
study_path: Path,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
raise NotImplementedError()

@abstractmethod
def get_log(self, job_id: str, log_type: LogType, study_path: Path) -> Optional[str]:
def get_log(self, job_id: str, log_type: LogType) -> Optional[str]:
raise NotImplementedError()

@abstractmethod
Expand Down
94 changes: 33 additions & 61 deletions antarest/launcher/adapters/local_launcher/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,23 @@
#
# This file is part of the Antares project.

import io
import logging
import os
import shutil
import signal
import subprocess
import tempfile
import threading
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, cast
from typing import Any, Callable, Dict, List, Optional, Tuple

from antares.study.version import SolverVersion

from antarest.core.config import Config
from antarest.core.interfaces.cache import ICache
from antarest.core.interfaces.eventbus import IEventBus
from antarest.launcher.adapters.abstractlauncher import AbstractLauncher, LauncherCallbacks, LauncherInitException
from antarest.launcher.adapters.log_manager import follow
from antarest.launcher.adapters.log_manager import LogTailManager
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType

logger = logging.getLogger(__name__)
Expand All @@ -49,7 +47,11 @@ def __init__(
super().__init__(config, callbacks, event_bus, cache)
if self.config.launcher.local is None:
raise LauncherInitException("Missing parameter 'launcher.local'")
self.tmpdir = config.storage.tmp_dir
self.local_workspace = self.config.launcher.local.local_workspace
logs_path = self.local_workspace / "LOGS"
logs_path.mkdir(parents=True, exist_ok=True)
self.log_directory = logs_path
self.log_tail_manager = LogTailManager(self.local_workspace)
self.job_id_to_study_id: Dict[str, Tuple[str, Path, subprocess.Popen]] = {} # type: ignore
self.logs: Dict[str, str] = {}

Expand All @@ -73,12 +75,7 @@ def _select_best_binary(self, version: str) -> Path:
return antares_solver_path

def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
study_path: Path,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
antares_solver_path = self._select_best_binary(f"{version:ddd}")

Expand All @@ -88,7 +85,6 @@ def run_study(
self,
antares_solver_path,
study_uuid,
study_path,
job_id,
launcher_parameters,
),
Expand All @@ -100,64 +96,33 @@ def _compute(
self,
antares_solver_path: Path,
study_uuid: str,
study_path: Path,
job_id: str,
launcher_parameters: LauncherParametersDTO,
) -> None:
end = False

# create study logs
logs_path = study_path / "output" / "logs"
logs_path.mkdir(exist_ok=True, parents=True)

def stop_reading_output() -> bool:
if end and job_id in self.logs:
with open(logs_path / f"{job_id}-out.log", "w") as log_file:
log_file.write(self.logs[job_id])
del self.logs[job_id]
return end

tmp_path = tempfile.mkdtemp(prefix="local_launch_", dir=str(self.tmpdir))
export_path = Path(tmp_path) / "export"
export_path = self.local_workspace / job_id
logs_path = self.log_directory / job_id
logs_path.mkdir()
try:
self.callbacks.export_study(job_id, study_uuid, export_path, launcher_parameters)

simulator_args, environment_variables = self.parse_launcher_options(launcher_parameters)
simulator_args, environment_variables = self._parse_launcher_options(launcher_parameters)
new_args = [str(antares_solver_path)] + simulator_args + [str(export_path)]

std_out_file = study_path / "output" / "logs" / f"{job_id}-err.log"
with open(std_out_file, "w") as err_file:
std_err_file = logs_path / f"{job_id}-err.log"
std_out_file = logs_path / f"{job_id}-out.log"
with open(std_err_file, "w") as err_file, open(std_out_file, "w") as out_file:
process = subprocess.Popen(
new_args,
env=environment_variables,
stdout=subprocess.PIPE,
stdout=out_file,
stderr=err_file,
universal_newlines=True,
encoding="utf-8",
)
self.job_id_to_study_id[job_id] = (
study_uuid,
export_path,
process,
)
self.callbacks.update_status(
job_id,
JobStatus.RUNNING,
None,
None,
)
self.job_id_to_study_id[job_id] = (study_uuid, export_path, process)
self.callbacks.update_status(job_id, JobStatus.RUNNING, None, None)

thread = threading.Thread(
target=lambda: follow(
cast(io.StringIO, process.stdout),
self.create_update_log(job_id),
stop_reading_output,
None,
),
name=f"{self.__class__.__name__}-LogsWatcher",
daemon=True,
)
thread.start()
self.log_tail_manager.track(std_out_file, self.create_update_log(job_id))

while process.poll() is None:
time.sleep(1)
Expand All @@ -171,7 +136,8 @@ def stop_reading_output() -> bool:
if process.returncode == 0:
# The job succeed we need to import the output
try:
output_id = self.callbacks.import_output(job_id, export_path / "output", {})
launcher_logs = self._import_launcher_logs(job_id)
output_id = self.callbacks.import_output(job_id, export_path / "output", launcher_logs)
except Exception as e:
logger.error(
f"Failed to import output for study {study_uuid} located at {export_path}",
Expand All @@ -193,11 +159,17 @@ def stop_reading_output() -> bool:
None,
)
finally:
logger.info(f"Removing launch {job_id} export path at {tmp_path}")
end = True
shutil.rmtree(tmp_path)
logger.info(f"Removing launch {job_id} export path at {export_path}")
shutil.rmtree(export_path)

def _import_launcher_logs(self, job_id: str) -> Dict[str, List[Path]]:
logs_path = self.log_directory / job_id
return {
"antares-out.log": [logs_path / f"{job_id}-out.log"],
"antares-err.log": [logs_path / f"{job_id}-err.log"],
}

def parse_launcher_options(self, launcher_parameters: LauncherParametersDTO) -> Tuple[List[str], Dict[str, Any]]:
def _parse_launcher_options(self, launcher_parameters: LauncherParametersDTO) -> Tuple[List[str], Dict[str, Any]]:
simulator_args = [f"--force-parallel={launcher_parameters.nb_cpu}"]
environment_variables = os.environ.copy()
if launcher_parameters.other_options:
Expand Down Expand Up @@ -225,10 +197,10 @@ def append_to_log(log_line: str) -> None:

return append_to_log

def get_log(self, job_id: str, log_type: LogType, study_path: Path) -> Optional[str]:
def get_log(self, job_id: str, log_type: LogType) -> Optional[str]:
if job_id in self.job_id_to_study_id and job_id in self.logs and log_type == LogType.STDOUT:
return self.logs[job_id]
job_path = study_path / "output" / "logs" / f"{job_id}-{log_type.to_suffix()}"
job_path = self.log_directory / job_id / f"{job_id}-{log_type.to_suffix()}"
if job_path.exists():
return job_path.read_text()
return None
Expand Down
9 changes: 2 additions & 7 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,12 +589,7 @@ def _apply_params(self, launcher_params: LauncherParametersDTO) -> argparse.Name
return self.launcher_args

def run_study(
self,
study_uuid: str,
job_id: str,
version: SolverVersion,
launcher_parameters: LauncherParametersDTO,
study_path: Path,
self, study_uuid: str, job_id: str, version: SolverVersion, launcher_parameters: LauncherParametersDTO
) -> None:
thread = threading.Thread(
target=self._run_study,
Expand All @@ -603,7 +598,7 @@ def run_study(
)
thread.start()

def get_log(self, job_id: str, log_type: LogType, study_path: Path) -> t.Optional[str]:
def get_log(self, job_id: str, log_type: LogType) -> t.Optional[str]:
log_path: t.Optional[Path] = None
for study in self.data_repo_tinydb.get_list_of_studies():
if study.name == job_id:
Expand Down
6 changes: 2 additions & 4 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ def run_study(
logger.info(f"New study launch (study={study_uuid}, job_id={job_uuid})")
study_info = self.study_service.get_study_information(uuid=study_uuid, params=params)
solver_version = SolverVersion.parse(study_version or study_info.version)
study_path = self.study_service.get_study_path(uuid=study_uuid, params=params)

self._assert_launcher_is_initialized(launcher)
assert_permission(
Expand All @@ -252,7 +251,7 @@ def run_study(
)
self.job_result_repository.save(job_status)

self.launchers[launcher].run_study(study_uuid, job_uuid, solver_version, launcher_parameters, study_path)
self.launchers[launcher].run_study(study_uuid, job_uuid, solver_version, launcher_parameters)

self.event_bus.push(
Event(
Expand Down Expand Up @@ -404,8 +403,7 @@ def get_log(self, job_id: str, log_type: LogType, params: RequestParameters) ->
if job_result.launcher is None:
raise ValueError(f"Job {job_id} has no launcher")
self._assert_launcher_is_initialized(job_result.launcher)
study_path = self.study_service.get_study_path(job_result.study_id, params)
launcher_logs = str(self.launchers[job_result.launcher].get_log(job_id, log_type, study_path) or "")
launcher_logs = str(self.launchers[job_result.launcher].get_log(job_id, log_type) or "")
if log_type == LogType.STDOUT:
app_logs: Dict[JobLogType, List[str]] = functools.reduce(
lambda logs, log: LauncherService.sort_log(log, logs),
Expand Down
6 changes: 6 additions & 0 deletions docs/developer-guide/install/1-CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ it is instantiated on shared servers using `slurm`.
- **Description:** Path towards your xpress_dir. Needed if you want to launch a study with xpress. If the environment
variables "XPRESS_DIR" and "XPRESS" are set on your local environment it should work without setting them.

### **local_workspace**

- **Type:** Path
- **Default value:** `./local_workspace`
- **Description:** Antares Web uses this directory to run the simulations.

## **slurm**

SLURM (Simple Linux Utility for Resource Management) is used to interact with a remote environment (for Antares it's
Expand Down
1 change: 1 addition & 0 deletions resources/deploy/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ launcher:
local:
binaries:
VER: ANTARES_SOLVER_PATH
local_workspace: ./local_workspace

# slurm:
# local_workspace: /path/to/slurm_workspace # Path to the local SLURM workspace
Expand Down
15 changes: 7 additions & 8 deletions tests/launcher/test_local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def test_compute(tmp_path: Path, launcher_config: Config):
local_launcher._compute(
antares_solver_path=solver_path,
study_uuid="study-id",
study_path=Path(tmp_path / "run"),
job_id=study_id,
launcher_parameters=launcher_parameters,
)
Expand All @@ -115,23 +114,23 @@ def test_compute(tmp_path: Path, launcher_config: Config):
def test_parse_launcher_arguments(launcher_config: Config):
local_launcher = LocalLauncher(launcher_config, callbacks=Mock(), event_bus=Mock(), cache=Mock())
launcher_parameters = LauncherParametersDTO(nb_cpu=4)
sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters)
sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters)
assert sim_args == ["--force-parallel=4"]

launcher_parameters = LauncherParametersDTO(nb_cpu=8)
sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters)
sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters)
assert sim_args == ["--force-parallel=8"]

launcher_parameters.other_options = "coin"
sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters)
sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters)
assert sim_args == ["--force-parallel=8", "--use-ortools", "--ortools-solver=coin"]

launcher_parameters.other_options = "xpress"
sim_args, blabla = local_launcher.parse_launcher_options(launcher_parameters)
sim_args, blabla = local_launcher._parse_launcher_options(launcher_parameters)
assert sim_args == ["--force-parallel=8", "--use-ortools", "--ortools-solver=xpress"]

launcher_parameters.other_options = "xpress presolve"
sim_args, _ = local_launcher.parse_launcher_options(launcher_parameters)
sim_args, _ = local_launcher._parse_launcher_options(launcher_parameters)
assert sim_args == [
"--force-parallel=8",
"--use-ortools",
Expand All @@ -142,7 +141,7 @@ def test_parse_launcher_arguments(launcher_config: Config):

os.environ["XPRESS_DIR"] = "fake_path_for_test"
launcher_parameters.other_options = "xpress presolve"
_, env_variables = local_launcher.parse_launcher_options(launcher_parameters)
_, env_variables = local_launcher._parse_launcher_options(launcher_parameters)
assert env_variables["XPRESS_DIR"] == "fake_path_for_test"


Expand All @@ -151,7 +150,7 @@ def test_parse_xpress_dir(tmp_path: Path):
data = {"xpress_dir": "fake_path_for_test"}
launcher_config = Config(launcher=LauncherConfig(local=LocalConfig.from_dict(data)))
local_launcher = LocalLauncher(launcher_config, callbacks=Mock(), event_bus=Mock(), cache=Mock())
_, env_variables = local_launcher.parse_launcher_options(LauncherParametersDTO())
_, env_variables = local_launcher._parse_launcher_options(LauncherParametersDTO())
assert env_variables["XPRESS_DIR"] == "fake_path_for_test"


Expand Down

0 comments on commit 4662ffb

Please sign in to comment.