Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
egafni committed Jul 12, 2016
2 parents 8f1f283 + c727819 commit b172124
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cosmos/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.9
2.0.10
121 changes: 94 additions & 27 deletions cosmos/job/drm/drm_ge.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,60 @@ def submit_job(self, task):
return drm_jobID

def filter_is_done(self, tasks):
"""
Yield a dictionary of SGE job metadata for each task that has completed.
This method tries to be defensive against corrupt qstat and qacct output
(see ASSAYD-153, PIPE-1978, PT-3987, etc.). If qstat reports that a job
has finished, but qacct output looks suspicious, we try to give the job,
and/or SGE, time to complete and/or recover.
This method will only yield corrupt qacct data if every outstanding task
has been affected by this SGE bug.
"""
if len(tasks):
qjobs = qstat_all()
qjobs = _qstat_all()
corrupt_data = {}

for task in tasks:
jid = str(task.drm_jobID)
if jid not in qjobs:
# print 'missing %s %s' % (task, task.drm_jobID)
yield task, self._get_task_return_data(task)
else:
if any(finished_state in qjobs[jid]['state'] for finished_state in ['e', 'E']):
yield task, self._get_task_return_data(task)
if jid not in qjobs or \
any(finished_state in qjobs[jid]['state'] for finished_state in ['e', 'E']):
#
# If the job doesn't appear in qstat (or is tagged with 'e' or 'E'),
# it *probably* has completed. However, SGE's qmaster may have
# simply lost track of it for a little while, in which case qacct
# will output corrupt data when it is interrogated.
#
data, data_are_corrupt = self._get_task_return_data(task)
if data_are_corrupt:
task.workflow.log.warn(
'corrupt qstat/qacct output for %s means it may still be running' % task)
corrupt_data[task] = data
else:
yield task, data

num_cleanly_running_jobs = len(tasks) - len(corrupt_data)

if num_cleanly_running_jobs > 0:
for task in corrupt_data.keys():
task.workflow.log.info(
'temporarily masking corrupt SGE data for %s since %d other jobs are running cleanly' %
(task, num_cleanly_running_jobs))
else:
for task, data in corrupt_data.items():
task.workflow.log.error(
'all outstanding drm_ge tasks had corrupt SGE data: giving up on %s' % task)
yield task, data


def drm_statuses(self, tasks):
"""
:param tasks: tasks that have been submitted to the job manager
:returns: (dict) task.drm_jobID -> drm_status
"""
if len(tasks):
qjobs = qstat_all()
qjobs = _qstat_all()

def f(task):
return qjobs.get(str(task.drm_jobID), dict()).get('state', '???')
Expand All @@ -57,10 +93,27 @@ def f(task):
return {}

def _get_task_return_data(self, task):
d = qacct(task)
failed = d['failed'][0] != '0'
return dict(
exit_status=int(d['exit_status']) if not failed else int(re.search('^(\d+)', d['failed']).group(1)),
"""
Convert raw qacct job data into Cosmos's more portable format.
Returns a 2-tuple comprising:
[0] a dictionary of job metadata,
[1] a boolean indicating whether the metadata in [0] are affected by an
SGE bug that causes qacct to occasionally return corrupt results.
"""
d = _qacct_raw(task)

job_failed = d['failed'][0] != '0'
data_are_corrupt = _is_corrupt(d)

if job_failed or data_are_corrupt:
task.workflow.log.warn('`qacct -j %s` (for task %s) shows %s:\n%s' %
(task.drm_jobID,
'corrupt data' if data_are_corrupt else 'job failure',
task, d))

processed_data = dict(
exit_status=int(d['exit_status']) if not job_failed else int(re.search('^(\d+)', d['failed']).group(1)),

percent_cpu=div(float(d['cpu']), float(d['ru_wallclock'])),
wall_time=float(d['ru_wallclock']),
Expand Down Expand Up @@ -92,6 +145,8 @@ def _get_task_return_data(self, task):
memory=float(d['mem']),
)

return processed_data, data_are_corrupt

def kill(self, task):
"Terminates a task"
raise NotImplementedError
Expand All @@ -103,24 +158,37 @@ def kill_tasks(self, tasks):
sp.Popen(['qdel', pids], preexec_fn=preexec_function)


def is_garbage(qacct_dict):
def _is_corrupt(qacct_dict):
"""
qacct may return multiple records for a job. Yuk.
qacct may return multiple records for a job. They may all be corrupt. Yuk.
This was allegedly fixed in 6.0u10 but we've seen it in UGE 8.3.1.
http://osdir.com/ml/clustering.gridengine.users/2007-11/msg00397.html
When multiple records are returned, the first one(s) may have garbage data.
This function checks for three values whose presence means the entire block
is wrong.
When multiple records are returned, the first one(s) may have corrupt data.
UPDATE: this can happen even when only one block is returned, and we've also
seen cases where multiple blocks are returned and not one is reliable. This
function checks for values whose presence means an entire block is corrupt.
Note that qacct may return a date that precedes the Epoch (!), depending on
the $TZ env. variable. To be safe we check for dates within 24 hours of it.
"""
return qacct_dict.get('qsub_time', '').startswith('12/31/1969') or \
return \
qacct_dict.get('qsub_time', '').startswith('12/31/1969') or \
qacct_dict.get('qsub_time', '').startswith('01/01/1970') or \
qacct_dict.get('start_time', None) == '-/-' or \
qacct_dict.get('end_time', None) == '-/-'


def qacct(task, timeout=600):
def _qacct_raw(task, timeout=600):
"""
Parse qacct output into key/value pairs.
If qacct reports results in multiple blocks (separated by a row of ===='s),
the first block with valid data is returned. If no such block exists, then
the most recently-generated block of corrupt data is returned.
"""
start = time.time()
qacct_dict = None

Expand All @@ -140,29 +208,28 @@ def qacct(task, timeout=600):

for line in qacct_out.strip().split('\n'):
if line.startswith('='):
if not qacct_dict or is_garbage(qacct_dict):
if not qacct_dict or _is_corrupt(qacct_dict):
#
# Whether we haven't parsed any qacct data yet, or everything
# we've seen up to this point is unreliable garbage, when we see
# a stretch of ==='s, a new block of qacct data is beginning.
# we've seen up to this point appears corrupt, when we see a
# stretch of ==='s, a new block of qacct data is beginning.
#
qacct_dict = OrderedDict()
continue
else:
break
try:
k, v = re.split(r'\s+', line, maxsplit=1)
except ValueError:
raise EnvironmentError('%s with drm_jobID=%s has invalid qacct output: %s' %
raise EnvironmentError('%s with drm_jobID=%s has corrupt qacct output: %s' %
(task, task.drm_jobID, qacct_out))

qacct_dict[k] = v.strip()

if is_garbage(qacct_dict) or qacct_dict['failed'][0] != '0':
task.workflow.log.warn('`qacct -j %s` (for task %s) shows job failure:\n%s' %
(task.drm_jobID, task, qacct_out))
return qacct_dict


def qstat_all():
def _qstat_all():
"""
returns a dict keyed by lsf job ids, who's values are a dict of bjob
information about the job
Expand Down

0 comments on commit b172124

Please sign in to comment.