Skip to content

Commit

Permalink
Merge pull request #25 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.2.3.27
  • Loading branch information
PalNilsson authored Mar 24, 2022
2 parents b46eb4d + 38ca471 commit 0a4d7f4
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 119 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.2.22
3.2.3.27
5 changes: 5 additions & 0 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ def get_args():
dest='queuedata_url',
default='',
help='Queuedata server URL')
arg_parser.add_argument('--storagedata-url',
dest='storagedata_url',
default='',
help='URL for downloading DDM end points data')

# Country group
arg_parser.add_argument('--country-group',
Expand Down Expand Up @@ -458,6 +462,7 @@ def set_environment_variables():
# keep track of the server urls
environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port)
environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url
environ['STORAGEDATA_SERVER_URL'] = '%s' % args.storagedata_url


def wrap_up():
Expand Down
8 changes: 8 additions & 0 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,12 @@ def handle_backchannel_command(res, job, args, test_tobekilled=False):
logger.info('faking a \'tobekilled\' command')
res['command'] = 'tobekilled'

if 'pilotSecrets' in res:
try:
job.pilotsecrets = res.get('pilotSecrets')
except Exception as exc:
logger.warning(f'failed to parse pilotSecrets: {exc}')

if 'command' in res and res.get('command') != 'NULL':
# warning: server might return comma-separated string, 'debug,tobekilled'
cmd = res.get('command')
Expand Down Expand Up @@ -1463,6 +1469,8 @@ def getjob_server_command(url, port):
url = 'https://' + url
logger.warning('detected missing protocol in server url (added)')

# randomize server name
url = https.get_panda_server(url, port)
return '{pandaserver}/server/panda/getJob'.format(pandaserver=url)


Expand Down
65 changes: 39 additions & 26 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pilot.util.auxiliary import set_pilot_state
from pilot.util.processes import get_cpu_consumption_time
from pilot.util.config import config
from pilot.util.filehandling import read_file, remove_core_dumps, get_guid, extract_lines_from_file
from pilot.util.filehandling import read_file, remove_core_dumps, get_guid, extract_lines_from_file, find_file
from pilot.util.processes import threads_aborted
from pilot.util.queuehandling import put_in_queue
from pilot.common.errorcodes import ErrorCodes
Expand Down Expand Up @@ -321,33 +321,32 @@ def get_transport(catchall):
return transport


def get_logging_info(realtimelogging, catchall, realtime_logname, realtime_logging_server):
def get_logging_info(job, args):
"""
Extract the logging type/protocol/url/port from catchall if present, or from args fields.
Returns a dictionary with the format: {'logging_type': .., 'protocol': .., 'url': .., 'port': .., 'logname': ..}
If the provided debug_command contains a tail instruction ('tail log_file_name'), the pilot will locate
the log file and use that for RT logging (full path).
Note: the returned dictionary can be built with either args (has priority) or catchall info.
:param realtimelogging: True if real-time logging was activated by server/job definition (Boolean).
:param catchall: PQ.catchall field (string).
:param realtime_logname from pilot args: (string).
:param realtime_logging_server from pilot args: (string).
:param job: job object.
:param args: args object.
:return: info dictionary (logging_type (string), protocol (string), url (string), port (int)).
"""

info_dic = {}

if 'logging=' not in catchall and not realtimelogging:
#logger.debug(f'catchall={catchall}')
#logger.debug(f'realtimelogging={realtimelogging}')
if 'logging=' not in job.infosys.queuedata.catchall and not job.realtimelogging:
return {}

# args handling
info_dic['logname'] = realtime_logname if realtime_logname else "pilot-log"
logserver = realtime_logging_server if realtime_logging_server else ""
info_dic['logname'] = args.realtime_logname if args.realtime_logname else "pilot-log"
logserver = args.realtime_logging_server if args.realtime_logging_server else ""

pattern = r'logging\=(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)'
info = findall(pattern, catchall)
info = findall(pattern, job.infosys.queuedata.catchall)

if not logserver and not info:
logger.warning('not enough info available for activating real-time logging')
Expand All @@ -363,10 +362,26 @@ def get_logging_info(realtimelogging, catchall, realtime_logname, realtime_loggi
print(f'exception caught: {exc}')
info_dic = {}
else:
# experiment specific, move to relevant code

# ATLAS (testing; get info from debug parameter later)
info_dic['logfiles'] = [config.Payload.payloadstdout]
path = None
if 'tail' in job.debug_command:
filename = job.debug_command.split(' ')[-1]
logger.debug(f'filename={filename}')
counter = 0
path = None
maxwait = 5 * 60
while counter < maxwait and not args.graceful_stop.is_set():
path = find_file(filename, job.workdir)
if not path:
logger.debug(f'file {filename} not found, waiting for max {maxwait} s')
time.sleep(10)
else:
break
counter += 10
if not path:
logger.warning(f'file {filename} was not found for {maxwait} s, using default')
logf = path if path else config.Payload.payloadstdout
logger.info(f'using {logf} for real-time logging')
info_dic['logfiles'] = [logf]
else:
items = logserver.split(':')
info_dic['logging_type'] = items[0].lower()
Expand Down Expand Up @@ -414,14 +429,15 @@ def run_realtimelog(queues, traces, args):
except queue.Empty:
continue

# wait with proceeding until the job is running, or max 5 * 60 s
counter = 0
while counter < 5 * 60 and not args.graceful_stop.is_set():
# wait with proceeding until the job is running
while not args.graceful_stop.is_set():
if job.state == 'running':
logger.debug('job is running, time to start real-time logger [if needed]')
break
if job.state == 'stageout' or job.state == 'failed':
logger.debug(f'job is in state {job.state}, continue to next job')
continue
time.sleep(1)
counter += 1

if args.use_realtime_logging:
# always do real-time logging
Expand All @@ -431,8 +447,8 @@ def run_realtimelog(queues, traces, args):
logger.debug(f'debug={job.debug}')
logger.debug(f'debug_command={job.debug_command}')
logger.debug(f'args.use_realtime_logging={args.use_realtime_logging}')
if job.debug and (not job.debug_command or job.debug_command == 'debug') and not args.use_realtime_logging:
logger.info('turning on real-time logging since debug flag is true and debug_command is not set')
if job.debug and (not job.debug_command or job.debug_command == 'debug' or 'tail' in job.debug_command) and not args.use_realtime_logging:
logger.info('turning on real-time logging')
job.realtimelogging = True

# testing
Expand All @@ -441,10 +457,7 @@ def run_realtimelog(queues, traces, args):
if not job.realtimelogging:
info_dic = None
# only set info_dic once per job (the info will not change)
info_dic = get_logging_info(job.realtimelogging,
job.infosys.queuedata.catchall,
args.realtime_logname,
args.realtime_logging_server) if not info_dic and job.realtimelogging else info_dic
info_dic = get_logging_info(job, args) if not info_dic and job.realtimelogging else info_dic
logger.debug(f'info_dic={info_dic}')
if info_dic:
args.use_realtime_logging = True
Expand Down
9 changes: 5 additions & 4 deletions pilot/info/extinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#
# Authors:
# - Alexey Anisenkov, [email protected], 2018-2021
# - Paul Nilsson, [email protected], 2018-2019
# - Paul Nilsson, [email protected], 2018-2022

"""
Information provider from external source(s)
Expand Down Expand Up @@ -128,8 +128,7 @@ def jsonparser_panda(c):
return {pandaqueue: dat}

queuedata_url = (os.environ.get('QUEUEDATA_SERVER_URL') or getattr(config.Information, 'queuedata_url', '')).format(**{'pandaqueue': pandaqueues[0]})

cric_url = getattr(config.Information, 'queues_url', None) or 'https://atlas-cric.cern.ch/cache/schedconfig/{pandaqueue}.json'
cric_url = getattr(config.Information, 'queues_url', None)
cric_url = cric_url.format(pandaqueue=pandaqueues[0] if len(pandaqueues) == 1 else 'pandaqueues')
cvmfs_path = self.get_cvmfs_path(getattr(config.Information, 'queuedata_cvmfs', None), 'cric_pandaqueues.json')

Expand Down Expand Up @@ -178,11 +177,13 @@ def load_storage_data(self, ddmendpoints=[], priority=[], cache_time=60):
cache_dir = os.environ.get('PILOT_HOME', '.')

# list of sources to fetch ddmconf data from
_storagedata_url = os.environ.get('QUEUEDATA_SERVER_URL', '')
storagedata_url = _storagedata_url if _storagedata_url else getattr(config.Information, 'storages_url', None)
cvmfs_path = self.get_cvmfs_path(config.Information.storages_cvmfs, 'cric_ddmendpoints.json')
sources = {'CVMFS': {'url': cvmfs_path,
'nretry': 1,
'fname': os.path.join(cache_dir, getattr(config.Information, 'storages_cache', None) or 'agis_ddmendpoints.json')},
'CRIC': {'url': (getattr(config.Information, 'storages_url', None) or 'https://atlas-cric.cern.ch/cache/ddmendpoints.json'),
'CRIC': {'url': storagedata_url,
'nretry': 3,
'sleep_time': lambda: 15 + random.randint(0, 30), ## max sleep time 45 seconds between retries
'cache_time': 3 * 60 * 60, # 3 hours
Expand Down
8 changes: 7 additions & 1 deletion pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,7 @@ def get_redundants():
"home",
"o..pacman..o",
"pacman-*",
"python",
"python*",
"runAthena*",
"share",
"sources.*",
Expand Down Expand Up @@ -1940,16 +1940,22 @@ def get_redundants():
"pandawnutil/*",
"src/*",
"singularity_cachedir",
"apptainer_cachedir",
"_joproxy15",
"HAHM_*",
"Process",
"merged_lhef._0.events-new",
"panda_secrets.json",
"singularity/*",
"apptainer/*",
"/cores",
"/panda_pilot*",
"/work",
"README*",
"MANIFEST*",
"*.part*",
"docs/",
"/venv/",
"/pilot3"]

return dir_list
Expand Down
36 changes: 28 additions & 8 deletions pilot/user/atlas/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

# from .utilities import get_memory_values
from pilot.util.container import execute

from .utilities import get_memory_values
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -60,11 +60,11 @@ def add_core_count(corecount, core_counts=[]):
return core_counts


def set_core_counts(job):
def set_core_counts(**kwargs):
"""
Set the number of used cores.
:param job: job object.
:param kwargs: kwargs (dictionary).
:return:
"""

Expand All @@ -86,12 +86,32 @@ def set_core_counts(job):
#else:
# logger.debug('no summary_dictionary')

if job.pgrp:
# for debugging
#cmd = "ps axo pgid,psr,comm,args | grep %d" % job.pgrp
#exit_code, stdout, stderr = execute(cmd, mute=True)
#logger.debug('%s:\n%s\n', cmd, stdout)
job = kwargs.get('job', None)
walltime = kwargs.get('walltime', None)

if job and walltime:
summary_dictionary = get_memory_values(job.workdir, name=job.memorymonitor)
if summary_dictionary:
time_dictionary = summary_dictionary.get('Time', None)
if time_dictionary:
stime = time_dictionary.get('stime', None)
utime = time_dictionary.get('utime', None)
if stime and utime:
logger.debug(f'stime={stime}')
logger.debug(f'utime={utime}')
logger.debug(f'walltime={walltime}')
cores = float(stime + utime) / float(walltime)
logger.debug(f'number of cores={cores}')
else:
logger.debug('no stime/utime')
else:
logger.debug('no time dictionary')
else:
logger.debug('no summary dictionary')
else:
logger.debug(f'failed to calculate number of cores (walltime={walltime})')

if job and job.pgrp:
# ps axo pgid,psr -> 154628 8 \n 154628 9 \n 1546280 1 ..
# sort is redundant; uniq removes any duplicate lines; wc -l gives the final count
# awk is added to get the pgrp list only and then grep -x makes sure that false positives are removed, e.g. 1546280
Expand Down
11 changes: 8 additions & 3 deletions pilot/user/atlas/diagnose.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def process_job_report(job):
job.piloterrordiag = diagnostics
else:
# extract Frontier errors
errmsg = get_more_details(job.metadata)
errmsg = get_frontier_details(job.metadata)
if errmsg:
msg = f'Frontier error: {errmsg}'
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.FRONTIER, msg=msg)
Expand All @@ -634,15 +634,20 @@ def process_job_report(job):
job.piloterrordiag = diagnostics


def get_more_details(job_report_dictionary):
def get_frontier_details(job_report_dictionary):
"""
Extract special Frontier related errors from the job report.
:param job_report_dictionary: job report (dictionary).
:return: extracted error message (string).
"""

error_details = job_report_dictionary['executor'][0]['logfileReport']['details']
try:
error_details = job_report_dictionary['executor'][0]['logfileReport']['details']
except KeyError as exc:
logger.warning(f'key error: {exc} (ignore detailed Frontier analysis)')
return ""

patterns = {'abnormalLines': r'Cannot\sfind\sa\svalid\sfrontier\sconnection(.*)',
'lastNormalLine': r'Using\sfrontier\sconnection\sfrontier(.*)'}
errmsg = ''
Expand Down
Loading

0 comments on commit 0a4d7f4

Please sign in to comment.