Skip to content
This repository has been archived by the owner on Jan 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #297 from PanDAWMS/next
Browse files Browse the repository at this point in the history
2.8.4.35
  • Loading branch information
PalNilsson authored Oct 29, 2020
2 parents 5999036 + a191e52 commit 0026537
Show file tree
Hide file tree
Showing 30 changed files with 913 additions and 267 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.8.3.3
2.8.4.34
65 changes: 48 additions & 17 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pilot.info import infosys
from pilot.common.exception import PilotException, ErrorCodes, SizeTooLarge, NoLocalSpace, ReplicasNotFound
from pilot.util.config import config
from pilot.util.filehandling import calculate_checksum
from pilot.util.filehandling import calculate_checksum, write_json
from pilot.util.math import convert_mb_to_b
from pilot.util.parameters import get_maximum_input_sizes
from pilot.util.workernode import get_local_disk_space
Expand Down Expand Up @@ -198,23 +198,21 @@ def sort_replicas(self, replicas, inputddms):

return replicas

def resolve_replicas(self, files): # noqa: C901
def resolve_replicas(self, files, use_vp=False): # noqa: C901
"""
Populates filespec.replicas for each entry from `files` list
:param files: list of `FileSpec` objects
Populates filespec.replicas for each entry from `files` list
fdat.replicas = [{'ddmendpoint':'ddmendpoint', 'pfn':'replica', 'domain':'domain value'}]
:return: `files`
:param files: list of `FileSpec` objects.
:param use_vp: True for VP jobs (boolean).
:return: `files`
"""

logger = self.logger
xfiles = []
#ddmconf = self.infosys.resolve_storage_data()

for fdat in files:
#ddmdat = ddmconf.get(fdat.ddmendpoint)
#if not ddmdat:
# raise Exception("Failed to resolve input ddmendpoint by name=%s (from PanDA), please check configuration. fdat=%s" % (fdat.ddmendpoint, fdat))

## skip fdat if need for further workflow (e.g. to properly handle OS ddms)
xfiles.append(fdat)

Expand All @@ -233,8 +231,12 @@ def resolve_replicas(self, files): # noqa: C901
'schemes': ['srm', 'root', 'davs', 'gsiftp', 'https', 'storm'],
'dids': [dict(scope=e.scope, name=e.lfn) for e in xfiles],
}

query.update(sort='geoip', client_location=location)
# reset the schemas for VP jobs
if use_vp:
query['schemes'] = ['root']
query['rse_expression'] = 'istape=False\\type=SPECIAL'

logger.info('calling rucio.list_replicas() with query=%s' % query)

try:
Expand Down Expand Up @@ -447,6 +449,7 @@ def transfer(self, files, activity='default', **kwargs): # noqa: C901
continue

try:
self.logger.debug('kwargs=%s' % str(kwargs))
result = self.transfer_files(copytool, remain_files, activity, **kwargs)
self.logger.debug('transfer_files() using copytool=%s completed with result=%s' % (copytool, str(result)))
break
Expand Down Expand Up @@ -712,14 +715,19 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): # noqa: C90

if getattr(copytool, 'require_replicas', False) and files:
if files[0].replicas is None: # look up replicas only once
files = self.resolve_replicas(files)
files = self.resolve_replicas(files, use_vp=kwargs['use_vp'])

allowed_schemas = getattr(copytool, 'allowed_schemas', None)

if self.infosys and self.infosys.queuedata:
copytool_name = copytool.__name__.rsplit('.', 1)[-1]
allowed_schemas = self.infosys.queuedata.resolve_allowed_schemas(activity, copytool_name) or allowed_schemas

# overwrite allowed_schemas for VP jobs
if kwargs['use_vp']:
allowed_schemas = ['root']
self.logger.debug('overwrote allowed_schemas for VP job: %s' % str(allowed_schemas))

for fspec in files:
resolve_replica = getattr(copytool, 'resolve_replica', None)
resolve_replica = self.resolve_replica if not callable(resolve_replica) else resolve_replica
Expand Down Expand Up @@ -772,7 +780,7 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): # noqa: C90
self.require_protocols(files, copytool, activity, local_dir=kwargs['input_dir'])

# mark direct access files with status=remote_io
self.set_status_for_direct_access(files)
self.set_status_for_direct_access(files, kwargs.get('workdir', ''))

# get remain files that need to be transferred by copytool
remain_files = [e for e in files if e.status not in ['remote_io', 'transferred', 'no_transfer']]
Expand Down Expand Up @@ -806,12 +814,13 @@ def transfer_files(self, copytool, files, activity=None, **kwargs): # noqa: C90
# return copytool.copy_in_bulk(remain_files, **kwargs)
return copytool.copy_in(remain_files, **kwargs)

def set_status_for_direct_access(self, files):
def set_status_for_direct_access(self, files, workdir):
"""
Update the FileSpec status with 'remote_io' for direct access mode.
Should be called only once since the function sends traces
:param files: list of FileSpec objects.
:param workdir: work directory (string).
:return: None
"""

Expand All @@ -820,11 +829,19 @@ def set_status_for_direct_access(self, files):
fspec.is_directaccess(ensure_replica=True, allowed_replica_schemas=self.direct_localinput_allowed_schemas))
direct_wan = (fspec.domain == 'wan' and fspec.direct_access_wan and
fspec.is_directaccess(ensure_replica=True, allowed_replica_schemas=self.remoteinput_allowed_schemas))

if not direct_lan and not direct_wan:
self.logger.debug('direct lan/wan transfer will not be used for lfn=%s' % fspec.lfn)
self.logger.debug('lfn=%s, direct_lan=%s, direct_wan=%s, direct_access_lan=%s, direct_access_wan=%s, '
'direct_localinput_allowed_schemas=%s, remoteinput_allowed_schemas=%s' %
(fspec.lfn, direct_lan, direct_wan, fspec.direct_access_lan, fspec.direct_access_wan,
str(self.direct_localinput_allowed_schemas), str(self.remoteinput_allowed_schemas)))

if direct_lan or direct_wan:
fspec.status_code = 0
fspec.status = 'remote_io'

self.logger.info('stage-in: direct access (remoteio) will be used for lfn=%s (direct_lan=%s, direct_wan=%s), turl=%s' %
self.logger.info('stage-in: direct access (remote i/o) will be used for lfn=%s (direct_lan=%s, direct_wan=%s), turl=%s' %
(fspec.lfn, direct_lan, direct_wan, fspec.turl))

# send trace
Expand All @@ -833,9 +850,23 @@ def set_status_for_direct_access(self, files):
self.trace_report.update(localSite=localsite, remoteSite=fspec.ddmendpoint, filesize=fspec.filesize)
self.trace_report.update(filename=fspec.lfn, guid=fspec.guid.replace('-', ''))
self.trace_report.update(scope=fspec.scope, dataset=fspec.dataset)

self.trace_report.update(url=fspec.turl, clientState='FOUND_ROOT', stateReason='direct_access')
self.trace_report.send()

# do not send the trace report at this point if remote file verification is to be done
# note also that we can't verify the files at this point since root will not be available from inside
# the rucio container
if config.Pilot.remotefileverification_log:
# store the trace report for later use (the trace report class inherits from dict, so just write it as JSON)
# outside of the container, it will be available in the normal work dir
# use the normal work dir if we are not in a container
_workdir = workdir if os.path.exists(workdir) else '.'
path = os.path.join(_workdir, config.Pilot.base_trace_report)
if not os.path.exists(_workdir):
path = os.path.join('/srv', config.Pilot.base_trace_report)
self.logger.debug('writing base trace report to: %s' % path)
write_json(path, self.trace_report)
else:
self.trace_report.send()

def check_availablespace(self, files):
"""
Expand Down
4 changes: 3 additions & 1 deletion pilot/common/errorcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class ErrorCodes:
MISSINGRELEASEUNPACKED = 1358
PANDAQUEUENOTACTIVE = 1359
IMAGENOTFOUND = 1360
REMOTEFILECOULDNOTBEOPENED = 1361

_error_messages = {
GENERALERROR: "General pilot error, consult batch log",
Expand Down Expand Up @@ -264,7 +265,8 @@ class ErrorCodes:
POSTPROCESSFAILURE: "Post-process command failed",
MISSINGRELEASEUNPACKED: "Missing release setup in unpacked container",
PANDAQUEUENOTACTIVE: "PanDA queue is not active",
IMAGENOTFOUND: "Image not found"
IMAGENOTFOUND: "Image not found",
REMOTEFILECOULDNOTBEOPENED: "Remote file could not be opened"
}

put_error_codes = [1135, 1136, 1137, 1141, 1152, 1181]
Expand Down
9 changes: 6 additions & 3 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ def _stage_in(args, job):
logger.info('stage-in will be done in a container')
try:
eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
pilot.util.middleware.containerise_middleware(job, job.indata, args.queue, eventtype, localsite, remotesite, label=label)
pilot.util.middleware.containerise_middleware(job, job.indata, args.queue, eventtype, localsite, remotesite,
job.infosys.queuedata.container_options, label=label)
except PilotException as e:
logger.warning('stage-in containerisation threw a pilot exception: %s' % e)
except Exception as e:
Expand All @@ -213,7 +214,8 @@ def _stage_in(args, job):
client = StageInClient(job.infosys, logger=log, trace_report=trace_report)
activity = 'pr'
use_pcache = job.infosys.queuedata.use_pcache
kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False, input_dir=args.input_dir)
kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, use_pcache=use_pcache, use_bulk=False,
input_dir=args.input_dir, use_vp=job.use_vp)
client.prepare_sources(job.indata)
client.transfer(job.indata, activity=activity, **kwargs)
except PilotException as error:
Expand Down Expand Up @@ -754,7 +756,8 @@ def _do_stageout(job, xdata, activity, queue, title, output_dir=''):
logger.info('stage-out will be done in a container')
try:
eventtype, localsite, remotesite = get_trace_report_variables(job, label=label)
pilot.util.middleware.containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite, label=label)
pilot.util.middleware.containerise_middleware(job, xdata, queue, eventtype, localsite, remotesite,
job.infosys.queuedata.container_options, label=label)
except PilotException as e:
logger.warning('stage-out containerisation threw a pilot exception: %s' % e)
except Exception as e:
Expand Down
22 changes: 22 additions & 0 deletions pilot/control/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import queue # Python 3

from pilot.common.exception import ExcThread
from pilot.util.processes import threads_aborted

import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,6 +58,13 @@ def run(args):

time.sleep(0.5)

# proceed to set the job_aborted flag?
if threads_aborted():
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.debug('[interceptor] run thread has finished')


Expand All @@ -71,6 +79,13 @@ def receive(args):
while not args.graceful_stop.is_set():
time.sleep(0.5)

# proceed to set the job_aborted flag?
if threads_aborted():
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.debug('[interceptor] receive thread has finished')


Expand All @@ -85,4 +100,11 @@ def send(args):
while not args.graceful_stop.is_set():
time.sleep(0.5)

# proceed to set the job_aborted flag?
if threads_aborted():
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.debug('[interceptor] receive send has finished')
11 changes: 9 additions & 2 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
is_harvester_mode, get_worker_attributes_file, publish_job_report, publish_work_report, get_event_status_file, \
publish_stageout_files
from pilot.util.jobmetrics import get_job_metrics
# from pilot.util.math import mean
from pilot.util.math import mean
from pilot.util.monitoring import job_monitor_tasks, check_local_space
from pilot.util.monitoringtime import MonitoringTime
from pilot.util.processes import cleanup, threads_aborted
Expand Down Expand Up @@ -539,6 +539,10 @@ def get_data_structure(job, state, args, xml=None, metadata=None):
if job.corecount and job.corecount != 'null' and job.corecount != 'NULL':
data['coreCount'] = job.corecount
#data['coreCount'] = mean(job.corecounts) if job.corecounts else job.corecount
if job.corecounts:
_mean = mean(job.corecounts)
log.info('mean actualcorecount: %f' % _mean)
data['meanCoreCount'] = _mean

# get the number of events, should report in heartbeat in case of preempted.
if job.nevents != 0:
Expand Down Expand Up @@ -1472,6 +1476,9 @@ def retrieve(queues, traces, args): # noqa: C901
jobnumber += 1
while not args.graceful_stop.is_set():
if has_job_completed(queues, args):
#import signal
#os.kill(os.getpid(), signal.SIGTERM)

args.job_aborted.clear()
args.abort_job.clear()
logger.info('ready for new job')
Expand Down Expand Up @@ -1911,7 +1918,7 @@ def get_finished_or_failed_job(args, queues):
def get_heartbeat_period(debug=False):
"""
Return the proper heartbeat period, as determined by normal or debug mode.
In normal mode, the hearbeat period is 30*60 s, while in debug mode it is 5*60 s. Both values are defined in the
In normal mode, the heartbeat period is 30*60 s, while in debug mode it is 5*60 s. Both values are defined in the
config file.
:param debug: Boolean, True for debug mode. False otherwise.
Expand Down
7 changes: 7 additions & 0 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,11 @@ def failed_post(queues, traces, args):
set_pilot_state(job=job, state='stageout')
put_in_queue(job, queues.data_out)

# proceed to set the job_aborted flag?
if threads_aborted():
logger.debug('will proceed to set job_aborted')
args.job_aborted.set()
else:
logger.debug('will not set job_aborted yet')

logger.info('[payload] failed_post thread has finished')
9 changes: 5 additions & 4 deletions pilot/control/payloads/eventservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ class Executor(generic.Executor):
def __init__(self, args, job, out, err, traces):
super(Executor, self).__init__(args, job, out, err, traces)

def run_payload(self, job, out, err):
def run_payload(self, job, cmd, out, err):
"""
(add description)
:param job:
:param out:
:param err:
:param job: job object.
:param cmd: (unused in ES mode)
:param out: stdout file object.
:param err: stderr file object.
:return:
"""
log = get_logger(job.jobid, logger)
Expand Down
1 change: 1 addition & 0 deletions pilot/control/payloads/eventservicemerge.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def untar_file(self, lfn, job):
def utility_before_payload(self, job):
"""
Functions to run before payload
Note: this function updates job.jobparams (process_writetofile() call)
:param job: job object
"""
Expand Down
Loading

0 comments on commit 0026537

Please sign in to comment.