From db18767058c47395205a8852c567e2011a1125c5 Mon Sep 17 00:00:00 2001 From: Erik Gafni Date: Mon, 11 Jul 2016 16:16:19 -0700 Subject: [PATCH 1/8] pretty print params on failure --- cosmos/models/Task.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cosmos/models/Task.py b/cosmos/models/Task.py index efe8e93e..fb22402b 100644 --- a/cosmos/models/Task.py +++ b/cosmos/models/Task.py @@ -15,6 +15,7 @@ from .. import TaskStatus, StageStatus, signal_task_status_change from ..util.helpers import wait_for_file import datetime +import pprint opj = os.path.join @@ -33,9 +34,10 @@ class GetOutputError(Exception): pass task_failed_printout = u"""Failure Info: - - + + +{0.params_pformat} + {0.command_script_text} @@ -326,8 +328,12 @@ def url(self): def params_pretty(self): return '%s' % ', '.join('%s=%s' % (k, "'%s'" % v if isinstance(v, basestring) else v) for k, v in self.params.items()) + @property + def params_pformat(self): + return pprint.pformat(self.params, indent=2) + def __repr__(self): - return '' % (self.id or 'id_%s' % id(self), + return "" % (self.id or 'id_%s' % id(self), self.stage.name if self.stage else '', self.uid ) From 0bcceef51c5e65ab6943e4124a6d65db86fe2d7a Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 12 Jul 2016 06:04:22 -0700 Subject: [PATCH 2/8] Detect bad qacct data when $TZ is not UTC. --- cosmos/job/drm/drm_ge.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index f75eeee9..2d9199d0 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -114,8 +114,13 @@ def is_garbage(qacct_dict): When multiple records are returned, the first one(s) may have garbage data. This function checks for three values whose presence means the entire block is wrong. + + Note that qacct may return a date that precedes the Epoch (!), depending on + the $TZ env. variable. To be safe we check for dates within 24 hours of it. """ - return qacct_dict.get('qsub_time', '').startswith('12/31/1969') or \ + return \ + qacct_dict.get('qsub_time', '').startswith('12/31/1969') or \ + qacct_dict.get('qsub_time', '').startswith('01/01/1970') or \ qacct_dict.get('start_time', None) == '-/-' or \ qacct_dict.get('end_time', None) == '-/-' From a34f6718cddc82fc75d59a44b9f5d816d2646ab2 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 12 Jul 2016 06:07:41 -0700 Subject: [PATCH 3/8] Mask certain qstat/qacct errors when not all jobs are affected. If qstat reports that a job is finished, but qacct output looks suspicious, we try to give that job, and/or SGE, time to complete and/or recover. We fall back to the previous behavior (tearing down the workflow) only if every outstanding task has been affected by this SGE bug. --- cosmos/job/drm/drm_ge.py | 106 +++++++++++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 22 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 2d9199d0..b110cf47 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -30,16 +30,52 @@ def submit_job(self, task): return drm_jobID def filter_is_done(self, tasks): + """ + Yield a dictionary of SGE job metadata for each task that has completed. + + This method tries to be defensive against incorrect qstat and qacct + output (see ASSAYD-153, PT-3987, etc.). If qstat reports that a job is + finished, but qacct output looks suspicious, we try to give that job, + and/or SGE, time to complete and/or recover. + + This method will only yield so-called 'garbage' qacct data if every + outstanding task has been affected by this SGE bug. + """ if len(tasks): - qjobs = qstat_all() + qjobs = _qstat_all() + garbage_data = {} + for task in tasks: jid = str(task.drm_jobID) - if jid not in qjobs: - # print 'missing %s %s' % (task, task.drm_jobID) - yield task, self._get_task_return_data(task) - else: - if any(finished_state in qjobs[jid]['state'] for finished_state in ['e', 'E']): - yield task, self._get_task_return_data(task) + if jid not in qjobs or \ + any(finished_state in qjobs[jid]['state'] for finished_state in ['e', 'E']): + # + # If the job doesn't appear in qstat (or is tagged with e or E), + # it *probably* has completed. However, SGE's qmaster may have + # simply lost track of it for a little while, in which case qacct + # will output unreliable garbage when it is interrogated. + # + data, data_are_garbage = self._get_task_return_data(task) + if data_are_garbage: + task.workflow.log.warn( + 'unreliable qstat/qacct output for %s means it may still be running' % task) + garbage_data[task] = data + else: + yield task, data + + num_cleanly_running_jobs = len(tasks) - len(garbage_data) + + if num_cleanly_running_jobs > 0: + for task in garbage_data.keys(): + task.workflow.log.info( + 'masking error condition for %s while %d other jobs are running cleanly' % + (task, num_cleanly_running_jobs)) + else: + for task, data in garbage_data.items(): + task.workflow.log.error( + 'all outstanding ge tasks show some kind of error: giving up on %s' % task) + yield task, data + def drm_statuses(self, tasks): """ @@ -47,7 +83,7 @@ def drm_statuses(self, tasks): :returns: (dict) task.drm_jobID -> drm_status """ if len(tasks): - qjobs = qstat_all() + qjobs = _qstat_all() def f(task): return qjobs.get(str(task.drm_jobID), dict()).get('state', '???') @@ -57,10 +93,29 @@ def f(task): return {} def _get_task_return_data(self, task): - d = qacct(task) - failed = d['failed'][0] != '0' - return dict( - exit_status=int(d['exit_status']) if not failed else int(re.search('^(\d+)', d['failed']).group(1)), + """ + Convert raw qacct job data into Cosmos's more portable format. + + Returns a 2-tuple comprising: + [0] a dictionary of job metadata, + [1] a boolean indicating whether the metadata in [0] are affected by an + SGE bug that causes qacct to occasionally return garbage results. + """ + d = _qacct_raw(task) + + job_failed = d['failed'][0] != '0' + data_are_garbage = False + + if _is_garbage(d): + task.workflow.log.warn('`qacct -j %s` (for task %s) is invalid:\n%s' % + (task.drm_jobID, task, d)) + data_are_garbage = True + elif job_failed: + task.workflow.log.warn('`qacct -j %s` (for task %s) shows job failure:\n%s' % + (task.drm_jobID, task, d)) + + processed_data = dict( + exit_status=int(d['exit_status']) if not job_failed else int(re.search('^(\d+)', d['failed']).group(1)), percent_cpu=div(float(d['cpu']), float(d['ru_wallclock'])), wall_time=float(d['ru_wallclock']), @@ -92,6 +147,8 @@ def _get_task_return_data(self, task): memory=float(d['mem']), ) + return processed_data, data_are_garbage + def kill(self, task): "Terminates a task" raise NotImplementedError @@ -103,17 +160,18 @@ def kill_tasks(self, tasks): sp.Popen(['qdel', pids], preexec_fn=preexec_function) -def is_garbage(qacct_dict): +def _is_garbage(qacct_dict): """ - qacct may return multiple records for a job. Yuk. + qacct may return multiple records for a job. They may all be corrupt. Yuk. This was allegedly fixed in 6.0u10 but we've seen it in UGE 8.3.1. http://osdir.com/ml/clustering.gridengine.users/2007-11/msg00397.html When multiple records are returned, the first one(s) may have garbage data. - This function checks for three values whose presence means the entire block - is wrong. + UPDATE: this can happen even when only one block is returned, and we've + also seen cases where multiple blocks are return and not one is reliable. + This function checks for values whose presence means an entire block is wrong. Note that qacct may return a date that precedes the Epoch (!), depending on the $TZ env. variable. To be safe we check for dates within 24 hours of it. @@ -125,7 +183,14 @@ def is_garbage(qacct_dict): qacct_dict.get('end_time', None) == '-/-' -def qacct(task, timeout=600): +def _qacct_raw(task, timeout=600): + """ + Parse qacct output into key/value pairs. + + If qacct reports results in multiple blocks (separated by a row of ===='s), + the first block with valid data is returned. If no such block exists, then + the last block of garbage data is returned. + """ start = time.time() qacct_dict = None @@ -145,7 +210,7 @@ def qacct(task, timeout=600): for line in qacct_out.strip().split('\n'): if line.startswith('='): - if not qacct_dict or is_garbage(qacct_dict): + if not qacct_dict or _is_garbage(qacct_dict): # Whether we haven't parsed any qacct data yet, or everything # we've seen up to this point is unreliable garbage, when we see # a stretch of ==='s, a new block of qacct data is beginning. @@ -161,13 +226,10 @@ def qacct(task, timeout=600): qacct_dict[k] = v.strip() - if is_garbage(qacct_dict) or qacct_dict['failed'][0] != '0': - task.workflow.log.warn('`qacct -j %s` (for task %s) shows job failure:\n%s' % - (task.drm_jobID, task, qacct_out)) return qacct_dict -def qstat_all(): +def _qstat_all(): """ returns a dict keyed by lsf job ids, who's values are a dict of bjob information about the job From 360d9b39d237fe589faedb54761bdaadc3a84a29 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 12 Jul 2016 06:30:03 -0700 Subject: [PATCH 4/8] Update comment with jira links. --- cosmos/job/drm/drm_ge.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index b110cf47..ab9299fd 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -34,9 +34,9 @@ def filter_is_done(self, tasks): Yield a dictionary of SGE job metadata for each task that has completed. This method tries to be defensive against incorrect qstat and qacct - output (see ASSAYD-153, PT-3987, etc.). If qstat reports that a job is - finished, but qacct output looks suspicious, we try to give that job, - and/or SGE, time to complete and/or recover. + output (see ASSAYD-153, PIPE-1978, PT-3987, etc.). If qstat reports that + a job is finished, but qacct output looks suspicious, we try to give the + job, and/or SGE, time to complete and/or recover. This method will only yield so-called 'garbage' qacct data if every outstanding task has been affected by this SGE bug. From 732bc2b3dc2fddf4437fc668316e22f29201033f Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 12 Jul 2016 10:55:49 -0700 Subject: [PATCH 5/8] Use consistent terminology for corrupt SGE data. While investigating this issue I used terms like 'error condition', 'garbage', 'incorrect', 'invalid', 'some kind of error', 'suspicious', and 'unreliable'. I want to save those terms for other SGE bugs, so I now consistently document the issue I'm trying to solve here with the term 'corrupt [data]'. --- cosmos/job/drm/drm_ge.py | 70 +++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index ab9299fd..09a08b2b 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -33,47 +33,47 @@ def filter_is_done(self, tasks): """ Yield a dictionary of SGE job metadata for each task that has completed. - This method tries to be defensive against incorrect qstat and qacct - output (see ASSAYD-153, PIPE-1978, PT-3987, etc.). If qstat reports that - a job is finished, but qacct output looks suspicious, we try to give the - job, and/or SGE, time to complete and/or recover. + This method tries to be defensive against corrupt qstat and qacct output + (see ASSAYD-153, PIPE-1978, PT-3987, etc.). If qstat reports that a job + has finished, but qacct output looks suspicious, we try to give the job, + and/or SGE, time to complete and/or recover. - This method will only yield so-called 'garbage' qacct data if every - outstanding task has been affected by this SGE bug. + This method will only yield corrupt qacct data if every outstanding task + has been affected by this SGE bug. """ if len(tasks): qjobs = _qstat_all() - garbage_data = {} + corrupt_data = {} for task in tasks: jid = str(task.drm_jobID) if jid not in qjobs or \ any(finished_state in qjobs[jid]['state'] for finished_state in ['e', 'E']): # - # If the job doesn't appear in qstat (or is tagged with e or E), + # If the job doesn't appear in qstat (or is tagged with 'e' or 'E'), # it *probably* has completed. However, SGE's qmaster may have # simply lost track of it for a little while, in which case qacct - # will output unreliable garbage when it is interrogated. + # will output corrupt data when it is interrogated. # - data, data_are_garbage = self._get_task_return_data(task) - if data_are_garbage: + data, data_are_corrupt = self._get_task_return_data(task) + if data_are_corrupt: task.workflow.log.warn( - 'unreliable qstat/qacct output for %s means it may still be running' % task) - garbage_data[task] = data + 'corrupt qstat/qacct output for %s means it may still be running' % task) + corrupt_data[task] = data else: yield task, data - num_cleanly_running_jobs = len(tasks) - len(garbage_data) + num_cleanly_running_jobs = len(tasks) - len(corrupt_data) if num_cleanly_running_jobs > 0: - for task in garbage_data.keys(): + for task in corrupt_data.keys(): task.workflow.log.info( - 'masking error condition for %s while %d other jobs are running cleanly' % + 'temporarily masking corrupt SGE data for %s since %d other jobs are running cleanly' % (task, num_cleanly_running_jobs)) else: - for task, data in garbage_data.items(): + for task, data in corrupt_data.items(): task.workflow.log.error( - 'all outstanding ge tasks show some kind of error: giving up on %s' % task) + 'all outstanding drm_ge tasks had corrupt SGE data: giving up on %s' % task) yield task, data @@ -99,17 +99,17 @@ def _get_task_return_data(self, task): Returns a 2-tuple comprising: [0] a dictionary of job metadata, [1] a boolean indicating whether the metadata in [0] are affected by an - SGE bug that causes qacct to occasionally return garbage results. + SGE bug that causes qacct to occasionally return corrupt results. """ d = _qacct_raw(task) job_failed = d['failed'][0] != '0' - data_are_garbage = False + data_are_corrupt = False - if _is_garbage(d): - task.workflow.log.warn('`qacct -j %s` (for task %s) is invalid:\n%s' % + if _is_corrupt(d): + task.workflow.log.warn('`qacct -j %s` (for task %s) returned corrupt data:\n%s' % (task.drm_jobID, task, d)) - data_are_garbage = True + data_are_corrupt = True elif job_failed: task.workflow.log.warn('`qacct -j %s` (for task %s) shows job failure:\n%s' % (task.drm_jobID, task, d)) @@ -147,7 +147,7 @@ def _get_task_return_data(self, task): memory=float(d['mem']), ) - return processed_data, data_are_garbage + return processed_data, data_are_corrupt def kill(self, task): "Terminates a task" @@ -160,7 +160,7 @@ def kill_tasks(self, tasks): sp.Popen(['qdel', pids], preexec_fn=preexec_function) -def _is_garbage(qacct_dict): +def _is_corrupt(qacct_dict): """ qacct may return multiple records for a job. They may all be corrupt. Yuk. @@ -168,10 +168,10 @@ def _is_garbage(qacct_dict): http://osdir.com/ml/clustering.gridengine.users/2007-11/msg00397.html - When multiple records are returned, the first one(s) may have garbage data. - UPDATE: this can happen even when only one block is returned, and we've - also seen cases where multiple blocks are return and not one is reliable. - This function checks for values whose presence means an entire block is wrong. + When multiple records are returned, the first one(s) may have corrupt data. + UPDATE: this can happen even when only one block is returned, and we've also + seen cases where multiple blocks are returned and not one is reliable. This + function checks for values whose presence means an entire block is corrupt. Note that qacct may return a date that precedes the Epoch (!), depending on the $TZ env. variable. To be safe we check for dates within 24 hours of it. @@ -189,7 +189,7 @@ def _qacct_raw(task, timeout=600): If qacct reports results in multiple blocks (separated by a row of ===='s), the first block with valid data is returned. If no such block exists, then - the last block of garbage data is returned. + the most recently-generated block of corrupt data is returned. """ start = time.time() qacct_dict = None @@ -210,10 +210,12 @@ def _qacct_raw(task, timeout=600): for line in qacct_out.strip().split('\n'): if line.startswith('='): - if not qacct_dict or _is_garbage(qacct_dict): + if not qacct_dict or _is_corrupt(qacct_dict): + # # Whether we haven't parsed any qacct data yet, or everything - # we've seen up to this point is unreliable garbage, when we see - # a stretch of ==='s, a new block of qacct data is beginning. + # we've seen up to this point appears corrupt, when we see a + # stretch of ==='s, a new block of qacct data is beginning. + # qacct_dict = OrderedDict() continue else: @@ -221,7 +223,7 @@ def _qacct_raw(task, timeout=600): try: k, v = re.split(r'\s+', line, maxsplit=1) except ValueError: - raise EnvironmentError('%s with drm_jobID=%s has invalid qacct output: %s' % + raise EnvironmentError('%s with drm_jobID=%s has corrupt qacct output: %s' % (task, task.drm_jobID, qacct_out)) qacct_dict[k] = v.strip() From b93bf24e3eb6582b3420daa96550e546011dae01 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 12 Jul 2016 11:02:58 -0700 Subject: [PATCH 6/8] Simplify log message. --- cosmos/job/drm/drm_ge.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 09a08b2b..1d3cb672 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -104,15 +104,13 @@ def _get_task_return_data(self, task): d = _qacct_raw(task) job_failed = d['failed'][0] != '0' - data_are_corrupt = False - - if _is_corrupt(d): - task.workflow.log.warn('`qacct -j %s` (for task %s) returned corrupt data:\n%s' % - (task.drm_jobID, task, d)) - data_are_corrupt = True - elif job_failed: - task.workflow.log.warn('`qacct -j %s` (for task %s) shows job failure:\n%s' % - (task.drm_jobID, task, d)) + data_are_corrupt = _is_corrupt(d) + + if job_failed or data_are_corrupt: + task.workflow.log.warn('`qacct -j %s` (for task %s) shows %s:\n%s' % + (task.drm_jobID, + 'corrupt data' if data_are_corrupt else 'job failure', + task, d)) processed_data = dict( exit_status=int(d['exit_status']) if not job_failed else int(re.search('^(\d+)', d['failed']).group(1)), From c1c537b2fadf89eec3f45f08ca6a91789cb3afc2 Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 12 Jul 2016 11:15:04 -0700 Subject: [PATCH 7/8] Bump version to 2.0.10a3. --- cosmos/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/VERSION b/cosmos/VERSION index 09843e3b..76e1c3ab 100644 --- a/cosmos/VERSION +++ b/cosmos/VERSION @@ -1 +1 @@ -2.0.9 +2.0.10a3 From 32b7af930b5e735d75a821ce90663d6f1578c27f Mon Sep 17 00:00:00 2001 From: Matthew Pearson Date: Tue, 12 Jul 2016 12:05:57 -0700 Subject: [PATCH 8/8] Bump version to 2.0.10. --- cosmos/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/VERSION b/cosmos/VERSION index 76e1c3ab..0a692060 100644 --- a/cosmos/VERSION +++ b/cosmos/VERSION @@ -1 +1 @@ -2.0.10a3 +2.0.10