Skip to content

Commit

Permalink
Merge pull request #37 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.3.1.9
  • Loading branch information
PalNilsson authored May 5, 2022
2 parents c3f18bf + 03633e9 commit 4aba8b7
Show file tree
Hide file tree
Showing 19 changed files with 290 additions and 215 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.3.0.39
3.3.1.9
56 changes: 21 additions & 35 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Authors:
# - Mario Lassnig, [email protected], 2016-2017
# - Daniel Drizhuk, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2021
# - Paul Nilsson, [email protected], 2017-2022
# - Wen Guan, [email protected], 2018

from __future__ import print_function # Python 2
Expand Down Expand Up @@ -300,8 +300,17 @@ def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False)
:return: boolean (True if successful, False otherwise).
"""

state = get_proper_state(job, state)
# insert out of batch time error code if MAXTIME has been reached
logger.debug(f"REACHED_MAXTIME={os.environ.get('REACHED_MAXTIME', None)}")
if os.environ.get('REACHED_MAXTIME', None):
msg = 'the max batch system time limit has been reached'
logger.warning(msg)
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.REACHEDMAXTIME, msg=msg)
state = 'failed'
job.state = state

state = get_proper_state(job, state)
logger.debug(f'state={state}')
# should the pilot make any server updates?
if not args.update_server:
logger.info('pilot will not update the server (heartbeat message will be written to file)')
Expand All @@ -311,7 +320,7 @@ def send_state(job, args, state, xml=None, metadata=None, test_tobekilled=False)

# build the data structure needed for updateJob
data = get_data_structure(job, state, args, xml=xml, metadata=metadata)

logger.debug(f'data={data}')
# write the heartbeat message to file if the server is not to be updated by the pilot (Nordugrid mode)
if not args.update_server:
# if in harvester mode write to files required by harvester
Expand Down Expand Up @@ -578,6 +587,7 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
:return: data structure (dictionary).
"""

logger.debug(f'state={state}')
data = {'jobId': job.jobid,
'state': state,
'timestamp': time_stamp(),
Expand Down Expand Up @@ -642,7 +652,7 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
add_memory_info(data, job.workdir, name=job.memorymonitor)
if state == 'finished' or state == 'failed':
add_timing_and_extracts(data, job, state, args)
add_error_codes(data, job)
https.add_error_codes(data, job)

return data

Expand Down Expand Up @@ -805,37 +815,6 @@ def get_requested_log_tail(debug_command, workdir):
return _tail


def add_error_codes(data, job):
"""
Add error codes to data structure.
:param data: data dictionary.
:param job: job object.
:return:
"""

# error codes
pilot_error_code = job.piloterrorcode
pilot_error_codes = job.piloterrorcodes
if pilot_error_codes != []:
logger.warning(f'pilotErrorCodes = {pilot_error_codes} (will report primary/first error code)')
data['pilotErrorCode'] = pilot_error_codes[0]
else:
data['pilotErrorCode'] = pilot_error_code

# add error info
pilot_error_diag = job.piloterrordiag
pilot_error_diags = job.piloterrordiags
if pilot_error_diags != []:
logger.warning(f'pilotErrorDiags = {pilot_error_diags} (will report primary/first error diag)')
data['pilotErrorDiag'] = pilot_error_diags[0]
else:
data['pilotErrorDiag'] = pilot_error_diag
data['transExitCode'] = job.transexitcode
data['exeErrorCode'] = job.exeerrorcode
data['exeErrorDiag'] = job.exeerrordiag


def get_cpu_consumption_time(cpuconsumptiontime):
"""
Get the CPU consumption time.
Expand Down Expand Up @@ -1277,6 +1256,8 @@ def get_job_label(args):
elif status == 'test' and args.job_label != 'ptest':
logger.warning('PQ status set to test - will use job label / prodSourceLabel test')
job_label = 'test'
elif infosys.queuedata.type == 'unified':
job_label = 'unified'
else:
job_label = args.job_label

Expand Down Expand Up @@ -2587,6 +2568,11 @@ def job_monitor(queues, traces, args): # noqa: C901
peeking_time = int(time.time())
for i in range(len(jobs)):
current_id = jobs[i].jobid

if os.environ.get('REACHED_MAXTIME', None):
# the batch system max time has been reached, time to abort (in the next step)
jobs[i].state = 'failed'

logger.info('monitor loop #%d: job %d:%s is in state \'%s\'', n, i, current_id, jobs[i].state)
if jobs[i].state == 'finished' or jobs[i].state == 'failed':
logger.info('will abort job monitoring soon since job state=%s (job is still in queue)', jobs[i].state)
Expand Down
21 changes: 11 additions & 10 deletions pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
# Authors:
# - Daniel Drizhuk, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2021
# - Paul Nilsson, [email protected], 2017-2022

# NOTE: this module should deal with non-job related monitoring, such as thread monitoring. Job monitoring is
# a task for the job_monitor thread in the Job component.
Expand Down Expand Up @@ -77,6 +77,7 @@ def control(queues, traces, args):
logger.fatal(f'max running time ({max_running_time}s) minus grace time ({grace_time}s) has been exceeded - must abort pilot')
logger.info('setting REACHED_MAXTIME and graceful stop')
environ['REACHED_MAXTIME'] = 'REACHED_MAXTIME' # TODO: use singleton instead
logger.debug(f"REACHED_MAXTIME={environ.get('REACHED_MAXTIME', None)}")
# do not set graceful stop if pilot has not finished sending the final job update
# i.e. wait until SERVER_UPDATE is FINAL_DONE
check_for_final_server_update(args.update_server)
Expand Down Expand Up @@ -197,7 +198,7 @@ def run_checks(queues, args):

t_max = 2 * 60
logger.warning('pilot monitor received instruction that abort_job has been requested')
logger.warning('will wait for a maximum of %d seconds for threads to finish', t_max)
logger.warning(f'will wait for a maximum of {t_max} s for threads to finish')
t_0 = time.time()
while time.time() - t_0 < t_max:
if args.job_aborted.is_set():
Expand All @@ -211,7 +212,7 @@ def run_checks(queues, args):
args.graceful_stop.set()

if not args.job_aborted.is_set():
logger.warning('will wait for a maximum of %d seconds for graceful_stop to take effect', t_max)
logger.warning(f'will wait for a maximum of {t_max} s for graceful_stop to take effect')
t_max = 10
t_0 = time.time()
while time.time() - t_0 < t_max:
Expand Down Expand Up @@ -241,21 +242,21 @@ def get_max_running_time(lifetime, queuedata):

# use the schedconfig value if set, otherwise use the pilot option lifetime value
if not queuedata:
logger.warning('queuedata could not be extracted from queues, will use default for max running time '
'(%d s)', max_running_time)
logger.warning(f'queuedata could not be extracted from queues, will use default for max running time '
f'({max_running_time} s)')
else:
if queuedata.maxtime:
try:
max_running_time = int(queuedata.maxtime)
except Exception as error:
logger.warning('exception caught: %s', error)
logger.warning('failed to convert maxtime from queuedata, will use default value for max running time '
'(%d s)', max_running_time)
logger.warning(f'exception caught: {error}')
logger.warning(f'failed to convert maxtime from queuedata, will use default value for max running time '
f'({max_running_time} s)')
else:
if max_running_time == 0:
max_running_time = lifetime # fallback to default value
logger.info('will use default value for max running time: %d s', max_running_time)
logger.info(f'will use default value for max running time: {max_running_time} s')
else:
logger.info('will use queuedata.maxtime value for max running time: %d s', max_running_time)
logger.info(f'will use queuedata.maxtime value for max running time: {max_running_time} s')

return max_running_time
61 changes: 47 additions & 14 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pilot.control.payloads import generic, eventservice, eventservicemerge
from pilot.control.job import send_state
from pilot.util.auxiliary import set_pilot_state
from pilot.util.container import execute
from pilot.util.processes import get_cpu_consumption_time
from pilot.util.config import config
from pilot.util.filehandling import read_file, remove_core_dumps, get_guid, extract_lines_from_file, find_file
Expand Down Expand Up @@ -62,7 +63,7 @@ def control(queues, traces, args):
pass
else:
exc_type, exc_obj, exc_trace = exc
logger.warning("thread \'%s\' received an exception from bucket: %s", thread.name, exc_obj)
logger.warning(f"thread \'{thread.name}\' received an exception from bucket: {exc_obj}")

# deal with the exception
# ..
Expand Down Expand Up @@ -204,13 +205,13 @@ def execute_payloads(queues, traces, args): # noqa: C901
# this job is now to be monitored, so add it to the monitored_payloads queue
put_in_queue(job, queues.monitored_payloads)

logger.info('job %s added to monitored payloads queue', job.jobid)
logger.debug(f'job {job.jobid} added to monitored payloads queue')

try:
out = open(os.path.join(job.workdir, config.Payload.payloadstdout), 'wb')
err = open(os.path.join(job.workdir, config.Payload.payloadstderr), 'wb')
except Exception as error:
logger.warning('failed to open payload stdout/err: %s', error)
logger.warning(f'failed to open payload stdout/err: {error}')
out = None
err = None
send_state(job, args, 'starting')
Expand All @@ -222,7 +223,7 @@ def execute_payloads(queues, traces, args): # noqa: C901
continue

payload_executor = get_payload_executor(args, job, out, err, traces)
logger.info("will use payload executor: %s", payload_executor)
logger.info(f"will use payload executor: {payload_executor}")

# run the payload and measure the execution time
job.t0 = os.times()
Expand All @@ -238,17 +239,16 @@ def execute_payloads(queues, traces, args): # noqa: C901

# some HPO jobs will produce new output files (following lfn name pattern), discover those and replace the job.outdata list
if job.is_hpo:
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user],
0) # Python 2/3
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
try:
user.update_output_for_hpo(job)
except Exception as error:
logger.warning('exception caught by update_output_for_hpo(): %s', error)
logger.warning(f'exception caught by update_output_for_hpo(): {error}')
else:
for dat in job.outdata:
if not dat.guid:
dat.guid = get_guid()
logger.warning('guid not set: generated guid=%s for lfn=%s', dat.guid, dat.lfn)
logger.warning(f'guid not set: generated guid={dat.guid} for lfn={dat.lfn}')

#if traces.pilot['nr_jobs'] == 1:
# logger.debug('faking job failure in first multi-job')
Expand All @@ -262,11 +262,11 @@ def execute_payloads(queues, traces, args): # noqa: C901
#if job.piloterrorcodes:
# exit_code_interpret = 1
#else:
user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
user = __import__('pilot.user.%s.diagnose' % pilot_user, globals(), locals(), [pilot_user], 0)
try:
exit_code_interpret = user.interpret(job)
except Exception as error:
logger.warning('exception caught: %s', error)
logger.warning(f'exception caught: {error}')
#exit_code_interpret = -1
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.INTERNALPILOTPROBLEM)

Expand All @@ -289,7 +289,7 @@ def execute_payloads(queues, traces, args): # noqa: C901
except queue.Empty:
continue
except Exception as error:
logger.fatal('execute payloads caught an exception (cannot recover): %s, %s', error, traceback.format_exc())
logger.fatal(f'execute payloads caught an exception (cannot recover): {error}, {traceback.format_exc()}')
if job:
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PAYLOADEXECUTIONEXCEPTION)
#queues.failed_payloads.put(job)
Expand Down Expand Up @@ -528,7 +528,7 @@ def set_cpu_consumption_time(job):
job.cpuconsumptiontime = int(round(cpuconsumptiontime))
job.cpuconsumptionunit = "s"
job.cpuconversionfactor = 1.0
logger.info('CPU consumption time: %f %s (rounded to %d %s)', cpuconsumptiontime, job.cpuconsumptionunit, job.cpuconsumptiontime, job.cpuconsumptionunit)
logger.info(f'CPU consumption time: {cpuconsumptiontime} {job.cpuconsumptionunit} (rounded to {job.cpuconsumptiontime} {job.cpuconsumptionunit})')


def perform_initial_payload_error_analysis(job, exit_code):
Expand All @@ -542,7 +542,7 @@ def perform_initial_payload_error_analysis(job, exit_code):
"""

if exit_code != 0:
logger.warning('main payload execution returned non-zero exit code: %d', exit_code)
logger.warning(f'main payload execution returned non-zero exit code: {exit_code}')

# look for singularity errors (the exit code can be zero in this case)
path = os.path.join(job.workdir, config.Payload.payloadstderr)
Expand All @@ -553,7 +553,13 @@ def perform_initial_payload_error_analysis(job, exit_code):
stderr = ''
logger.info(f'file does not exist: {path}')

if exit_code != 0:
# check for memory errors first
if exit_code != 0 and job.subprocesses:
# scan for memory errors in dmesg messages
msg = scan_for_memory_errors(job.subprocesses)
if msg:
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PAYLOADOUTOFMEMORY, msg=msg)
elif exit_code != 0:
msg = ""

# are there any critical errors in the stdout?
Expand Down Expand Up @@ -611,6 +617,33 @@ def perform_initial_payload_error_analysis(job, exit_code):
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.COREDUMP)


def scan_for_memory_errors(subprocesses):
"""
Scan for memory errors in dmesg messages.
:param subprocesses: list of payload subprocesses.
:return: error diagnostics (string).
"""

diagnostics = ""
search_str = 'Memory cgroup out of memory'
for pid in subprocesses:
logger.info(f'scanning dmesg message for subprocess={pid} for memory errors')
cmd = f'dmesg|grep {pid}'
_, out, _ = execute(cmd)
if search_str in out:
for line in out.split('\n'):
if search_str in line:
diagnostics = line[line.find(search_str):]
logger.warning(f'found memory error: {diagnostics}')
break

if diagnostics:
break

return diagnostics


def set_error_code_from_stderr(msg, fatal):
"""
Identify specific errors in stderr and set the corresponding error code.
Expand Down
10 changes: 5 additions & 5 deletions pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ def verify_arcproxy(envsetup, limit, proxy_id="pilot", test=False):
seconds_left = validity_end - tnow
logger.info("cache: check %s proxy validity: wanted=%dh left=%.2fh (now=%d validity_end=%d left=%d)",
proxy_id, limit, float(seconds_left) / 3600, tnow, validity_end, seconds_left)
if seconds_left < limit * 3600: # REMOVE THIS, FAVOUR THE NEXT
diagnostics = f"{proxy_id} proxy validity time is too short: %.2fh" % (float(seconds_left) / 3600)
logger.warning(diagnostics)
exit_code = errors.NOVOMSPROXY
elif seconds_left < limit * 3600 - 20 * 60: # FAVOUR THIS, IE NEVER SET THE PREVIOUS
if seconds_left < limit * 3600 - 20 * 60: # FAVOUR THIS, IE NEVER SET THE NEXT
diagnostics = f'{proxy_id} proxy is about to expire: %.2fh' % (float(seconds_left) / 3600)
logger.warning(diagnostics)
exit_code = errors.VOMSPROXYABOUTTOEXPIRE
#elif seconds_left < limit * 3600: # REMOVE THIS, FAVOUR THE NEXT
# diagnostics = f"{proxy_id} proxy validity time is too short: %.2fh" % (float(seconds_left) / 3600)
# logger.warning(diagnostics)
# exit_code = errors.NOVOMSPROXY
else:
logger.info("%s proxy validity time is verified", proxy_id)
return exit_code, diagnostics
Expand Down
3 changes: 2 additions & 1 deletion pilot/util/auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2017-2021
# - Paul Nilsson, [email protected], 2017-2022

import os
import re
Expand Down Expand Up @@ -83,6 +83,7 @@ def display_architecture_info():
Display OS/architecture information.
The function attempts to use the lsb_release -a command if available. If that is not available,
it will dump the contents of
WARNING: lsb_release will not be available on CentOS Stream 9
:return:
"""
Expand Down
Loading

0 comments on commit 4aba8b7

Please sign in to comment.