Skip to content

Commit

Permalink
Merge branch 'master' into issues/4971-slurm-node-memory
Browse files Browse the repository at this point in the history
  • Loading branch information
adamnovak authored Sep 24, 2024
2 parents 9939309 + 027e89d commit 93c2dbf
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 29 deletions.
16 changes: 15 additions & 1 deletion src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ def _runStep(self):
newJob = self.newJobsQueue.get()
if newJob is None:
logger.debug('Received queue sentinel.')
# Send out kill signals before stopping
self.killJobs()
return False
if self.killJobs():
activity = True
Expand Down Expand Up @@ -423,7 +425,14 @@ def issueBatchJob(self, command: str, job_desc: JobDescription, job_environment:
gpus = self.count_needed_gpus(job_desc)
job_id = self.getNextJobID()
self.currentJobs.add(job_id)

gpus = 0
if isinstance(jobDesc.accelerators, list):
for accelerator in jobDesc.accelerators:
if accelerator['kind'] == 'gpu':
gpus = accelerator['count']
else:
gpus = jobDesc.accelerators

self.newJobsQueue.put((job_id, job_desc.cores, job_desc.memory, command, get_job_kind(job_desc.get_names()),
job_environment, gpus))
logger.debug("Issued the job command: %s with job id: %s and job name %s", command, str(job_id),
Expand Down Expand Up @@ -506,6 +515,11 @@ def shutdown(self) -> None:
"""
Signals thread to shutdown (via sentinel) then cleanly joins the thread
"""

for jobID in self.getIssuedBatchJobIDs():
# Send kill signals to any jobs that might be running
self.killQueue.put(jobID)

self.shutdownLocal()
newJobsQueue = self.newJobsQueue
self.newJobsQueue = None
Expand Down
34 changes: 34 additions & 0 deletions src/toil/test/wdl/testfiles/wait.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version 1.0

workflow wait {
input {
}

call waiter_task {
input:
}

output {
String result = read_lines(waiter_task.out)[0]
}
}

task waiter_task {
input {
}

command <<<
sleep 10 &
sleep 2 &
wait
echo "waited"
>>>

output {
File out = stdout()
}

runtime {
docker: "ubuntu:22.04"
}
}
16 changes: 16 additions & 0 deletions src/toil/test/wdl/wdltoil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from toil.fileStores import FileID
from toil.provisioners import cluster_factory
from toil.test import (ToilTest,
needs_docker,
needs_docker_cuda,
needs_google_storage,
needs_singularity_or_docker,
Expand Down Expand Up @@ -165,6 +166,21 @@ def test_url_to_file(self):
assert 'url_to_file.first_line' in result
assert isinstance(result['url_to_file.first_line'], str)
self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328')

@needs_docker
def test_wait(self):
"""
Test if Bash "wait" works in WDL scripts.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/wait.wdl')

result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--wdlContainer=docker'])
result = json.loads(result_json)

assert 'wait.result' in result
assert isinstance(result['wait.result'], str)
self.assertEqual(result['wait.result'], 'waited')

def test_url_to_optional_file(self):
"""
Expand Down
47 changes: 19 additions & 28 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,36 +505,26 @@ def log_bindings(log_function: Callable[..., None], message: str, all_bindings:
elif isinstance(bindings, Promise):
log_function("<Unfulfilled promise for bindings>")

def get_supertype(types: Sequence[Optional[WDL.Type.Base]]) -> WDL.Type.Base:
def get_supertype(types: Sequence[WDL.Type.Base]) -> WDL.Type.Base:
"""
Get the supertype that can hold values of all the given types.
"""

if None in types:
# Need to allow optional values
if len(types) == 1:
# Only None is here
return WDL.Type.Any(optional=True)
if len(types) == 2:
# None and something else
for item in types:
if item is not None:
# Return the type that's actually there, but make optional if not already.
return item.copy(optional=True)
raise RuntimeError("Expected non-None in types could not be found")
else:
# Multiple types, and some nulls, so we need an optional Any.
return WDL.Type.Any(optional=True)
else:
if len(types) == 1:
# Only one type. It isn't None.
the_type = types[0]
if the_type is None:
raise RuntimeError("The supertype cannot be None.")
return the_type
supertype = None
optional = False
for typ in types:
if isinstance(typ, WDL.Type.Any):
# ignore an Any type, as we represent a bottom type as Any. See https://miniwdl.readthedocs.io/en/latest/WDL.html#WDL.Type.Any
# and https://github.com/openwdl/wdl/blob/e43e042104b728df1f1ad6e6145945d2b32331a6/SPEC.md?plain=1#L1484
optional = optional or typ.optional
elif supertype is None:
supertype = typ
optional = optional or typ.optional
else:
# Multiple types (or none). Assume Any
return WDL.Type.Any()
# We have conflicting types
raise RuntimeError(f"Cannot generate a supertype from conflicting types: {types}")
if supertype is None:
return WDL.Type.Any(null=optional) # optional flag isn't used in Any
return supertype.copy(optional=optional)


def for_each_node(root: WDL.Tree.WorkflowNode) -> Iterator[WDL.Tree.WorkflowNode]:
Expand Down Expand Up @@ -2089,7 +2079,8 @@ def add_injections(self, command_string: str, task_container: TaskContainer) ->
}
""")
parts.append(script)
parts.append(f"_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &")
# Launch in a subshell so that it doesn't interfere with Bash "wait" in the main shell
parts.append(f"(_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &)")

if isinstance(task_container, SwarmContainer) and platform.system() == "Darwin":
# With gRPC FUSE file sharing, files immediately downloaded before
Expand Down Expand Up @@ -3226,7 +3217,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings:
# Problem: the WDL type types are not hashable, so we need to do bad N^2 deduplication
observed_types = []
for env in new_bindings:
binding_type = env.resolve(name).type if env.has_binding(name) else None
binding_type = env.resolve(name).type if env.has_binding(name) else WDL.Type.Any()
if binding_type not in observed_types:
observed_types.append(binding_type)
# Get the supertype of those types
Expand Down

0 comments on commit 93c2dbf

Please sign in to comment.