Skip to content

Commit

Permalink
feat(api): add endpoint get_nb_cores (#1727)
Browse files Browse the repository at this point in the history
Co-authored-by: LAIDI Takfarinas (Externe) <[email protected]>
Co-authored-by: Laurent LAPORTE <[email protected]>
Co-authored-by: belthlemar <[email protected]>
Co-authored-by: Laurent LAPORTE <[email protected]>
  • Loading branch information
5 people authored Oct 11, 2023
1 parent e435573 commit 8b04512
Show file tree
Hide file tree
Showing 20 changed files with 1,950 additions and 1,096 deletions.
469 changes: 307 additions & 162 deletions antarest/core/config.py

Large diffs are not rendered by default.

54 changes: 39 additions & 15 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
logger = logging.getLogger(__name__)
logging.getLogger("paramiko").setLevel("WARN")

MAX_NB_CPU = 24
MAX_TIME_LIMIT = 864000
MIN_TIME_LIMIT = 3600
WORKSPACE_LOCK_FILE_NAME = ".lock"
Expand Down Expand Up @@ -153,7 +152,7 @@ def _init_launcher_arguments(self, local_workspace: Optional[Path] = None) -> ar
main_options_parameters = ParserParameters(
default_wait_time=self.slurm_config.default_wait_time,
default_time_limit=self.slurm_config.default_time_limit,
default_n_cpu=self.slurm_config.default_n_cpu,
default_n_cpu=self.slurm_config.nb_cores.default,
studies_in_dir=str((Path(local_workspace or self.slurm_config.local_workspace) / STUDIES_INPUT_DIR_NAME)),
log_dir=str((Path(self.slurm_config.local_workspace) / LOG_DIR_NAME)),
finished_dir=str((Path(local_workspace or self.slurm_config.local_workspace) / STUDIES_OUTPUT_DIR_NAME)),
Expand Down Expand Up @@ -440,7 +439,7 @@ def _run_study(
_override_solver_version(study_path, version)

append_log(launch_uuid, "Submitting study to slurm launcher")
launcher_args = self._check_and_apply_launcher_params(launcher_params)
launcher_args = self._apply_params(launcher_params)
self._call_launcher(launcher_args, self.launcher_params)

launch_success = self._check_if_study_is_in_launcher_db(launch_uuid)
Expand Down Expand Up @@ -481,23 +480,40 @@ def _check_if_study_is_in_launcher_db(self, job_id: str) -> bool:
studies = self.data_repo_tinydb.get_list_of_studies()
return any(s.name == job_id for s in studies)

def _check_and_apply_launcher_params(self, launcher_params: LauncherParametersDTO) -> argparse.Namespace:
def _apply_params(self, launcher_params: LauncherParametersDTO) -> argparse.Namespace:
"""
Populate a `argparse.Namespace` object with the user parameters.
Args:
launcher_params:
Contains the launcher parameters selected by the user.
If a parameter is not provided (`None`), the default value should be retrieved
from the configuration.
Returns:
The `argparse.Namespace` object which is then passed to `antarestlauncher.main.run_with`,
to launch a simulation using Antares Launcher.
"""
if launcher_params:
launcher_args = deepcopy(self.launcher_args)
other_options = []

if launcher_params.other_options:
options = re.split("\\s+", launcher_params.other_options)
for opt in options:
other_options.append(re.sub("[^a-zA-Z0-9_,-]", "", opt))
if launcher_params.xpansion is not None:
launcher_args.xpansion_mode = "r" if launcher_params.xpansion_r_version else "cpp"
options = launcher_params.other_options.split()
other_options = [re.sub("[^a-zA-Z0-9_,-]", "", opt) for opt in options]
else:
other_options = []

# launcher_params.xpansion can be an `XpansionParametersDTO`, a bool or `None`
if launcher_params.xpansion: # not None and not False
launcher_args.xpansion_mode = {True: "r", False: "cpp"}[launcher_params.xpansion_r_version]
if (
isinstance(launcher_params.xpansion, XpansionParametersDTO)
and launcher_params.xpansion.sensitivity_mode
):
other_options.append("xpansion_sensitivity")

time_limit = launcher_params.time_limit
if time_limit and isinstance(time_limit, int):
if time_limit is not None:
if MIN_TIME_LIMIT > time_limit:
logger.warning(
f"Invalid slurm launcher time limit ({time_limit}),"
Expand All @@ -512,15 +528,23 @@ def _check_and_apply_launcher_params(self, launcher_params: LauncherParametersDT
launcher_args.time_limit = MAX_TIME_LIMIT - 3600
else:
launcher_args.time_limit = time_limit

post_processing = launcher_params.post_processing
if isinstance(post_processing, bool):
if post_processing is not None:
launcher_args.post_processing = post_processing

nb_cpu = launcher_params.nb_cpu
if nb_cpu and isinstance(nb_cpu, int):
if 0 < nb_cpu <= MAX_NB_CPU:
if nb_cpu is not None:
nb_cores = self.slurm_config.nb_cores
if nb_cores.min <= nb_cpu <= nb_cores.max:
launcher_args.n_cpu = nb_cpu
else:
logger.warning(f"Invalid slurm launcher nb_cpu ({nb_cpu}), should be between 1 and 24")
logger.warning(
f"Invalid slurm launcher nb_cpu ({nb_cpu}),"
f" should be between {nb_cores.min} and {nb_cores.max}"
)
launcher_args.n_cpu = nb_cores.default

if launcher_params.adequacy_patch is not None: # the adequacy patch can be an empty object
launcher_args.post_processing = True

Expand Down
6 changes: 3 additions & 3 deletions antarest/launcher/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ class XpansionParametersDTO(BaseModel):


class LauncherParametersDTO(BaseModel):
# Warning ! This class must be retrocompatible (that's the reason for the weird bool/XpansionParametersDTO union)
# Warning ! This class must be retro-compatible (that's the reason for the weird bool/XpansionParametersDTO union)
# The reason is that it's stored in json format in database and deserialized using the latest class version
# If compatibility is to be broken, an (alembic) data migration script should be added
adequacy_patch: Optional[Dict[str, Any]] = None
nb_cpu: Optional[int] = None
post_processing: bool = False
time_limit: Optional[int] = None
xpansion: Union[bool, Optional[XpansionParametersDTO]] = None
time_limit: Optional[int] = None # 3600 <= time_limit < 864000 (10 days)
xpansion: Union[XpansionParametersDTO, bool, None] = None
xpansion_r_version: bool = False
archive_output: bool = True
auto_unzip: bool = True
Expand Down
60 changes: 40 additions & 20 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import json
import logging
import os
import shutil
Expand All @@ -10,7 +11,7 @@

from fastapi import HTTPException

from antarest.core.config import Config
from antarest.core.config import Config, NbCoresConfig
from antarest.core.exceptions import StudyNotFoundError
from antarest.core.filetransfer.model import FileDownloadTaskDTO
from antarest.core.filetransfer.service import FileTransferManager
Expand Down Expand Up @@ -99,6 +100,21 @@ def _init_extensions(self) -> Dict[str, ILauncherExtension]:
def get_launchers(self) -> List[str]:
return list(self.launchers.keys())

def get_nb_cores(self, launcher: str) -> NbCoresConfig:
"""
Retrieve the configuration of the launcher's nb of cores.
Args:
launcher: name of the launcher: "default", "slurm" or "local".
Returns:
Number of cores of the launcher
Raises:
InvalidConfigurationError: if the launcher configuration is not available
"""
return self.config.launcher.get_nb_cores(launcher)

def _after_export_flat_hooks(
self,
job_id: str,
Expand Down Expand Up @@ -586,27 +602,31 @@ def get_load(self, from_cluster: bool = False) -> Dict[str, float]:
local_running_jobs.append(job)
else:
logger.warning(f"Unknown job launcher {job.launcher}")

load = {}
if self.config.launcher.slurm:

slurm_config = self.config.launcher.slurm
if slurm_config is not None:
if from_cluster:
raise NotImplementedError
slurm_used_cpus = functools.reduce(
lambda count, j: count
+ (
LauncherParametersDTO.parse_raw(j.launcher_params or "{}").nb_cpu
or self.config.launcher.slurm.default_n_cpu # type: ignore
),
slurm_running_jobs,
0,
)
load["slurm"] = float(slurm_used_cpus) / self.config.launcher.slurm.max_cores
if self.config.launcher.local:
local_used_cpus = functools.reduce(
lambda count, j: count + (LauncherParametersDTO.parse_raw(j.launcher_params or "{}").nb_cpu or 1),
local_running_jobs,
0,
)
load["local"] = float(local_used_cpus) / (os.cpu_count() or 1)
raise NotImplementedError("Cluster load not implemented yet")
default_cpu = slurm_config.nb_cores.default
slurm_used_cpus = 0
for job in slurm_running_jobs:
obj = json.loads(job.launcher_params) if job.launcher_params else {}
launch_params = LauncherParametersDTO(**obj)
slurm_used_cpus += launch_params.nb_cpu or default_cpu
load["slurm"] = slurm_used_cpus / slurm_config.max_cores

local_config = self.config.launcher.local
if local_config is not None:
default_cpu = local_config.nb_cores.default
local_used_cpus = 0
for job in local_running_jobs:
obj = json.loads(job.launcher_params) if job.launcher_params else {}
launch_params = LauncherParametersDTO(**obj)
local_used_cpus += launch_params.nb_cpu or default_cpu
load["local"] = local_used_cpus / local_config.nb_cores.max

return load

def get_solver_versions(self, solver: str) -> List[str]:
Expand Down
46 changes: 45 additions & 1 deletion antarest/launcher/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from fastapi import APIRouter, Depends, Query
from fastapi.exceptions import HTTPException

from antarest.core.config import Config
from antarest.core.config import Config, InvalidConfigurationError
from antarest.core.filetransfer.model import FileDownloadTaskDTO
from antarest.core.jwt import JWTUser
from antarest.core.requests import RequestParameters
Expand Down Expand Up @@ -230,4 +230,48 @@ def get_solver_versions(
raise UnknownSolverConfig(solver)
return service.get_solver_versions(solver)

# noinspection SpellCheckingInspection
@bp.get(
"/launcher/nbcores", # We avoid "nb_cores" and "nb-cores" in endpoints
tags=[APITag.launcher],
summary="Retrieving Min, Default, and Max Core Count",
response_model=Dict[str, int],
)
def get_nb_cores(
launcher: str = Query(
"default",
examples={
"Default launcher": {
"description": "Min, Default, and Max Core Count",
"value": "default",
},
"SLURM launcher": {
"description": "Min, Default, and Max Core Count",
"value": "slurm",
},
"Local launcher": {
"description": "Min, Default, and Max Core Count",
"value": "local",
},
},
)
) -> Dict[str, int]:
"""
Retrieve the numer of cores of the launcher.
Args:
- `launcher`: name of the configuration to read: "slurm" or "local".
If "default" is specified, retrieve the configuration of the default launcher.
Returns:
- "min": min number of cores
- "defaultValue": default number of cores
- "max": max number of cores
"""
logger.info(f"Fetching the number of cores for the '{launcher}' configuration")
try:
return service.config.launcher.get_nb_cores(launcher).to_json()
except InvalidConfigurationError:
raise UnknownSolverConfig(launcher)

return bp
19 changes: 13 additions & 6 deletions resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ db:
#pool_recycle:

storage:
tmp_dir: /tmp
tmp_dir: ./tmp
matrixstore: ./matrices
archive_dir: examples/archives
archive_dir: ./examples/archives
allow_deletion: false # indicate if studies found in non default workspace can be deleted by the application
#matrix_gc_sleeping_time: 3600 # time in seconds to sleep between two garbage collection
#matrix_gc_dry_run: False # Skip matrix effective deletion
Expand All @@ -32,20 +32,23 @@ storage:
#auto_archive_max_parallel: 5 # max auto archival tasks in parallel
workspaces:
default: # required, no filters applied, this folder is not watched
path: examples/internal_studies/
path: ./examples/internal_studies/
# other workspaces can be added
# if a directory is to be ignored by the watcher, place a file named AW_NO_SCAN inside
tmp:
path: examples/studies/
path: ./examples/studies/
# filter_in: ['.*'] # default to '.*'
# filter_out: [] # default to empty
# groups: [] # default empty

launcher:
default: local

local:
binaries:
700: path/to/700
enable_nb_cores_detection: true

# slurm:
# local_workspace: path/to/workspace
# username: username
Expand All @@ -56,7 +59,11 @@ launcher:
# password: password_is_optional_but_necessary_if_key_is_absent
# default_wait_time: 900
# default_time_limit: 172800
# default_n_cpu: 12
# enable_nb_cores_detection: False
# nb_cores:
# min: 1
# default: 22
# max: 24
# default_json_db_name: launcher_db.json
# slurm_script_path: /path/to/launchantares_v1.1.3.sh
# db_primary_key: name
Expand All @@ -70,7 +77,7 @@ launcher:

debug: true

root_path: ""
root_path: "api"

#tasks:
# max_workers: 5
Expand Down
11 changes: 9 additions & 2 deletions resources/deploy/config.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ storage:

launcher:
default: local

local:
binaries:
800: /antares_simulator/antares-8.2-solver
enable_nb_cores_detection: true

# slurm:
# local_workspace: path/to/workspace
# username: username
Expand All @@ -45,7 +48,11 @@ launcher:
# password: password_is_optional_but_necessary_if_key_is_absent
# default_wait_time: 900
# default_time_limit: 172800
# default_n_cpu: 12
# enable_nb_cores_detection: False
# nb_cores:
# min: 1
# default: 22
# max: 24
# default_json_db_name: launcher_db.json
# slurm_script_path: /path/to/launchantares_v1.1.3.sh
# db_primary_key: name
Expand All @@ -59,7 +66,7 @@ launcher:

debug: false

root_path: "/api"
root_path: "api"

#tasks:
# max_workers: 5
Expand Down
9 changes: 8 additions & 1 deletion resources/deploy/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ storage:

launcher:
default: local

local:
binaries:
700: path/to/700
enable_nb_cores_detection: true

# slurm:
# local_workspace: path/to/workspace
# username: username
Expand All @@ -42,7 +45,11 @@ launcher:
# password: password_is_optional_but_necessary_if_key_is_absent
# default_wait_time: 900
# default_time_limit: 172800
# default_n_cpu: 12
# enable_nb_cores_detection: False
# nb_cores:
# min: 1
# default: 22
# max: 24
# default_json_db_name: launcher_db.json
# slurm_script_path: /path/to/launchantares_v1.1.3.sh
# db_primary_key: name
Expand Down
Loading

0 comments on commit 8b04512

Please sign in to comment.