Skip to content

Commit

Permalink
Merge pull request #30 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.2.4.16
  • Loading branch information
PalNilsson authored Apr 4, 2022
2 parents 0a4d7f4 + 6ca25de commit ab2d317
Show file tree
Hide file tree
Showing 16 changed files with 206 additions and 125 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.3.27
3.2.4.16
5 changes: 5 additions & 0 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ def get_args():
dest='storagedata_url',
default='',
help='URL for downloading DDM end points data')
arg_parser.add_argument('--rucio-host',
dest='rucio_host',
default='',
help='URL for the Rucio host (optional)')

# Country group
arg_parser.add_argument('--country-group',
Expand Down Expand Up @@ -568,6 +572,7 @@ def send_worker_status(status, queue, url, port, logger):
# get the args from the arg parser
args = get_args()
args.last_heartbeat = time.time()
# args.rucio_host = 'https://voatlasrucio-server-prod.cern.ch:443'

# Define and set the main harvester control boolean
args.harvester = is_harvester_mode(args)
Expand Down
29 changes: 19 additions & 10 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,13 @@ def _stage_in(args, job):
# should stage-in be done by a script (for containerisation) or by invoking the API (ie classic mode)?
use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware"))
if use_container:
logger.info('stage-in will be done by a script')
logger.info('stage-in will be done in a container')
try:
eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
pilot.util.middleware.containerise_middleware(job, job.indata, args.queue, eventtype, localsite, remotesite,
job.infosys.queuedata.container_options, args.input_dir,
label=label, container_type=job.infosys.queuedata.container_type.get("middleware"))
label=label, container_type=job.infosys.queuedata.container_type.get("middleware"),
rucio_host=args.rucio_host)
except PilotException as error:
logger.warning('stage-in containerisation threw a pilot exception: %s', error)
except Exception as error:
Expand All @@ -212,7 +213,8 @@ def _stage_in(args, job):
# get the proper input file destination (normally job.workdir unless stager workflow)
workdir = get_proper_input_destination(job.workdir, args.input_destination_dir)
kwargs = dict(workdir=workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False,
input_dir=args.input_dir, use_vp=job.use_vp, catchall=job.infosys.queuedata.catchall, checkinputsize=True)
input_dir=args.input_dir, use_vp=job.use_vp, catchall=job.infosys.queuedata.catchall,
checkinputsize=True, rucio_host=args.rucio_host)
client.prepare_sources(job.indata)
client.transfer(job.indata, activity=activity, **kwargs)
except PilotException as error:
Expand Down Expand Up @@ -816,15 +818,20 @@ def create_log(workdir, logfile_name, tarball_name, cleanup, input_files=[], out
logger.warning(f'caught exception when copying tarball: {exc}')


def _do_stageout(job, xdata, activity, queue, title, output_dir=''):
def _do_stageout(job, xdata, activity, queue, title, output_dir='', rucio_host=''):
"""
Use the `StageOutClient` in the Data API to perform stage-out.
The rucio host is internally set by Rucio via the client config file. This can be set directly as a pilot option
--rucio-host.
:param job: job object.
:param xdata: list of FileSpec objects.
:param activity: copytool activity or preferred list of activities to resolve copytools
:param title: type of stage-out (output, log) (string).
:param queue: PanDA queue (string).
:param title: type of stage-out (output, log) (string).
:param output_dir: optional output directory (string).
:param rucio_host: optional rucio host (string).
:return: True in case of success transfers
"""

Expand All @@ -834,12 +841,14 @@ def _do_stageout(job, xdata, activity, queue, title, output_dir=''):
# should stage-in be done by a script (for containerisation) or by invoking the API (ie classic mode)?
use_container = pilot.util.middleware.use_middleware_script(job.infosys.queuedata.container_type.get("middleware"))
if use_container:
logger.info('stage-out will be done by a script')
logger.info('stage-out will be done in a container')
try:
eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
pilot.util.middleware.containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite,
job.infosys.queuedata.container_options, output_dir,
label=label, container_type=job.infosys.queuedata.container_type.get("middleware"))
label=label,
container_type=job.infosys.queuedata.container_type.get("middleware"),
rucio_host=rucio_host)
except PilotException as error:
logger.warning('stage-out containerisation threw a pilot exception: %s', error)
except Exception as error:
Expand All @@ -853,7 +862,7 @@ def _do_stageout(job, xdata, activity, queue, title, output_dir=''):

client = StageOutClient(job.infosys, logger=logger, trace_report=trace_report)
kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job, output_dir=output_dir,
catchall=job.infosys.queuedata.catchall) #, mode='stage-out')
catchall=job.infosys.queuedata.catchall, rucio_host=rucio_host) #, mode='stage-out')
# prod analy unification: use destination preferences from PanDA server for unified queues
if job.infosys.queuedata.type != 'unified':
client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
Expand Down Expand Up @@ -909,7 +918,7 @@ def _stage_out_new(job, args):
job.stageout = 'log'

if job.stageout != 'log': ## do stage-out output files
if not _do_stageout(job, job.outdata, ['pw', 'w'], args.queue, title='output', output_dir=args.output_dir):
if not _do_stageout(job, job.outdata, ['pw', 'w'], args.queue, title='output', output_dir=args.output_dir, rucio_host=args.rucio_host):
is_success = False
logger.warning('transfer of output file(s) failed')

Expand All @@ -936,7 +945,7 @@ def _stage_out_new(job, args):
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.LOGFILECREATIONFAILURE)
return False

if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], args.queue, title='log', output_dir=args.output_dir):
if not _do_stageout(job, [logfile], ['pl', 'pw', 'w'], args.queue, title='log', output_dir=args.output_dir, rucio_host=args.rucio_host):
is_success = False
logger.warning('log transfer failed')
job.status['LOG_TRANSFER'] = LOG_TRANSFER_FAILED
Expand Down
4 changes: 4 additions & 0 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from pilot.util.processes import cleanup, threads_aborted, kill_process, kill_processes
from pilot.util.proxy import get_distinguished_name
from pilot.util.queuehandling import scan_for_jobs, put_in_queue, queue_report, purge_queue
from pilot.util.realtimelogger import cleanup as rtcleanup
from pilot.util.timing import add_to_pilot_timing, timing_report, get_postgetjob_time, get_time_since, time_stamp
from pilot.util.workernode import get_disk_space, collect_workernode_info, get_node_name, get_cpu_model

Expand Down Expand Up @@ -2006,6 +2007,9 @@ def has_job_completed(queues, args):
job.reset_errors()
logger.info(f"job {job.jobid} has completed (purged errors)")

# reset any running real-time logger
rtcleanup()

# cleanup of any remaining processes
if job.pid:
job.zombies.append(job.pid)
Expand Down
67 changes: 37 additions & 30 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,15 @@ def get_logging_info(job, args):

info_dic = {}

if 'logging=' not in job.infosys.queuedata.catchall and not job.realtimelogging:
if not job.realtimelogging:
return {}

# args handling
info_dic['logname'] = args.realtime_logname if args.realtime_logname else "pilot-log"
logserver = args.realtime_logging_server if args.realtime_logging_server else ""

pattern = r'logging\=(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)'
info = findall(pattern, job.infosys.queuedata.catchall)
pattern = r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)'
info = findall(pattern, config.Pilot.rtlogging)

if not logserver and not info:
logger.warning('not enough info available for activating real-time logging')
Expand Down Expand Up @@ -430,34 +430,41 @@ def run_realtimelog(queues, traces, args):
continue

# wait with proceeding until the job is running
abort_loops = False
while not args.graceful_stop.is_set():
if job.state == 'running':
logger.debug('job is running, time to start real-time logger [if needed]')

# note: in multi-job mode, the real-time logging will be switched off at the end of the job
first = True
while not args.graceful_stop.is_set():
if job.state == 'running':
logger.debug('job is running, check if real-time logger is needed')
break
if job.state == 'stageout' or job.state == 'failed' or job.state == 'holding':
if first:
logger.debug(f'job is in state {job.state}, continue to next job or abort (wait for graceful stop)')
first = False
time.sleep(10)
continue
time.sleep(1)

if args.use_realtime_logging:
# always do real-time logging
job.realtimelogging = True
job.debug = True
abort_loops = True
elif job.debug and \
(not job.debug_command or job.debug_command == 'debug' or 'tail' in job.debug_command) and \
not args.use_realtime_logging:
job.realtimelogging = True
abort_loops = True

if abort_loops:
logger.info('time to start real-time logger')
break
if job.state == 'stageout' or job.state == 'failed':
logger.debug(f'job is in state {job.state}, continue to next job')
continue
time.sleep(1)

if args.use_realtime_logging:
# always do real-time logging
job.realtimelogging = True
job.debug = True

logger.debug(f'debug={job.debug}')
logger.debug(f'debug_command={job.debug_command}')
logger.debug(f'args.use_realtime_logging={args.use_realtime_logging}')
if job.debug and (not job.debug_command or job.debug_command == 'debug' or 'tail' in job.debug_command) and not args.use_realtime_logging:
logger.info('turning on real-time logging')
job.realtimelogging = True

# testing
# job.realtimelogging = True
# reset info_dic if real-time logging is not wanted by the job
if not job.realtimelogging:
info_dic = None
time.sleep(10)

# only set info_dic once per job (the info will not change)
info_dic = get_logging_info(job, args) if not info_dic and job.realtimelogging else info_dic
info_dic = get_logging_info(job, args)
logger.debug(f'info_dic={info_dic}')
if info_dic:
args.use_realtime_logging = True
Expand All @@ -468,8 +475,8 @@ def run_realtimelog(queues, traces, args):

# If no realtime logger is found, do nothing and exit
if realtime_logger is None:
logger.debug('realtime logger was not found, exiting')
break
logger.debug('realtime logger was not found, waiting ..')
continue
else:
logger.debug('realtime logger was found')

Expand Down
19 changes: 4 additions & 15 deletions pilot/copytool/mv.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
# Authors:
# - Paul Nilsson, [email protected], 2017-2021
# - Tobias Wegner, [email protected], 2018
# - David Cameron, [email protected], 2018-2019
# - David Cameron, [email protected], 2018-2022

import os
import re

from pilot.common.exception import StageInFailure, StageOutFailure, ErrorCodes, PilotException
from pilot.common.exception import StageInFailure, StageOutFailure, ErrorCodes
from pilot.util.container import execute

import logging
Expand All @@ -22,16 +22,11 @@
check_availablespace = False # indicate whether space check should be applied before stage-in transfers using given copytool


def create_output_list(files, init_dir, ddmconf):
def create_output_list(files, init_dir):
"""
Add files to the output list which tells ARC CE which files to upload
"""

if not ddmconf:
raise PilotException("copy_out() failed to resolve ddmconf from function arguments",
code=ErrorCodes.STAGEOUTFAILED,
state='COPY_ERROR')

for fspec in files:
arcturl = fspec.turl
if arcturl.startswith('s3://'):
Expand All @@ -45,12 +40,6 @@ def create_output_list(files, init_dir, ddmconf):
else:
# Add ARC options to TURL
checksumtype, checksum = list(fspec.checksum.items())[0] # Python 2/3
# resolve token value from fspec.ddmendpoint
token = ddmconf.get(fspec.ddmendpoint).token
if not token:
logger.info('No space token info for %s', fspec.ddmendpoint)
else:
arcturl = re.sub(r'((:\d+)/)', r'\2;autodir=no;spacetoken=%s/' % token, arcturl)
arcturl += ':checksumtype=%s:checksumvalue=%s' % (checksumtype, checksum)

logger.info('Adding to output.list: %s %s', fspec.lfn, arcturl)
Expand Down Expand Up @@ -127,7 +116,7 @@ def copy_out(files, copy_type="mv", **kwargs):
logger.debug('init_dir for output.list=%s', os.path.dirname(kwargs.get('workdir')))
output_dir = kwargs.get('output_dir', '')
if not output_dir:
create_output_list(files, os.path.dirname(kwargs.get('workdir')), kwargs.get('ddmconf', None))
create_output_list(files, os.path.dirname(kwargs.get('workdir')))

return files

Expand Down
Loading

0 comments on commit ab2d317

Please sign in to comment.