Skip to content

Commit

Permalink
Merge pull request #83 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.6.0.108
  • Loading branch information
PalNilsson authored May 24, 2023
2 parents 1d6b69a + 1d80246 commit 283157b
Show file tree
Hide file tree
Showing 41 changed files with 2,262 additions and 431 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.1.17
3.6.0.108
20 changes: 16 additions & 4 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException
from pilot.util.config import config
from pilot.info import infosys
from pilot.util.auxiliary import pilot_version_banner, shell_exit_code
from pilot.util.constants import SUCCESS, FAILURE, ERRNO_NOJOBS, PILOT_START_TIME, PILOT_END_TIME, get_pilot_version, \
Expand Down Expand Up @@ -70,6 +71,9 @@ def main():
if args.update_server:
send_worker_status('started', args.queue, args.url, args.port, logger, 'IPv6') # note: assuming IPv6, fallback in place

if not args.rucio_host:
args.rucio_host = config.Rucio.host

# initialize InfoService
try:
infosys.init(args.queue)
Expand Down Expand Up @@ -308,6 +312,13 @@ def get_args():
required=True,
help='Pilot user (e.g. name of experiment corresponding to pilot plug-in)')

# Kubernetes (pilot running in a pod)
arg_parser.add_argument('--pod',
dest='pod',
action='store_true',
default=False,
help='Pilot running in a Kubernetes pod')

# Harvester specific options (if any of the following options are used, args.harvester will be set to True)
arg_parser.add_argument('--harvester-workdir',
dest='harvester_workdir',
Expand Down Expand Up @@ -448,7 +459,8 @@ def set_environment_variables():
environ['PILOT_HOME'] = mainworkdir # TODO: replace with singleton

# pilot source directory (e.g. /cluster/home/usatlas1/gram_scratch_hHq4Ns/condorg_oqmHdWxz)
environ['PILOT_SOURCE_DIR'] = args.sourcedir # TODO: replace with singleton
if not environ.get('PILOT_SOURCE_DIR', None):
environ['PILOT_SOURCE_DIR'] = args.sourcedir # TODO: replace with singleton

# set the pilot user (e.g. ATLAS)
environ['PILOT_USER'] = args.pilot_user # TODO: replace with singleton
Expand Down Expand Up @@ -540,10 +552,11 @@ def wrap_up():
logging.warning(f'failed to convert exit code to int: {exitcode}, {exc}')
exitcode = 1008

logging.info('pilot has finished')
sec = shell_exit_code(exitcode)
logging.info(f'pilot has finished (exit code={exitcode}, shell exit code={sec})')
logging.shutdown()

return shell_exit_code(exitcode)
return sec


def get_pilot_source_dir():
Expand Down Expand Up @@ -632,7 +645,6 @@ def set_redirectall(args):
# get the args from the arg parser
args = get_args()
args.last_heartbeat = time.time()
# args.rucio_host = 'https://voatlasrucio-server-prod.cern.ch:443'

# Define and set the main harvester control boolean
args.harvester = is_harvester_mode(args)
Expand Down
101 changes: 67 additions & 34 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ def __init__(self, infosys_instance=None, acopytools=None, logger=None, default_
raise PilotException("failed to resolve acopytools settings")
logger.info('configured copytools per activity: acopytools=%s', self.acopytools)

def allow_mvfinaldest(self, catchall):
"""
Is there an override in catchall to allow mv to final destination?
:param catchall: catchall from queuedata (string)
:return: True if 'mv_final_destination' is present in catchall, otherwise False (Boolean)
"""

return True if catchall and 'mv_final_destination' in catchall else False

def set_acopytools(self):
"""
Set the internal acopytools.
Expand Down Expand Up @@ -215,7 +225,7 @@ def resolve_replicas(self, files, use_vp=False):
:param files: list of `FileSpec` objects.
:param use_vp: True for VP jobs (boolean).
:return: `files`
:return: files object.
"""

logger = self.logger
Expand All @@ -224,48 +234,19 @@ def resolve_replicas(self, files, use_vp=False):
show_memory_usage()

for fdat in files:
## skip fdat if need for further workflow (e.g. to properly handle OS ddms)
# skip fdat if need for further workflow (e.g. to properly handle OS ddms)
xfiles.append(fdat)

show_memory_usage()

if not xfiles: # no files for replica look-up
return files

# load replicas from Rucio
from rucio.client import Client
c = Client()

show_memory_usage()

location = self.detect_client_location()
if not location:
raise PilotException("Failed to get client location for Rucio", code=ErrorCodes.RUCIOLOCATIONFAILED)

query = {
'schemes': ['srm', 'root', 'davs', 'gsiftp', 'https', 'storm', 'file'],
'dids': [dict(scope=e.scope, name=e.lfn) for e in xfiles],
}
query.update(sort='geoip', client_location=location)
# reset the schemas for VP jobs
if use_vp:
query['schemes'] = ['root']
query['rse_expression'] = 'istape=False\\type=SPECIAL'

# add signature lifetime for signed URL storages
query.update(signature_lifetime=24 * 3600) # note: default is otherwise 1h

logger.info('calling rucio.list_replicas() with query=%s', query)

# get the list of replicas
try:
replicas = c.list_replicas(**query)
replicas = self.list_replicas(xfiles, use_vp)
except Exception as exc:
raise PilotException("Failed to get replicas from Rucio: %s" % exc, code=ErrorCodes.RUCIOLISTREPLICASFAILED)

show_memory_usage()

replicas = list(replicas)
logger.debug("replicas received from Rucio: %s", replicas)
raise exc

files_lfn = dict(((e.scope, e.lfn), e) for e in xfiles)
for replica in replicas:
Expand Down Expand Up @@ -310,6 +291,52 @@ def resolve_replicas(self, files, use_vp=False):

return files

def list_replicas(self, xfiles, use_vp):
"""
Wrapper around rucio_client.list_replicas()
:param xfiles: files object.
:param use_vp: True for VP jobs (boolean).
:return: replicas (list).
"""

# load replicas from Rucio
from rucio.client import Client
rucio_client = Client()

show_memory_usage()

location = self.detect_client_location()
if not location:
raise PilotException("Failed to get client location for Rucio", code=ErrorCodes.RUCIOLOCATIONFAILED)

query = {
'schemes': ['srm', 'root', 'davs', 'gsiftp', 'https', 'storm', 'file'],
'dids': [dict(scope=e.scope, name=e.lfn) for e in xfiles],
}
query.update(sort='geoip', client_location=location)
# reset the schemas for VP jobs
if use_vp:
query['schemes'] = ['root']
query['rse_expression'] = 'istape=False\\type=SPECIAL'

# add signature lifetime for signed URL storages
query.update(signature_lifetime=24 * 3600) # note: default is otherwise 1h

self.logger.info(f'calling rucio.list_replicas() with query={query}')

try:
replicas = rucio_client.list_replicas(**query)
except Exception as exc:
raise PilotException(f"Failed to get replicas from Rucio: {exc}", code=ErrorCodes.RUCIOLISTREPLICASFAILED)

show_memory_usage()

replicas = list(replicas)
self.logger.debug(f"replicas received from Rucio: {replicas}")

return replicas

def add_replicas(self, fdat, replica):
"""
Add the replicas to the fdat structure.
Expand Down Expand Up @@ -806,6 +833,9 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): # noqa: C90
kwargs['trace_report'] = self.trace_report
self.logger.info('ready to transfer (stage-in) files: %s', remain_files)

# is there an override in catchall to allow mv to final destination (relevant for mv copytool only)
kwargs['mvfinaldest'] = self.allow_mvfinaldest(kwargs.get('catchall', ''))

# use bulk downloads if necessary
# if kwargs['use_bulk_transfer']
# return copytool.copy_in_bulk(remain_files, **kwargs)
Expand Down Expand Up @@ -1092,6 +1122,9 @@ def transfer_files(self, copytool, files, activity, **kwargs):
# add the trace report
kwargs['trace_report'] = self.trace_report

# is there an override in catchall to allow mv to final destination (relevant for mv copytool only)
kwargs['mvfinaldest'] = self.allow_mvfinaldest(kwargs.get('catchall', ''))

return copytool.copy_out(files, **kwargs)

#class StageInClientAsync(object):
Expand Down
33 changes: 14 additions & 19 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ def control(queues, traces, args):
#abort_jobs_in_queues(queues, args.signal)

# proceed to set the job_aborted flag?
if threads_aborted():
if threads_aborted(caller='control'):
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.info('[data] control thread has finished')

Expand Down Expand Up @@ -539,14 +537,16 @@ def copytool_in(queues, traces, args): # noqa: C901
traces.pilot['command'] = 'abort'
logger.warning('copytool_in detected a set abort_job pre stage-in (due to a kill signal)')
declare_failed_by_kill(job, queues.failed_data_in, args.signal)
break
if args.graceful_stop.is_set():
break

if _stage_in(args, job):
if args.abort_job.is_set():
traces.pilot['command'] = 'abort'
logger.warning('copytool_in detected a set abort_job post stage-in (due to a kill signal)')
declare_failed_by_kill(job, queues.failed_data_in, args.signal)
break
if args.graceful_stop.is_set():
break

put_in_queue(job, queues.finished_data_in)
# remove the job from the current stage-in queue
Expand Down Expand Up @@ -583,16 +583,14 @@ def copytool_in(queues, traces, args): # noqa: C901
continue

# proceed to set the job_aborted flag?
if threads_aborted():
if threads_aborted(caller='copytool_in'):
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.info('[data] copytool_in thread has finished')


def copytool_out(queues, traces, args):
def copytool_out(queues, traces, args): # noqa: C901
"""
Main stage-out thread.
Perform stage-out as soon as a job object can be extracted from the data_out queue.
Expand All @@ -604,9 +602,8 @@ def copytool_out(queues, traces, args):
"""

cont = True
logger.debug('entering copytool_out loop')
if args.graceful_stop.is_set():
logger.debug('graceful_stop already set')
logger.debug('graceful_stop already set - do not start copytool_out thread')

processed_jobs = []
while cont:
Expand Down Expand Up @@ -638,14 +635,16 @@ def copytool_out(queues, traces, args):
traces.pilot['command'] = 'abort'
logger.warning('copytool_out detected a set abort_job pre stage-out (due to a kill signal)')
declare_failed_by_kill(job, queues.failed_data_out, args.signal)
break
if abort:
break

if _stage_out_new(job, args):
if args.abort_job.is_set():
traces.pilot['command'] = 'abort'
logger.warning('copytool_out detected a set abort_job post stage-out (due to a kill signal)')
#declare_failed_by_kill(job, queues.failed_data_out, args.signal)
break
if args.graceful_stop.is_set():
break

#queues.finished_data_out.put(job)
processed_jobs.append(job.jobid)
Expand All @@ -668,11 +667,9 @@ def copytool_out(queues, traces, args):
break

# proceed to set the job_aborted flag?
if threads_aborted():
if threads_aborted(caller='copytool_out'):
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.info('[data] copytool_out thread has finished')

Expand Down Expand Up @@ -1080,10 +1077,8 @@ def queue_monitoring(queues, traces, args):
break

# proceed to set the job_aborted flag?
if threads_aborted():
if threads_aborted(caller='queue_monitoring'):
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.info('[data] queue_monitor thread has finished')
Loading

0 comments on commit 283157b

Please sign in to comment.