Skip to content

Commit

Permalink
Merge pull request #89 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.6.2.9
  • Loading branch information
PalNilsson authored Jul 11, 2023
2 parents 23235f3 + 3456bff commit f026dad
Show file tree
Hide file tree
Showing 13 changed files with 768 additions and 120 deletions.
572 changes: 572 additions & 0 deletions .pylintrc

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.1.31
3.6.2.9
60 changes: 37 additions & 23 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
errors = ErrorCodes()


def main():
def main() -> int:
"""
Main function of PanDA Pilot 2.
Prepare for and execute the requested workflow.
Expand Down Expand Up @@ -102,7 +102,7 @@ def main():
infosys.init(args.queue)
# check if queue is ACTIVE
if infosys.queuedata.state != 'ACTIVE':
logger.critical('specified queue is NOT ACTIVE: %s -- aborting', infosys.queuedata.name)
logger.critical(f'specified queue is NOT ACTIVE: {infosys.queuedata.name} -- aborting')
return errors.PANDAQUEUENOTACTIVE
except PilotException as error:
logger.fatal(error)
Expand All @@ -115,20 +115,20 @@ def main():
environ['PILOT_IP_VERSION'] = args.internet_protocol_version

# set the site name for rucio
environ['PILOT_RUCIO_SITENAME'] = os.environ.get('PILOT_RUCIO_SITENAME', None) or infosys.queuedata.site
environ['PILOT_RUCIO_SITENAME'] = os.environ.get('PILOT_RUCIO_SITENAME', '') or infosys.queuedata.site

# store the site name as set with a pilot option
environ['PILOT_SITENAME'] = infosys.queuedata.resource #args.site # TODO: replace with singleton

# set requested workflow
logger.info('pilot arguments: %s', str(args))
logger.info(f'pilot arguments: {args}')
workflow = __import__(f'pilot.workflow.{args.workflow}', globals(), locals(), [args.workflow], 0)

# execute workflow
try:
exitcode = workflow.run(args)
except Exception as exc:
logger.fatal('main pilot function caught exception: %s', exc)
logger.fatal(f'main pilot function caught exception: {exc}')
exitcode = None

# let the server know that the worker has finished
Expand All @@ -138,22 +138,28 @@ def main():
return exitcode


def str2bool(_var: str):
""" Helper function to convert string to bool """
def str2bool(var: str) -> bool:
"""
Helper function to convert string to bool.
:param var: string to be converted to bool (str)
:return: converted string (bool).
"""

if isinstance(var, bool): # does this ever happen?
return var

ret = _var
if isinstance(_var, bool): # does this ever happen?
pass
elif _var.lower() in {'yes', 'true', 't', 'y', '1'}:
if var.lower() in {'yes', 'true', 't', 'y', '1'}:
ret = True
elif _var.lower() in {'no', 'false', 'f', 'n', '0'}:
elif var.lower() in {'no', 'false', 'f', 'n', '0'}:
ret = False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
raise argparse.ArgumentTypeError(f'boolean value expected (var={var})')

return ret


def get_args():
def get_args() -> Any:
"""
Return the args from the arg parser.
Expand Down Expand Up @@ -199,6 +205,12 @@ def get_args():
required=False,
type=int,
help='Pilot lifetime seconds (default: 324000 s)')
arg_parser.add_argument('-L', '--leasetime',
dest='leasetime',
default=3600,
required=False,
type=int,
help='Pilot leasetime seconds (default: 3600 s)')

# set the appropriate site, resource and queue
arg_parser.add_argument('-q', '--queue',
Expand Down Expand Up @@ -439,7 +451,7 @@ def get_args():
return arg_parser.parse_args()


def create_main_work_dir():
def create_main_work_dir() -> (int, str):
"""
Create and return the pilot's main work directory.
The function also sets args.mainworkdir and cd's into this directory.
Expand Down Expand Up @@ -468,7 +480,7 @@ def create_main_work_dir():
return exitcode, _mainworkdir


def set_environment_variables():
def set_environment_variables() -> None:
"""
Set environment variables. To be replaced with singleton implementation.
This function sets PILOT_WORK_DIR, PILOT_HOME, PILOT_SITENAME, PILOT_USER and PILOT_VERSION and others.
Expand Down Expand Up @@ -523,7 +535,7 @@ def set_environment_variables():
environ['STORAGEDATA_SERVER_URL'] = f'{args.storagedata_url}'


def wrap_up():
def wrap_up() -> int:
"""
Perform cleanup and terminate logging.
Note: args and mainworkdir, used in this function, are defined in outer scope.
Expand Down Expand Up @@ -551,6 +563,7 @@ def wrap_up():
exitcode = trace.pilot['error_code']
except KeyError:
exitcode = trace
logging.debug(f'trace was not a class, trace={trace}')
else:
logging.info(f'traces error code: {exitcode}')
if trace.pilot['nr_jobs'] <= 1:
Expand Down Expand Up @@ -581,7 +594,7 @@ def wrap_up():
return sec


def get_pilot_source_dir():
def get_pilot_source_dir() -> str:
"""
Return the pilot source directory.
Expand All @@ -594,7 +607,7 @@ def get_pilot_source_dir():
return cwd


def send_worker_status(status: str, queue: str, url: str, port: str, logger: Any, internet_protocol_version: str):
def send_worker_status(status: str, queue: str, url: str, port: str, logger: Any, internet_protocol_version: str) -> None:
"""
Send worker info to the server to let it know that the worker has started
Note: the function can fail, but if it does, it will be ignored.
Expand All @@ -621,22 +634,22 @@ def send_worker_status(status: str, queue: str, url: str, port: str, logger: Any
logger.warning('workerID/harvesterID not known, will not send worker status to server')


def set_lifetime():
def set_lifetime() -> None:
"""
Update the pilot lifetime if set by an environment variable (PANDAPILOT_LIFETIME) (in seconds).
"""

lifetime = os.environ.get('PANDAPILOT_LIFETIME', None)
if lifetime:
try:
lifetime = int(lifetime)
_lifetime = int(lifetime)
except (ValueError, TypeError):
pass
else:
args.lifetime = lifetime
args.lifetime = _lifetime


def set_redirectall():
def set_redirectall() -> None:
"""
Set args redirectall field.
Currently not used.
Expand Down Expand Up @@ -694,6 +707,7 @@ def set_redirectall():

# execute main function
trace = main()
logging.debug(f'trace={trace}')

# store final time stamp (cannot be placed later since the mainworkdir is about to be purged)
add_to_pilot_timing('0', PILOT_END_TIME, time.time(), args, store=False)
Expand Down
54 changes: 48 additions & 6 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
import hashlib
import logging
import time

from functools import reduce

try:
import requests
except ImportError:
pass

from pilot.info import infosys
from pilot.common.exception import (
PilotException,
Expand Down Expand Up @@ -306,7 +310,7 @@ def list_replicas(self, xfiles, use_vp):
# load replicas from Rucio
from rucio.client import Client
rucio_client = Client()
location = self.detect_client_location()
location = self.detect_client_location(use_vp=use_vp)
if not location:
raise PilotException("Failed to get client location for Rucio", code=ErrorCodes.RUCIOLOCATIONFAILED)

Expand Down Expand Up @@ -378,13 +382,51 @@ def add_replicas(self, fdat, replica):
return fdat

@classmethod
def detect_client_location(self):
def detect_client_location(self, use_vp: bool = False) -> dict:
"""
Open a UDP socket to a machine on the internet, to get the local IPv4 and IPv6
addresses of the requesting client.
"""

client_location = {}

ip = '0.0.0.0'
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception as exc:
self.logger.warning(f'failed to get socket info: {exc}')
client_location['ip'] = ip

if use_vp:
latitude = os.environ.get('RUCIO_LATITUDE')
longitude = os.environ.get('RUCIO_LONGITUDE')
if latitude and longitude:
try:
client_location['latitude'] = float(latitude)
client_location['longitude'] = float(longitude)
except ValueError:
self.logger.warning(f'client set latitude (\"{latitude}\") and longitude (\"{longitude}\") are not valid')
else:
try:
response = requests.post('https://location.cern.workers.dev',
json={"site": client_location.get('site')},
timeout=1)
if response.status_code == 200 and 'application/json' in response.headers.get('Content-Type', ''):
client_location = response.json()
except Exception as exc:
self.logger.warning(f'no requests module: {exc}')

client_location['site'] = os.environ.get('PILOT_RUCIO_SITENAME', 'unknown')
return client_location

@classmethod
def detect_client_location_old(self):
"""
Open a UDP socket to a machine on the internet, to get the local IPv4 and IPv6
addresses of the requesting client.
Try to determine the sitename automatically from common environment variables,
in this order: SITE_NAME, ATLAS_SITE_NAME, OSG_SITE_NAME. If none of these exist
use the fixed string 'ROAMING'.
"""

ip = '0.0.0.0'
Expand Down
14 changes: 12 additions & 2 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2763,7 +2763,8 @@ def job_monitor(queues, traces, args): # noqa: C901

# peeking and current time; peeking_time gets updated if and when jobs are being monitored, update_time is only
# used for sending the heartbeat and is updated after a server update
peeking_time = int(time.time())
start_time = int(time.time())
peeking_time = start_time
update_time = peeking_time

# overall loop counter (ignoring the fact that more than one job may be running)
Expand Down Expand Up @@ -2815,7 +2816,16 @@ def job_monitor(queues, traces, args): # noqa: C901
continue

if args.workflow == 'stager':
logger.debug('stage-in has finished - no need for job_monitor to continue')
if args.pod:
# wait maximum args.leasetime, then abort
time.sleep(10)
time_now = int(time.time())
if time_now - start_time >= args.leasetime:
logger.warning(f'lease time is up: {time_now - start_time} s has passed since start - abort stager pilot')
else:
continue
else:
logger.debug('stage-in has finished - no need for job_monitor to continue')
break

# peek at the jobs in the validated_jobs queue and send the running ones to the heartbeat function
Expand Down
1 change: 1 addition & 0 deletions pilot/scripts/open_remote_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def interrupt(args, signum, frame):
args = get_args()
args.debug = True
args.nopilotlog = False
args.signal = None

try:
logname = config.Pilot.remotefileverification_log
Expand Down
7 changes: 4 additions & 3 deletions pilot/user/atlas/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,17 +878,18 @@ def filter_output(stdout):

# copy pilot source into container directory, unless it is already there
setup = get_asetup(asetup=False) + 'lsetup cpu_flags; '
script = 'cpu_arch.py --alg gcc'
# script = 'cpu_arch.py --alg gcc'
script = 'cpu_arch.py'
cmd = setup + script

# CPU arch script has now been copied, time to execute it
ec, stdout, stderr = execute(cmd)
if ec or stderr:
logger.debug(f'ec={ec}, stdout={stdout}, stderr={stderr}')
logger.warning(f'ec={ec}, stdout={stdout}, stderr={stderr}')
else:
logger.debug(stdout)
stdout = filter_output(stdout)
cpu_arch = stdout
logger.debug(f'CPU arch script returned: {cpu_arch}')
logger.info(f'CPU arch script returned: {cpu_arch}')

return cpu_arch
Loading

0 comments on commit f026dad

Please sign in to comment.