From 51b41480b3de36a3a1bcca90aa9b6ec5d89632b2 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Thu, 24 May 2018 10:02:13 -0700 Subject: [PATCH 01/14] Double UGE timeout to 20 min and clarify docstrings. --- cosmos/job/drm/drm_ge.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 74f2c542..58c52d29 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -174,6 +174,8 @@ def kill_tasks(self, tasks): def _is_corrupt(qacct_dict): """ + Return true if qacct returns bogus job data for a job id. + qacct may return multiple records for a job. They may all be corrupt. Yuk. This was allegedly fixed in 6.0u10 but we've seen it in UGE 8.3.1. @@ -195,13 +197,14 @@ def _is_corrupt(qacct_dict): ("before writing exit_status" not in qacct_dict.get('failed', '')) -def _qacct_raw(task, timeout=600, quantum=15): +def _qacct_raw(task, timeout=1200, quantum=15): """ Parse qacct output into key/value pairs. If qacct reports results in multiple blocks (separated by a row of ===='s), - the most recently-generated block with valid data is returned. If no such - block exists, then return the most recently-generated block of corrupt data. + the most recently-generated block with valid data is returned. If no block + with valid data exists, then return the most recently-generated block of + corrupt data. """ start = time.time() curr_qacct_dict = None From f87ace7371d700f9fea24213fa45fc2b8c202d73 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Thu, 24 May 2018 12:18:21 -0700 Subject: [PATCH 02/14] =?UTF-8?q?Use=20correct=20units=20for=20GE=E2=80=99?= =?UTF-8?q?s=20max=5Frss=5Fmem=5Fkb,=20io=5Fread=5Fkb,=20and=20io=5Fwrite?= =?UTF-8?q?=5Fkb.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/job/drm/drm_ge.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 58c52d29..4764b04a 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -137,15 +137,15 @@ def _get_task_return_data(self, task): system_time=float(d['ru_stime']), avg_rss_mem=d['ru_ixrss'], - max_rss_mem_kb=convert_size_to_kb(d['ru_maxrss']), + max_rss_mem_kb=convert_size_to_kb(d['maxrss']), avg_vms_mem_kb=None, max_vms_mem_kb=convert_size_to_kb(d['maxvmem']), io_read_count=int(d['ru_inblock']), io_write_count=int(d['ru_oublock']), io_wait=float(d['iow']), - io_read_kb=float(d['io']), - io_write_kb=float(d['io']), + io_read_kb="%fG" % float(d['io']), + io_write_kb="%fG" % float(d['io']), ctx_switch_voluntary=int(d['ru_nvcsw']), ctx_switch_involuntary=int(d['ru_nivcsw']), From 4d40af0dcd385059fedd708cb975792a708a3de0 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Thu, 24 May 2018 12:22:34 -0700 Subject: [PATCH 03/14] Refactor and create simple qacct and qstat methods in cosmos.job.drm.drm_ge. --- cosmos/job/drm/drm_ge.py | 81 +++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 4764b04a..5f471742 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -1,17 +1,19 @@ import json +import logging import os import re import subprocess +import sys import time - from collections import OrderedDict from more_itertools import grouper from cosmos import TaskStatus from cosmos.job.drm.DRM_Base import DRM -from cosmos.job.drm.util import (check_output_and_stderr, convert_size_to_kb, div, - exit_process_group, DetailedCalledProcessError) +from cosmos.job.drm.util import (DetailedCalledProcessError, + check_output_and_stderr, convert_size_to_kb, + div, exit_process_group) from cosmos.util.signal_handlers import sleep_through_signals @@ -57,7 +59,7 @@ def filter_is_done(self, tasks): has been affected by this SGE bug. """ if tasks: - qjobs = _qstat_all() + qjobs = qstat() corrupt_data = {} for task in tasks: @@ -97,7 +99,7 @@ def drm_statuses(self, tasks): :returns: (dict) task.drm_jobID -> drm_status """ if tasks: - qjobs = _qstat_all() + qjobs = qstat() def f(task): return qjobs.get(unicode(task.drm_jobID), dict()).get('state', 'UNK_JOB_STATE') @@ -115,10 +117,10 @@ def _get_task_return_data(self, task): [1] a boolean indicating whether the metadata in [0] are affected by an SGE bug that causes qacct to occasionally return corrupt results. """ - d = _qacct_raw(task) + d = self.task_qacct(task) job_failed = d['failed'][0] != '0' - data_are_corrupt = _is_corrupt(d) + data_are_corrupt = is_corrupt(d) if job_failed or data_are_corrupt: task.workflow.log.warn('%s SGE (qacct -j %s) reports %s:\n%s' % @@ -161,6 +163,14 @@ def _get_task_return_data(self, task): return processed_data, data_are_corrupt + @staticmethod + def task_qacct(task, timeout=1200, quantum=15): + """ + Return qacct data for the specified task. + """ + return qacct(task.job_id, timeout, quantum, + task.workflow.log, str(task)) + def kill(self, task): """Terminate a task.""" raise NotImplementedError @@ -172,7 +182,7 @@ def kill_tasks(self, tasks): subprocess.call(['qdel', pids], preexec_fn=exit_process_group) -def _is_corrupt(qacct_dict): +def is_corrupt(qacct_dict): """ Return true if qacct returns bogus job data for a job id. @@ -197,15 +207,24 @@ def _is_corrupt(qacct_dict): ("before writing exit_status" not in qacct_dict.get('failed', '')) -def _qacct_raw(task, timeout=1200, quantum=15): +def qacct(job_id, timeout=1200, quantum=15, logger=None, log_prefix=""): """ Parse qacct output into key/value pairs. If qacct reports results in multiple blocks (separated by a row of ===='s), the most recently-generated block with valid data is returned. If no block with valid data exists, then return the most recently-generated block of - corrupt data. + corrupt data. Call ``is_corrupt()`` on the output of this method to see if + the data are suitable for use. """ + # create a dummy logger with a distinct name if one is not supplied + if not logger: + logger = logging.getLogger( + ".".join([sys.modules[__name__].__name__, "qacct"])) + # only initialize the dummy logger the first time we load it + if not logger.handlers: + logger.addHandler(logging.NullHandler()) + start = time.time() curr_qacct_dict = None good_qacct_dict = None @@ -215,7 +234,7 @@ def _qacct_raw(task, timeout=1200, quantum=15): qacct_returncode = 0 try: qacct_stdout_str, qacct_stderr_str = check_output_and_stderr( - ['qacct', '-j', unicode(task.drm_jobID)], + ['qacct', '-j', unicode(job_id)], preexec_fn=exit_process_group) if qacct_stdout_str.strip(): break @@ -226,34 +245,34 @@ def _qacct_raw(task, timeout=1200, quantum=15): if qacct_stderr_str and re.match(r'error: job id \d+ not found', qacct_stderr_str): if i > 0: - task.workflow.log.info('%s SGE (qacct -j %s) reports "not found"; this may mean ' - 'qacct is merely slow, or %s died in the \'qw\' state', - task, task.drm_jobID, task.drm_jobID) + logger.info('%s SGE (qacct -j %s) reports "not found"; this may mean ' + 'qacct is merely slow, or %s died in the \'qw\' state', + log_prefix, job_id, job_id) else: - task.workflow.log.error('%s SGE (qacct -j %s) returned error code %d', - task, task.drm_jobID, qacct_returncode) + logger.error('%s SGE (qacct -j %s) returned error code %d', + log_prefix, job_id, qacct_returncode) if qacct_stdout_str or qacct_stderr_str: - task.workflow.log.error('%s SGE (qacct -j %s) printed the following', task, task.drm_jobID) + logger.error('%s SGE (qacct -j %s) printed the following', log_prefix, job_id) if qacct_stdout_str: - task.workflow.log.error('stdout: "%s"', qacct_stdout_str) + logger.error('stdout: "%s"', qacct_stdout_str) if qacct_stderr_str: - task.workflow.log.error('stderr: "%s"', qacct_stderr_str) + logger.error('stderr: "%s"', qacct_stderr_str) if i > 0: - task.workflow.log.info( + logger.info( '%s SGE (qacct -j %s) attempt %d failed %d sec after first attempt%s', - task, task.drm_jobID, i + 1, time.time() - start, + log_prefix, job_id, i + 1, time.time() - start, '. Will recheck job status after %d sec' % quantum if i + 1 < num_retries else '') if i + 1 < num_retries: sleep_through_signals(timeout=quantum) else: # fallthrough: all retries failed - raise ValueError('No valid `qacct -j %s` output after %d tries and %d sec' % - (task.drm_jobID, i, time.time() - start)) + raise ValueError('%s No valid SGE (qacct -j %s) output after %d tries and %d sec' % + (log_prefix, job_id, i, time.time() - start)) for line in qacct_stdout_str.strip().split('\n'): if line.startswith('='): - if curr_qacct_dict and not _is_corrupt(curr_qacct_dict): + if curr_qacct_dict and not is_corrupt(curr_qacct_dict): # # Cache this non-corrupt block of qacct data just # in case all the more recent blocks are corrupt. @@ -266,22 +285,24 @@ def _qacct_raw(task, timeout=1200, quantum=15): try: k, v = re.split(r'\s+', line, maxsplit=1) except ValueError: - raise EnvironmentError('%s with drm_jobID=%s has unparseable qacct output:\n%s' % - (task, task.drm_jobID, qacct_stdout_str)) + raise EnvironmentError('%s SGE (qacct -j %s) output is unparseable:\n%s' % + (log_prefix, job_id, qacct_stdout_str)) curr_qacct_dict[k] = v.strip() # if the last block of qacct data looks good, promote it - if curr_qacct_dict and not _is_corrupt(curr_qacct_dict): + if curr_qacct_dict and not is_corrupt(curr_qacct_dict): good_qacct_dict = curr_qacct_dict return good_qacct_dict if good_qacct_dict else curr_qacct_dict -def _qstat_all(): +def qstat(): """ - returns a dict keyed by lsf job ids, who's values are a dict of bjob - information about the job + Return a mapping of job ids to a dict of GE information about each job. + + The exact contents of the sub-dictionaries in the returned dictionary's + values() depend on the installed GE version. """ try: lines = subprocess.check_output(['qstat'], preexec_fn=exit_process_group).decode().strip().split('\n') From e0267102487963fcec13dcbba78aa5ce78c846f3 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Fri, 25 May 2018 15:40:00 -0700 Subject: [PATCH 04/14] Fix an incorrect variable name. --- cosmos/job/drm/drm_ge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 5f471742..5ad64598 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -168,7 +168,7 @@ def task_qacct(task, timeout=1200, quantum=15): """ Return qacct data for the specified task. """ - return qacct(task.job_id, timeout, quantum, + return qacct(task.drm_jobID, timeout, quantum, task.workflow.log, str(task)) def kill(self, task): From 4dbb0a3905ff256179b75d8adc7725fdaf8b1460 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Thu, 24 May 2018 14:04:44 -0700 Subject: [PATCH 05/14] Support GE job classes. --- bin/cosmos | 5 ++++- cosmos/models/Cosmos.py | 5 ++++- cosmos/models/Task.py | 7 +++++-- cosmos/models/Workflow.py | 4 +++- examples/ex2.py | 4 +++- examples_py2/ex2.py | 4 +++- 6 files changed, 22 insertions(+), 7 deletions(-) diff --git a/bin/cosmos b/bin/cosmos index 457899fa..da24ea59 100755 --- a/bin/cosmos +++ b/bin/cosmos @@ -38,7 +38,8 @@ def shell(db_url_or_path_to_sqlite): cosmos_app.shell() -def run(in_jobs, default_drm, default_queue, restart, max_cores): +def run(in_jobs, default_drm, default_job_class, default_queue, + restart, max_cores): """ Create an embarassingly parallel workflow from all jobs in `in_jobs`. `in_jobs` should be a json of a dict keyed by uid => command. @@ -48,6 +49,7 @@ def run(in_jobs, default_drm, default_queue, restart, max_cores): cosmos_app = Cosmos(database_url='sqlite:///cosmos.sqlite', default_drm=default_drm, + default_job_class=default_job_class, default_queue=default_queue, get_submit_args=partial(default_get_submit_args, parallel_env='smp')) cosmos_app.initdb() @@ -77,6 +79,7 @@ if __name__ == '__main__': sp = sps.add_parser('run') sp.add_argument('in_jobs') sp.add_argument('--default-drm', '-drm', default='ge') + sp.add_argument('--job-class', '-j') sp.add_argument('--queue', '-q') sp.add_argument('--max_cores', '--max-cores', '-c', type=int, diff --git a/cosmos/models/Cosmos.py b/cosmos/models/Cosmos.py index 84ed0283..a12b28f4 100644 --- a/cosmos/models/Cosmos.py +++ b/cosmos/models/Cosmos.py @@ -31,8 +31,9 @@ def default_get_submit_args(task, parallel_env='orte'): time = ' -W 0:{0}'.format(task.time_req) if task.time_req else '' return '-R "{rusage}span[hosts=1]" -n {task.core_req}{time}{queue} -J "{jobname}"'.format(**locals()) elif task.drm in ['ge', 'drmaa:ge']: - return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{queue}'.format( + return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{job_class}{queue}'.format( priority=' -p %s' % default_job_priority if default_job_priority else '', + job_class=' -jc %s' % task.job_class or '', queue=' -q %s' % task.queue or '', jobname=jobname, core_req=task.core_req, @@ -56,6 +57,7 @@ def __init__(self, database_url='sqlite:///:memory:', get_submit_args=default_get_submit_args, default_drm='local', + default_job_class=None, default_queue=None, default_time_req=None, default_max_attempts=1, @@ -108,6 +110,7 @@ def shutdown_session(exception=None): self.session.remove() self.default_drm = default_drm + self.default_job_class = default_job_class self.default_queue = default_queue self.default_max_attempts = default_max_attempts self.default_time_req = default_time_req diff --git a/cosmos/models/Task.py b/cosmos/models/Task.py index 08ce75f3..ef9cb501 100644 --- a/cosmos/models/Task.py +++ b/cosmos/models/Task.py @@ -57,8 +57,10 @@ def task_status_changed(task): elif task.status == TaskStatus.submitted: task.stage.status = StageStatus.running if not task.NOOP: - task.log.info('%s %s. drm=%s; drm_jobid=%s; queue=%s' % (task, task.status, repr(task.drm), - repr(task.drm_jobID), repr(task.queue))) + task.log.info( + '%s %s. drm=%s; drm_jobid=%s; job_class=%s; queue=%s' % + (task, task.status, repr(task.drm), repr(task.drm_jobID), + repr(task.job_class), repr(task.queue))) task.submitted_on = datetime.datetime.now() elif task.status == TaskStatus.failed: @@ -172,6 +174,7 @@ class Task(Base): attempt = Column(Integer, nullable=False) must_succeed = Column(Boolean, nullable=False) drm = Column(String(255)) + job_class = Column(String(255)) queue = Column(String(255)) max_attempts = Column(Integer) parents = relationship("Task", diff --git a/cosmos/models/Workflow.py b/cosmos/models/Workflow.py index 06ba194c..e504f3c4 100644 --- a/cosmos/models/Workflow.py +++ b/cosmos/models/Workflow.py @@ -143,7 +143,7 @@ def make_output_dirs(self): def add_task(self, func, params=None, parents=None, stage_name=None, uid=None, drm=None, queue=None, must_succeed=True, time_req=None, core_req=None, mem_req=None, - max_attempts=None, noop=False): + max_attempts=None, noop=False, job_class=None): """ Adds a new Task to the Workflow. If the Task already exists (and was successful), return the successful Task stored in the database @@ -157,6 +157,7 @@ def add_task(self, func, params=None, parents=None, stage_name=None, uid=None, d database version will be returned and a new one will not be created. :param str stage_name: The name of the Stage to add this Task to. Defaults to `func.__name__`. :param str drm: The drm to use for this Task (example 'local', 'ge' or 'drmaa:lsf'). Defaults to the `default_drm` parameter of :meth:`Cosmos.start` + :param job_class: The name of a job_class to submit to; defaults to the `default_job_class` parameter of :meth:`Cosmos.start` :param queue: The name of a queue to submit to; defaults to the `default_queue` parameter of :meth:`Cosmos.start` :param bool must_succeed: Default True. If False, the Workflow will not fail if this Task does not succeed. Dependent Jobs will not be executed. :param bool time_req: The time requirement; will set the Task.time_req attribute which is intended to be used by :func:`get_submit_args` to request resources. @@ -259,6 +260,7 @@ def params_or_signature_default_or(name, default): output_map=output_map, uid=uid, drm=drm if drm is not None else self.cosmos_app.default_drm, + job_class=job_class if job_class is not None else self.cosmos_app.default_job_class, queue=queue if queue is not None else self.cosmos_app.default_queue, must_succeed=must_succeed, core_req=core_req if core_req is not None else params_or_signature_default_or('core_req', 1), diff --git a/examples/ex2.py b/examples/ex2.py index 83a9d60e..527d09f4 100644 --- a/examples/ex2.py +++ b/examples/ex2.py @@ -78,7 +78,8 @@ def recipe(workflow): if __name__ == '__main__': p = argparse.ArgumentParser() p.add_argument('-drm', default='local', help='', choices=('local', 'drmaa:ge', 'ge', 'slurm')) - p.add_argument('-q', '--queue', help='Submit to this queue of the DRM supports it') + p.add_argument('-j', '--job-class', help='Submit to this job class if the DRM supports it') + p.add_argument('-q', '--queue', help='Submit to this queue if the DRM supports it') args = p.parse_args() @@ -87,6 +88,7 @@ def recipe(workflow): get_submit_args=partial(default_get_submit_args, parallel_env='smp'), default_drm=args.drm, default_max_attempts=2, + default_job_class=args.job_class, default_queue=args.queue) cosmos.initdb() diff --git a/examples_py2/ex2.py b/examples_py2/ex2.py index 9100fde3..a4f26471 100644 --- a/examples_py2/ex2.py +++ b/examples_py2/ex2.py @@ -54,7 +54,8 @@ def recipe(workflow): p = argparse.ArgumentParser() p.add_argument('-drm', default='local', help='', choices=('local', 'drmaa:ge', 'ge', 'slurm')) - p.add_argument('-q', '--queue', help='Submit to this queue of the DRM supports it') + p.add_argument('-j', '--job-class', help='Submit to this job class if the DRM supports it') + p.add_argument('-q', '--queue', help='Submit to this queue if the DRM supports it') args = p.parse_args() @@ -62,6 +63,7 @@ def recipe(workflow): # example of how to change arguments if you're NOT using default_drm='local' get_submit_args=partial(default_get_submit_args, parallel_env='smp'), default_drm=args.drm, + default_job_class=args.job_class, default_queue=args.queue) cosmos.initdb() From 8ad51055252376067b1df338cc765684877e5089 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Thu, 24 May 2018 14:06:51 -0700 Subject: [PATCH 06/14] Remove unused imports from Task and lint the rest. --- cosmos/models/Task.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/cosmos/models/Task.py b/cosmos/models/Task.py index ef9cb501..f2a1343b 100644 --- a/cosmos/models/Task.py +++ b/cosmos/models/Task.py @@ -1,23 +1,20 @@ -import os import codecs -import networkx as nx +import datetime +import os +import pprint import subprocess as sp -from sqlalchemy.orm import relationship, synonym, backref + +import networkx as nx +from flask import url_for from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.orm import relationship, synonym from sqlalchemy.schema import Column, ForeignKey, UniqueConstraint -from sqlalchemy.types import Boolean, Integer, String, DateTime, BigInteger -from flask import url_for +from sqlalchemy.types import Boolean, DateTime, Integer, String +from cosmos import StageStatus, TaskStatus, signal_task_status_change from cosmos.db import Base -from cosmos.util.sqla import Enum_ColumnType, MutableDict, JSONEncodedDict, ListOfStrings, MutableList -from cosmos import TaskStatus, StageStatus, signal_task_status_change from cosmos.util.helpers import wait_for_file - - -import datetime -import pprint - -opj = os.path.join +from cosmos.util.sqla import Enum_ColumnType, JSONEncodedDict, MutableDict class ExpectedError(Exception): pass @@ -112,7 +109,8 @@ def task_status_changed(task): def logplus(filename): prefix, suffix = os.path.splitext(filename) - return property(lambda self: opj(self.log_dir, "{0}_attempt{1}{2}".format(prefix, self.attempt, suffix))) + return property(lambda self: os.path.join( + self.log_dir, "{0}_attempt{1}{2}".format(prefix, self.attempt, suffix))) def readfile(path): From 377ea37cb801719750ffb5bd624074564f18d5b4 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Fri, 25 May 2018 15:40:51 -0700 Subject: [PATCH 07/14] Log qsub CLI strings if SGE refuses to execute them. --- cosmos/job/drm/drm_ge.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 5ad64598..2395a074 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -37,8 +37,8 @@ def submit_job(self, task): task.drm_jobID = unicode(int(out)) except subprocess.CalledProcessError as cpe: - task.log.error('%s submission to %s failed with error %s: %s' % - (task, task.drm, cpe.returncode, cpe.output.decode().strip())) + task.log.error('%s submission to %s (%s) failed with error %s: %s' % + (task, task.drm, qsub, cpe.returncode, cpe.output.decode().strip())) task.status = TaskStatus.failed except ValueError: task.log.error('%s submission to %s returned unexpected text: %s' % (task, task.drm, out)) From 3226bc3166bae85db9ada853392f7bf90a4fbcf4 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Fri, 25 May 2018 15:42:06 -0700 Subject: [PATCH 08/14] Don't pass -jc or -q flags to GE if the corresponding Task field is unset. --- cosmos/models/Cosmos.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/models/Cosmos.py b/cosmos/models/Cosmos.py index a12b28f4..620b3aeb 100644 --- a/cosmos/models/Cosmos.py +++ b/cosmos/models/Cosmos.py @@ -33,8 +33,8 @@ def default_get_submit_args(task, parallel_env='orte'): elif task.drm in ['ge', 'drmaa:ge']: return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{job_class}{queue}'.format( priority=' -p %s' % default_job_priority if default_job_priority else '', - job_class=' -jc %s' % task.job_class or '', - queue=' -q %s' % task.queue or '', + job_class=' -jc %s' % task.job_class if task.job_class else '', + queue=' -q %s' % task.queue if task.queue else '', jobname=jobname, core_req=task.core_req, parallel_env=parallel_env) From ebfc62e056a46be2de1ac89742fffe8298c00a36 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Wed, 30 May 2018 12:20:53 -0700 Subject: [PATCH 09/14] Use K, not G, for io_read_kb and io_write_kb. Oops! --- cosmos/job/drm/drm_ge.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 2395a074..7f521012 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -146,8 +146,8 @@ def _get_task_return_data(self, task): io_read_count=int(d['ru_inblock']), io_write_count=int(d['ru_oublock']), io_wait=float(d['iow']), - io_read_kb="%fG" % float(d['io']), - io_write_kb="%fG" % float(d['io']), + io_read_kb=convert_size_to_kb("%fG" % float(d['io'])), + io_write_kb=convert_size_to_kb("%fG" % float(d['io'])), ctx_switch_voluntary=int(d['ru_nvcsw']), ctx_switch_involuntary=int(d['ru_nivcsw']), From 1dc0b02f89ceba790faab78c6df67b28dcc001bd Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Sat, 2 Jun 2018 20:35:36 -0700 Subject: [PATCH 10/14] Don't persist job_class info in the Task schema. Code review. --- cosmos/models/Task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/models/Task.py b/cosmos/models/Task.py index f2a1343b..3b809648 100644 --- a/cosmos/models/Task.py +++ b/cosmos/models/Task.py @@ -172,7 +172,8 @@ class Task(Base): attempt = Column(Integer, nullable=False) must_succeed = Column(Boolean, nullable=False) drm = Column(String(255)) - job_class = Column(String(255)) + # FIXME consider making this a full-fledged DB entry next time the schema changes + # job_class = Column(String(255)) queue = Column(String(255)) max_attempts = Column(Integer) parents = relationship("Task", From 918a7b03af4dfaff63444f9fcf3d85d18acb5b1a Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Sat, 2 Jun 2018 22:16:30 -0700 Subject: [PATCH 11/14] Move new function arguments to end of param list with default=None. --- bin/cosmos | 4 ++-- cosmos/models/Cosmos.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/cosmos b/bin/cosmos index da24ea59..4ccc1059 100755 --- a/bin/cosmos +++ b/bin/cosmos @@ -38,8 +38,8 @@ def shell(db_url_or_path_to_sqlite): cosmos_app.shell() -def run(in_jobs, default_drm, default_job_class, default_queue, - restart, max_cores): +def run(in_jobs, default_drm, default_queue, restart, max_cores, + default_job_class=None): """ Create an embarassingly parallel workflow from all jobs in `in_jobs`. `in_jobs` should be a json of a dict keyed by uid => command. diff --git a/cosmos/models/Cosmos.py b/cosmos/models/Cosmos.py index 620b3aeb..834206fc 100644 --- a/cosmos/models/Cosmos.py +++ b/cosmos/models/Cosmos.py @@ -57,11 +57,11 @@ def __init__(self, database_url='sqlite:///:memory:', get_submit_args=default_get_submit_args, default_drm='local', - default_job_class=None, default_queue=None, default_time_req=None, default_max_attempts=1, - flask_app=None): + flask_app=None, + default_job_class=None): """ :param str database_url: A `sqlalchemy database url `_. ex: sqlite:///home/user/sqlite.db or mysql://user:pass@localhost/database_name or postgresql+psycopg2://user:pass@localhost/database_name From 91a81d07a4a6beda61ac72c5192549414a392578 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Sun, 3 Jun 2018 09:31:40 -0700 Subject: [PATCH 12/14] Make job_class a valid constructor argument. --- cosmos/models/Task.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cosmos/models/Task.py b/cosmos/models/Task.py index 3b809648..956ad60c 100644 --- a/cosmos/models/Task.py +++ b/cosmos/models/Task.py @@ -7,7 +7,8 @@ import networkx as nx from flask import url_for from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy.orm import relationship, synonym +from sqlalchemy.ext.declarative.base import _declarative_constructor +from sqlalchemy.orm import reconstructor, relationship, synonym from sqlalchemy.schema import Column, ForeignKey, UniqueConstraint from sqlalchemy.types import Boolean, DateTime, Integer, String @@ -172,7 +173,7 @@ class Task(Base): attempt = Column(Integer, nullable=False) must_succeed = Column(Boolean, nullable=False) drm = Column(String(255)) - # FIXME consider making this a full-fledged DB entry next time the schema changes + # FIXME consider making job_class a proper field next time the schema changes # job_class = Column(String(255)) queue = Column(String(255)) max_attempts = Column(Integer) @@ -347,3 +348,12 @@ def __repr__(self): def __str__(self): return self.__repr__() + + # FIXME consider making job_class a proper field next time the schema changes + def __init__(self, **kwargs): + self.job_class = kwargs.pop('job_class', None) + _declarative_constructor(self, **kwargs) + + @reconstructor + def init_on_load(self): + self.job_class = None From 61837195549a0f555b9045e97c440660869167f0 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Sun, 3 Jun 2018 21:48:51 -0700 Subject: [PATCH 13/14] Refactor cosmos.job.drm.drm_ge to expose a qsub command. --- cosmos/job/drm/drm_ge.py | 91 +++++++++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 28 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 7f521012..f067e60a 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -22,29 +22,15 @@ class DRM_GE(DRM): poll_interval = 5 def submit_job(self, task): - for p in [task.output_stdout_path, task.output_stderr_path]: - if os.path.exists(p): - os.unlink(p) + task.drm_jobID, task.status = qsub( + cmd_fn=task.output_command_script_path, + stdout_fn=task.output_stdout_path, + stderr_fn=task.output_stderr_path, + addl_args=task.drm_native_specification, + drm_name=task.drm, + logger=task.log, + log_prefix=str(task)) - ns = ' ' + task.drm_native_specification if task.drm_native_specification else '' - qsub = 'qsub -terse -o {stdout} -e {stderr} -b y -w e -cwd -S /bin/bash -V{ns} '.format( - stdout=task.output_stdout_path, stderr=task.output_stderr_path, ns=ns) - - try: - out = subprocess.check_output( - '{qsub} "{cmd_str}"'.format(cmd_str=task.output_command_script_path, qsub=qsub), - env=os.environ, preexec_fn=exit_process_group, shell=True, stderr=subprocess.STDOUT).decode() - - task.drm_jobID = unicode(int(out)) - except subprocess.CalledProcessError as cpe: - task.log.error('%s submission to %s (%s) failed with error %s: %s' % - (task, task.drm, qsub, cpe.returncode, cpe.output.decode().strip())) - task.status = TaskStatus.failed - except ValueError: - task.log.error('%s submission to %s returned unexpected text: %s' % (task, task.drm, out)) - task.status = TaskStatus.failed - else: - task.status = TaskStatus.submitted def filter_is_done(self, tasks): """ @@ -182,6 +168,17 @@ def kill_tasks(self, tasks): subprocess.call(['qdel', pids], preexec_fn=exit_process_group) +def _get_null_logger(): + """ + Return a logger that drops all messages passed to it. + """ + logger = logging.getLogger( + ".".join([sys.modules[__name__].__name__, "null_logger"])) + # only initialize the null logger the first time we load it + if not logger.handlers: + logger.addHandler(logging.NullHandler()) + + def is_corrupt(qacct_dict): """ Return true if qacct returns bogus job data for a job id. @@ -217,13 +214,8 @@ def qacct(job_id, timeout=1200, quantum=15, logger=None, log_prefix=""): corrupt data. Call ``is_corrupt()`` on the output of this method to see if the data are suitable for use. """ - # create a dummy logger with a distinct name if one is not supplied if not logger: - logger = logging.getLogger( - ".".join([sys.modules[__name__].__name__, "qacct"])) - # only initialize the dummy logger the first time we load it - if not logger.handlers: - logger.addHandler(logging.NullHandler()) + logger = _get_null_logger() start = time.time() curr_qacct_dict = None @@ -314,3 +306,46 @@ def qstat(): items = re.split(r"\s+", l.strip()) bjobs[items[0]] = dict(zip(keys, items)) return bjobs + + +def qsub(cmd_fn, stdout_fn, stderr_fn, addl_args=None, drm_name="GE", logger=None, log_prefix=""): + """ + Submit the requested (bash-parseable) script stored in cmd_fn to GE. + + The command is submitted relatove to the current CWD. Callers should change + this before calling if they need to run in a particular directory. + + Output will be written to two filenames, specified in stdout_fn and stderr_fn. + Additional arguments to SGE may be specified as a single string in addl_args. + Callers can optionally supply a logger object and a prefix to prepend to log messages. + """ + for p in [stdout_fn, stderr_fn]: + if os.path.exists(p): + os.unlink(p) + + qsub_cli = 'qsub -terse -o {stdout_fn} -e {stderr_fn} -b y -w e -cwd -S /bin/bash -V'.format( + stdout_fn=stdout_fn, stderr_fn=stderr_fn) + + if addl_args: + qsub_cli += ' %s' % addl_args + + job_id = None + try: + out = subprocess.check_output( + '{qsub_cli} "{cmd_fn}"'.format(cmd_fn=cmd_fn, qsub_cli=qsub_cli), + env=os.environ, preexec_fn=exit_process_group, shell=True, + stderr=subprocess.STDOUT).decode() + + job_id = unicode(int(out)) + except subprocess.CalledProcessError as cpe: + logger.error('%s submission to %s (%s) failed with error %s: %s' % + (log_prefix, drm_name, qsub, cpe.returncode, cpe.output.decode().strip())) + status = TaskStatus.failed + except ValueError: + logger.error('%s submission to %s returned unexpected text: %s' % + (log_prefix, drm_name, out)) + status = TaskStatus.failed + else: + status = TaskStatus.submitted + + return (job_id, status) From 11460794938c308422116b454ea6bdbd79909fd5 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 5 Jun 2018 11:39:50 -0700 Subject: [PATCH 14/14] Update VERSION to 2.6.24. --- cosmos/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/VERSION b/cosmos/VERSION index fef250dd..848abb76 100644 --- a/cosmos/VERSION +++ b/cosmos/VERSION @@ -1 +1 @@ -2.6.23 +2.6.24