Skip to content

Commit

Permalink
feat(config): correct and improve the nb_cores configuration and endp…
Browse files Browse the repository at this point in the history
…oint
  • Loading branch information
laurent-laporte-pro committed Oct 3, 2023
1 parent 1c9f746 commit fb8fc58
Show file tree
Hide file tree
Showing 14 changed files with 1,765 additions and 1,286 deletions.
475 changes: 243 additions & 232 deletions antarest/core/config.py

Large diffs are not rendered by default.

54 changes: 39 additions & 15 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
logger = logging.getLogger(__name__)
logging.getLogger("paramiko").setLevel("WARN")

MAX_NB_CPU = 24
MAX_TIME_LIMIT = 864000
MIN_TIME_LIMIT = 3600
WORKSPACE_LOCK_FILE_NAME = ".lock"
Expand Down Expand Up @@ -153,7 +152,7 @@ def _init_launcher_arguments(self, local_workspace: Optional[Path] = None) -> ar
main_options_parameters = ParserParameters(
default_wait_time=self.slurm_config.default_wait_time,
default_time_limit=self.slurm_config.default_time_limit,
default_n_cpu=self.slurm_config.default_n_cpu,
default_n_cpu=self.slurm_config.nb_cores.default,
studies_in_dir=str((Path(local_workspace or self.slurm_config.local_workspace) / STUDIES_INPUT_DIR_NAME)),
log_dir=str((Path(self.slurm_config.local_workspace) / LOG_DIR_NAME)),
finished_dir=str((Path(local_workspace or self.slurm_config.local_workspace) / STUDIES_OUTPUT_DIR_NAME)),
Expand Down Expand Up @@ -440,7 +439,7 @@ def _run_study(
_override_solver_version(study_path, version)

append_log(launch_uuid, "Submitting study to slurm launcher")
launcher_args = self._check_and_apply_launcher_params(launcher_params)
launcher_args = self._apply_params(launcher_params)
self._call_launcher(launcher_args, self.launcher_params)

launch_success = self._check_if_study_is_in_launcher_db(launch_uuid)
Expand Down Expand Up @@ -481,23 +480,40 @@ def _check_if_study_is_in_launcher_db(self, job_id: str) -> bool:
studies = self.data_repo_tinydb.get_list_of_studies()
return any(s.name == job_id for s in studies)

def _check_and_apply_launcher_params(self, launcher_params: LauncherParametersDTO) -> argparse.Namespace:
def _apply_params(self, launcher_params: LauncherParametersDTO) -> argparse.Namespace:
"""
Populate a `argparse.Namespace` object with the user parameters.
Args:
launcher_params:
Contains the launcher parameters selected by the user.
If a parameter is not provided (`None`), the default value should be retrieved
from the configuration.
Returns:
The `argparse.Namespace` object which is then passed to `antarestlauncher.main.run_with`,
to launch a simulation using Antares Launcher.
"""
if launcher_params:
launcher_args = deepcopy(self.launcher_args)
other_options = []

if launcher_params.other_options:
options = re.split("\\s+", launcher_params.other_options)
for opt in options:
other_options.append(re.sub("[^a-zA-Z0-9_,-]", "", opt))
if launcher_params.xpansion is not None:
launcher_args.xpansion_mode = "r" if launcher_params.xpansion_r_version else "cpp"
options = launcher_params.other_options.split()
other_options = [re.sub("[^a-zA-Z0-9_,-]", "", opt) for opt in options]
else:
other_options = []

# launcher_params.xpansion can be an `XpansionParametersDTO`, a bool or `None`
if launcher_params.xpansion: # not None and not False
launcher_args.xpansion_mode = {True: "r", False: "cpp"}[launcher_params.xpansion_r_version]
if (
isinstance(launcher_params.xpansion, XpansionParametersDTO)
and launcher_params.xpansion.sensitivity_mode
):
other_options.append("xpansion_sensitivity")

time_limit = launcher_params.time_limit
if time_limit and isinstance(time_limit, int):
if time_limit is not None:
if MIN_TIME_LIMIT > time_limit:
logger.warning(
f"Invalid slurm launcher time limit ({time_limit}),"
Expand All @@ -512,15 +528,23 @@ def _check_and_apply_launcher_params(self, launcher_params: LauncherParametersDT
launcher_args.time_limit = MAX_TIME_LIMIT - 3600
else:
launcher_args.time_limit = time_limit

post_processing = launcher_params.post_processing
if isinstance(post_processing, bool):
if post_processing is not None:
launcher_args.post_processing = post_processing

nb_cpu = launcher_params.nb_cpu
if nb_cpu and isinstance(nb_cpu, int):
if 0 < nb_cpu <= MAX_NB_CPU:
if nb_cpu is not None:
nb_cores = self.slurm_config.nb_cores
if nb_cores.min <= nb_cpu <= nb_cores.max:
launcher_args.n_cpu = nb_cpu
else:
logger.warning(f"Invalid slurm launcher nb_cpu ({nb_cpu}), should be between 1 and 24")
logger.warning(
f"Invalid slurm launcher nb_cpu ({nb_cpu}),"
f" should be between {nb_cores.min} and {nb_cores.max}"
)
launcher_args.n_cpu = nb_cores.default

if launcher_params.adequacy_patch is not None: # the adequacy patch can be an empty object
launcher_args.post_processing = True

Expand Down
6 changes: 3 additions & 3 deletions antarest/launcher/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class XpansionParametersDTO(BaseModel):


class LauncherParametersDTO(BaseModel):
# Warning ! This class must be retrocompatible (that's the reason for the weird bool/XpansionParametersDTO union)
# Warning ! This class must be retro-compatible (that's the reason for the weird bool/XpansionParametersDTO union)
# The reason is that it's stored in json format in database and deserialized using the latest class version
# If compatibility is to be broken, an (alembic) data migration script should be added
adequacy_patch: Optional[Dict[str, Any]] = None
nb_cpu: Optional[int] = None
post_processing: bool = False
time_limit: Optional[int] = None
xpansion: Union[bool, Optional[XpansionParametersDTO]] = None
time_limit: Optional[int] = None # 3600 <= time_limit < 864000 (10 days)
xpansion: Union[XpansionParametersDTO, bool, None] = None
xpansion_r_version: bool = False
archive_output: bool = True
auto_unzip: bool = True
Expand Down
61 changes: 36 additions & 25 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import json
import logging
import os
import shutil
Expand All @@ -10,7 +11,7 @@

from fastapi import HTTPException

from antarest.core.config import Config, LauncherConfig
from antarest.core.config import Config, NbCoresConfig
from antarest.core.exceptions import StudyNotFoundError
from antarest.core.filetransfer.model import FileDownloadTaskDTO
from antarest.core.filetransfer.service import FileTransferManager
Expand Down Expand Up @@ -99,14 +100,20 @@ def _init_extensions(self) -> Dict[str, ILauncherExtension]:
def get_launchers(self) -> List[str]:
return list(self.launchers.keys())

@staticmethod
def get_nb_cores(launcher: str) -> Dict[str, int]:
def get_nb_cores(self, launcher: str) -> NbCoresConfig:
"""
Retrieving Min, Default, and Max Core Count.
Retrieve the configuration of the launcher's nb of cores.
Args:
launcher: name of the configuration : "default", "slurm" or "local".
launcher: name of the launcher: "default", "slurm" or "local".
Returns:
Number of cores of the launcher
Raises:
InvalidConfigurationError: if the launcher configuration is not available
"""
return LauncherConfig().get_nb_cores(launcher).to_json()
return self.config.launcher.get_nb_cores(launcher)

def _after_export_flat_hooks(
self,
Expand Down Expand Up @@ -595,27 +602,31 @@ def get_load(self, from_cluster: bool = False) -> Dict[str, float]:
local_running_jobs.append(job)
else:
logger.warning(f"Unknown job launcher {job.launcher}")

load = {}
if self.config.launcher.slurm:

slurm_config = self.config.launcher.slurm
if slurm_config is not None:
if from_cluster:
raise NotImplementedError
slurm_used_cpus = functools.reduce(
lambda count, j: count
+ (
LauncherParametersDTO.parse_raw(j.launcher_params or "{}").nb_cpu
or self.config.launcher.slurm.default_n_cpu # type: ignore
),
slurm_running_jobs,
0,
)
load["slurm"] = float(slurm_used_cpus) / self.config.launcher.slurm.max_cores
if self.config.launcher.local:
local_used_cpus = functools.reduce(
lambda count, j: count + (LauncherParametersDTO.parse_raw(j.launcher_params or "{}").nb_cpu or 1),
local_running_jobs,
0,
)
load["local"] = float(local_used_cpus) / (os.cpu_count() or 1)
raise NotImplementedError("Cluster load not implemented yet")
default_cpu = slurm_config.nb_cores.default
slurm_used_cpus = 0
for job in slurm_running_jobs:
obj = json.loads(job.launcher_params) if job.launcher_params else {}
launch_params = LauncherParametersDTO(**obj)
slurm_used_cpus += launch_params.nb_cpu or default_cpu
load["slurm"] = slurm_used_cpus / slurm_config.max_cores

local_config = self.config.launcher.local
if local_config is not None:
default_cpu = local_config.nb_cores.default
local_used_cpus = 0
for job in local_running_jobs:
obj = json.loads(job.launcher_params) if job.launcher_params else {}
launch_params = LauncherParametersDTO(**obj)
local_used_cpus += launch_params.nb_cpu or default_cpu
load["local"] = local_used_cpus / local_config.nb_cores.max

return load

def get_solver_versions(self, solver: str) -> List[str]:
Expand Down
18 changes: 12 additions & 6 deletions antarest/launcher/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ def get_solver_versions(
raise UnknownSolverConfig(solver)
return service.get_solver_versions(solver)

# noinspection SpellCheckingInspection
@bp.get(
"/launcher/nbcores",
"/launcher/nbcores", # We avoid "nb_cores" and "nb-cores" in endpoints
tags=[APITag.launcher],
summary="Retrieving Min, Default, and Max Core Count",
response_model=Dict[str, int],
Expand All @@ -256,13 +257,18 @@ def get_nb_cores(
)
) -> Dict[str, int]:
"""
Retrieving Min, Default, and Max Core Count.
Retrieve the numer of cores of the launcher.
Args:
- `launcher`: name of the configuration to read: "default", "slurm" or "local".
- `launcher`: name of the configuration to read: "slurm" or "local".
If "default" is specified, retrieve the configuration of the default launcher.
Returns:
- "min": min number of cores
- "defaultValue": default number of cores
- "max": max number of cores
"""
logger.info(f"Fetching the number of cpu for the '{launcher}' configuration")
if launcher not in {"default", "slurm", "local"}:
raise UnknownSolverConfig(launcher)
logger.info(f"Fetching the number of cores for the '{launcher}' configuration")
try:
return service.config.launcher.get_nb_cores(launcher).to_json()
except InvalidConfigurationError:
Expand Down
9 changes: 5 additions & 4 deletions tests/conftest_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import pytest
from sqlalchemy import create_engine # type: ignore
from sqlalchemy.orm import sessionmaker
from sqlalchemy.engine.base import Engine # type: ignore
from sqlalchemy.orm import Session, sessionmaker # type: ignore

from antarest.core.utils.fastapi_sqlalchemy import DBSessionMiddleware
from antarest.dbmodel import Base
Expand All @@ -12,7 +13,7 @@


@pytest.fixture(name="db_engine")
def db_engine_fixture() -> Generator[Any, None, None]:
def db_engine_fixture() -> Generator[Engine, None, None]:
"""
Fixture that creates an in-memory SQLite database engine for testing.
Expand All @@ -26,7 +27,7 @@ def db_engine_fixture() -> Generator[Any, None, None]:


@pytest.fixture(name="db_session")
def db_session_fixture(db_engine) -> Generator:
def db_session_fixture(db_engine: Engine) -> Generator[Session, None, None]:
"""
Fixture that creates a database session for testing purposes.
Expand All @@ -46,7 +47,7 @@ def db_session_fixture(db_engine) -> Generator:

@pytest.fixture(name="db_middleware", autouse=True)
def db_middleware_fixture(
db_engine: Any,
db_engine: Engine,
) -> Generator[DBSessionMiddleware, None, None]:
"""
Fixture that sets up a database session middleware with custom engine settings.
Expand Down
3 changes: 3 additions & 0 deletions tests/core/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from pathlib import Path

ASSETS_DIR = Path(__file__).parent.resolve()
61 changes: 61 additions & 0 deletions tests/core/assets/config/application-2.14.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
security:
disabled: false
jwt:
key: super-secret
login:
admin:
pwd: admin

db:
url: "sqlite:////home/john/antares_data/database.db"

storage:
tmp_dir: /tmp
matrixstore: /home/john/antares_data/matrices
archive_dir: /home/john/antares_data/archives
allow_deletion: false
workspaces:
default:
path: /home/john/antares_data/internal_studies/
studies:
path: /home/john/antares_data/studies/

launcher:
default: slurm
local:
binaries:
850: /home/john/opt/antares-8.5.0-Ubuntu-20.04/antares-solver
860: /home/john/opt/antares-8.6.0-Ubuntu-20.04/antares-8.6-solver

slurm:
local_workspace: /home/john/antares_data/slurm_workspace

username: antares
hostname: slurm-prod-01

port: 22
private_key_file: /home/john/.ssh/id_rsa
key_password:
default_wait_time: 900
default_time_limit: 172800
default_n_cpu: 20
default_json_db_name: launcher_db.json
slurm_script_path: /applis/antares/launchAntares.sh
db_primary_key: name
antares_versions_on_remote_server:
- '850' # 8.5.1/antares-8.5-solver
- '860' # 8.6.2/antares-8.6-solver
- '870' # 8.7.0/antares-8.7-solver

debug: false

root_path: ""

server:
worker_threadpool_size: 12
services:
- watcher
- matrix_gc

logging:
level: INFO
Loading

0 comments on commit fb8fc58

Please sign in to comment.