Skip to content

Commit

Permalink
Merge pull request #93 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.6.4.7
  • Loading branch information
PalNilsson authored Jul 13, 2023
2 parents 6dac888 + 5b8c1e5 commit fc11bb0
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 40 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.3.8
3.6.4.7
16 changes: 8 additions & 8 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,13 +584,13 @@ def copytool_in(queues, traces, args): # noqa: C901
logger.warning(f'path does not exist: {path}')

# stage-out log file
job.stageout = "log"
if not _stage_out_new(job, args):
logger.info(f"job {job.jobid} failed during stage-out of log, adding job object to failed_data_outs queue")
put_in_queue(job, queues.failed_data_out)
else:
logger.info(f"job {job.jobid} has finished")
put_in_queue(job, queues.finished_jobs)
#job.stageout = "log"
#if not _stage_out_new(job, args):
# logger.info(f"job {job.jobid} failed during stage-out of log, adding job object to failed_data_outs queue")
# put_in_queue(job, queues.failed_data_out)
#else:
# logger.info(f"job {job.jobid} has finished")
# put_in_queue(job, queues.finished_jobs)

logger.info('stage-in thread is no longer needed - terminating')
abort = True
Expand Down Expand Up @@ -619,7 +619,7 @@ def copytool_in(queues, traces, args): # noqa: C901
logger.debug('an abort was received - finishing stage-in thread')

# proceed to set the job_aborted flag?
if threads_aborted(caller='copytool_in'):
if threads_aborted(caller='copytool_in') and args.workflow != 'stager': # only finish this thread in stager mode
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()

Expand Down
14 changes: 12 additions & 2 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2770,6 +2770,7 @@ def job_monitor(queues, traces, args): # noqa: C901
# overall loop counter (ignoring the fact that more than one job may be running)
n = 0
cont = True
first = True
while cont:

# abort in case graceful_stop has been set, and less than 30 s has passed since MAXTIME was reached (if set)
Expand Down Expand Up @@ -2817,16 +2818,25 @@ def job_monitor(queues, traces, args): # noqa: C901
elif not queues.finished_data_in.empty():
# stage-in has finished, or there were no input files to begin with, job object ends up in finished_data_in queue
if args.workflow == 'stager':
if first:
logger.debug('stage-in finished - waiting for lease time to finish')
first = False
if args.pod:
# wait maximum args.leasetime seconds, then abort
time.sleep(10)
time_now = int(time.time())
if time_now - start_time >= args.leasetime:
logger.warning(f'lease time is up: {time_now - start_time} s has passed since start - abort stager pilot')
jobs[i].stageout = 'log' # only stage-out log file
put_in_queue(jobs[i], queues.data_out)
#args.graceful_stop.set()
else:
continue
else:
continue
else:
logger.debug('stage-in has finished - no need for job_monitor to continue')

if args.workflow == 'stager':
logger.debug('stage-in has finished - no need for job_monitor to continue')
break

# peek at the jobs in the validated_jobs queue and send the running ones to the heartbeat function
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '6' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '8' # build number should be reset to '1' for every new development cycle
REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '7' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
58 changes: 31 additions & 27 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
get_subprocesses
from pilot.util.timing import get_time_since
from pilot.util.workernode import get_local_disk_space, check_hz
from pilot.info import infosys

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -652,36 +653,40 @@ def check_work_dir(job):
if os.path.exists(job.workdir):
# get the limit of the workdir
maxwdirsize = get_max_allowed_work_dir_size()
workdirsize = get_disk_usage(job.workdir)

# is user dir within allowed size limit?
if workdirsize > maxwdirsize:
exit_code = errors.USERDIRTOOLARGE
diagnostics = f'work directory ({job.workdir}) is too large: {workdirsize} B (must be < {maxwdirsize} B)'
logger.fatal(diagnostics)
if os.path.exists(job.workdir):
workdirsize = get_disk_usage(job.workdir)

cmd = 'ls -altrR %s' % job.workdir
_ec, stdout, stderr = execute(cmd, mute=True)
logger.info(f'{cmd}:\n{stdout}')
# is user dir within allowed size limit?
if workdirsize > maxwdirsize:
exit_code = errors.USERDIRTOOLARGE
diagnostics = f'work directory ({job.workdir}) is too large: {workdirsize} B (must be < {maxwdirsize} B)'
logger.fatal(diagnostics)

# kill the job
set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
kill_processes(job.pid)
cmd = 'ls -altrR %s' % job.workdir
_ec, stdout, stderr = execute(cmd, mute=True)
logger.info(f'{cmd}:\n{stdout}')

# remove any lingering input files from the work dir
lfns, guids = job.get_lfns_and_guids()
if lfns:
remove_files(lfns, workdir=job.workdir)
# kill the job
set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code)
kill_processes(job.pid)

# re-measure the size of the workdir at this point since the value is stored below
workdirsize = get_disk_usage(job.workdir)
else:
logger.info(f'size of work directory {job.workdir}: {workdirsize} B (within {maxwdirsize} B limit)')
# remove any lingering input files from the work dir
lfns, guids = job.get_lfns_and_guids()
if lfns:
remove_files(lfns, workdir=job.workdir)

# Store the measured disk space (the max value will later be sent with the job metrics)
if workdirsize > 0:
job.add_workdir_size(workdirsize)
# remeasure the size of the workdir at this point since the value is stored below
workdirsize = get_disk_usage(job.workdir)
else:
logger.info(f'size of work directory {job.workdir}: {workdirsize} B (within {maxwdirsize} B limit)')

# Store the measured disk space (the max value will later be sent with the job metrics)
if workdirsize > 0:
job.add_workdir_size(workdirsize)
else:
logger.warning(f'job work dir does not exist: {job.workdir}')
else:
logger.warning('skipping size check of workdir since it has not been created yet')

Expand Down Expand Up @@ -713,16 +718,15 @@ def get_max_allowed_work_dir_size():
return maxwdirsize


def get_max_input_size(queuedata, megabyte=False):
def get_max_input_size(megabyte=False):
"""
Return a proper maxinputsize value.
:param queuedata: job.infosys.queuedata object.
:param megabyte: return results in MB (Boolean).
:return: max input size (int).
"""

_maxinputsize = queuedata.maxwdir # normally 14336+2000 MB
_maxinputsize = infosys.queuedata.maxwdir # normally 14336+2000 MB
max_input_file_sizes = 14 * 1024 * 1024 * 1024 # 14 GB, 14336 MB (pilot default)
max_input_file_sizes_mb = 14 * 1024 # 14336 MB (pilot default)
if _maxinputsize != "":
Expand Down

0 comments on commit fc11bb0

Please sign in to comment.