diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index e214d3125b..b6b2889789 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -131,6 +131,9 @@ There are several environment variables that affect the way Toil runs. | TOIL_GOOGLE_PROJECTID | The Google project ID to use when generating | | | Google job store names for tests or CWL workflows. | +----------------------------------+----------------------------------------------------+ +| TOIL_SLURM_ALLOCATE_MEM | Whether to allocate memory in Slurm with --mem. | +| | True by default. | ++----------------------------------+----------------------------------------------------+ | TOIL_SLURM_ARGS | Arguments for sbatch for the slurm batch system. | | | Do not pass CPU or memory specifications here. | | | Instead, define resource requirements for the job. | @@ -140,9 +143,15 @@ There are several environment variables that affect the way Toil runs. | | provided. | +----------------------------------+----------------------------------------------------+ | TOIL_SLURM_PE | Name of the slurm partition to use for parallel | -| | jobs. | +| | jobs. Useful for Slurm clusters that do not offer | +| | a partition accepting both single-core and | +| | multi-core jobs. | | | There is no default value for this variable. | +----------------------------------+----------------------------------------------------+ +| TOIL_SLURM_TIME | Slurm job time limit, in [DD-]HH:MM:SS format. For | +| | example, ``2-07:15:30`` for 2 days, 7 hours, 15 | +| | minutes and 30 seconds, or ``4:00:00`` for 4 hours.| ++----------------------------------+----------------------------------------------------+ | TOIL_GRIDENGINE_ARGS | Arguments for qsub for the gridengine batch | | | system. Do not pass CPU or memory specifications | | | here. Instead, define resource requirements for | diff --git a/docs/cwl/running.rst b/docs/cwl/running.rst index 9ff839194f..ddd2eb4c8b 100644 --- a/docs/cwl/running.rst +++ b/docs/cwl/running.rst @@ -91,9 +91,17 @@ printed to the stdout stream after workflow execution. ``--stats``: Save resources usages in json files that can be collected with the ``toil stats`` command after the workflow is done. -``--disable-streaming``: Does not allow streaming of input files. This is enabled -by default for files marked with ``streamable`` flag True and only for remote files -when the jobStore is not on local machine. +Extra Toil CWL Options +++++++++++++++++++++++ + +Besides the normal Toil options and the options supported by cwltool, toil-cwl-runner adds some of its own options: + + --bypass-file-store Do not use Toil's file store system and assume all paths are accessible in place from all nodes. This can avoid possibly-redundant file copies into Toil's job store storage, and is required for CWL's ``InplaceUpdateRequirement``. But, it allows a failed job execution to leave behind a partially-modified state, which means that a restarted workflow might not work correctly. + --reference-inputs Do not copy remote inputs into Toil's file store and assume they are accessible in place from all nodes. + --disable-streaming Do not allow streaming of job input files. By default, files marked with ``streamable`` True are streamed from remote job stores. + --cwl-default-ram Apply CWL specification default ramMin. + --no-cwl-default-ram Do not apply CWL specification default ramMin, so that Toil --defaultMemory applies. + Running CWL in the Cloud ------------------------ diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index 5a55f61493..e974f7e5cd 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -237,6 +237,23 @@ levels in toil are based on priority from the logging module: Use a Mesos role. --mesosName MESOSNAME The Mesos name to use. (default: toil) + --scale SCALE A scaling factor to change the value of all submitted + tasks' submitted cores. Used in single_machine batch + system. Useful for running workflows on smaller + machines than they were designed for, by setting a + value less than 1. (default: 1) + --slurmAllocateMem SLURM_ALLOCATE_MEM + If False, do not use --mem. Used as a workaround for + Slurm clusters that reject jobs with memory + allocations. + --slurmTime SLURM_TIME + Slurm job time limit, in [DD-]HH:MM:SS format. + --slurmPE SLURM_PE Special partition to send Slurm jobs to if they ask + for more than 1 CPU. Useful for Slurm clusters that do + not offer a partition accepting both single-core and + multi-core jobs. + --slurmArgs SLURM_ARGS + Extra arguments to pass to Slurm. --kubernetesHostPath KUBERNETES_HOST_PATH Path on Kubernetes hosts to use as shared inter-pod temp directory. @@ -261,11 +278,6 @@ levels in toil are based on priority from the logging module: The ARN of an IAM role to run AWS Batch jobs as, so they can e.g. access a job store. Must be assumable by ecs-tasks.amazonaws.com - --scale SCALE A scaling factor to change the value of all submitted - tasks' submitted cores. Used in single_machine batch - system. Useful for running workflows on smaller - machines than they were designed for, by setting a - value less than 1. (default: 1) **Data Storage Options** Allows configuring Toil's data storage. diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 24bd4bb4fb..a183ad2481 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -24,7 +24,7 @@ UpdatedBatchJobInfo) from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport from toil.bus import ExternalBatchIdMessage, get_job_kind -from toil.job import AcceleratorRequirement +from toil.job import JobDescription, AcceleratorRequirement from toil.lib.misc import CalledProcessErrorStderr from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry @@ -79,6 +79,7 @@ def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queu self.killedJobsQueue = killedJobsQueue self.waitingJobs: List[JobTuple] = list() self.runningJobs = set() + # TODO: Why do we need a lock for this? We have the GIL. self.runningJobsLock = Lock() self.batchJobIDs: Dict[int, str] = dict() self._checkOnJobsCache = None @@ -400,28 +401,36 @@ def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int def supportsAutoDeployment(cls): return False - def issueBatchJob(self, command: str, jobDesc, job_environment: Optional[Dict[str, str]] = None): + def count_needed_gpus(self, job_desc: JobDescription): + """ + Count the number of cluster-allocateable GPUs we want to allocate for the given job. + """ + gpus = 0 + if isinstance(job_desc.accelerators, list): + for accelerator in job_desc.accelerators: + if accelerator['kind'] == 'gpu': + gpus += accelerator['count'] + else: + gpus = job_desc.accelerators + + return gpus + + def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None): # Avoid submitting internal jobs to the batch queue, handle locally - localID = self.handleLocalJob(command, jobDesc) - if localID is not None: - return localID + local_id = self.handleLocalJob(command, job_desc) + if local_id is not None: + return local_id else: - self.check_resource_request(jobDesc) - jobID = self.getNextJobID() - self.currentJobs.add(jobID) - gpus = 0 - if isinstance(jobDesc.accelerators, list): - for accelerator in jobDesc.accelerators: - if accelerator['kind'] == 'gpu': - gpus = accelerator['count'] - else: - gpus = jobDesc.accelerators + self.check_resource_request(job_desc) + gpus = self.count_needed_gpus(job_desc) + job_id = self.getNextJobID() + self.currentJobs.add(job_id) - self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, command, get_job_kind(jobDesc.get_names()), + self.newJobsQueue.put((job_id, job_desc.cores, job_desc.memory, command, get_job_kind(job_desc.get_names()), job_environment, gpus)) - logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(jobID), - get_job_kind(jobDesc.get_names())) - return jobID + logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id), + get_job_kind(job_desc.get_names())) + return job_id def killBatchJobs(self, jobIDs): """ @@ -511,6 +520,13 @@ def shutdown(self) -> None: newJobsQueue.put(None) self.background_thread.join() + # Now in one thread, kill all the jobs + if len(self.background_thread.runningJobs) > 0: + logger.warning("Cleaning up %s jobs still running at shutdown", len(self.background_thread.runningJobs)) + for job in self.background_thread.runningJobs: + self.killQueue.put(job) + self.background_thread.killJobs() + def setEnv(self, name, value=None): if value and ',' in value: raise ValueError(type(self).__name__ + " does not support commata in environment variable values") diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 5ecc597fdb..b516a90921 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -17,16 +17,18 @@ import math import os import sys -from argparse import ArgumentParser, _ArgumentGroup +from argparse import ArgumentParser, _ArgumentGroup, SUPPRESS from shlex import quote from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union, NamedTuple +from toil.bus import get_job_kind from toil.common import Config from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE, InsufficientSystemResources from toil.batchSystems.abstractGridEngineBatchSystem import \ AbstractGridEngineBatchSystem from toil.batchSystems.options import OptionSetter -from toil.job import Requirer +from toil.job import JobDescription, Requirer +from toil.lib.conversions import strtobool from toil.lib.misc import CalledProcessErrorStderr, call_command from toil.statsAndLogging import TRACE @@ -566,7 +568,7 @@ def prepareSbatch(self, if cpu and cpu > 1 and parallel_env: sbatch_line.append(f'--partition={parallel_env}') - if mem is not None and self.boss.config.allocate_mem: # type: ignore[attr-defined] + if mem is not None and self.boss.config.slurm_allocate_mem: # type: ignore[attr-defined] # memory passed in is in bytes, but slurm expects megabytes sbatch_line.append(f'--mem={math.ceil(mem / 2 ** 20)}') if cpu is not None: @@ -625,6 +627,33 @@ def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int super().__init__(config, maxCores, maxMemory, maxDisk) self.partitions = SlurmBatchSystem.PartitionSet() + # Override issuing jobs so we can check if we need to use Slurm's magic + # whole-node-memory feature. + def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: Optional[Dict[str, str]] = None) -> int: + # Avoid submitting internal jobs to the batch queue, handle locally + local_id = self.handleLocalJob(command, job_desc) + if local_id is not None: + return local_id + else: + self.check_resource_request(job_desc) + gpus = self.count_needed_gpus(job_desc) + job_id = self.getNextJobID() + self.currentJobs.add(job_id) + + if "memory" not in job_desc.requirements and self.config.slurm_default_all_mem: # type: ignore[attr-defined] + # The job doesn't have its own memory requirement, and we are + # defaulting to whole node memory. Use Slurm's 0-memory sentinel. + memory = 0 + else: + # Use the memory actually on the job, or the Toil default memory + memory = job_desc.memory + + self.newJobsQueue.put((job_id, job_desc.cores, memory, command, get_job_kind(job_desc.get_names()), + job_environment, gpus)) + logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id), + get_job_kind(job_desc.get_names())) + return job_id + def _check_accelerator_request(self, requirer: Requirer) -> None: for accelerator in requirer.accelerators: if accelerator['kind'] != 'gpu': @@ -647,26 +676,30 @@ def _check_accelerator_request(self, requirer: Requirer) -> None: @classmethod def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: - allocate_mem = parser.add_mutually_exclusive_group() - allocate_mem_help = ("A flag that can block allocating memory with '--mem' for job submissions " - "on SLURM since some system servers may reject any job request that " - "explicitly specifies the memory allocation. The default is to always allocate memory.") - allocate_mem.add_argument("--dont_allocate_mem", action='store_false', dest="allocate_mem", help=allocate_mem_help) - allocate_mem.add_argument("--allocate_mem", action='store_true', dest="allocate_mem", help=allocate_mem_help) - allocate_mem.set_defaults(allocate_mem=True) + parser.add_argument("--slurmAllocateMem", dest="slurm_allocate_mem", type=strtobool, default=True, env_var="TOIL_SLURM_ALLOCATE_MEM", + help="If False, do not use --mem. Used as a workaround for Slurm clusters that reject jobs " + "with memory allocations.") + # Keep these deprcated options for backward compatibility + parser.add_argument("--dont_allocate_mem", action='store_false', dest="slurm_allocate_mem", help=SUPPRESS) + parser.add_argument("--allocate_mem", action='store_true', dest="slurm_allocate_mem", help=SUPPRESS) + + parser.add_argument("--slurmDefaultAllMem", dest="slurm_default_all_mem", type=strtobool, default=False, env_var="TOIL_SLURM_DEFAULT_ALL_MEM", + help="If True, assign Toil jobs without their own memory requirements all available " + "memory on a Slurm node (via Slurm --mem=0).") parser.add_argument("--slurmTime", dest="slurm_time", type=parse_slurm_time, default=None, env_var="TOIL_SLURM_TIME", - help="Slurm job time limit, in [DD-]HH:MM:SS format") + help="Slurm job time limit, in [DD-]HH:MM:SS format.") parser.add_argument("--slurmPE", dest="slurm_pe", default=None, env_var="TOIL_SLURM_PE", - help="Special partition to send Slurm jobs to if they ask for more than 1 CPU") + help="Special partition to send Slurm jobs to if they ask for more than 1 CPU.") parser.add_argument("--slurmArgs", dest="slurm_args", default="", env_var="TOIL_SLURM_ARGS", - help="Extra arguments to pass to Slurm") + help="Extra arguments to pass to Slurm.") OptionType = TypeVar('OptionType') @classmethod def setOptions(cls, setOption: OptionSetter) -> None: - setOption("allocate_mem") + setOption("slurm_allocate_mem") + setOption("slurm_default_all_mem") setOption("slurm_time") setOption("slurm_pe") setOption("slurm_args") diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 473fc374e7..0e61eb476b 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -801,7 +801,7 @@ def visit( # wherever else we would stage it. # TODO: why would we do that? stagedir = cast(Optional[str], obj.get("dirname")) or stagedir - + if obj["class"] not in ("File", "Directory"): # We only handle files and directories; only they have locations. return @@ -990,7 +990,7 @@ def visit( # reference, we just pass that along. """Link or copy files to their targets. Create them as needed.""" - + logger.debug( "ToilPathMapper adding file mapping %s -> %s", deref, tgt ) @@ -2473,6 +2473,20 @@ def __init__( req = tool.evalResources(self.builder, runtime_context) + tool_own_resources = tool.get_requirement("ResourceRequirement")[0] or {} + if "ramMin" in tool_own_resources or "ramMax" in tool_own_resources: + # The tool is actually asking for memory. + memory = int(req["ram"] * (2**20)) + else: + # The tool is getting a default ram allocation. + if getattr(runtime_context, "cwl_default_ram"): + # We will respect the CWL spec and apply the default cwltool + # computed, which might be different than Toil's default. + memory = int(req["ram"] * (2**20)) + else: + # We use a None requirement and the Toil default applies. + memory = None + accelerators: Optional[List[AcceleratorRequirement]] = None if req.get("cudaDeviceCount", 0) > 0: # There's a CUDARequirement, which cwltool processed for us @@ -2537,7 +2551,7 @@ def __init__( super().__init__( cores=req["cores"], - memory=int(req["ram"] * (2**20)), + memory=memory, disk=int(total_disk), accelerators=accelerators, preemptible=preemptible, @@ -3837,6 +3851,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: ) runtime_context.workdir = workdir # type: ignore[attr-defined] runtime_context.outdir = outdir + setattr(runtime_context, "cwl_default_ram", options.cwl_default_ram) runtime_context.move_outputs = "leave" runtime_context.rm_tmpdir = False runtime_context.streaming_allowed = not options.disable_streaming @@ -3875,11 +3890,14 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int: if options.restart: outobj = toil.restart() else: + # TODO: why are we doing this? Does this get applied to all + # tools as a default or something? loading_context.hints = [ { "class": "ResourceRequirement", "coresMin": toil.config.defaultCores, - "ramMin": toil.config.defaultMemory / (2**20), + # Don't include any RAM requirement because we want to + # know when tools don't manually ask for RAM. "outdirMin": toil.config.defaultDisk / (2**20), "tmpdirMin": 0, } diff --git a/src/toil/leader.py b/src/toil/leader.py index 94ebf286f0..a488f699d0 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -1067,7 +1067,6 @@ def killJobs(self, jobsToKill, exit_reason: BatchJobExitReason = BatchJobExitRea return jobsRerunning - #Following functions handle error cases for when jobs have gone awry with the batch system. def reissueOverLongJobs(self) -> None: diff --git a/src/toil/options/common.py b/src/toil/options/common.py index 2e4fd5eab2..845655d4bb 100644 --- a/src/toil/options/common.py +++ b/src/toil/options/common.py @@ -211,7 +211,7 @@ def add_base_toil_options(parser: ArgumentParser, jobstore_as_flag: bool = False # Core options core_options = parser.add_argument_group( - title="Toil core options.", + title="Toil core options", description="Options to specify the location of the Toil workflow and " "turn on stats collation about the performance of jobs." ) @@ -297,7 +297,7 @@ def is_within(x: Union[int, float]) -> bool: "shared between the containers.") core_options.add_argument("--coordinationDir", dest="coordination_dir", default=None, env_var="TOIL_COORDINATION_DIR", action=CoordinationDirAction, metavar="PATH", - help="Absolute path to directory where Toil will keep state and lock files." + help="Absolute path to directory where Toil will keep state and lock files. " "When sharing a cache between containers on a host, this directory must be " "shared between the containers.") core_options.add_argument("--noStdOutErr", dest="noStdOutErr", default=False, action="store_true", @@ -325,7 +325,7 @@ def is_within(x: Union[int, float]) -> bool: # Restarting the workflow options restart_options = parser.add_argument_group( - title="Toil options for restarting an existing workflow.", + title="Toil options for restarting an existing workflow", description="Allows the restart of an existing workflow" ) restart_options.add_argument("--restart", dest="restart", default=False, action="store_true", @@ -335,14 +335,14 @@ def is_within(x: Union[int, float]) -> bool: # Batch system options batchsystem_options = parser.add_argument_group( - title="Toil options for specifying the batch system.", + title="Toil options for specifying the batch system", description="Allows the specification of the batch system." ) add_all_batchsystem_options(batchsystem_options) # File store options file_store_options = parser.add_argument_group( - title="Toil options for configuring storage.", + title="Toil options for configuring storage", description="Allows configuring Toil's data storage." ) link_imports = file_store_options.add_mutually_exclusive_group() @@ -375,7 +375,7 @@ def is_within(x: Union[int, float]) -> bool: # Auto scaling options autoscaling_options = parser.add_argument_group( - title="Toil options for autoscaling the cluster of worker nodes.", + title="Toil options for autoscaling the cluster of worker nodes", description="Allows the specification of the minimum and maximum number of nodes in an autoscaled cluster, " "as well as parameters to control the level of provisioning." ) @@ -485,8 +485,9 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any # Parameters to limit service jobs / detect service deadlocks service_options = parser.add_argument_group( title="Toil options for limiting the number of service jobs and detecting service deadlocks", - description="Allows the specification of the maximum number of service jobs in a cluster. By keeping " - "this limited we can avoid nodes occupied with services causing deadlocks." + description=SUPPRESS if cwl else "Allows the specification of the maximum number of service jobs in a cluster. " + "By keeping this limited we can avoid nodes occupied with services causing " + "deadlocks." ) service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int, metavar="INT", @@ -515,7 +516,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any # Resource requirements resource_options = parser.add_argument_group( - title="Toil options for cores/memory requirements.", + title="Toil options for cores/memory requirements", description="The options to specify default cores/memory requirements (if not specified by the jobs " "themselves), and to limit the total amount of memory/cores requested from the batch system." ) @@ -565,7 +566,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any # Retrying/rescuing jobs job_options = parser.add_argument_group( - title="Toil options for rescuing/killing/restarting jobs.", + title="Toil options for rescuing/killing/restarting jobs", description="The options for jobs that either run too long/fail or get lost (some batch systems have issues!)." ) job_options.add_argument("--retryCount", dest="retryCount", default=1, type=int, @@ -599,7 +600,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any # Log management options log_options = parser.add_argument_group( - title="Toil log management options.", + title="Toil log management options", description="Options for how Toil should manage its logs." ) log_options.add_argument("--maxLogFileSize", dest="maxLogFileSize", default=100 * 1024 * 1024, type=h2b, @@ -633,7 +634,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any # Misc options misc_options = parser.add_argument_group( - title="Toil miscellaneous options.", + title="Toil miscellaneous options", description="Everything else." ) misc_options.add_argument('--disableChaining', dest='disableChaining', type=strtobool, default=False, @@ -704,7 +705,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any # Debug options debug_options = parser.add_argument_group( - title="Toil debug options.", + title="Toil debug options", description="Debug options for finding problems or helping with testing." ) debug_options.add_argument("--debugWorker", dest="debugWorker", default=False, action="store_true", diff --git a/src/toil/options/cwl.py b/src/toil/options/cwl.py index 0db2c80889..1e5ee480b0 100644 --- a/src/toil/options/cwl.py +++ b/src/toil/options/cwl.py @@ -15,6 +15,9 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: :return: None """ suppress_help = SUPPRESS if suppress else None + + # These are options that we have to match cwltool + # TODO: Are there still any Toil-specific options in here? parser.add_argument("--not-strict", action="store_true", help=suppress_help) parser.add_argument( "--enable-dev", @@ -75,8 +78,7 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: help=suppress_help or "Do not delete Docker container used by jobs after they exit", dest="rm_container", ) - extra_dockergroup = parser.add_argument_group() - extra_dockergroup.add_argument( + parser.add_argument( "--custom-net", help=suppress_help or "Specify docker network name to pass to docker run command", ) @@ -134,11 +136,6 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: default=False, dest="preserve_entire_environment", ) - parser.add_argument( - "--destBucket", - type=str, - help=suppress_help or "Specify a cloud bucket endpoint for output files.", - ) parser.add_argument("--beta-dependency-resolvers-configuration", default=None, help=suppress_help) parser.add_argument("--beta-dependencies-directory", default=None, help=suppress_help) parser.add_argument("--beta-use-biocontainers", default=None, action="store_true", help=suppress_help) @@ -257,30 +254,6 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: "section 'Running MPI-based tools' for details of the format: " "https://github.com/common-workflow-language/cwltool#running-mpi-based-tools-that-need-to-be-launched", ) - parser.add_argument( - "--bypass-file-store", - action="store_true", - default=False, - help=suppress_help or "Do not use Toil's file store and assume all " - "paths are accessible in place from all nodes.", - dest="bypass_file_store", - ) - parser.add_argument( - "--reference-inputs", - action="store_true", - default=False, - help=suppress_help or "Do not copy remote inputs into Toil's file " - "store and assume they are accessible in place from " - "all nodes.", - dest="reference_inputs", - ) - parser.add_argument( - "--disable-streaming", - action="store_true", - default=False, - help=suppress_help or "Disable file streaming for files that have 'streamable' flag True", - dest="disable_streaming", - ) provgroup = parser.add_argument_group( "Options for recording provenance information of the execution" @@ -341,3 +314,48 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: default=os.environ.get("CWL_FULL_NAME", ""), type=str, ) + + # These are Toil-specific options + parser.add_argument( + "--bypass-file-store", + action="store_true", + default=False, + help=suppress_help or "Do not use Toil's file store and assume all " + "paths are accessible in place from all nodes.", + dest="bypass_file_store", + ) + parser.add_argument( + "--reference-inputs", + action="store_true", + default=False, + help=suppress_help or "Do not copy remote inputs into Toil's file " + "store and assume they are accessible in place from " + "all nodes.", + dest="reference_inputs", + ) + parser.add_argument( + "--disable-streaming", + action="store_true", + default=False, + help=suppress_help or "Disable file streaming for files that have 'streamable' flag True.", + dest="disable_streaming", + ) + ram_group = parser.add_mutually_exclusive_group() if not suppress_help else parser.add_argument_group() + ram_group.add_argument( + "--cwl-default-ram", + action="store_true", + default=True, + help=suppress_help or "Apply CWL specification default ramMin.", + dest="cwl_default_ram", + ) + ram_group.add_argument( + "--no-cwl-default-ram", + action="store_false", + help=suppress_help or "Do not apply CWL specification default ramMin, so that Toil --defaultMemory applies.", + dest="cwl_default_ram", + ) + parser.add_argument( + "--destBucket", + type=str, + help=suppress_help or "Specify a cloud bucket endpoint for output files.", + ) diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index 1186c42d8e..6ae98ccf6a 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -238,11 +238,14 @@ def setUp(self) -> None: self.outDir = f"/tmp/toil-cwl-test-{str(uuid.uuid4())}" os.makedirs(self.outDir) self.rootDir = self._projectRootPath() + self.jobStoreDir = f"./jobstore-{str(uuid.uuid4())}" def tearDown(self) -> None: """Clean up outputs.""" if os.path.exists(self.outDir): shutil.rmtree(self.outDir) + if os.path.exists(self.jobStoreDir): + shutil.rmtree(self.jobStoreDir) unittest.TestCase.tearDown(self) def test_cwl_cmdline_input(self) -> None: @@ -451,6 +454,55 @@ def test_glob_dir_bypass_file_store(self) -> None: except FileNotFoundError: pass + @needs_slurm + def test_slurm_node_memory(self) -> None: + from toil.cwl import cwltoil + + # Run the workflow. This will either finish quickly and tell us the + # memory we got, or take a long time because it requested a whole + # node's worth of memory and no nodes are free right now. We need to + # support both. + + # And if we run out of time we need to stop the workflow gracefully and + # cancel the Slurm jobs. + + main_args = [ + f"--jobStore={self.jobStoreDir}", + # Avoid racing to toil kill before the jobstore is removed + "--clean=never", + "--batchSystem=slurm", + "--no-cwl-default-ram", + "--slurmDefaultAllMem=True", + "--outdir", + self.outDir, + os.path.join(self.rootDir, "src/toil/test/cwl/measure_default_memory.cwl"), + os.path.join(self.rootDir, "src/toil/test/cwl/empty.json"), + ] + try: + log.debug("Start test workflow") + child = subprocess.Popen(["toil-cwl-runner"] + main_args, stdout=subprocess.PIPE) + output, _ = child.communicate(timeout=60) + except subprocess.TimeoutExpired: + # The job didn't finish quickly; presumably waiting for a full node. + # Stop the workflow + log.debug("Workflow might be waiting for a full node. Stop it.") + subprocess.check_call(["toil", "kill", self.jobStoreDir]) + # Wait another little bit for it to clean up, making sure to collect output in case it is blocked on writing + child.communicate(timeout=20) + # Kill it off in case it is still running + child.kill() + # Reap it + child.wait() + # The test passes + else: + out = json.loads(output) + log.debug("Workflow output: %s", out) + memory_string = open(out["memory"]["location"][len("file://") :]).read() + log.debug("Observed memory: %s", memory_string) + result = int(memory_string) + # We should see more than the CWL default or the Toil default, assuming Slurm nodes of reasonable size (3 GiB). + self.assertGreater(result, 3 * 1024 * 1024) + @needs_aws_s3 def test_download_s3(self) -> None: self.download("download_s3.json", self._tester) diff --git a/src/toil/test/cwl/measure_default_memory.cwl b/src/toil/test/cwl/measure_default_memory.cwl new file mode 100644 index 0000000000..bb73f2cfdd --- /dev/null +++ b/src/toil/test/cwl/measure_default_memory.cwl @@ -0,0 +1,23 @@ +cwlVersion: v1.2 +class: Workflow + +inputs: [] + +steps: + measure: + run: + class: CommandLineTool + inputs: [] + baseCommand: ["bash", "-c", "ulimit -m"] + outputs: + memory: stdout + in: [] + out: + # There's no good way to go back from a command output to a CWL value + # without bringing in a bunch of JS. + - id: memory + +outputs: + - id: memory + type: File + outputSource: measure/memory