Skip to content

Commit

Permalink
Merge pull request #97 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.6.5.32
  • Loading branch information
PalNilsson authored Sep 5, 2023
2 parents fc11bb0 + 6fff31f commit 70dff3a
Show file tree
Hide file tree
Showing 31 changed files with 1,175 additions and 276 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
continue-on-error: true
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10']
python-version: ['3.8', '3.9', '3.10', '3.11']
env:
FLAKE8_VERSION: "==3.8.4"
FLAKE8_CONFIG: ".flake8"
Expand Down
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.4.7
3.6.5.32
19 changes: 19 additions & 0 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
https_setup,
send_update,
)
from pilot.util.processgroups import find_defunct_subprocesses
from pilot.util.loggingsupport import establish_logging
from pilot.util.timing import add_to_pilot_timing

Expand Down Expand Up @@ -666,6 +667,20 @@ def set_redirectall() -> None:
args.redirectall = redirectall


def list_zombies() -> None:
"""
Make sure there are no remaining defunct processes still lingering.
Note: can be used to find zombies, but zombies can't be killed..
"""

found = find_defunct_subprocesses(os.getpid())
if found:
logging.info(f'found these defunct processes: {found}')
else:
logging.info('no defunct processes were found')


if __name__ == '__main__':
# Main function of pilot module.

Expand Down Expand Up @@ -713,6 +728,10 @@ def set_redirectall() -> None:
# store final time stamp (cannot be placed later since the mainworkdir is about to be purged)
add_to_pilot_timing('0', PILOT_END_TIME, time.time(), args, store=False)

# make sure the pilot does not leave any lingering defunct child processes behind
if args.debug:
list_zombies()

# perform cleanup and terminate logging
exit_code = wrap_up()

Expand Down
2 changes: 1 addition & 1 deletion pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from pilot.util.math import convert_mb_to_b
from pilot.util.parameters import get_maximum_input_sizes
from pilot.util.workernode import get_local_disk_space
from pilot.util.timer import TimeoutException
from pilot.util.auxiliary import TimeoutException
from pilot.util.tracereport import TraceReport


Expand Down
6 changes: 5 additions & 1 deletion pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ class ErrorCodes:
APPTAINERNOTINSTALLED = 1372
CERTIFICATEHASEXPIRED = 1373
REMOTEFILEDICTDOESNOTEXIST = 1374
LEASETIME = 1375
LOGCREATIONTIMEOUT = 1376

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -294,7 +296,9 @@ class ErrorCodes:
VOMSPROXYABOUTTOEXPIRE: "VOMS proxy is about to expire",
BADOUTPUTFILENAME: "Output file name contains illegal characters",
CERTIFICATEHASEXPIRED: "Certificate has expired",
REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist"
REMOTEFILEDICTDOESNOTEXIST: "Remote file open dictionary does not exist",
LEASETIME: "Lease time is up", # internal use only
LOGCREATIONTIMEOUT: "Log file creation timed out"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
155 changes: 121 additions & 34 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Authors:
# - Mario Lassnig, [email protected], 2016-2017
# - Daniel Drizhuk, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2022
# - Paul Nilsson, [email protected], 2017-2023
# - Wen Guan, [email protected], 2018
# - Alexey Anisenkov, [email protected], 2018

Expand All @@ -18,21 +18,53 @@
import queue
from typing import Any

from pilot.api.data import StageInClient, StageOutClient
from pilot.api.data import (
StageInClient,
StageOutClient
)
from pilot.api.es_data import StageInESClient
from pilot.control.job import send_state
from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import ExcThread, PilotException, LogFileCreationFailure, NoSuchFile, FileHandlingFailure
#from pilot.util.config import config
from pilot.util.auxiliary import set_pilot_state, check_for_final_server_update #, abort_jobs_in_queues
from pilot.common.exception import (
ExcThread,
PilotException,
LogFileCreationFailure,
NoSuchFile,
FileHandlingFailure
)
from pilot.util.auxiliary import (
set_pilot_state,
check_for_final_server_update
)
from pilot.util.common import should_abort
from pilot.util.config import config
from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, LOG_TRANSFER_IN_PROGRESS,\
LOG_TRANSFER_DONE, LOG_TRANSFER_NOT_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_RUNNING, MAX_KILL_WAIT_TIME, UTILITY_BEFORE_STAGEIN
from pilot.util.constants import (
PILOT_PRE_STAGEIN,
PILOT_POST_STAGEIN,
PILOT_PRE_STAGEOUT,
PILOT_POST_STAGEOUT,
PILOT_PRE_LOG_TAR,
PILOT_POST_LOG_TAR,
LOG_TRANSFER_IN_PROGRESS,
LOG_TRANSFER_DONE,
LOG_TRANSFER_NOT_DONE,
LOG_TRANSFER_FAILED,
SERVER_UPDATE_RUNNING,
MAX_KILL_WAIT_TIME,
UTILITY_BEFORE_STAGEIN
)
from pilot.util.container import execute
from pilot.util.filehandling import remove, write_file, copy
from pilot.util.filehandling import (
remove,
write_file,
copy,
get_directory_size
)
from pilot.util.processes import threads_aborted
from pilot.util.queuehandling import declare_failed_by_kill, put_in_queue
from pilot.util.queuehandling import (
declare_failed_by_kill,
put_in_queue
)
from pilot.util.timing import add_to_pilot_timing
from pilot.util.tracereport import TraceReport
import pilot.util.middleware
Expand Down Expand Up @@ -592,6 +624,9 @@ def copytool_in(queues, traces, args): # noqa: C901
# logger.info(f"job {job.jobid} has finished")
# put_in_queue(job, queues.finished_jobs)

# this job is now to be monitored, so add it to the monitored_payloads queue
put_in_queue(job, queues.monitored_payloads)

logger.info('stage-in thread is no longer needed - terminating')
abort = True
break
Expand Down Expand Up @@ -788,18 +823,18 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
# perform special cleanup (user specific) prior to log file creation
if cleanup:
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) # Python 2/3
user.remove_redundant_files(workdir, piloterrors=piloterrors, debugmode=debugmode)

# remove any present input/output files before tarring up workdir
for fname in input_files + output_files:
path = os.path.join(workdir, fname)
if os.path.exists(path):
logger.info('removing file: %s', path)
logger.info(f'removing file: {path}')
remove(path)

if logfile_name is None or len(logfile_name.strip('/ ')) == 0:
logger.info('Skipping tarball creation, since the logfile_name is empty')
logger.info('skipping tarball creation, since the logfile_name is empty')
return

# rename the workdir for the tarball creation
Expand All @@ -808,21 +843,34 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
os.rename(workdir, newworkdir)
workdir = newworkdir

# get the size of the workdir
dirsize = get_directory_size(workdir)
timeout = get_tar_timeout(dirsize)

fullpath = os.path.join(current_dir, logfile_name) # /some/path/to/dirname/log.tgz
logger.info('will create archive %s', fullpath)
logger.info(f'will create archive {fullpath} using timeout={timeout} s for directory size={dirsize} MB')

try:
cmd = "pwd;tar cvfz %s %s --dereference --one-file-system; echo $?" % (fullpath, tarball_name)
_, stdout, _ = execute(cmd)
# add e.g. sleep 200; before tar command to test time-out
cmd = f"pwd;tar cvfz {fullpath} {tarball_name} --dereference --one-file-system; echo $?"
exit_code, stdout, stderr = execute(cmd, timeout=timeout)
except Exception as error:
raise LogFileCreationFailure(error)
else:
if pilot_home != current_dir:
os.chdir(pilot_home)
logger.debug('stdout = %s', stdout)
logger.debug(f'stdout: {stdout}')
try:
os.rename(workdir, orgworkdir)
except Exception as error:
logger.debug('exception caught when renaming workdir: %s', error)
except OSError as error:
logger.debug(f'exception caught when renaming workdir: {error} (ignore)')

if exit_code:
diagnostics = f'tarball creation failed with exit code: {exit_code}, stdout={stdout}, stderr={stderr}'
logger.warning(diagnostics)
if exit_code == errors.COMMANDTIMEDOUT:
exit_code = errors.LOGCREATIONTIMEOUT
raise PilotException(diagnostics, code=exit_code)

# final step, copy the log file into the workdir - otherwise containerized stage-out won't work
try:
Expand All @@ -831,6 +879,22 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
logger.warning(f'caught exception when copying tarball: {exc}')


def get_tar_timeout(dirsize: float) -> int:
"""
Get a proper time-out limit based on the directory size.
It should also handle the case dirsize=None and return the max timeout.
:param dirsize: directory size (float).
:return: time-out in seconds (int).
"""

timeout_max = 3 * 3600 # 3 hours
timeout_min = 30
timeout = timeout_min + int(60.0 + dirsize / 5.0) if dirsize else timeout_max

return min(timeout, timeout_max)


def _do_stageout(job, xdata, activity, queue, title, output_dir='', rucio_host='', ipv='IPv6'):
"""
Use the `StageOutClient` in the Data API to perform stage-out.
Expand Down Expand Up @@ -965,18 +1029,34 @@ def _stage_out_new(job: Any, args: Any) -> bool:
job.status['LOG_TRANSFER'] = LOG_TRANSFER_IN_PROGRESS
logfile = job.logdata[0]

# write time stamps to pilot timing file
current_time = time.time()
add_to_pilot_timing(job.jobid, PILOT_PRE_LOG_TAR, current_time, args)

try:
tarball_name = f'tarball_PandaJob_{job.jobid}_{job.infosys.pandaqueue}'
input_files = [fspec.lfn for fspec in job.indata]
output_files = [fspec.lfn for fspec in job.outdata]
create_log(job.workdir, logfile.lfn, tarball_name, args.cleanup,
input_files=input_files, output_files=output_files,
input_files=[fspec.lfn for fspec in job.indata],
output_files=[fspec.lfn for fspec in job.outdata],
piloterrors=job.piloterrorcodes, debugmode=job.debug)
except LogFileCreationFailure as error:
logger.warning(f'failed to create tar file: {error}')
set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOGFILECREATIONFAILURE)
return False
except PilotException as error:
logger.warning(f'failed to create tar file: {error}')
set_pilot_state(job=job, state="failed")
if 'timed out' in error.get_detail():
delta = int(time.time() - current_time)
msg = f'tar command for log file creation timed out after {delta} s: {error.get_detail()}'
else:
msg = error.get_detail()
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code(), msg=msg)
return False

# write time stamps to pilot timing file
add_to_pilot_timing(job.jobid, PILOT_POST_LOG_TAR, time.time(), args)

if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], args.queue, title='log', output_dir=args.output_dir,
rucio_host=args.rucio_host, ipv=args.internet_protocol_version):
Expand All @@ -993,16 +1073,7 @@ def _stage_out_new(job: Any, args: Any) -> bool:
add_to_pilot_timing(job.jobid, PILOT_POST_STAGEOUT, time.time(), args)

# generate fileinfo details to be sent to Panda
fileinfo = {}
checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum'
for iofile in job.outdata + job.logdata:
if iofile.status in ['transferred']:
fileinfo[iofile.lfn] = {'guid': iofile.guid,
'fsize': iofile.filesize,
f'{checksum_type}': iofile.checksum.get(config.File.checksum_type),
'surl': iofile.turl}

job.fileinfo = fileinfo
job.fileinfo = generate_fileinfo(job)

# WARNING THE FOLLOWING RESETS ANY PREVIOUS STAGEOUT ERRORS
if not is_success:
Expand All @@ -1018,12 +1089,28 @@ def _stage_out_new(job: Any, args: Any) -> bool:
logger.debug(f'changing job state from {job.state} to finished')
set_pilot_state(job=job, state="finished")

# send final server update since all transfers have finished correctly
# send_state(job, args, 'finished', xml=dumps(fileinfodict))

return is_success


def generate_fileinfo(job):
"""
Generate fileinfo details to be sent to Panda.
:param job: job object.
"""

fileinfo = {}
checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum'
for iofile in job.outdata + job.logdata:
if iofile.status in ['transferred']:
fileinfo[iofile.lfn] = {'guid': iofile.guid,
'fsize': iofile.filesize,
f'{checksum_type}': iofile.checksum.get(config.File.checksum_type),
'surl': iofile.turl}

return fileinfo


def queue_monitoring(queues, traces, args):
"""
Monitoring of Data queues.
Expand Down
Loading

0 comments on commit 70dff3a

Please sign in to comment.