Skip to content

Commit

Permalink
Merge pull request #22 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.2.2.22
  • Loading branch information
PalNilsson authored Mar 17, 2022
2 parents 2f43636 + 09a4817 commit b46eb4d
Show file tree
Hide file tree
Showing 17 changed files with 686 additions and 66 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.1.1
3.2.2.22
18 changes: 10 additions & 8 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
SERVER_UPDATE_NOT_DONE, PILOT_MULTIJOB_START_TIME
from pilot.util.filehandling import get_pilot_work_dir, mkdirs, establish_logging
from pilot.util.harvester import is_harvester_mode
from pilot.util.https import https_setup, send_update
from pilot.util.https import get_panda_server, https_setup, send_update
from pilot.util.timing import add_to_pilot_timing

errors = ErrorCodes()
Expand Down Expand Up @@ -153,7 +153,7 @@ def get_args():
choices=['generic', 'generic_hpc',
'production', 'production_hpc',
'analysis', 'analysis_hpc',
'eventservice_hpc', 'stagein', 'payload_stageout'],
'eventservice_hpc', 'stager', 'payload_stageout'],
help='Pilot workflow (default: generic)')

# graciously stop pilot process after hard limit
Expand Down Expand Up @@ -342,6 +342,10 @@ def get_args():
dest='input_dir',
default='',
help='Input directory')
arg_parser.add_argument('--input-destination-dir',
dest='input_destination_dir',
default='',
help='Input destination directory')
arg_parser.add_argument('--output-dir',
dest='output_dir',
default='',
Expand Down Expand Up @@ -452,9 +456,7 @@ def set_environment_variables():
environ['PILOT_OUTPUT_DIR'] = args.output_dir

# keep track of the server urls
_port = ":%s" % args.port
url = args.url if _port in args.url else args.url + _port
environ['PANDA_SERVER_URL'] = url
environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port)
environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url


Expand Down Expand Up @@ -582,12 +584,12 @@ def send_worker_status(status, queue, url, port, logger):
if exit_code != 0:
sys.exit(exit_code)

# set environment variables (to be replaced with singleton implementation)
set_environment_variables()

# setup and establish standard logging
establish_logging(debug=args.debug, nopilotlog=args.nopilotlog)

# set environment variables (to be replaced with singleton implementation)
set_environment_variables()

# execute main function
trace = main()

Expand Down
3 changes: 2 additions & 1 deletion pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,8 @@ def transfer_files(self, copytool, files, activity, **kwargs):
self.logger.error(msg)
raise PilotException(msg, code=ErrorCodes.NOSTORAGE, state='NO_OUTPUTSTORAGE_DEFINED')

pfn = fspec.surl or getattr(fspec, 'pfn', None) or os.path.join(kwargs.get('workdir', ''), fspec.lfn)
pfn = fspec.surl or getattr(fspec, 'pfn', None) or os.path.join(kwargs.get('workdir', ''), fspec.lfn) or \
os.path.join(os.path.join(kwargs.get('workdir', ''), '..'), fspec.lfn)
if not os.path.exists(pfn) or not os.access(pfn, os.R_OK):
msg = "output pfn file/directory does not exist: %s" % pfn
self.logger.error(msg)
Expand Down
4 changes: 3 additions & 1 deletion pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class ErrorCodes:
CHECKSUMCALCFAILURE = 1366
COMMANDTIMEDOUT = 1367
REMOTEFILEOPENTIMEDOUT = 1368
FRONTIER = 1369

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -280,7 +281,8 @@ class ErrorCodes:
NOCTYPES: "Python module ctypes not available on worker node",
CHECKSUMCALCFAILURE: "Failure during checksum calculation",
COMMANDTIMEDOUT: "Command timed out",
REMOTEFILEOPENTIMEDOUT: "Remote file open timed out"
REMOTEFILEOPENTIMEDOUT: "Remote file open timed out",
FRONTIER: "Frontier error"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
49 changes: 42 additions & 7 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
from pilot.api.es_data import StageInESClient
from pilot.control.job import send_state
from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import ExcThread, PilotException, LogFileCreationFailure
from pilot.common.exception import ExcThread, PilotException, LogFileCreationFailure, NoSuchFile, FileHandlingFailure
#from pilot.util.config import config
from pilot.util.auxiliary import set_pilot_state, check_for_final_server_update #, abort_jobs_in_queues
from pilot.util.common import should_abort
from pilot.util.constants import PILOT_PRE_STAGEIN, PILOT_POST_STAGEIN, PILOT_PRE_STAGEOUT, PILOT_POST_STAGEOUT, LOG_TRANSFER_IN_PROGRESS,\
LOG_TRANSFER_DONE, LOG_TRANSFER_NOT_DONE, LOG_TRANSFER_FAILED, SERVER_UPDATE_RUNNING, MAX_KILL_WAIT_TIME, UTILITY_BEFORE_STAGEIN
from pilot.util.container import execute
from pilot.util.filehandling import remove, write_file
from pilot.util.filehandling import remove, write_file, copy
from pilot.util.processes import threads_aborted
from pilot.util.queuehandling import declare_failed_by_kill, put_in_queue
from pilot.util.timing import add_to_pilot_timing
Expand Down Expand Up @@ -209,7 +209,9 @@ def _stage_in(args, job):
client = StageInClient(job.infosys, logger=logger, trace_report=trace_report)
activity = 'pr'
use_pcache = job.infosys.queuedata.use_pcache
kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False,
# get the proper input file destination (normally job.workdir unless stager workflow)
workdir = get_proper_input_destination(job.workdir, args.input_destination_dir)
kwargs = dict(workdir=workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False,
input_dir=args.input_dir, use_vp=job.use_vp, catchall=job.infosys.queuedata.catchall, checkinputsize=True)
client.prepare_sources(job.indata)
client.transfer(job.indata, activity=activity, **kwargs)
Expand All @@ -235,10 +237,37 @@ def _stage_in(args, job):

remain_files = [infile for infile in job.indata if infile.status not in ['remote_io', 'transferred', 'no_transfer']]
logger.info("stage-in finished") if not remain_files else logger.info("stage-in failed")
os.environ['PILOT_JOB_STATE'] = 'stageincompleted'

return not remain_files


def get_proper_input_destination(workdir, input_destination_dir):
"""
Return the proper input file destination.
Normally this would be the job.workdir, unless an input file destination has been set with pilot
option --input-file-destination (which should be set for stager workflow).
:param workdir: job work directory (string).
:param input_destination_dir: optional input file destination (string).
:return: input file destination (string).
"""

if input_destination_dir:
if not os.path.exists(input_destination_dir):
logger.warning(f'input file destination does not exist: {input_destination_dir} (defaulting to {workdir})')
destination = workdir
else:
destination = input_destination_dir
else:
destination = workdir

logger.info(f'will use input file destination: {destination}')

return destination


def get_rse(data, lfn=""):
"""
Return the ddmEndPoint corresponding to the given lfn.
Expand Down Expand Up @@ -366,7 +395,7 @@ def stage_out_auto(files):

tmp_executable = objectcopy.deepcopy(executable)

tmp_executable += ['--rses', _file['rse']]
tmp_executable += ['--rse', _file['rse']]

if 'no_register' in list(_file.keys()) and _file['no_register']: # Python 2/3
tmp_executable += ['--no-register']
Expand Down Expand Up @@ -733,7 +762,7 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
:return:
"""

logger.debug('preparing to create log file (debug mode=%s)', str(debugmode))
logger.debug(f'preparing to create log file (debug mode={debugmode})')

# PILOT_HOME is the launch directory of the pilot (or the one specified in pilot options as pilot workdir)
pilot_home = os.environ.get('PILOT_HOME', os.getcwd())
Expand Down Expand Up @@ -764,7 +793,7 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
os.rename(workdir, newworkdir)
workdir = newworkdir

fullpath = os.path.join(workdir, logfile_name) # /some/path/to/dirname/log.tgz
fullpath = os.path.join(current_dir, logfile_name) # /some/path/to/dirname/log.tgz
logger.info('will create archive %s', fullpath)
try:
cmd = "pwd;tar cvfz %s %s --dereference --one-file-system; echo $?" % (fullpath, tarball_name)
Expand All @@ -778,7 +807,13 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
try:
os.rename(workdir, orgworkdir)
except Exception as error:
logger.debug('exception caught: %s', error)
logger.debug('exception caught when renaming workdir: %s', error)

# final step, copy the log file into the workdir - otherwise containerized stage-out won't work
try:
copy(fullpath, orgworkdir)
except (NoSuchFile, FileHandlingFailure) as exc:
logger.warning(f'caught exception when copying tarball: {exc}')


def _do_stageout(job, xdata, activity, queue, title, output_dir=''):
Expand Down
19 changes: 13 additions & 6 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1923,9 +1923,16 @@ def dump_job_definition(res):
res['secrets'] = '********'
else:
_pandasecrets = ''
if 'pilotSecrets' in res:
_pilotsecrets = res['pilotSecrets']
res['pilotSecrets'] = '********'
else:
_pilotsecrets = ''
logger.info(f'job definition = {res}')
if _pandasecrets:
res['secrets'] = _pandasecrets
if _pilotsecrets:
res['pilotSecrets'] = _pilotsecrets


def print_node_info():
Expand Down Expand Up @@ -2223,18 +2230,17 @@ def queue_monitor(queues, traces, args): # noqa: C901
# we can now stop monitoring this job, so remove it from the monitored_payloads queue and add it to the
# completed_jobs queue which will tell retrieve() that it can download another job
try:
_job = queues.monitored_payloads.get(block=True, timeout=1)
_job = queues.monitored_payloads.get(block=True, timeout=1) if args.workflow != 'stager' else None
except queue.Empty:
logger.warning('failed to dequeue job: queue is empty (did job fail before job monitor started?)')
make_job_report(job)
else:
logger.debug('job %s was dequeued from the monitored payloads queue', _job.jobid)
# now ready for the next job (or quit)
put_in_queue(job.jobid, queues.completed_jobids)

put_in_queue(job, queues.completed_jobs)
del _job
logger.debug('tmp job object deleted')
if _job:
del _job

if abort_thread:
break
Expand Down Expand Up @@ -2559,7 +2565,7 @@ def job_monitor(queues, traces, args): # noqa: C901
time.sleep(60)

# peek at the jobs in the validated_jobs queue and send the running ones to the heartbeat function
jobs = queues.monitored_payloads.queue
jobs = queues.monitored_payloads.queue if args.workflow != 'stager' else None
if jobs:
# update the peeking time
peeking_time = int(time.time())
Expand Down Expand Up @@ -2615,7 +2621,8 @@ def job_monitor(queues, traces, args): # noqa: C901
logger.info('job monitoring is waiting for stage-in to finish')
else:
# check the waiting time in the job monitor. set global graceful_stop if necessary
check_job_monitor_waiting_time(args, peeking_time, abort_override=abort_job)
if args.workflow != 'stager':
check_job_monitor_waiting_time(args, peeking_time, abort_override=abort_job)

n += 1

Expand Down
15 changes: 12 additions & 3 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,15 @@ def run_realtimelog(queues, traces, args):
except queue.Empty:
continue

# wait with proceeding until the job is running, or max 5 * 60 s
counter = 0
while counter < 5 * 60 and not args.graceful_stop.is_set():
if job.state == 'running':
logger.debug('job is running, time to start real-time logger [if needed]')
break
time.sleep(1)
counter += 1

if args.use_realtime_logging:
# always do real-time logging
job.realtimelogging = True
Expand All @@ -422,7 +431,7 @@ def run_realtimelog(queues, traces, args):
logger.debug(f'debug={job.debug}')
logger.debug(f'debug_command={job.debug_command}')
logger.debug(f'args.use_realtime_logging={args.use_realtime_logging}')
if job.debug and not job.debug_command and not args.use_realtime_logging:
if job.debug and (not job.debug_command or job.debug_command == 'debug') and not args.use_realtime_logging:
logger.info('turning on real-time logging since debug flag is true and debug_command is not set')
job.realtimelogging = True

Expand All @@ -436,10 +445,10 @@ def run_realtimelog(queues, traces, args):
job.infosys.queuedata.catchall,
args.realtime_logname,
args.realtime_logging_server) if not info_dic and job.realtimelogging else info_dic
# logger.debug(f'info_dic={info_dic}')
logger.debug(f'info_dic={info_dic}')
if info_dic:
args.use_realtime_logging = True
realtime_logger = get_realtime_logger(args, info_dic)
realtime_logger = get_realtime_logger(args, info_dic, job.workdir, job.pilotsecrets)
else:
logger.debug('real-time logging not needed at this point')
realtime_logger = None
Expand Down
16 changes: 14 additions & 2 deletions pilot/copytool/objectstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,20 @@ def copy_in(files, **kwargs):
# if ddm_special_setup:
# cmd += [ddm_special_setup]

# temporary hack
rses_option = '--rses' if is_new_rucio_version() else '--rse'

dst = fspec.workdir or kwargs.get('workdir') or '.'
cmd += ['/usr/bin/env', 'rucio', '-v', 'download', '--no-subdir', '--dir', dst]
if require_replicas:
cmd += ['--rses', fspec.replicas[0]['ddmendpoint']]
cmd += [rses_option, fspec.replicas[0]['ddmendpoint']]

# a copytool module should consider fspec.turl for transfers, and could failback to fspec.surl,
# but normally fspec.turl (transfer url) is mandatory and already populated by the top workflow
turl = fspec.turl or fspec.surl
if turl:
if fspec.ddmendpoint:
cmd.extend(['--rses', fspec.ddmendpoint])
cmd.extend([rses_option, fspec.ddmendpoint])
cmd.extend(['--pfn', turl])
cmd += ['%s:%s' % (fspec.scope, fspec.lfn)]

Expand All @@ -156,6 +159,15 @@ def copy_in(files, **kwargs):
return files


def is_new_rucio_version():
"""
"""

_, stdout, _ = execute('rucio download -h')
return True if '--rses RSES' in stdout else False


def copy_out(files, **kwargs):
"""
Upload given files using rucio copytool.
Expand Down
6 changes: 4 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class JobData(BaseData):
allownooutput = "" # used to disregard empty files from job report
realtimelogging = False # True for real-time logging (set by server/job definition/args)
pandasecrets = "" # User defined secrets
pilotsecrets = {} # Real-time logging secrets

# set by the pilot (not from job definition)
workdir = "" # working directory for this job
Expand Down Expand Up @@ -165,7 +166,7 @@ class JobData(BaseData):
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
'postprocess', 'coprocess', 'containeroptions'],
'postprocess', 'coprocess', 'containeroptions', 'pilotsecrets'],
bool: ['is_eventservice', 'is_eventservicemerge', 'is_hpo', 'noexecstrcnv', 'debug', 'usecontainer',
'use_vp', 'looping_check']
}
Expand Down Expand Up @@ -472,7 +473,8 @@ def load(self, data, use_kmap=True):
'imagename_jobdef': 'container_name',
'containeroptions': 'containerOptions',
'looping_check': 'loopingCheck',
'pandasecrets': 'secrets'
'pandasecrets': 'secrets',
'pilotsecrets': 'pilotSecrets'
} if use_kmap else {}

self._load_data(data, kmap)
Expand Down
Loading

0 comments on commit b46eb4d

Please sign in to comment.