Skip to content

Commit

Permalink
Merge branch 'master' of github.com:DataBiosphere/toil into issues/50…
Browse files Browse the repository at this point in the history
…04-wdl-virtualize-only-at-task-boundaries
  • Loading branch information
stxue1 committed Aug 23, 2024
2 parents f60d475 + fd7f86d commit 552ac74
Show file tree
Hide file tree
Showing 18 changed files with 820 additions and 190 deletions.
18 changes: 17 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,22 @@ mesos:
- make test threads="${TEST_THREADS}" tests=src/toil/test/src/promisedRequirementTest.py::MesosPromisedRequirementsTest
- make test threads="${TEST_THREADS}" tests="src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSStaticAutoscaleTest src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSAutoscaleTestMultipleNodeTypes src/toil/test/provisioners/aws/awsProvisionerTest.py::AWSRestartTest::testAutoScaledCluster"

batchsystem:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_COMMIT_TAG
- if: $CI_COMMIT_BRANCH =~ /.*-fix-ci/
- if: $CI_COMMIT_BRANCH
changes:
compare_to: 'refs/heads/master'
paths:
- 'src/toil/test/batchSystems/test_gridengine.py'
- 'src/toil/batchSystems/gridengine.py'
stage: integration
script:
- ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[all]
- make test threads="${TEST_THREADS}" tests=src/toil/test/batchSystems/test_gridengine.py::GridEngineTest

# Cactus-on-Kubernetes integration (as a script and not a pytest test)
cactus_integration:
rules:
Expand All @@ -490,7 +506,7 @@ cactus_integration:
- 'src/toil/test/cactus/test_cactus_integration.py'
stage: integration
script:
- export CACTUS_COMMIT_SHA=f5adf4013326322ae58ef1eccb8409b71d761583
- export CACTUS_COMMIT_SHA=2d706d9c6637c85116533477693e6c73ed5b8a53
- set -e
- ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && make prepare && make develop extras=[aws]
- python setup_gitlab_docker.py # login to increase the docker.io rate limit
Expand Down
1 change: 1 addition & 0 deletions contrib/admin/mypy-with-ignore.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def main():
'src/toil/batchSystems/lsf.py',
'src/toil/batchSystems/__init__.py',
'src/toil/batchSystems/abstractGridEngineBatchSystem.py',
'src/toil/batchSystems/awsBatch.py',
'src/toil/batchSystems/lsfHelper.py',
'src/toil/batchSystems/htcondor.py',
'src/toil/batchSystems/mesos/batchSystem.py',
Expand Down
7 changes: 7 additions & 0 deletions docs/running/cliOptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ Allows configuring Toil's data storage.
to use a batch system that does not support
cleanup. Set to "true" if caching
is desired.
--symlinkJobStoreReads BOOL
Allow reads and container mounts from a JobStore's
shared filesystem directly via symlink. Can be turned
off if the shared filesystem can't support the IO load
of all the jobs reading from it at once, and you want
to use ``--caching=True`` to make jobs on each node
read from node-local cache storage. (Default=True)

**Autoscaling Options**
Allows the specification of the minimum and maximum number of nodes in an
Expand Down
10 changes: 5 additions & 5 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ pytest>=6.2.1,<9
pytest-cov>=2.12.1,<6
pytest-timeout>=1.4.2,<3
stubserver>=1.1,<2
setuptools>=65.5.1,<72
sphinx>=7,<8
sphinx-autoapi>=3,<4
setuptools>=65.5.1,<74
sphinx>=7,<9
sphinx-autoapi>=3.2.1,<4
astroid>=3,<4
sphinx-autodoc-typehints>=1.24.0,<3
sphinxcontrib-autoprogram==0.1.9
cwltest>=2.2.20211116163652
mypy==1.10.1
mypy==1.11.1
types-aws-xray-sdk
types-boto<2.49.18.20240205
types-boto<2.49.18.20240807
types-Flask-Cors
types-requests
types-psutil
Expand Down
4 changes: 2 additions & 2 deletions requirements-server.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
connexion[swagger-ui]>=2.10.0, <3
flask>=2.0,<3
werkzeug>=2.0,<3
werkzeug>=2.0,<4
flask-cors==4.0.1
gunicorn==22.0.0
gunicorn==23.0.0
celery>=5.1.0, <6
wes-service>=4.0.0, <5
ruamel.yaml>=0.15,<0.19
2 changes: 1 addition & 1 deletion requirements-wdl.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
miniwdl==1.12.0
miniwdl==1.12.1
wdlparse==0.1.0
graphlib-backport==1.0 ; python_version < '3.9'
45 changes: 23 additions & 22 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS
from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry

logger = logging.getLogger(__name__)

Expand All @@ -41,6 +41,10 @@
# Accelerator requirements for the job
JobTuple = Tuple[int, float, int, str, str, Dict[str, str], List[AcceleratorRequirement]]

class ExceededRetryAttempts(Exception):
def __init__(self):
super().__init__("Exceeded retry attempts talking to scheduler.")

class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport):
"""
A partial implementation of BatchSystemSupport for batch systems run on a
Expand Down Expand Up @@ -208,24 +212,15 @@ def checkOnJobs(self):
running_job_list = list(self.runningJobs)
batch_job_id_list = [self.getBatchSystemID(j) for j in running_job_list]
if batch_job_id_list:
try:
# Get the statuses as a batch
statuses = self.boss.with_retries(
self.coalesce_job_exit_codes, batch_job_id_list
# Get the statuses as a batch
statuses = self.boss.with_retries(
self.coalesce_job_exit_codes, batch_job_id_list
)
# We got the statuses as a batch
for running_job_id, status in zip(running_job_list, statuses):
activity = self._handle_job_status(
running_job_id, status, activity
)
except NotImplementedError:
# We have to get the statuses individually
for running_job_id, batch_job_id in zip(running_job_list, batch_job_id_list):
status = self.boss.with_retries(self.getJobExitCode, batch_job_id)
activity = self._handle_job_status(
running_job_id, status, activity
)
else:
# We got the statuses as a batch
for running_job_id, status in zip(running_job_list, statuses):
activity = self._handle_job_status(
running_job_id, status, activity
)

self._checkOnJobsCache = activity
self._checkOnJobsTimestamp = datetime.now()
Expand Down Expand Up @@ -292,13 +287,19 @@ def coalesce_job_exit_codes(self, batch_job_id_list: list) -> List[Union[int, Tu
Called by GridEngineThread.checkOnJobs().
This is an optional part of the interface. It should raise
NotImplementedError if not actually implemented for a particular
scheduler.
The default implementation falls back on self.getJobExitCode and polls each job individually
:param string batch_job_id_list: List of batch system job ID
"""
raise NotImplementedError()
statuses = []
try:
for batch_job_id in batch_job_id_list:
statuses.append(self.boss.with_retries(self.getJobExitCode, batch_job_id))
except CalledProcessErrorStderr as err:
# This avoids the nested retry issue where we could issue n^2 retries when the backing scheduler somehow disappears
# We catch the internal retry exception and raise something else so the outer retry doesn't retry the entire function again
raise ExceededRetryAttempts() from err
return statuses

@abstractmethod
def prepareSubmission(self,
Expand Down
Loading

0 comments on commit 552ac74

Please sign in to comment.