Skip to content

Commit

Permalink
Merge pull request #84 from LPM-HMS/qacct_access
Browse files Browse the repository at this point in the history
Support qsub -jc, make qstat/qacct importable, fix summary units
  • Loading branch information
mdpearson authored Jun 5, 2018
2 parents f9bea03 + 1146079 commit 04f3ae5
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 73 deletions.
5 changes: 4 additions & 1 deletion bin/cosmos
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def shell(db_url_or_path_to_sqlite):
cosmos_app.shell()


def run(in_jobs, default_drm, default_queue, restart, max_cores):
def run(in_jobs, default_drm, default_queue, restart, max_cores,
default_job_class=None):
"""
Create an embarassingly parallel workflow from all jobs in `in_jobs`. `in_jobs` should be a json of a dict
keyed by uid => command.
Expand All @@ -48,6 +49,7 @@ def run(in_jobs, default_drm, default_queue, restart, max_cores):

cosmos_app = Cosmos(database_url='sqlite:///cosmos.sqlite',
default_drm=default_drm,
default_job_class=default_job_class,
default_queue=default_queue,
get_submit_args=partial(default_get_submit_args, parallel_env='smp'))
cosmos_app.initdb()
Expand Down Expand Up @@ -77,6 +79,7 @@ if __name__ == '__main__':
sp = sps.add_parser('run')
sp.add_argument('in_jobs')
sp.add_argument('--default-drm', '-drm', default='ge')
sp.add_argument('--job-class', '-j')
sp.add_argument('--queue', '-q')

sp.add_argument('--max_cores', '--max-cores', '-c', type=int,
Expand Down
2 changes: 1 addition & 1 deletion cosmos/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.23
2.6.24
171 changes: 115 additions & 56 deletions cosmos/job/drm/drm_ge.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import logging
import os
import re
import subprocess
import sys
import time

from collections import OrderedDict

from more_itertools import grouper

from cosmos import TaskStatus
from cosmos.job.drm.DRM_Base import DRM
from cosmos.job.drm.util import (check_output_and_stderr, convert_size_to_kb, div,
exit_process_group, DetailedCalledProcessError)
from cosmos.job.drm.util import (DetailedCalledProcessError,
check_output_and_stderr, convert_size_to_kb,
div, exit_process_group)
from cosmos.util.signal_handlers import sleep_through_signals


Expand All @@ -20,29 +22,15 @@ class DRM_GE(DRM):
poll_interval = 5

def submit_job(self, task):
for p in [task.output_stdout_path, task.output_stderr_path]:
if os.path.exists(p):
os.unlink(p)
task.drm_jobID, task.status = qsub(
cmd_fn=task.output_command_script_path,
stdout_fn=task.output_stdout_path,
stderr_fn=task.output_stderr_path,
addl_args=task.drm_native_specification,
drm_name=task.drm,
logger=task.log,
log_prefix=str(task))

ns = ' ' + task.drm_native_specification if task.drm_native_specification else ''
qsub = 'qsub -terse -o {stdout} -e {stderr} -b y -w e -cwd -S /bin/bash -V{ns} '.format(
stdout=task.output_stdout_path, stderr=task.output_stderr_path, ns=ns)

try:
out = subprocess.check_output(
'{qsub} "{cmd_str}"'.format(cmd_str=task.output_command_script_path, qsub=qsub),
env=os.environ, preexec_fn=exit_process_group, shell=True, stderr=subprocess.STDOUT).decode()

task.drm_jobID = unicode(int(out))
except subprocess.CalledProcessError as cpe:
task.log.error('%s submission to %s failed with error %s: %s' %
(task, task.drm, cpe.returncode, cpe.output.decode().strip()))
task.status = TaskStatus.failed
except ValueError:
task.log.error('%s submission to %s returned unexpected text: %s' % (task, task.drm, out))
task.status = TaskStatus.failed
else:
task.status = TaskStatus.submitted

def filter_is_done(self, tasks):
"""
Expand All @@ -57,7 +45,7 @@ def filter_is_done(self, tasks):
has been affected by this SGE bug.
"""
if tasks:
qjobs = _qstat_all()
qjobs = qstat()
corrupt_data = {}

for task in tasks:
Expand Down Expand Up @@ -97,7 +85,7 @@ def drm_statuses(self, tasks):
:returns: (dict) task.drm_jobID -> drm_status
"""
if tasks:
qjobs = _qstat_all()
qjobs = qstat()

def f(task):
return qjobs.get(unicode(task.drm_jobID), dict()).get('state', 'UNK_JOB_STATE')
Expand All @@ -115,10 +103,10 @@ def _get_task_return_data(self, task):
[1] a boolean indicating whether the metadata in [0] are affected by an
SGE bug that causes qacct to occasionally return corrupt results.
"""
d = _qacct_raw(task)
d = self.task_qacct(task)

job_failed = d['failed'][0] != '0'
data_are_corrupt = _is_corrupt(d)
data_are_corrupt = is_corrupt(d)

if job_failed or data_are_corrupt:
task.workflow.log.warn('%s SGE (qacct -j %s) reports %s:\n%s' %
Expand All @@ -137,15 +125,15 @@ def _get_task_return_data(self, task):
system_time=float(d['ru_stime']),

avg_rss_mem=d['ru_ixrss'],
max_rss_mem_kb=convert_size_to_kb(d['ru_maxrss']),
max_rss_mem_kb=convert_size_to_kb(d['maxrss']),
avg_vms_mem_kb=None,
max_vms_mem_kb=convert_size_to_kb(d['maxvmem']),

io_read_count=int(d['ru_inblock']),
io_write_count=int(d['ru_oublock']),
io_wait=float(d['iow']),
io_read_kb=float(d['io']),
io_write_kb=float(d['io']),
io_read_kb=convert_size_to_kb("%fG" % float(d['io'])),
io_write_kb=convert_size_to_kb("%fG" % float(d['io'])),

ctx_switch_voluntary=int(d['ru_nvcsw']),
ctx_switch_involuntary=int(d['ru_nivcsw']),
Expand All @@ -161,6 +149,14 @@ def _get_task_return_data(self, task):

return processed_data, data_are_corrupt

@staticmethod
def task_qacct(task, timeout=1200, quantum=15):
"""
Return qacct data for the specified task.
"""
return qacct(task.drm_jobID, timeout, quantum,
task.workflow.log, str(task))

def kill(self, task):
"""Terminate a task."""
raise NotImplementedError
Expand All @@ -172,8 +168,21 @@ def kill_tasks(self, tasks):
subprocess.call(['qdel', pids], preexec_fn=exit_process_group)


def _is_corrupt(qacct_dict):
def _get_null_logger():
"""
Return a logger that drops all messages passed to it.
"""
logger = logging.getLogger(
".".join([sys.modules[__name__].__name__, "null_logger"]))
# only initialize the null logger the first time we load it
if not logger.handlers:
logger.addHandler(logging.NullHandler())


def is_corrupt(qacct_dict):
"""
Return true if qacct returns bogus job data for a job id.
qacct may return multiple records for a job. They may all be corrupt. Yuk.
This was allegedly fixed in 6.0u10 but we've seen it in UGE 8.3.1.
Expand All @@ -195,14 +204,19 @@ def _is_corrupt(qacct_dict):
("before writing exit_status" not in qacct_dict.get('failed', ''))


def _qacct_raw(task, timeout=600, quantum=15):
def qacct(job_id, timeout=1200, quantum=15, logger=None, log_prefix=""):
"""
Parse qacct output into key/value pairs.
If qacct reports results in multiple blocks (separated by a row of ===='s),
the most recently-generated block with valid data is returned. If no such
block exists, then return the most recently-generated block of corrupt data.
the most recently-generated block with valid data is returned. If no block
with valid data exists, then return the most recently-generated block of
corrupt data. Call ``is_corrupt()`` on the output of this method to see if
the data are suitable for use.
"""
if not logger:
logger = _get_null_logger()

start = time.time()
curr_qacct_dict = None
good_qacct_dict = None
Expand All @@ -212,7 +226,7 @@ def _qacct_raw(task, timeout=600, quantum=15):
qacct_returncode = 0
try:
qacct_stdout_str, qacct_stderr_str = check_output_and_stderr(
['qacct', '-j', unicode(task.drm_jobID)],
['qacct', '-j', unicode(job_id)],
preexec_fn=exit_process_group)
if qacct_stdout_str.strip():
break
Expand All @@ -223,34 +237,34 @@ def _qacct_raw(task, timeout=600, quantum=15):

if qacct_stderr_str and re.match(r'error: job id \d+ not found', qacct_stderr_str):
if i > 0:
task.workflow.log.info('%s SGE (qacct -j %s) reports "not found"; this may mean '
'qacct is merely slow, or %s died in the \'qw\' state',
task, task.drm_jobID, task.drm_jobID)
logger.info('%s SGE (qacct -j %s) reports "not found"; this may mean '
'qacct is merely slow, or %s died in the \'qw\' state',
log_prefix, job_id, job_id)
else:
task.workflow.log.error('%s SGE (qacct -j %s) returned error code %d',
task, task.drm_jobID, qacct_returncode)
logger.error('%s SGE (qacct -j %s) returned error code %d',
log_prefix, job_id, qacct_returncode)
if qacct_stdout_str or qacct_stderr_str:
task.workflow.log.error('%s SGE (qacct -j %s) printed the following', task, task.drm_jobID)
logger.error('%s SGE (qacct -j %s) printed the following', log_prefix, job_id)
if qacct_stdout_str:
task.workflow.log.error('stdout: "%s"', qacct_stdout_str)
logger.error('stdout: "%s"', qacct_stdout_str)
if qacct_stderr_str:
task.workflow.log.error('stderr: "%s"', qacct_stderr_str)
logger.error('stderr: "%s"', qacct_stderr_str)

if i > 0:
task.workflow.log.info(
logger.info(
'%s SGE (qacct -j %s) attempt %d failed %d sec after first attempt%s',
task, task.drm_jobID, i + 1, time.time() - start,
log_prefix, job_id, i + 1, time.time() - start,
'. Will recheck job status after %d sec' % quantum if i + 1 < num_retries else '')
if i + 1 < num_retries:
sleep_through_signals(timeout=quantum)
else:
# fallthrough: all retries failed
raise ValueError('No valid `qacct -j %s` output after %d tries and %d sec' %
(task.drm_jobID, i, time.time() - start))
raise ValueError('%s No valid SGE (qacct -j %s) output after %d tries and %d sec' %
(log_prefix, job_id, i, time.time() - start))

for line in qacct_stdout_str.strip().split('\n'):
if line.startswith('='):
if curr_qacct_dict and not _is_corrupt(curr_qacct_dict):
if curr_qacct_dict and not is_corrupt(curr_qacct_dict):
#
# Cache this non-corrupt block of qacct data just
# in case all the more recent blocks are corrupt.
Expand All @@ -263,22 +277,24 @@ def _qacct_raw(task, timeout=600, quantum=15):
try:
k, v = re.split(r'\s+', line, maxsplit=1)
except ValueError:
raise EnvironmentError('%s with drm_jobID=%s has unparseable qacct output:\n%s' %
(task, task.drm_jobID, qacct_stdout_str))
raise EnvironmentError('%s SGE (qacct -j %s) output is unparseable:\n%s' %
(log_prefix, job_id, qacct_stdout_str))

curr_qacct_dict[k] = v.strip()

# if the last block of qacct data looks good, promote it
if curr_qacct_dict and not _is_corrupt(curr_qacct_dict):
if curr_qacct_dict and not is_corrupt(curr_qacct_dict):
good_qacct_dict = curr_qacct_dict

return good_qacct_dict if good_qacct_dict else curr_qacct_dict


def _qstat_all():
def qstat():
"""
returns a dict keyed by lsf job ids, who's values are a dict of bjob
information about the job
Return a mapping of job ids to a dict of GE information about each job.
The exact contents of the sub-dictionaries in the returned dictionary's
values() depend on the installed GE version.
"""
try:
lines = subprocess.check_output(['qstat'], preexec_fn=exit_process_group).decode().strip().split('\n')
Expand All @@ -290,3 +306,46 @@ def _qstat_all():
items = re.split(r"\s+", l.strip())
bjobs[items[0]] = dict(zip(keys, items))
return bjobs


def qsub(cmd_fn, stdout_fn, stderr_fn, addl_args=None, drm_name="GE", logger=None, log_prefix=""):
"""
Submit the requested (bash-parseable) script stored in cmd_fn to GE.
The command is submitted relatove to the current CWD. Callers should change
this before calling if they need to run in a particular directory.
Output will be written to two filenames, specified in stdout_fn and stderr_fn.
Additional arguments to SGE may be specified as a single string in addl_args.
Callers can optionally supply a logger object and a prefix to prepend to log messages.
"""
for p in [stdout_fn, stderr_fn]:
if os.path.exists(p):
os.unlink(p)

qsub_cli = 'qsub -terse -o {stdout_fn} -e {stderr_fn} -b y -w e -cwd -S /bin/bash -V'.format(
stdout_fn=stdout_fn, stderr_fn=stderr_fn)

if addl_args:
qsub_cli += ' %s' % addl_args

job_id = None
try:
out = subprocess.check_output(
'{qsub_cli} "{cmd_fn}"'.format(cmd_fn=cmd_fn, qsub_cli=qsub_cli),
env=os.environ, preexec_fn=exit_process_group, shell=True,
stderr=subprocess.STDOUT).decode()

job_id = unicode(int(out))
except subprocess.CalledProcessError as cpe:
logger.error('%s submission to %s (%s) failed with error %s: %s' %
(log_prefix, drm_name, qsub, cpe.returncode, cpe.output.decode().strip()))
status = TaskStatus.failed
except ValueError:
logger.error('%s submission to %s returned unexpected text: %s' %
(log_prefix, drm_name, out))
status = TaskStatus.failed
else:
status = TaskStatus.submitted

return (job_id, status)
9 changes: 6 additions & 3 deletions cosmos/models/Cosmos.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ def default_get_submit_args(task, parallel_env='orte'):
time = ' -W 0:{0}'.format(task.time_req) if task.time_req else ''
return '-R "{rusage}span[hosts=1]" -n {task.core_req}{time}{queue} -J "{jobname}"'.format(**locals())
elif task.drm in ['ge', 'drmaa:ge']:
return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{queue}'.format(
return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{job_class}{queue}'.format(
priority=' -p %s' % default_job_priority if default_job_priority else '',
queue=' -q %s' % task.queue or '',
job_class=' -jc %s' % task.job_class if task.job_class else '',
queue=' -q %s' % task.queue if task.queue else '',
jobname=jobname,
core_req=task.core_req,
parallel_env=parallel_env)
Expand All @@ -60,7 +61,8 @@ def __init__(self,
default_queue=None,
default_time_req=None,
default_max_attempts=1,
flask_app=None):
flask_app=None,
default_job_class=None):
"""
:param str database_url: A `sqlalchemy database url <http://docs.sqlalchemy.org/en/latest/core/engines.html>`_. ex: sqlite:///home/user/sqlite.db or
mysql://user:pass@localhost/database_name or postgresql+psycopg2://user:pass@localhost/database_name
Expand Down Expand Up @@ -109,6 +111,7 @@ def shutdown_session(exception=None):
self.session.remove()

self.default_drm = default_drm
self.default_job_class = default_job_class
self.default_queue = default_queue
self.default_max_attempts = default_max_attempts
self.default_time_req = default_time_req
Expand Down
Loading

0 comments on commit 04f3ae5

Please sign in to comment.