diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 7fc6863a90..4c59fd4c25 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -258,6 +258,8 @@ def _runStep(self): newJob = self.newJobsQueue.get() if newJob is None: logger.debug('Received queue sentinel.') + # Send out kill signals before stopping + self.killJobs() return False if self.killJobs(): activity = True @@ -423,7 +425,14 @@ def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment: gpus = self.count_needed_gpus(job_desc) job_id = self.getNextJobID() self.currentJobs.add(job_id) - + gpus = 0 + if isinstance(jobDesc.accelerators, list): + for accelerator in jobDesc.accelerators: + if accelerator['kind'] == 'gpu': + gpus = accelerator['count'] + else: + gpus = jobDesc.accelerators + self.newJobsQueue.put((job_id, job_desc.cores, job_desc.memory, command, get_job_kind(job_desc.get_names()), job_environment, gpus)) logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id), @@ -506,6 +515,11 @@ def shutdown(self) -> None: """ Signals thread to shutdown (via sentinel) then cleanly joins the thread """ + + for jobID in self.getIssuedBatchJobIDs(): + # Send kill signals to any jobs that might be running + self.killQueue.put(jobID) + self.shutdownLocal() newJobsQueue = self.newJobsQueue self.newJobsQueue = None diff --git a/src/toil/test/wdl/testfiles/wait.wdl b/src/toil/test/wdl/testfiles/wait.wdl new file mode 100644 index 0000000000..08024fffc5 --- /dev/null +++ b/src/toil/test/wdl/testfiles/wait.wdl @@ -0,0 +1,34 @@ +version 1.0 + +workflow wait { + input { + } + + call waiter_task { + input: + } + + output { + String result = read_lines(waiter_task.out)[0] + } +} + +task waiter_task { + input { + } + + command <<< + sleep 10 & + sleep 2 & + wait + echo "waited" + >>> + + output { + File out = stdout() + } + + runtime { + docker: "ubuntu:22.04" + } +} diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 4873f5a7bf..6de389b11c 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -17,6 +17,7 @@ from toil.fileStores import FileID from toil.provisioners import cluster_factory from toil.test import (ToilTest, + needs_docker, needs_docker_cuda, needs_google_storage, needs_singularity_or_docker, @@ -165,6 +166,21 @@ def test_url_to_file(self): assert 'url_to_file.first_line' in result assert isinstance(result['url_to_file.first_line'], str) self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328') + + @needs_docker + def test_wait(self): + """ + Test if Bash "wait" works in WDL scripts. + """ + wdl = os.path.abspath('src/toil/test/wdl/testfiles/wait.wdl') + + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--wdlContainer=docker']) + result = json.loads(result_json) + + assert 'wait.result' in result + assert isinstance(result['wait.result'], str) + self.assertEqual(result['wait.result'], 'waited') def test_url_to_optional_file(self): """ diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 12094a42d6..e19f596cb0 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -505,36 +505,26 @@ def log_bindings(log_function: Callable[..., None], message: str, all_bindings: elif isinstance(bindings, Promise): log_function("") -def get_supertype(types: Sequence[Optional[WDL.Type.Base]]) -> WDL.Type.Base: +def get_supertype(types: Sequence[WDL.Type.Base]) -> WDL.Type.Base: """ Get the supertype that can hold values of all the given types. """ - - if None in types: - # Need to allow optional values - if len(types) == 1: - # Only None is here - return WDL.Type.Any(optional=True) - if len(types) == 2: - # None and something else - for item in types: - if item is not None: - # Return the type that's actually there, but make optional if not already. - return item.copy(optional=True) - raise RuntimeError("Expected non-None in types could not be found") - else: - # Multiple types, and some nulls, so we need an optional Any. - return WDL.Type.Any(optional=True) - else: - if len(types) == 1: - # Only one type. It isn't None. - the_type = types[0] - if the_type is None: - raise RuntimeError("The supertype cannot be None.") - return the_type + supertype = None + optional = False + for typ in types: + if isinstance(typ, WDL.Type.Any): + # ignore an Any type, as we represent a bottom type as Any. See https://miniwdl.readthedocs.io/en/latest/WDL.html#WDL.Type.Any + # and https://github.com/openwdl/wdl/blob/e43e042104b728df1f1ad6e6145945d2b32331a6/SPEC.md?plain=1#L1484 + optional = optional or typ.optional + elif supertype is None: + supertype = typ + optional = optional or typ.optional else: - # Multiple types (or none). Assume Any - return WDL.Type.Any() + # We have conflicting types + raise RuntimeError(f"Cannot generate a supertype from conflicting types: {types}") + if supertype is None: + return WDL.Type.Any(null=optional) # optional flag isn't used in Any + return supertype.copy(optional=optional) def for_each_node(root: WDL.Tree.WorkflowNode) -> Iterator[WDL.Tree.WorkflowNode]: @@ -2089,7 +2079,8 @@ def add_injections(self, command_string: str, task_container: TaskContainer) -> } """) parts.append(script) - parts.append(f"_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &") + # Launch in a subshell so that it doesn't interfere with Bash "wait" in the main shell + parts.append(f"(_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &)") if isinstance(task_container, SwarmContainer) and platform.system() == "Darwin": # With gRPC FUSE file sharing, files immediately downloaded before @@ -3226,7 +3217,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # Problem: the WDL type types are not hashable, so we need to do bad N^2 deduplication observed_types = [] for env in new_bindings: - binding_type = env.resolve(name).type if env.has_binding(name) else None + binding_type = env.resolve(name).type if env.has_binding(name) else WDL.Type.Any() if binding_type not in observed_types: observed_types.append(binding_type) # Get the supertype of those types