Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fault tolerance] Automatic resubmission #346

Merged
merged 45 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
28f3e43
Added fault tolerance config for gpt3 126m
jbieniusiewi Oct 30, 2023
7a955dd
inital auto-resume impl-WIP
jbieniusiewi Nov 3, 2023
0b84305
Auto-resume loop-WIP
jbieniusiewi Nov 7, 2023
733ac4c
Cleaning dbg code
jbieniusiewi Nov 7, 2023
adc5b7b
Fixes after testing on EOS
Nov 9, 2023
cc4f54b
Updated fault tolerance-end of day version
Nov 15, 2023
2b0d228
Read fault tolerance config from exp_manager section
Nov 17, 2023
63dc927
Added autoresume after preemption
jbieniusiewi Nov 27, 2023
2334995
Updated auto-resume params reading
jbieniusiewi Nov 28, 2023
0aea955
Added FT to main branch
jbieniusiewi Dec 5, 2023
e50d388
Added SC2 config and run script
Dec 5, 2023
ca7ab16
Fixed launch cmd for Sc2
Dec 5, 2023
74b2e42
Updated launcher cmd
Dec 6, 2023
b8d8a58
increased timeouts and removed NVTE_APPLY_QK_LAYER_SCALING
Dec 6, 2023
0d9065e
Added back NVTE_APPLY_QK_LAYER_SCALING=1 as error happens without it
jbieniusiewi Dec 7, 2023
c6367c5
Added fault tolerance unit tests
jbieniusiewi Dec 11, 2023
f1dfe54
Use FT launcher, WIP
jbieniusiewi Jan 11, 2024
87e3386
Fix...
jbieniusiewi Jan 11, 2024
90fc006
Fix2
jbieniusiewi Jan 11, 2024
89362b3
Updating for FT launcher, wip...
jbieniusiewi Jan 16, 2024
b4ee9ed
Updated FT params reading
jbieniusiewi Jan 17, 2024
050ccf2
Fixes after testing on DlCluster...
jbieniusiewi Jan 17, 2024
147024b
Update after 'create_fault_tolerance_callback' param was added
jbieniusiewi Jan 18, 2024
5105a29
Improved auto-resume
jbieniusiewi Jan 19, 2024
58741ae
Improved auto-resume-cont
jbieniusiewi Jan 19, 2024
4442118
Use hostname to get the rendezvous host
jbieniusiewi Jan 22, 2024
25cc521
Added additional_ft_launcher_args
jbieniusiewi Jan 23, 2024
874b651
Restored --kill-on-bad-exit=0, --wait=3600
jbieniusiewi Jan 25, 2024
8316411
Fixed comment
jbieniusiewi Jan 25, 2024
fb80ec0
--kill-on-bad-exit=1
jbieniusiewi Jan 25, 2024
79b371f
Set FT work dir
jbieniusiewi Feb 5, 2024
ff6e939
Set FT work dir/fix
jbieniusiewi Feb 5, 2024
e5ade77
Added test script for DracoRNO/wip
jbieniusiewi Feb 5, 2024
c2f8cba
Working on test scripts/WIP
jbieniusiewi Mar 5, 2024
76f176a
Version for testing
jbieniusiewi Mar 8, 2024
1ee3eb3
Merge branch NeMo-Megatron-Launcher:master into fault-tolerance-elastic
jbieniusiewi Apr 29, 2024
8e0cf6f
Handle unknown job result as a failure, added some comments
jbieniusiewi Apr 30, 2024
b298524
Merge remote-tracking branch 'gh/main' into fault-tolerance-elastic
maanug-nv Jun 4, 2024
23d03cb
formatting
maanug-nv Jun 4, 2024
9a9accf
fix unit tests
maanug-nv Jun 4, 2024
eb48956
remove examples
maanug-nv Jun 5, 2024
7d6e2df
Merge branch 'main' into fault-tolerance-elastic
maanug-nv Jun 28, 2024
df42322
Final cleanup
jbieniusiewi Jul 1, 2024
f1085d5
Merge branch 'main' into fault-tolerance-elastic
jbieniusiewi Jul 5, 2024
5068143
Merge branch 'main' into fault-tolerance-elastic
jbieniusiewi Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 297 additions & 8 deletions launcher_scripts/nemo_launcher/core/launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

import nemo_launcher.utils.job_utils as job_utils
import yaml
from hera.workflows import Workflow
from nemo_launcher.core.logger import logger
from omegaconf import DictConfig, OmegaConf
from hera.workflows import Workflow

NEMO_LAUNCHER_CI = os.getenv("NEMO_LAUNCHER_CI", "False").lower() in ("true", "t", "1")
NEMO_LAUNCHER_DEBUG = os.getenv("NEMO_LAUNCHER_DEBUG", "False").lower() in (
Expand Down Expand Up @@ -360,15 +360,23 @@ class SlurmLauncher(Launcher):

:param Union[Path, str] folder: folder for storing job submission/output and logs.
:param str job_name: Name of the job, used as job folder name
:param bool use_fault_tolerance: Use fault tolerance launcher to run the job
:param Any **kwargs: See slurm documentation for most parameters.
Most useful parameters are: time, mem, gpus_per_node, cpus_per_task, partition
Below are the parameters that differ from slurm documentation:
setup: a list of command to run in sbatch before running srun
"""

def __init__(self, folder: Union[Path, str], job_name: str, **kwargs: Any) -> None:
def __init__(
self,
folder: Union[Path, str],
job_name: str,
use_fault_tolerance: bool = False,
**kwargs: Any,
) -> None:
super().__init__(folder, job_name)
self.parameters = {}
self.use_fault_tolerance = use_fault_tolerance
self._update_parameters(job_name=job_name, **kwargs)

if shutil.which("srun") is None and not NEMO_LAUNCHER_DEBUG:
Expand All @@ -389,9 +397,12 @@ def _equivalence_dict(cls):
}

@classmethod
def _valid_parameters(cls) -> Set[str]:
def _valid_parameters(cls, use_fault_tolerance) -> Set[str]:
"""Parameters that can be set through update_parameters"""
return set(_get_default_parameters())
if use_fault_tolerance:
return set(_get_default_parameters(_make_sbatch_string_ft_launcher))
else:
return set(_get_default_parameters(_make_sbatch_string))

def _convert_parameters(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""translate slurm parameter names"""
Expand All @@ -415,7 +426,11 @@ def _update_parameters(self, **kwargs: Any) -> None:
Below are the parameters that differ from slurm documentation:
setup: a list of command to run in sbatch before running srun
"""
defaults = _get_default_parameters()

if self.use_fault_tolerance:
defaults = _get_default_parameters_ft_launcher()
else:
defaults = _get_default_parameters()
in_valid_parameters = sorted(set(kwargs) - set(defaults))
if in_valid_parameters:
string = "\n - ".join(
Expand Down Expand Up @@ -458,9 +473,14 @@ def _make_submission_file_text(self, command_groups: List[List[str]]) -> str:
:return: submission script file's text
:rtype: str
"""
return _make_sbatch_string(
command_groups=command_groups, folder=self.folder, **self.parameters
)
if self.use_fault_tolerance:
return _make_sbatch_string_ft_launcher(
command_groups=command_groups, folder=self.folder, **self.parameters
)
else:
return _make_sbatch_string(
command_groups=command_groups, folder=self.folder, **self.parameters
)

@staticmethod
def _make_submission_command(submission_file_path: Path) -> List[str]:
Expand Down Expand Up @@ -834,6 +854,275 @@ def _make_sbatch_string(
return "\n".join(lines)


@functools.lru_cache()
def _get_default_parameters_ft_launcher() -> Dict[str, Any]:
"""Parameters that can be set through update_parameters"""
specs = inspect.getfullargspec(_make_sbatch_string_ft_launcher)
zipped = zip(specs.args[-len(specs.defaults) :], specs.defaults) # type: ignore
return {key: val for key, val in zipped if key not in {"command_groups", "folder"}}


# pylint: disable=too-many-arguments,unused-argument, too-many-locals
def _make_sbatch_string_ft_launcher(
command_groups: List[List[str]],
folder: Union[str, Path],
job_name: str = "nemo_launcher",
partition: Optional[str] = None,
time: int = 5,
nodes: Union[int, List[int]] = 1,
ntasks_per_node: Optional[Union[int, List[int]]] = None,
cpus_per_task: Optional[int] = None,
cpus_per_gpu: Optional[int] = None,
num_gpus: Optional[int] = None, # legacy
gpus_per_node: Optional[int] = None,
gpus_per_task: Optional[int] = None,
qos: Optional[str] = None, # quality of service
setup: Optional[List[str]] = None,
mem: Optional[str] = None,
mem_per_gpu: Optional[str] = None,
mem_per_cpu: Optional[str] = None,
dependency: Optional[str] = None,
comment: Optional[str] = None,
constraint: Optional[str] = None,
exclude: Optional[str] = None,
account: Optional[str] = None,
gres: Optional[str] = None,
exclusive: Optional[Union[bool, str]] = None,
array: Optional[str] = None,
stderr_to_stdout: bool = False,
container_image: Optional[str] = None,
container_mounts: Optional[str] = None,
additional_parameters: Optional[Dict[str, Any]] = None,
srun_args: Optional[Iterable[str]] = None,
heterogeneous: bool = False,
max_subsequent_job_failures: int = 0,
max_rank_restarts: int = 0,
additional_ft_launcher_args: str = "",
) -> str:

"""Creates the content of an sbatch file with provided parameters

Parameters
----------
See slurm sbatch documentation for most parameters:
https://slurm.schedmd.com/sbatch.html

Below are the parameters that differ from slurm documentation:

command_groups:
each command group will be assigned one srun
folder: str/Path
folder where print logs and error logs will be written
setup: list
a list of command to run in sbatch before running srun
additional_parameters: dict
Forces any parameter to a given value in sbatch. This can be useful
to add parameters which are not currently available in nemo_launcher.
Eg: {"mail-user": "[email protected]", "mail-type": "BEGIN"}
srun_args: List[str]
Add each argument in the list to the srun call

Raises
------
ValueError
In case an erroneous keyword argument is added, a list of all eligible parameters
is printed, with their default values
"""
nonslurm = [
"nonslurm",
"folder",
"command_groups",
"additional_parameters",
"setup",
"stderr_to_stdout",
"container_image",
"container_mounts",
"srun_args",
"heterogeneous",
"max_subsequent_job_failures",
"max_rank_restarts",
"additional_ft_launcher_args",
]
parameters = {
k: v for k, v in locals().items() if v is not None and k not in nonslurm
}
# rename and reformat parameters

if num_gpus is not None:
warnings.warn(
'"num_gpus" is deprecated, please use "gpus_per_node" instead (overwritting with num_gpus)'
)
parameters["gpus_per_node"] = parameters.pop("num_gpus", 0)
if "cpus_per_gpu" in parameters and "gpus_per_task" not in parameters:
warnings.warn(
'"cpus_per_gpu" requires to set "gpus_per_task" to work (and not "gpus_per_node")'
)
# add necessary parameters
job_name = parameters.get("job_name")
paths = job_utils.JobPaths(folder=folder, job_name=job_name)
stdout = str(paths.stdout)
stderr = str(paths.stderr)

if array is not None:
stdout = stdout.replace("%j", "%A_%a")
stderr = stderr.replace("%j", "%A_%a")
parameters["output"] = stdout.replace("%t", "0")

if not stderr_to_stdout:
parameters["error"] = stderr.replace("%t", "0")

if NEMO_LAUNCHER_CI: # Override output file for slurm
parameters["output"] = parameters["error"] = str(paths.folder / "slurm_%j.out")
stdout = stderr = parameters["output"]

if additional_parameters is not None:
parameters.update(additional_parameters)
# now create
lines = ["#!/bin/bash", "", "# Parameters"]
if heterogeneous:
raise ValueError("Fault tolerance is not supported with heterogeneous jobs.")
else:
# run 1 FT launcher per node, it will spawn the actual tasks
parameters["ntasks_per_node"] = 1
for k in sorted(parameters):
lines.append(_as_sbatch_flag(k, parameters[k]))
parameters["ntasks_per_node"] = ntasks_per_node

# environment setup:
if setup is not None:
lines += ["", "# setup"] + setup

if srun_args is None:
srun_args = []

# A safety measures:
# let SLURM immediately kill all tasks if any FT launcher returns with a failure.
# let SLURM kill the job, 1h after any task ended without a failure.
srun_args += ["--kill-on-bad-exit=1", "--wait=3600"]

lines += [
"",
"# Fault tolerance related items",
f'export FAULT_TOL_CFG_PATH="{str(paths.config_file)}"',
f'export FAULT_TOL_FINISHED_FLAG_FILE="{str(paths.folder / "_finished_flag")}"',
"RDZV_HOST=$(hostname)",
"ANY_JOB_STEP_FAILED=0",
]

if max_subsequent_job_failures > 0:
lines += [
"",
"# Automatic job resubmission related items",
f'JOB_RESULTS_FILE="{str(paths.folder / "_job_results")}"',
f"MAX_JOB_FAILURES={max_subsequent_job_failures}",
"is_job_failures_limit_reached() {",
' tail -n $MAX_JOB_FAILURES "$JOB_RESULTS_FILE" | \\',
' awk "/^[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$MAX_JOB_FAILURES)}"',
"}",
"is_training_finished() {",
' test -f "$FAULT_TOL_FINISHED_FLAG_FILE"',
"}",
"# Exit immediately if finished flag file exists and this job is a continuation",
'if [ "$FT_RESUMED" = "1" ] ; then',
' if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi',
' if is_job_failures_limit_reached ; then echo "Job failures limit reached ($MAX_JOB_FAILURES)" ; exit 1 ; fi',
"else",
' rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE"',
"fi",
"# Pre-schedule continuation job",
'CONT_SBATCH_OUT=$(FT_RESUMED=1 sbatch --parsable --dependency=afterany:"$SLURM_JOB_ID" "$0")',
'if [ $? -ne 0 ] ; then echo "Couldnt schedule continuation job. Check stderr for details." ; exit 1 ; fi',
'CONT_SLURM_JOB_ID=$(echo $CONT_SBATCH_OUT | cut -f1 -d",")',
"# Write unknown job status to the job log, we will fix it at the end",
'echo "$SLURM_JOB_ID X" >> "$JOB_RESULTS_FILE"',
]

# commandline (this will run the function and args specified in the file provided as argument)
# We pass --output and --error here, because the SBATCH command doesn't work as expected with a filename pattern
stderr_flags = [] if stderr_to_stdout else ["--error", stderr]
container_flags = ["--container-image", container_image] if container_image else []
container_flags += (
["--container-mounts", container_mounts] if container_mounts else []
)

if NEMO_LAUNCHER_MEMORY_MEASURE:
srun_args += ["--overlap"]

mem_stdout = stdout.replace("_%j", "_mem_%j")
mem_stdout = mem_stdout.replace("_%A_%a", "_mem_%A_%a")
mem_srun_cmd = shlex.join(
[
"srun",
"--ntasks=1",
"--ntasks-per-node=1",
"--output",
mem_stdout,
*container_flags,
*srun_args,
]
)
lines += [
"",
"# run memory measure",
f"{mem_srun_cmd} \\",
f" nvidia-smi --query-gpu=timestamp,index,,memory.total,memory.free,memory.used --format=csv -l 1 & ",
"",
]

ft_launcher_cmd_part = (
"ft_launcher "
+ f"--fault-tol-cfg-path=$FAULT_TOL_CFG_PATH --ignore-missing-fault-tol-cfg {additional_ft_launcher_args} "
+ "--rdzv_id=$SLURM_JOB_ID --rdzv_backend=c10d --rdzv_endpoint=$RDZV_HOST "
+ f"--nnodes={nodes} --nproc_per_node={ntasks_per_node} --max-restarts={max_rank_restarts}"
)

for group_ind, command_group in enumerate(command_groups):
if heterogeneous:
raise ValueError("This PoC does not support heterogeneous jobs")
else:
srun_cmd = shlex.join(
[
"srun",
"--output",
stdout,
*stderr_flags,
*container_flags,
*srun_args,
]
)
command = ";\n ".join(command_group)
assert "python3 -u" in command
command = command.replace("python3 -u", ft_launcher_cmd_part,)
lines += [
"",
f"# command {group_ind + 1}",
f'{srun_cmd} bash -c "',
f' {command} "',
"",
]
lines += ["if [ $? -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi"]

if max_subsequent_job_failures > 0:
lines += [
"",
'# Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result',
'if [ "$ANY_JOB_STEP_FAILED" = "0" ] ; then',
' sed -i "s/$SLURM_JOB_ID X/$SLURM_JOB_ID S/" "$JOB_RESULTS_FILE"',
"else",
' sed -i "s/$SLURM_JOB_ID X/$SLURM_JOB_ID F/" "$JOB_RESULTS_FILE"',
"fi",
"# Check if the continuation job can be cancelled",
"if is_training_finished ; then",
' echo "Training is finished" ; scancel $CONT_SLURM_JOB_ID ; exit 0',
"fi",
"if is_job_failures_limit_reached ; then",
' echo "Job failures limit reached ($MAX_JOB_FAILURES)" ; scancel $CONT_SLURM_JOB_ID ; exit 1',
"fi",
]

return "\n".join(lines)


def _convert_mem(mem_gb: float) -> str:
"""Convert non-integer mem_gb to unit MB"""
if mem_gb == int(mem_gb):
Expand Down
Loading
Loading