Skip to content

Commit

Permalink
Merge pull request #77 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.5.1.17
  • Loading branch information
PalNilsson authored Mar 14, 2023
2 parents 23a777a + 0c469ee commit 1d6b69a
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 41 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.0.31
3.5.1.17
60 changes: 58 additions & 2 deletions pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pilot.util.config import config
from pilot.util.constants import MAX_KILL_WAIT_TIME
# from pilot.util.container import execute
from pilot.util.features import MachineFeatures
from pilot.util.queuehandling import get_queuedata_from_job, abort_jobs_in_queues
from pilot.util.timing import get_time_since_start

Expand All @@ -31,7 +32,7 @@

# Monitoring of threads functions

def control(queues, traces, args):
def control(queues, traces, args): # noqa: C901
"""
Main control function, run from the relevant workflow module.
Expand All @@ -50,6 +51,7 @@ def control(queues, traces, args):
# for CPU usage debugging
cpuchecktime = int(config.Pilot.cpu_check)
tcpu = t_0
last_minute_check = t_0

queuedata = get_queuedata_from_job(queues)
max_running_time = get_max_running_time(args.lifetime, queuedata)
Expand All @@ -59,7 +61,7 @@ def control(queues, traces, args):
niter = 0

while not args.graceful_stop.is_set():
# every seconds, run the monitoring checks
# every few seconds, run the monitoring checks
if args.graceful_stop.wait(1) or args.graceful_stop.is_set():
logger.warning('aborting monitor loop since graceful_stop has been set')
break
Expand All @@ -83,6 +85,16 @@ def control(queues, traces, args):
else:
if niter % 60 == 0:
logger.info(f'{time_since_start}s have passed since pilot start')

# every minute run the following check
if time.time() - last_minute_check > 60:
reached_maxtime = run_shutdowntime_minute_check(time_since_start)
if reached_maxtime:
reached_maxtime_abort(args)
break
last_minute_check = time.time()

# take a nap
time.sleep(1)

# time to check the CPU usage?
Expand Down Expand Up @@ -116,6 +128,50 @@ def control(queues, traces, args):
logger.info('[monitor] control thread has ended')


def run_shutdowntime_minute_check(time_since_start):
"""
Run checks on machine features shutdowntime once a minute.
:param time_since_start: how many seconds have lapsed since the pilot started (int).
:return: True if reached max time, False it not (or if shutdowntime not known) (Boolean).
"""

# check machine features if present for shutdowntime
machinefeatures = MachineFeatures().get()
if machinefeatures:
grace_time = 10 * 60
try:
now = int(time.time())
except (TypeError, ValueError) as exc:
logger.warning(f'failed to read current time: {exc}')
return False # will be ignored

# ignore shutdowntime if not known
try:
shutdowntime = int(machinefeatures.get('shutdowntime'))
except (TypeError, ValueError) as exc:
logger.debug(f'failed to convert shutdowntime: {exc}')
return False # will be ignored
logger.debug(f'machinefeatures shutdowntime={shutdowntime} - now={now}')
if not shutdowntime:
logger.debug('ignoring shutdowntime since it is not set')
return False # will be ignored

# ignore shutdowntime if in the past (= set before the pilot started)
if shutdowntime < (now - time_since_start):
logger.debug(f'shutdowntime ({shutdowntime}) was set before pilot started - ignore it '
f'(now - time since start = {now - time_since_start})')
return False # will be ignored

# did we pass, or are we close to the shutdowntime?
if now > shutdowntime - grace_time:
logger.fatal(f'now={now}s - shutdowntime ({shutdowntime}s) minus grace time ({grace_time}s) has been '
f'exceeded - time to abort pilot')
return True

return False


def reached_maxtime_abort(args):
"""
Max time has been reached, set REACHED_MAXTIME and graceful_stop, close any ActiveMQ connections.
Expand Down
13 changes: 7 additions & 6 deletions pilot/copytool/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2021
# - Paul Nilsson, [email protected], 2021-2023
# - Shuwei

import os
Expand All @@ -18,7 +18,10 @@
try:
from google.cloud import storage
except Exception:
pass
storage_client = storage.Client()
else:
storage_client = None

try:
import pathlib # Python 3
except Exception:
Expand Down Expand Up @@ -127,10 +130,9 @@ def download_file(path, surl, object_name=None):
object_name = os.path.basename(path)

try:
client = storage.Client()
target = pathlib.Path(object_name)
with target.open(mode="wb") as downloaded_file:
client.download_blob_to_file(surl, downloaded_file)
storage_client.download_blob_to_file(surl, downloaded_file)
except Exception as error:
diagnostics = 'exception caught in gs client: %s' % error
logger.critical(diagnostics)
Expand Down Expand Up @@ -234,8 +236,7 @@ def upload_file(file_name, bucket, object_name=None, content_type=None):

# upload the file
try:
client = storage.Client()
gs_bucket = client.get_bucket(bucket)
gs_bucket = storage_client.get_bucket(bucket)
# remove any leading slash(es) in object_name
object_name = object_name.lstrip('/')
logger.info('uploading a file to bucket=%s in full path=%s in content_type=%s', bucket, object_name, content_type)
Expand Down
14 changes: 13 additions & 1 deletion pilot/user/atlas/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def open_remote_files(indata, workdir, nthreads):
cmd = create_root_container_command(workdir, _cmd)

show_memory_usage()
timeout = len(indata) * 30 + 600
timeout = get_timeout_for_remoteio(indata)
logger.info('executing file open verification script (timeout=%d):\n\n\'%s\'\n\n', timeout, cmd)

exitcode, stdout, stderr = execute(cmd, usecontainer=False, timeout=timeout)
Expand Down Expand Up @@ -252,6 +252,18 @@ def open_remote_files(indata, workdir, nthreads):
return exitcode, diagnostics, not_opened


def get_timeout_for_remoteio(indata):
"""
Calculate a proper timeout to be used for remote i/o files.
:param indata: indata object.
:return: timeout in seconds (int).
"""

remote_io = [fspec.status == 'remote_io' for fspec in indata]
return len(remote_io) * 30 + 600


def parse_remotefileverification_dictionary(workdir):
"""
Verify that all files could be remotely opened.
Expand Down
2 changes: 1 addition & 1 deletion pilot/user/atlas/jobmetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def add_sub_features(job_metrics, features_dic, add=[]):
# correct hs06 for corecount: hs06*perf_scale/total_cpu*corecount
hs06 = machinefeatures.get('hs06', 0)
total_cpu = machinefeatures.get('total_cpu', 0)
if hs06 and total_cpu:
if hs06 and total_cpu and (total_cpu != '0' or total_cpu != 0):
perf_scale = 1
try:
machinefeatures_hs06 = 1.0 * int(float(hs06)) * perf_scale * corecount / (1.0 * int(float(total_cpu)))
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '5' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '31' # build number should be reset to '1' for every new development cycle
REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '17' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
6 changes: 5 additions & 1 deletion pilot/util/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ def set(self, path, label):
if path and os.path.exists(path):
data_members = self.get_data_members()
for member in data_members:
# ignore if file doesn't exist
filename = os.path.join(path, member)
if not os.path.exists(filename):
continue
try:
value = read_file(os.path.join(path, member))
value = read_file(filename)
except FileHandlingFailure as exc:
logger.warning(f'failed to process {member}: {exc}')
value = None
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/jobmetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2018-2022
# - Paul Nilsson, [email protected], 2018-2023

from os import environ

Expand All @@ -16,7 +16,7 @@
def get_job_metrics_entry(name, value):
"""
Get a formatted job metrics entry.
Return a a job metrics substring with the format 'name=value ' (return empty entry if value is not set).
Return a job metrics substring with the format 'name=value ' (return empty entry if value is not set).
:param name: job metrics parameter name (string).
:param value: job metrics parameter value (string).
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def get_time_since(job_id, timing_constant, args):
if time_measurement_dictionary:
time_measurement = get_time_measurement(timing_constant, time_measurement_dictionary, args.timing)
if time_measurement:
diff = time.time() - time_measurement
diff = int(time.time() - time_measurement)
else:
logger.warning(f'failed to extract time measurement dictionary from {args.timing}')
else:
Expand Down
86 changes: 65 additions & 21 deletions pilot/util/workernode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Authors:
# - Paul Nilsson, [email protected], 2017-2022
# - Paul Nilsson, [email protected], 2017-2023

import os
import re
import logging
from shutil import which

#from subprocess import getoutput

Expand Down Expand Up @@ -253,6 +254,27 @@ def get_cpu_model():
return modelstring


def lscpu():
"""
Execute lscpu command.
:return: exit code (int), stdout (string).
"""

cmd = 'lscpu'
if not which(cmd):
logger.warning('command={cmd} does not exist - cannot check number of available cores')
return 1, ""

ec, stdout, _ = execute(cmd)
if isinstance(stdout, bytes):
stdout = stdout.decode("utf-8")

logger.debug(f'lscpu:\n{stdout}')

return ec, stdout


def get_cpu_cores(modelstring):
"""
Get core count from /proc/cpuinfo and update modelstring (CPU model).
Expand All @@ -263,29 +285,51 @@ def get_cpu_cores(modelstring):
"""

number_of_cores = 0
re_cores = re.compile(r'^cpu cores\s+:\s+(\d+)')

with open("/proc/cpuinfo", "r") as _fp:
ec, stdout = lscpu()
if ec:
return modelstring

# loop over all lines in cpuinfo
for line in _fp.readlines():
cores_per_socket = 0
sockets = 0
for line in stdout.split('\n'):

# try to grab core count from current line
cores = re_cores.search(line)
if cores:
# found core count
try:
number_of_cores += int(cores.group(1))
except Exception:
pass

if number_of_cores > 0 and '-Core' not in modelstring:
if 'Core Processor' in modelstring:
modelstring = modelstring.replace('Core', '%d-Core' % number_of_cores)
elif 'Processor' in modelstring:
modelstring = modelstring.replace('Processor', '%d-Core Processor' % number_of_cores)
else:
modelstring += ' %d-Core Processor'
try:
pattern = r'Core\(s\)\ per\ socket\:\ +(\d+)'
_cores = re.findall(pattern, line)
if _cores:
cores_per_socket = int(_cores[0])
continue
except Exception as exc:
logger.warning(f'exception caught: {exc}')

try:
pattern = r'Socket\(s\)\:\ +(\d+)'
_sockets = re.findall(pattern, line)
if _sockets:
sockets = int(_sockets[0])
break
except Exception as exc:
logger.warning(f'exception caught: {exc}')

if cores_per_socket and sockets:
number_of_cores = cores_per_socket * sockets
logger.info(f'found {number_of_cores} cores ({cores_per_socket} cores per socket, {sockets} sockets)')

logger.debug(f'current model string: {modelstring}')
if number_of_cores > 0 and '-Core' not in modelstring:
if '-Core Processor' in modelstring: # NN-Core info already in string - update it
pattern = r'(\d+)\-Core Processor'
_nn = re.findall(pattern, modelstring)
if _nn:
modelstring = modelstring.replace(f'{_nn[0]}-Core', f'{number_of_cores}-Core')
if 'Core Processor' in modelstring:
modelstring = modelstring.replace('Core', '%d-Core' % number_of_cores)
elif 'Processor' in modelstring:
modelstring = modelstring.replace('Processor', '%d-Core Processor' % number_of_cores)
else:
modelstring += ' %d-Core Processor' % number_of_cores
logger.debug(f'updated model string: {modelstring}')

return modelstring

Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# config file for pypi
[metadata]
description-file = README.md
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
# Authors:
# - Fernando Barreiro Megino, [email protected], 2019
# - Paul Nilsson, [email protected], 2019-2021
# - Paul Nilsson, [email protected], 2019-2023
import sys

from setuptools import setup, find_packages
Expand All @@ -26,10 +26,11 @@
author='PanDA Team',
author_email='[email protected]',
url='https://github.com/PanDAWMS/pilot3/wiki',
python_requires='>=2.7',
python_requires='>=3.6',
packages=find_packages(),
install_requires=[],
data_files=[],
package_data={'': ['PILOTVERSION']},
include_package_data=True,
scripts=['pilot.py']
scripts=[]
)

0 comments on commit 1d6b69a

Please sign in to comment.