From 280d47f9eb8dd8eaf0ed860f35eb7354fdd14f51 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 19 Nov 2024 15:13:03 +0100 Subject: [PATCH 01/37] Added function to find lingering methods --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/psutils.py | 20 ++++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 2fb403f4..c0387ef0 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.23b \ No newline at end of file +3.9.2.23 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 1334b93b..2f0ae675 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '23' # build number should be reset to '1' for every new development cycle +BUILD = '24' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index ba18e1b1..dbd4fbbd 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -321,3 +321,23 @@ def find_actual_payload_pid(bash_pid: int, payload_cmd: str) -> int or None: logger.warning(f'could not find payload PID for bash PID {bash_pid}') return None + + +def find_lingering_processes(parent_pid: int) -> list: + """ + Find processes that are still running after the specified parent process has terminated. + + :param parent_pid: The PID of the parent process (int) + :return: A list of lingering process PIDs (list). + """ + if not _is_psutil_available: + logger.warning('psutil not available, cannot find lingering processes - aborting') + return [] + + lingering_processes = [] + parent_process = psutil.Process(parent_pid) + for child in parent_process.children(recursive=True): + if child.status() != psutil.STATUS_ZOMBIE: + lingering_processes.append(child.pid) + + return lingering_processes From e36e57a6d6529b921fe592e3c8a1da613e281110 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 19 Nov 2024 15:27:15 +0100 Subject: [PATCH 02/37] Added list_items() --- pilot/util/auxiliary.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index ddcd8547..9f687e79 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -839,3 +839,13 @@ def uuidgen_t() -> str: :return: A UUID in the format "00000000-0000-0000-0000-000000000000" (str). """ return str(uuid4()) + + +def list_items(items: list): + """ + List the items in the given list as a numbered list. + + :param items: list of items (list) + """ + for i, item in enumerate(items): + logger.info(f'{i + 1}: {item}') From df8ed021a5ab778d8e80a9418ec341a5dbc1a86a Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 19 Nov 2024 15:34:49 +0100 Subject: [PATCH 03/37] Looking for lingering processes after payload has finished --- pilot/control/payloads/generic.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index 056d2c32..2cde9668 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -36,7 +36,10 @@ from pilot.common.errorcodes import ErrorCodes from pilot.control.job import send_state from pilot.info import JobData -from pilot.util.auxiliary import set_pilot_state # , show_memory_usage +from pilot.util.auxiliary import ( + set_pilot_state, # , show_memory_usage + list_items +) from pilot.util.config import config from pilot.util.container import execute from pilot.util.constants import ( @@ -56,6 +59,7 @@ read_file ) from pilot.util.processes import kill_processes +from pilot.util.psutils import find_lingering_processes from pilot.util.timing import ( add_to_pilot_timing, get_time_measurement @@ -982,6 +986,16 @@ def run(self) -> tuple[int, str]: # noqa: C901 if self.__job.utilities != {}: self.stop_utilities() + # make sure there are no lingering processes + items = find_lingering_processes() + if items: + list_items() + #logger.warning(f"found lingering processes: {items}") + #for item in items: + # kill_processes(item) + else: + logger.info("found no lingering processes") + if self.__job.is_hpo and state != "failed": # in case there are more hyper-parameter points, move away the previous log files # self.rename_log_files(iteration) From 1c685e95ef2513e8628547adcfe76298af37b4d3 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 19 Nov 2024 16:03:12 +0100 Subject: [PATCH 04/37] Looking for lingering processes after payload has finished --- PILOTVERSION | 2 +- pilot/control/payloads/generic.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index c0387ef0..8ecb8303 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.23 \ No newline at end of file +3.9.2.24 \ No newline at end of file diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index 2cde9668..cc75891d 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -987,7 +987,7 @@ def run(self) -> tuple[int, str]: # noqa: C901 self.stop_utilities() # make sure there are no lingering processes - items = find_lingering_processes() + items = find_lingering_processes(os.getpid()) if items: list_items() #logger.warning(f"found lingering processes: {items}") From c54f6c1303a6b146637f56e15dd64b2aa9b0b290 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 19 Nov 2024 18:38:13 +0100 Subject: [PATCH 05/37] Testing lingering processes --- PILOTVERSION | 2 +- pilot/control/payloads/generic.py | 14 +++++++++++++- pilot/util/constants.py | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 8ecb8303..e8234382 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.24 \ No newline at end of file +3.9.2.25 \ No newline at end of file diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index cc75891d..b20f621f 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -956,6 +956,17 @@ def run(self) -> tuple[int, str]: # noqa: C901 f"\n\nfinished pid={proc.pid} exit_code={exit_code} state={self.__job.state}\n" ) + # make sure there are no lingering processes + items = find_lingering_processes(os.getpid()) + if items: + logger.warning("found lingering processes") + list_items() + #logger.warning(f"found lingering processes: {items}") + #for item in items: + # kill_processes(item) + else: + logger.info("(1/2) found no lingering processes") + # stop the utility command (e.g. a coprocess if necessary if proc_co: logger.debug(f"stopping utility command: {utility_cmd}") @@ -989,12 +1000,13 @@ def run(self) -> tuple[int, str]: # noqa: C901 # make sure there are no lingering processes items = find_lingering_processes(os.getpid()) if items: + logger.warning("found lingering processes") list_items() #logger.warning(f"found lingering processes: {items}") #for item in items: # kill_processes(item) else: - logger.info("found no lingering processes") + logger.info("(2/2) found no lingering processes") if self.__job.is_hpo and state != "failed": # in case there are more hyper-parameter points, move away the previous log files diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 2f0ae675..480c137a 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '24' # build number should be reset to '1' for every new development cycle +BUILD = '25' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From b98e6857f71c14055ef6259961c6a04c36f822cc Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 20 Nov 2024 10:04:28 +0100 Subject: [PATCH 06/37] Added support for curlgetjob catchall --- pilot/control/job.py | 9 ++++++--- pilot/util/constants.py | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pilot/control/job.py b/pilot/control/job.py index 6c8c4c28..527815ae 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -1684,10 +1684,13 @@ def get_job_definition_from_server(args: Any, taskid: str = "") -> str: cmd = https.get_server_command(args.url, args.port) if cmd != "": logger.info(f'executing server command: {cmd}') - res = https.request2(cmd, data=data, panda=True) # will be a dictionary - logger.debug(f"request2 response: {res}") # should be StatusCode=0 if all is ok - if not res: # fallback to curl solution + if "curlgetjob" in infosys.queuedata.catchall: res = https.request(cmd, data=data) + else: + res = https.request2(cmd, data=data, panda=True) # will be a dictionary + logger.debug(f"request2 response: {res}") # should be StatusCode=0 if all is ok + if not res: # fallback to curl solution + res = https.request(cmd, data=data) return res diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 480c137a..0b997dc5 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '25' # build number should be reset to '1' for every new development cycle +BUILD = '26' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 14a4c67957eca4dce184ca7e93a51fcee060aeea Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 20 Nov 2024 10:17:00 +0100 Subject: [PATCH 07/37] Added missing items --- PILOTVERSION | 2 +- pilot/control/payloads/generic.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index e8234382..8bde6e70 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.25 \ No newline at end of file +3.9.2.26 \ No newline at end of file diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index b20f621f..df62bf6c 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -960,7 +960,7 @@ def run(self) -> tuple[int, str]: # noqa: C901 items = find_lingering_processes(os.getpid()) if items: logger.warning("found lingering processes") - list_items() + list_items(items) #logger.warning(f"found lingering processes: {items}") #for item in items: # kill_processes(item) @@ -1001,7 +1001,7 @@ def run(self) -> tuple[int, str]: # noqa: C901 items = find_lingering_processes(os.getpid()) if items: logger.warning("found lingering processes") - list_items() + list_items(items) #logger.warning(f"found lingering processes: {items}") #for item in items: # kill_processes(item) From 1ddfc3d588a8ef3768db1a8355d9effe72949d96 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Thu, 21 Nov 2024 15:24:17 +0100 Subject: [PATCH 08/37] Skipping sending X509 for tokens --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/https.py | 9 +++------ 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 8bde6e70..ae44b459 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.26 \ No newline at end of file +3.9.2.27 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 0b997dc5..fc2e22d3 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '26' # build number should be reset to '1' for every new development cycle +BUILD = '27' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/https.py b/pilot/util/https.py index 9b483e9a..16b25499 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -799,11 +799,7 @@ def _create_connection(self, host, port=None, timeout=socket._GLOBAL_DEFAULT_TIM return socket.create_connection((host, port), timeout, source_address, family=socket.AF_INET) -def request2(url: str = "", - data: dict = None, - secure: bool = True, - compressed: bool = True, - panda: bool = False) -> str or dict: +def request2(url: str = "", data: dict = None, secure: bool = True, compressed: bool = True, panda: bool = False) -> str or dict: # noqa: C901 """ Send a request using HTTPS (using urllib module). @@ -851,7 +847,8 @@ def request2(url: str = "", # create a context with certificate verification ssl_context = get_ssl_context() #ssl_context.verify_mode = ssl.CERT_REQUIRED - ssl_context.load_cert_chain(certfile=_ctx.cacert, keyfile=_ctx.cacert) + if not use_oidc_token: + ssl_context.load_cert_chain(certfile=_ctx.cacert, keyfile=_ctx.cacert) if not secure: ssl_context.verify_mode = False From aaee5195b0c47fbb59a25f5d84a9e012c4fd48a7 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 22 Nov 2024 11:52:13 +0100 Subject: [PATCH 09/37] Added threashold to CPU consumption time --- pilot/util/processes.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pilot/util/processes.py b/pilot/util/processes.py index b1c16536..3bf4bdfb 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -582,6 +582,10 @@ def get_instant_cpu_consumption_time(pid: int) -> float: if utime and stime and cutime and cstime: # sum up all the user+system times for both the main process (pid) and the child processes cpu_consumption_time = utime + stime + cutime + cstime + max_threshold = 1e6 + if cpu_consumption_time > max_threshold: + logger.warning(f'CPU consumption time={cpu_consumption_time} exceeds sanity threshold={max_threshold}') + cpu_consumption_time = 0.0 else: cpu_consumption_time = 0.0 From e2935c1052b0cae4930b5822bb3b2ffc3a3d974b Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 22 Nov 2024 11:55:07 +0100 Subject: [PATCH 10/37] Added threashold to CPU consumption time, plus debug info in case of trouble --- pilot/util/processes.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 3bf4bdfb..495f0507 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -574,7 +574,8 @@ def get_instant_cpu_consumption_time(pid: int) -> float: if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as fp: - fields = fp.read().split(' ')[13:17] + _read = fp.read() + fields = _read.split(' ')[13:17] utime, stime, cutime, cstime = [(float(f) / hz) for f in fields] except IOError as exc: logger.warning(f'exception caught: {exc} (ignored)') @@ -585,6 +586,8 @@ def get_instant_cpu_consumption_time(pid: int) -> float: max_threshold = 1e6 if cpu_consumption_time > max_threshold: logger.warning(f'CPU consumption time={cpu_consumption_time} exceeds sanity threshold={max_threshold}') + logger.warning(f"utime={utime} stime={stime} cutime={cutime} cstime={cstime} hz={hz}") + logger.warning(f"fp.read()={_read}") cpu_consumption_time = 0.0 else: cpu_consumption_time = 0.0 From 321d90f19f13c188bd2674ca509b44d009a3c771 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 22 Nov 2024 12:06:59 +0100 Subject: [PATCH 11/37] Now aborting monitoring loop if CPU consumption time is 0 --- pilot/util/monitoring.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index bbd64286..1f11ee14 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -121,6 +121,9 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') elif _cpuconsumptiontime == -1: logger.warning('could not get CPU consumption time') + elif _cpuconsumptiontime == 0.0: + logger.warning(f'process {job.pid} can no longer be monitored (due to stat problems) - aborting') + return 0, "" else: logger.warning(f'process {job.pid} is no longer using CPU - aborting') return 0, "" From a1fe2c0a14a870a5c7f5b5326cee4de95bc9d758 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 22 Nov 2024 14:07:02 +0100 Subject: [PATCH 12/37] Fixed time-outs --- PILOTVERSION | 2 +- pilot/user/atlas/container.py | 112 ++++++++++++++++++++++++++++++++++ pilot/util/constants.py | 2 +- 3 files changed, 114 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index ae44b459..c59294e0 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.27 \ No newline at end of file +3.9.2.29 \ No newline at end of file diff --git a/pilot/user/atlas/container.py b/pilot/user/atlas/container.py index adc7de77..9e48760e 100644 --- a/pilot/user/atlas/container.py +++ b/pilot/user/atlas/container.py @@ -27,6 +27,7 @@ import logging import os import re +import select import shlex import subprocess import time @@ -838,6 +839,117 @@ def execute_remote_file_open(path: str, python_script_timeout: int) -> tuple[int python_completed = False # Flag to track completion of 'python3' process lsetup_completed_at = None + with open(new_path, "w", encoding='utf-8') as file: + while True: + # Check for timeout (once per second) + if time.time() - start_time > lsetup_timeout and not lsetup_completed: + logger.warning("timeout for 'lsetup' exceeded - killing script") + exit_code = 2 # 'lsetup' timeout + process.kill() + break + + # Use select to check if there is data to read (to byspass any blocking operation that will prevent time-out checks) + ready, _, _ = select.select([process.stdout], [], [], 1.0) + if ready: + output = process.stdout.readline() # Read bytes directly + if output: + output = output.decode().strip() + file.write(output + "\n") + stdout += output + "\n" + + # Check for LSETUP_COMPLETED message + if output == "LSETUP_COMPLETED": + lsetup_completed = True + lsetup_completed_at = time.time() + start_time = time.time() # Reset start time for 'python3' timeout + + # Check for PYTHON_COMPLETED message + if "PYTHON_COMPLETED" in output: + python_completed = True + match = re.search(r"\d+$", output) + if match: + exit_code = int(match.group()) + logger.info(f"python remote open command has completed with exit code {exit_code}") + else: + logger.info("python remote open command has completed without any exit code") + break + + # Timeout for python script after LSETUP_COMPLETED + if lsetup_completed and ((time.time() - lsetup_completed_at) > python_script_timeout): + logger.warning(f"(1) timeout for 'python3' subscript exceeded - killing script " + f"({time.time()} - {lsetup_completed_at} > {python_script_timeout})") + exit_code = 3 + process.kill() + break + + # Timeout for python script after LSETUP_COMPLETED + if lsetup_completed and ((time.time() - start_time) > python_script_timeout): + logger.warning(f"(2) timeout for 'python3' subscript exceeded - killing script " + f"({time.time()} - {start_time} > {python_script_timeout})") + exit_code = 3 + process.kill() + break + + if python_completed: + logger.info('aborting since python command has finished') + return_code = process.poll() + if return_code: + logger.warning(f"script execution completed with return code: {return_code}") + # exit_code = return_code + break + + # Check if script has completed normally + return_code = process.poll() + if return_code is not None: + pass + # logger.info(f"script execution completed with return code: {return_code}") + # exit_code = return_code + # break + + time.sleep(0.5) + + # Ensure process is terminated + if process.poll() is None: + process.terminate() + + # Check if 'lsetup' was completed + lsetup_time = int(lsetup_completed_at - lsetup_start_time) if lsetup_completed_at else 0 + + return exit_code, stdout, lsetup_time + + +def execute_remote_file_open_old(path: str, python_script_timeout: int) -> tuple[int, str, int]: # noqa: C901 + """ + Execute the remote file open script. + + :param path: path to container script (str) + :param python_script_timeout: timeout (int) + :return: exit code (int), stdout (str), lsetup time (int) (tuple). + """ + lsetup_timeout = 600 # Timeout for 'lsetup' step + exit_code = 1 + stdout = "" + + # Start the Bash script process with non-blocking I/O + try: + process = subprocess.Popen(["bash", path], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0) + fcntl.fcntl(process.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) # Set non-blocking + except OSError as e: + logger.warning(f"error starting subprocess: {e}") + return exit_code, "", 0 + + # Split the path at the last dot + filename, _ = path.rsplit(".", 1) + + # Create the new path with the desired suffix + new_path = f"{filename}.stdout" + + start_time = time.time() # Track execution start time + lsetup_start_time = start_time + lsetup_completed = False # Flag to track completion of 'lsetup' process + python_completed = False # Flag to track completion of 'python3' process + lsetup_completed_at = None + with open(new_path, "w", encoding='utf-8') as file: while True: # Check for timeout (once per second) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index fc2e22d3..9987fbc8 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '27' # build number should be reset to '1' for every new development cycle +BUILD = '29' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 4c6a70f3a13895068b771978f3caefe74c04d6c1 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 22 Nov 2024 14:08:07 +0100 Subject: [PATCH 13/37] CPU consumption time reset to 1.0 in case of stat problem --- pilot/util/processes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 495f0507..07dd2880 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -585,10 +585,10 @@ def get_instant_cpu_consumption_time(pid: int) -> float: cpu_consumption_time = utime + stime + cutime + cstime max_threshold = 1e6 if cpu_consumption_time > max_threshold: - logger.warning(f'CPU consumption time={cpu_consumption_time} exceeds sanity threshold={max_threshold}') + logger.warning(f'CPU consumption time={cpu_consumption_time} exceeds sanity threshold={max_threshold} (reset to 1.0)') logger.warning(f"utime={utime} stime={stime} cutime={cutime} cstime={cstime} hz={hz}") logger.warning(f"fp.read()={_read}") - cpu_consumption_time = 0.0 + cpu_consumption_time = 1.0 else: cpu_consumption_time = 0.0 From 20ac67c58cfb6b2845c1d6dbe8b5aa68ded0cbf0 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 22 Nov 2024 18:07:25 +0100 Subject: [PATCH 14/37] Added more debug info --- pilot/util/processes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 07dd2880..01d9aab6 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -46,7 +46,7 @@ def find_processes_in_group(cpids: list, pid: int, ps_cache: str = ""): """ Find all processes that belong to the same group using the given ps command output. - Recursively search for the children processes belonging to pid and return their pid's. + Search for the children processes belonging to pid and return their pid's. pid is the parent pid and cpids is a list that has to be initialized before calling this function and it contains the pids of the children AND the parent. @@ -585,7 +585,7 @@ def get_instant_cpu_consumption_time(pid: int) -> float: cpu_consumption_time = utime + stime + cutime + cstime max_threshold = 1e6 if cpu_consumption_time > max_threshold: - logger.warning(f'CPU consumption time={cpu_consumption_time} exceeds sanity threshold={max_threshold} (reset to 1.0)') + logger.warning(f'CPU consumption time={cpu_consumption_time} for pid={pid} exceeds sanity threshold={max_threshold} (reset to 1.0)') logger.warning(f"utime={utime} stime={stime} cutime={cutime} cstime={cstime} hz={hz}") logger.warning(f"fp.read()={_read}") cpu_consumption_time = 1.0 From 5113a065f0aae1282c3a39c209ba85d885073d0a Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 22 Nov 2024 18:07:41 +0100 Subject: [PATCH 15/37] Bypassed pylint complaint --- pilot/api/data.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pilot/api/data.py b/pilot/api/data.py index dc0e8764..61600cc2 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -1072,7 +1072,7 @@ class StageOutClient(StagingClient): mode = "stage-out" - def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = []) -> list: + def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = None) -> list: """ Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`. @@ -1083,6 +1083,9 @@ def prepare_destinations(self, files: list, activities: list or str, alt_exclude :param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out :return: updated fspec entries (list). """ + if alt_exclude is None: # to bypass pylint complaint if declared as [] above + alt_exclude = [] + if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do return files From 6503d2e084154cb2b907893fda208b302758c33e Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 25 Nov 2024 11:28:57 +0100 Subject: [PATCH 16/37] Reverted cpu monitoring, removed threshold which is useless --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 4 +++- pilot/util/processes.py | 10 ++-------- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index c59294e0..13e05fd4 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.29 \ No newline at end of file +3.9.2.30 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 9987fbc8..6baeea60 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '29' # build number should be reset to '1' for every new development cycle +BUILD = '30' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 1f11ee14..56271db0 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -116,12 +116,14 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i else: _cpuconsumptiontime = int(round(cpuconsumptiontime)) if _cpuconsumptiontime > 0: + # make sure there are no sudden jumps in the cpuconsumptiontime + #factor = _cpuconsumptiontime / job.cpuconsumptiontime job.cpuconsumptiontime = int(round(cpuconsumptiontime)) job.cpuconversionfactor = 1.0 logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') elif _cpuconsumptiontime == -1: logger.warning('could not get CPU consumption time') - elif _cpuconsumptiontime == 0.0: + elif _cpuconsumptiontime == 0: logger.warning(f'process {job.pid} can no longer be monitored (due to stat problems) - aborting') return 0, "" else: diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 01d9aab6..0d2fc252 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -552,6 +552,7 @@ def get_cpu_consumption_time(t0: tuple) -> float: def get_instant_cpu_consumption_time(pid: int) -> float: """ Return the CPU consumption time (system+user time) for a given process, by parsing /prod/pid/stat. + Note 1: the function returns 0.0 if the pid is not set. Note 2: the function must sum up all the user+system times for both the main process (pid) and the child processes, since the main process is most likely spawning new processes. @@ -574,8 +575,7 @@ def get_instant_cpu_consumption_time(pid: int) -> float: if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as fp: - _read = fp.read() - fields = _read.split(' ')[13:17] + fields = fp.read().split(' ')[13:17] utime, stime, cutime, cstime = [(float(f) / hz) for f in fields] except IOError as exc: logger.warning(f'exception caught: {exc} (ignored)') @@ -583,12 +583,6 @@ def get_instant_cpu_consumption_time(pid: int) -> float: if utime and stime and cutime and cstime: # sum up all the user+system times for both the main process (pid) and the child processes cpu_consumption_time = utime + stime + cutime + cstime - max_threshold = 1e6 - if cpu_consumption_time > max_threshold: - logger.warning(f'CPU consumption time={cpu_consumption_time} for pid={pid} exceeds sanity threshold={max_threshold} (reset to 1.0)') - logger.warning(f"utime={utime} stime={stime} cutime={cutime} cstime={cstime} hz={hz}") - logger.warning(f"fp.read()={_read}") - cpu_consumption_time = 1.0 else: cpu_consumption_time = 0.0 From 8e571ff832d33f6d82e187187550c0d7f8744d39 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 25 Nov 2024 13:17:53 +0100 Subject: [PATCH 17/37] Added function for checking CPU load --- pilot/util/psutils.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index dbd4fbbd..837f2e94 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -341,3 +341,24 @@ def find_lingering_processes(parent_pid: int) -> list: lingering_processes.append(child.pid) return lingering_processes + + +def check_cpu_load(): + """ + Check if the system is under heavy CPU load. + + High CPU load is here defined to be above 80%. + + :return: True (system is under heavy CPU load), False (system load is normal). + """ + if not _is_psutil_available: + logger.warning('psutil not available, cannot check CPU load (pretending it is normal)') + return False + + cpu_percent = psutil.cpu_percent(interval=1) + if cpu_percent > 80: + logger.info("system is under heavy CPU load") + return True + else: + logger.info("system load is normal") + return False From 8a5c87204fcee04ef2717276b9090ae976d24cf7 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 25 Nov 2024 13:54:55 +0100 Subject: [PATCH 18/37] Verifying the increase of CPU consumption --- pilot/util/monitoring.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 56271db0..9d1a3944 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -61,6 +61,7 @@ ) from pilot.util.psutils import ( is_process_running, + check_cpu_load, get_pid, get_subprocesses, find_actual_payload_pid @@ -116,9 +117,19 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i else: _cpuconsumptiontime = int(round(cpuconsumptiontime)) if _cpuconsumptiontime > 0: - # make sure there are no sudden jumps in the cpuconsumptiontime - #factor = _cpuconsumptiontime / job.cpuconsumptiontime - job.cpuconsumptiontime = int(round(cpuconsumptiontime)) + if job.cpuconsumptiontime == -1: # no time set yet so just proceed + job.cpuconsumptiontime = _cpuconsumptiontime + else: + # make sure there are no sudden jumps in the cpuconsumptiontime + increase_factor = _cpuconsumptiontime / job.cpuconsumptiontime if job.cpuconsumptiontime > 0 else 1 + high_cpu_load = check_cpu_load() + factor = 10 if high_cpu_load else 5 + if increase_factor > factor: + logger.warning(f'CPU consumption time increased by a factor of {increase_factor} (over the limit of {factor})') + logger.warning(f"will not consider the new value: {_cpuconsumptiontime}") + else: + logger.debug(f'CPU consumption time increased by a factor of {increase_factor} (below the limit of {factor})') + job.cpuconsumptiontime = _cpuconsumptiontime job.cpuconversionfactor = 1.0 logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') elif _cpuconsumptiontime == -1: From 3b915af96901ad22c14be22be684bf1937e0907d Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 25 Nov 2024 14:11:31 +0100 Subject: [PATCH 19/37] Refarctored job_monitoring_tasks() --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 80 ++++++++++++++++++++++++---------------- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 13e05fd4..46be5578 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.30 \ No newline at end of file +3.9.2.31 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 6baeea60..4f302811 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '30' # build number should be reset to '1' for every new development cycle +BUILD = '31' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 9d1a3944..2ca15413 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -107,39 +107,10 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i # confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU) check_hz() - try: - cpuconsumptiontime = get_current_cpu_consumption_time(job.pid) - except Exception as error: - diagnostics = f"Exception caught: {error}" - logger.warning(diagnostics) - exit_code = get_exception_error_code(diagnostics) + # set the CPU consumption time for the job + exit_code, diagnostics = set_cpu_consumption_time(job) + if exit_code: return exit_code, diagnostics - else: - _cpuconsumptiontime = int(round(cpuconsumptiontime)) - if _cpuconsumptiontime > 0: - if job.cpuconsumptiontime == -1: # no time set yet so just proceed - job.cpuconsumptiontime = _cpuconsumptiontime - else: - # make sure there are no sudden jumps in the cpuconsumptiontime - increase_factor = _cpuconsumptiontime / job.cpuconsumptiontime if job.cpuconsumptiontime > 0 else 1 - high_cpu_load = check_cpu_load() - factor = 10 if high_cpu_load else 5 - if increase_factor > factor: - logger.warning(f'CPU consumption time increased by a factor of {increase_factor} (over the limit of {factor})') - logger.warning(f"will not consider the new value: {_cpuconsumptiontime}") - else: - logger.debug(f'CPU consumption time increased by a factor of {increase_factor} (below the limit of {factor})') - job.cpuconsumptiontime = _cpuconsumptiontime - job.cpuconversionfactor = 1.0 - logger.info(f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') - elif _cpuconsumptiontime == -1: - logger.warning('could not get CPU consumption time') - elif _cpuconsumptiontime == 0: - logger.warning(f'process {job.pid} can no longer be monitored (due to stat problems) - aborting') - return 0, "" - else: - logger.warning(f'process {job.pid} is no longer using CPU - aborting') - return 0, "" # keep track of the subprocesses running (store payload subprocess PIDs) store_subprocess_pids(job) @@ -199,6 +170,51 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i return exit_code, diagnostics +def set_cpu_consumption_time(job: JobData) -> tuple[int, str]: + """ + Set the CPU consumption time for the job. + + :param job: job object (JobData) + :return: exit code (int), diagnostics (string). + """ + try: + cpuconsumptiontime = get_current_cpu_consumption_time(job.pid) + except Exception as error: + diagnostics = f"Exception caught: {error}" + logger.warning(diagnostics) + exit_code = get_exception_error_code(diagnostics) + return exit_code, diagnostics + else: + _cpuconsumptiontime = int(round(cpuconsumptiontime)) + if _cpuconsumptiontime > 0: + if job.cpuconsumptiontime == -1: # no time set yet so just proceed + job.cpuconsumptiontime = _cpuconsumptiontime + else: + # make sure there are no sudden jumps in the cpuconsumptiontime + increase_factor = _cpuconsumptiontime / job.cpuconsumptiontime if job.cpuconsumptiontime > 0 else 1 + high_cpu_load = check_cpu_load() + factor = 10 if high_cpu_load else 5 + if increase_factor > factor: + logger.warning( + f'CPU consumption time increased by a factor of {increase_factor} (over the limit of {factor})') + logger.warning(f"will not consider the new value: {_cpuconsumptiontime}") + else: + logger.debug( + f'CPU consumption time increased by a factor of {increase_factor} (below the limit of {factor})') + job.cpuconsumptiontime = _cpuconsumptiontime + job.cpuconversionfactor = 1.0 + logger.info( + f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') + elif _cpuconsumptiontime == -1: + logger.warning('could not get CPU consumption time') + elif _cpuconsumptiontime == 0: + logger.warning(f'process {job.pid} can no longer be monitored (due to stat problems) - aborting') + else: + logger.warning(f'process {job.pid} is no longer using CPU - aborting') + + return 0, "" + + def still_running(pid): # verify that the process is still alive From 2c9290b75d05ba85673fc4d11a072fc4f98953d0 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 25 Nov 2024 15:43:15 +0100 Subject: [PATCH 20/37] Protection against failures --- pilot/util/psutils.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index 837f2e94..b25a473f 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -355,7 +355,12 @@ def check_cpu_load(): logger.warning('psutil not available, cannot check CPU load (pretending it is normal)') return False - cpu_percent = psutil.cpu_percent(interval=1) + try: + cpu_percent = psutil.cpu_percent(interval=0.5) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e: + logger.warning(f"Failed to read CPU percent: {e}") + logger.info("system is under heavy CPU load (assumed)") + return True if cpu_percent > 80: logger.info("system is under heavy CPU load") return True From a1930657199b931e996bcf924319d5fdecddd36b Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 26 Nov 2024 07:03:22 +0100 Subject: [PATCH 21/37] Cleanup and comments --- PILOTVERSION | 2 +- pilot/control/data.py | 2 ++ pilot/control/payloads/generic.py | 2 -- pilot/util/constants.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 46be5578..9f922ddf 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.31 \ No newline at end of file +3.9.2.32 \ No newline at end of file diff --git a/pilot/control/data.py b/pilot/control/data.py index a71e9e53..641cc31b 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -328,6 +328,8 @@ def _stage_in(args: object, job: JobData) -> bool: logger.info(" -- lfn=%s, status_code=%s, status=%s", infile.lfn, infile.status_code, status) # write time stamps to pilot timing file + + # MOVE THIS TO AFTER REMOTE FILE OPEN HAS BEEN VERIFIED (actually just before the payload starts) add_to_pilot_timing(job.jobid, PILOT_POST_STAGEIN, time.time(), args) remain_files = [infile for infile in job.indata if infile.status not in ['remote_io', 'transferred', 'no_transfer']] diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index df62bf6c..48aab21f 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -492,8 +492,6 @@ def pre_payload(self, job: JobData): """ # write time stamps to pilot timing file update_time = time.time() - logger.debug(f"setting pre-payload time to {update_time} s") - logger.debug(f"gmtime is {time.gmtime(update_time)}") add_to_pilot_timing(job.jobid, PILOT_PRE_PAYLOAD, update_time, self.__args) def post_payload(self, job: JobData): diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 4f302811..c9f9f022 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '31' # build number should be reset to '1' for every new development cycle +BUILD = '32' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 0864ecb153c87c29a7c1d19904b0c874af9f0c99 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 26 Nov 2024 09:41:49 +0100 Subject: [PATCH 22/37] Lingering processes will now be killed --- pilot/control/payloads/generic.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index 48aab21f..e33b893c 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -58,7 +58,10 @@ write_file, read_file ) -from pilot.util.processes import kill_processes +from pilot.util.processes import ( + kill_process, + kill_processes, +) from pilot.util.psutils import find_lingering_processes from pilot.util.timing import ( add_to_pilot_timing, @@ -954,17 +957,6 @@ def run(self) -> tuple[int, str]: # noqa: C901 f"\n\nfinished pid={proc.pid} exit_code={exit_code} state={self.__job.state}\n" ) - # make sure there are no lingering processes - items = find_lingering_processes(os.getpid()) - if items: - logger.warning("found lingering processes") - list_items(items) - #logger.warning(f"found lingering processes: {items}") - #for item in items: - # kill_processes(item) - else: - logger.info("(1/2) found no lingering processes") - # stop the utility command (e.g. a coprocess if necessary if proc_co: logger.debug(f"stopping utility command: {utility_cmd}") @@ -998,13 +990,12 @@ def run(self) -> tuple[int, str]: # noqa: C901 # make sure there are no lingering processes items = find_lingering_processes(os.getpid()) if items: - logger.warning("found lingering processes") + logger.warning("found lingering processes - will now be removed") list_items(items) - #logger.warning(f"found lingering processes: {items}") - #for item in items: - # kill_processes(item) + for item in items: + kill_process(item, hardkillonly=True) else: - logger.info("(2/2) found no lingering processes") + logger.info("found no lingering processes") if self.__job.is_hpo and state != "failed": # in case there are more hyper-parameter points, move away the previous log files From 5b8790521eafa4460ccba073565f033f195710e8 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 26 Nov 2024 09:48:26 +0100 Subject: [PATCH 23/37] Added exception handling --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/psutils.py | 14 ++++++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 9f922ddf..a1c45371 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.32 \ No newline at end of file +3.9.2.33 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index c9f9f022..e1af55a8 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '32' # build number should be reset to '1' for every new development cycle +BUILD = '33' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index b25a473f..90eb6394 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -335,10 +335,16 @@ def find_lingering_processes(parent_pid: int) -> list: return [] lingering_processes = [] - parent_process = psutil.Process(parent_pid) - for child in parent_process.children(recursive=True): - if child.status() != psutil.STATUS_ZOMBIE: - lingering_processes.append(child.pid) + try: + parent_process = psutil.Process(parent_pid) + for child in parent_process.children(recursive=True): + try: + if child.status() != psutil.STATUS_ZOMBIE: + lingering_processes.append(child.pid) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e: + logger.warning(f"[harmless] failed to get status for child process {child.pid}: {e}") + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, psutil.FileNotFoundError) as e: + logger.warning(f"[harmless] failed to get parent process {parent_pid}: {e}") return lingering_processes From a0f114be80158fadb1372eae69132e7d54b027b9 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 26 Nov 2024 09:57:36 +0100 Subject: [PATCH 24/37] Improved log messages --- pilot/util/processes.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 0d2fc252..95d4eb8e 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -578,7 +578,9 @@ def get_instant_cpu_consumption_time(pid: int) -> float: fields = fp.read().split(' ')[13:17] utime, stime, cutime, cstime = [(float(f) / hz) for f in fields] except IOError as exc: - logger.warning(f'exception caught: {exc} (ignored)') + logger.warning(f'exception caught: {exc} (ignoring process {pid})') + else: + logger.debug(f"{path} no longer exist (ignoring terminated process {pid})") if utime and stime and cutime and cstime: # sum up all the user+system times for both the main process (pid) and the child processes From 9141f27cb56261f774dc08acbc251e5c851fda87 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 26 Nov 2024 10:15:59 +0100 Subject: [PATCH 25/37] Added verification that /proc/self/statm can be accessed --- pilot/util/monitoring.py | 17 +++++++++++------ pilot/util/processes.py | 15 +++++++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 2ca15413..42dde4d8 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -37,9 +37,9 @@ from pilot.util.container import execute from pilot.util.filehandling import ( get_disk_usage, - remove_files, get_local_file_size, read_file, + remove_files, zip_files, #write_file ) @@ -54,22 +54,23 @@ get_maximum_input_sizes ) from pilot.util.processes import ( + check_proc_access, get_current_cpu_consumption_time, - kill_processes, get_number_of_child_processes, + kill_processes, reap_zombies ) from pilot.util.psutils import ( is_process_running, check_cpu_load, + find_actual_payload_pid, get_pid, get_subprocesses, - find_actual_payload_pid ) from pilot.util.timing import get_time_since from pilot.util.workernode import ( + check_hz, get_local_disk_space, - check_hz ) from pilot.info import infosys, JobData @@ -201,10 +202,14 @@ def set_cpu_consumption_time(job: JobData) -> tuple[int, str]: else: logger.debug( f'CPU consumption time increased by a factor of {increase_factor} (below the limit of {factor})') - job.cpuconsumptiontime = _cpuconsumptiontime + + # make sure that /proc/self/statm still exists, otherwise the job is no longer using CPU, ie discard the info + if check_proc_access(): + logger.debug("/proc/self/statm exists - will update the CPU consumption time") + job.cpuconsumptiontime = _cpuconsumptiontime job.cpuconversionfactor = 1.0 logger.info( - f'(instant) CPU consumption time for pid={job.pid}: {cpuconsumptiontime} (rounded to {job.cpuconsumptiontime})') + f'(instant) CPU consumption time for pid={job.pid}: {job.cpuconsumptiontime})') elif _cpuconsumptiontime == -1: logger.warning('could not get CPU consumption time') elif _cpuconsumptiontime == 0: diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 95d4eb8e..d6167e63 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -946,3 +946,18 @@ def waitpid(pid: int = -1): pass logger.info(f'reaping zombies for max {max_timeout} seconds') waitpid(pid) + + +def check_proc_access() -> bool: + """ + Verify that /proc/self/statm can be accessed. + + :return: True if /proc/self/statm can be accessed, False otherwise (bool). + """ + try: + with open('/proc/self/statm', 'r') as f: + _ = f.read() + return True + except (FileNotFoundError, PermissionError) as e: + logger.warning(f"error accessing /proc/self/statm: {e} (CPU consumption time will be discarded)") + return False From ed52d3f76166e7ded5747749c4f0e7d3d3019035 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 26 Nov 2024 10:37:35 +0100 Subject: [PATCH 26/37] Aborting OOM update if the process has terminated --- pilot/util/monitoring.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 42dde4d8..38db1602 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -253,6 +253,11 @@ def update_oom_info(bash_pid, payload_cmd): return fname = f"/proc/{payload_pid}/oom_score" + # abort if the file does not exist + if not os.path.exists(fname): + logger.warning(f'oom_score file does not exist: {fname} (abort)') + return + fname_adj = fname + "_adj" payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN' pilot_score = get_score(os.getpid()) From d302aadc40597e2e8325af7b0ea89649ac8f7e56 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 26 Nov 2024 10:40:47 +0100 Subject: [PATCH 27/37] Updated log message --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index a1c45371..397bca28 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.33 \ No newline at end of file +3.9.2.34 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index e1af55a8..6959dafa 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '33' # build number should be reset to '1' for every new development cycle +BUILD = '34' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 38db1602..8471904b 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -200,8 +200,8 @@ def set_cpu_consumption_time(job: JobData) -> tuple[int, str]: f'CPU consumption time increased by a factor of {increase_factor} (over the limit of {factor})') logger.warning(f"will not consider the new value: {_cpuconsumptiontime}") else: - logger.debug( - f'CPU consumption time increased by a factor of {increase_factor} (below the limit of {factor})') + logger.info( + f'CPU consumption time changed by a factor of {increase_factor} (below the limit of {factor})') # make sure that /proc/self/statm still exists, otherwise the job is no longer using CPU, ie discard the info if check_proc_access(): From 2167118f6a797fb662e60fc3c47f9a9588aa5653 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 26 Nov 2024 12:06:45 +0100 Subject: [PATCH 28/37] Cleanup --- pilot/control/payloads/generic.py | 2 -- pilot/util/constants.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index e33b893c..9e6e4ae0 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -115,8 +115,6 @@ def pre_setup(self, job: JobData): """ # write time stamps to pilot timing file update_time = time.time() - logger.debug(f"setting pre-setup time to {update_time} s") - logger.debug(f"gmtime is {time.gmtime(update_time)}") add_to_pilot_timing(job.jobid, PILOT_PRE_SETUP, update_time, self.__args) def post_setup(self, job: JobData, update_time: bool = None): diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 6959dafa..58835e2c 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '34' # build number should be reset to '1' for every new development cycle +BUILD = '35' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From b8393859479cf6f41497ac55c072ca7fba0ad3b6 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 26 Nov 2024 12:30:57 +0100 Subject: [PATCH 29/37] Added pilot args object to get_payload_command() --- pilot/user/atlas/common.py | 13 +++++++++++-- pilot/user/generic/common.py | 6 +++++- pilot/user/rubin/common.py | 6 +++++- pilot/user/sphenix/common.py | 6 +++++- pilot/util/constants.py | 2 ++ 5 files changed, 28 insertions(+), 5 deletions(-) diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 4f114911..853b90ab 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -26,6 +26,7 @@ import logging import os import re +import time from collections import defaultdict from functools import reduce @@ -487,14 +488,17 @@ def get_nthreads(catchall: str) -> int: return _nthreads if _nthreads else 1 -def get_payload_command(job: JobData) -> str: +def get_payload_command(job: JobData, args: object = None) -> str: """ Return the full command for executing the payload, including the sourcing of all setup files and setting of environment variables. :param job: job object (JobData) - :return: command (str). + :param args: pilot arguments (object) + :return: command (str) :raises TrfDownloadFailure: in case of download failure. """ + if not args: # bypass pylint complaint + pass # Should the pilot do the setup or does jobPars already contain the information? preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams) @@ -515,6 +519,7 @@ def get_payload_command(job: JobData) -> str: exitcode = 0 diagnostics = "" + t0 = time.time() try: exitcode, diagnostics, not_opened_turls, lsetup_time = open_remote_files(job.indata, job.workdir, get_nthreads(catchall)) except Exception as exc: @@ -532,6 +537,10 @@ def get_payload_command(job: JobData) -> str: else: process_remote_file_traces(path, job, not_opened_turls) # ignore PyCharm warning, path is str + t1 = time.time() + dt = int(t1 - t0) + logger.info(f'remote file verification finished in {dt} s') + # fail the job if the remote files could not be verified if exitcode != 0: job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode, msg=diagnostics) diff --git a/pilot/user/generic/common.py b/pilot/user/generic/common.py index 3e2f312e..4c819e7e 100644 --- a/pilot/user/generic/common.py +++ b/pilot/user/generic/common.py @@ -27,6 +27,7 @@ from signal import SIGTERM from pilot.common.exception import TrfDownloadFailure +from pilot.info.jobdata import JobData from pilot.util.config import config from pilot.util.constants import ( UTILITY_BEFORE_PAYLOAD, @@ -64,7 +65,7 @@ def validate(job: object) -> bool: return True -def get_payload_command(job: object) -> str: +def get_payload_command(job: JobData, args: object = None) -> str: """ Return the full command for executing the payload. @@ -73,12 +74,15 @@ def get_payload_command(job: object) -> str: By default, the full payload command is assumed to be in the job.jobparams. :param job: job object (object) + :param args: pilot arguments (object) :return: command (str). """ # Try to download the trf # if job.imagename != "" or "--containerImage" in job.jobparams: # job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer") # logger.warning('overwrote job.transformation, now set to: %s' % job.transformation) + if not args: # bypass pylint complaint + pass ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir) if ec != 0: raise TrfDownloadFailure(diagnostics) diff --git a/pilot/user/rubin/common.py b/pilot/user/rubin/common.py index d83bed7d..0a853656 100644 --- a/pilot/user/rubin/common.py +++ b/pilot/user/rubin/common.py @@ -28,6 +28,7 @@ from typing import Any from pilot.common.exception import TrfDownloadFailure +from pilot.info.jobdata import JobData from pilot.util.config import config from pilot.util.constants import UTILITY_BEFORE_PAYLOAD, UTILITY_AFTER_PAYLOAD_STARTED from pilot.util.filehandling import read_file @@ -60,7 +61,7 @@ def validate(job: Any) -> bool: return True -def get_payload_command(job: object): +def get_payload_command(job: JobData, args: object = None): """ Return the full command for executing the payload. @@ -68,12 +69,15 @@ def get_payload_command(job: object): By default, the full payload command is assumed to be in the job.jobparams. :param job: job object (object) + :param args: pilot arguments (object) :return: command (str). """ # Try to download the trf # if job.imagename != "" or "--containerImage" in job.jobparams: # job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer") # logger.warning('overwrote job.transformation, now set to: %s' % job.transformation) + if not args: # bypass pylint complaint + pass ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir) if ec != 0: raise TrfDownloadFailure(diagnostics) diff --git a/pilot/user/sphenix/common.py b/pilot/user/sphenix/common.py index b4aa30ee..fff836bc 100644 --- a/pilot/user/sphenix/common.py +++ b/pilot/user/sphenix/common.py @@ -30,6 +30,7 @@ FileHandlingFailure ) from pilot.info import FileSpec +from pilot.info.jobdata import JobData from pilot.util.config import config from pilot.util.constants import ( UTILITY_AFTER_PAYLOAD_FINISHED, @@ -76,7 +77,7 @@ def validate(job: object) -> bool: return True -def get_payload_command(job: object) -> str: +def get_payload_command(job: JobData, args: object = None) -> str: """ Return the full command for executing the payload. @@ -84,12 +85,15 @@ def get_payload_command(job: object) -> str: By default, the full payload command is assumed to be in the job.jobparams. :param job: job object (object) + :param args: pilot arguments (object) :return: command (str). """ # Try to download the trf # if job.imagename != "" or "--containerImage" in job.jobparams: # job.transformation = os.path.join(os.path.dirname(job.transformation), "runcontainer") # logger.warning('overwrote job.transformation, now set to: %s' % job.transformation) + if not args: # to bypass pylint complaint + pass ec, diagnostics, trf_name = get_analysis_trf(job.transformation, job.workdir) if ec != 0: raise TrfDownloadFailure(diagnostics) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 58835e2c..9a1ed683 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -65,6 +65,8 @@ PILOT_POST_FINAL_UPDATE = 'PILOT_POST_FINAL_UPDATE' PILOT_END_TIME = 'PILOT_END_TIME' PILOT_KILL_SIGNAL = 'PILOT_KILL_SIGNAL' +PILOT_PRE_REMOTEIO = 'PILOT_PRE_REMOTEIO' +PILOT_POST_REMOTEIO = 'PILOT_POST_REMOTEIO' # Keep track of log transfers LOG_TRANSFER_NOT_DONE = 'NOT_DONE' From 8ebb97b0dff5c3dde01dde89a98ef082807655d3 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 26 Nov 2024 12:34:05 +0100 Subject: [PATCH 30/37] Added pilot args object to get_payload_command(). Some cleanup --- pilot/control/payloads/generic.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index 9e6e4ae0..a320dd1c 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -505,8 +505,6 @@ def post_payload(self, job: JobData): """ # write time stamps to pilot timing file update_time = time.time() - logger.debug(f"setting post-payload time to {update_time} s") - logger.debug(f"gmtime is {time.gmtime(update_time)}") add_to_pilot_timing(job.jobid, PILOT_POST_PAYLOAD, update_time, self.__args) def run_command(self, cmd: str, label: str = "") -> Any: @@ -694,11 +692,10 @@ def wait_graceful(self, args: object, proc: Any) -> int: return exit_code - def get_payload_command(self, job: JobData) -> str: + def get_payload_command(self) -> str: """ Return the payload command string. - :param job: job object (JobData) :return: command (str). """ cmd = "" @@ -708,14 +705,14 @@ def get_payload_command(self, job: JobData) -> str: user = __import__( f"pilot.user.{pilot_user}.common", globals(), locals(), [pilot_user], 0 ) - cmd = user.get_payload_command(job) # + 'sleep 900' # to test looping jobs + cmd = user.get_payload_command(self.__job, args=self.__args) # + 'sleep 900' # to test looping jobs except PilotException as error: - self.post_setup(job) + self.post_setup(self.__job) logger.error(traceback.format_exc()) - job.piloterrorcodes, job.piloterrordiags = errors.add_error_code( + self.__job.piloterrorcodes, self.__job.piloterrordiags = errors.add_error_code( error.get_error_code() ) - self.__traces.pilot["error_code"] = job.piloterrorcodes[0] + self.__traces.pilot["error_code"] = self.__job.piloterrorcodes[0] logger.fatal( f"could not define payload command (traces error set to: {self.__traces.pilot['error_code']})" ) @@ -802,7 +799,7 @@ def run(self) -> tuple[int, str]: # noqa: C901 self.pre_setup(self.__job) # get the user defined payload command - cmd = self.get_payload_command(self.__job) + cmd = self.get_payload_command() if not cmd: logger.warning("aborting run() since payload command could not be defined") return errors.UNKNOWNPAYLOADFAILURE, "undefined payload command" From 63c70aefe9911be68b7edb07448a0c1ecc431740 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 26 Nov 2024 12:38:15 +0100 Subject: [PATCH 31/37] Recording time to verify remote files --- pilot/user/atlas/common.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 853b90ab..1533f369 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -57,7 +57,9 @@ UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_STARTED2, UTILITY_BEFORE_STAGEIN, - UTILITY_AFTER_PAYLOAD_FINISHED2 + UTILITY_AFTER_PAYLOAD_FINISHED2, + PILOT_PRE_REMOTEIO, + PILOT_POST_REMOTEIO ) from pilot.util.container import execute from pilot.util.filehandling import ( @@ -82,6 +84,7 @@ get_trimmed_dictionary, is_child ) +from pilot.util.timing import add_to_pilot_timing from pilot.util.tracereport import TraceReport from .container import ( create_root_container_command, @@ -519,7 +522,8 @@ def get_payload_command(job: JobData, args: object = None) -> str: exitcode = 0 diagnostics = "" - t0 = time.time() + t0 = int(time.time()) + add_to_pilot_timing(job.jobid, PILOT_PRE_REMOTEIO, t0, args) try: exitcode, diagnostics, not_opened_turls, lsetup_time = open_remote_files(job.indata, job.workdir, get_nthreads(catchall)) except Exception as exc: @@ -537,8 +541,9 @@ def get_payload_command(job: JobData, args: object = None) -> str: else: process_remote_file_traces(path, job, not_opened_turls) # ignore PyCharm warning, path is str - t1 = time.time() - dt = int(t1 - t0) + t1 = int(time.time()) + add_to_pilot_timing(job.jobid, PILOT_POST_REMOTEIO, t1, args) + dt = t1 - t0 logger.info(f'remote file verification finished in {dt} s') # fail the job if the remote files could not be verified From dfaaa074398ce94c45633aa57f60e3619aa82b91 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 26 Nov 2024 14:42:15 +0100 Subject: [PATCH 32/37] Recording time to verify remote files --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/timing.py | 26 ++++++++++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 397bca28..ea783d7a 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.34 \ No newline at end of file +3.9.2.37 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 9a1ed683..a803cd25 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '35' # build number should be reset to '1' for every new development cycle +BUILD = '37' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/timing.py b/pilot/util/timing.py index ee823058..c87afd6d 100644 --- a/pilot/util/timing.py +++ b/pilot/util/timing.py @@ -52,6 +52,8 @@ PILOT_PRE_STAGEOUT, PILOT_PRE_FINAL_UPDATE, PILOT_START_TIME, + PILOT_PRE_REMOTEIO, + PILOT_POST_REMOTEIO ) from pilot.util.filehandling import ( read_json, @@ -234,6 +236,19 @@ def get_total_pilot_time(job_id: str, args: object) -> int: return get_time_difference(job_id, PILOT_START_TIME, PILOT_END_TIME, args) +def get_total_remoteio_time(job_id: str, args: object) -> int: + """ + Return the total time to verify remote i/o files for the given job_id. + + High level function that returns the end time for the given job_id. + + :param job_id: PanDA job id (str) + :param args: pilot arguments (object) + :return: time in seconds (int). + """ + return get_time_difference(job_id, PILOT_PRE_REMOTEIO, PILOT_POST_REMOTEIO, args) + + def get_postgetjob_time(job_id: str, args: object) -> int or None: """ Return the post getjob time. @@ -394,6 +409,17 @@ def timing_report(job_id: str, args: object) -> tuple[int, int, int, int, int, i time_payload = get_payload_execution_time(job_id, args) time_stageout = get_stageout_time(job_id, args) time_log_creation = get_log_creation_time(job_id, args) + time_remoteio = get_total_remoteio_time(job_id, args) + + # correct the setup and stagein times if remote i/o verification was done + if time_remoteio > 0: + logger.info("correcting setup and stagein times since remote i/o verification was done") + logger.debug(f"original setup time: {time_setup} s") + logger.debug(f"original stagein time: {time_stagein} s") + time_setup -= time_remoteio + time_stagein += time_remoteio + logger.debug(f"corrected setup time: {time_setup} s") + logger.debug(f"corrected stagein time: {time_stagein} s") logger.info('.' * 30) logger.info('. Timing measurements:') From e35fcaa5294f44d5501d535e55b327f918e0a375 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 2 Dec 2024 10:50:01 +0100 Subject: [PATCH 33/37] Improved exception handling for socket related errors --- pilot/util/tracereport.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index d6b25ca5..b3607be4 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -114,7 +114,7 @@ def init(self, job): try: hostname = os.environ.get('PANDA_HOSTNAME', socket.gethostname()) - except socket.herror as exc: + except (socket.gaierror, socket.herror) as exc: logger.warning(f'unable to detect hostname for trace report: {exc}') hostname = os.environ.get('PANDA_HOSTNAME', 'unknown') @@ -126,7 +126,7 @@ def init(self, job): try: self['ip'] = socket.gethostbyname(hostname) - except socket.herror as exc: + except (socket.gaierror, socket.herror) as exc: logger.debug(f"unable to detect host IP for trace report: {exc}") self['ip'] = '0.0.0.0' From 94681339cbb592983e5b9f55fe0f9dc0e4b484db Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 2 Dec 2024 11:24:09 +0100 Subject: [PATCH 34/37] Improved extraction of dmesg memory error --- pilot/control/payload.py | 25 ++++++++++++++++++++++++- pilot/util/constants.py | 2 +- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 7d091073..9607e218 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -744,7 +744,30 @@ def scan_for_memory_errors(subprocesses: list) -> str: if search_str in line: diagnostics = line[line.find(search_str):] logger.warning(f'found memory error: {diagnostics}') - break + + # make sure that this message is for a true subprocess of the pilot + # extract the pid from the message and compare it to the subprocesses list + match = search(r'Killed process (\d+)', diagnostics) + if match: + try: + found_pid = int(match.group(1)) + logger.info(f"extracted PID: {found_pid}") + + # is it a known subprocess? + if found_pid in subprocesses: + logger.info("PID found in the list of subprocesses") + break + else: + logger.warning("the extracted PID is not a known subprocess of the payload") + diagnostics = "" + # is the extracted PID a subprocess of the main pilot process itself? + + except (ValueError, TypeError, AttributeError) as e: + logger.warning(f"failed to extract PID from the message: {e}") + diagnostics = "" + else: + logger.warning("PID could not be extracted from the message") + diagnostics = "" if diagnostics: break diff --git a/pilot/util/constants.py b/pilot/util/constants.py index a803cd25..e7454438 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '37' # build number should be reset to '1' for every new development cycle +BUILD = '38' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From e8d5b8a32f8d634f6ab2860ab55c96531c8bdfec Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 2 Dec 2024 17:54:09 +0100 Subject: [PATCH 35/37] Improved error message displayed on the monitor for remote i/o errors --- PILOTVERSION | 2 +- pilot/user/atlas/common.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/PILOTVERSION b/PILOTVERSION index ea783d7a..4fb479ab 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.37 \ No newline at end of file +3.9.2.38 \ No newline at end of file diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 1533f369..660e206b 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -548,6 +548,8 @@ def get_payload_command(job: JobData, args: object = None) -> str: # fail the job if the remote files could not be verified if exitcode != 0: + # improve the error diagnostics + diagnostics = errors.format_diagnostics(exitcode, diagnostics) job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exitcode, msg=diagnostics) raise PilotException(diagnostics, code=exitcode) else: From 61670c5b4a8a7b8316b4da1dc49fdc8f2782fdcd Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 4 Dec 2024 09:44:46 +0100 Subject: [PATCH 36/37] Delayed first CPU consumption time --- PILOTVERSION | 2 +- pilot/info/jobdata.py | 1 + pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 15 +++++++++++---- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 4fb479ab..8c5d7627 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.38 \ No newline at end of file +3.9.2.39 \ No newline at end of file diff --git a/pilot/info/jobdata.py b/pilot/info/jobdata.py index b769f831..c9e41ad6 100644 --- a/pilot/info/jobdata.py +++ b/pilot/info/jobdata.py @@ -133,6 +133,7 @@ class JobData(BaseData): prodproxy = "" # to keep track of production proxy on unified queues completed = False # True when job has finished or failed, used by https::send_update() lsetuptime = 0 # payload setup time (lsetup) + runningstart = None # time when the payload started running (only for internal monitoring purposes, not the actual start time) # time variable used for on-the-fly cpu consumption time measurements done by job monitoring t0 = None # payload startup time diff --git a/pilot/util/constants.py b/pilot/util/constants.py index e7454438..4cc40074 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '38' # build number should be reset to '1' for every new development cycle +BUILD = '39' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 8471904b..87a131ee 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -100,7 +100,11 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i # update timing info for running jobs (to avoid an update after the job has finished) if job.state == 'running': + # keep track of the time since the job started running (approximate since it is set here, move later) + if not job.runningstart: + job.runningstart = current_time + # check the disk space # make sure that any utility commands are still running (and determine pid of memory monitor- as early as possible) if job.utilities != {}: utility_monitor(job) @@ -108,10 +112,13 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i # confirm that the worker node has a proper SC_CLK_TCK (problems seen on MPPMU) check_hz() - # set the CPU consumption time for the job - exit_code, diagnostics = set_cpu_consumption_time(job) - if exit_code: - return exit_code, diagnostics + # set the CPU consumption time for the job (if it has been running for > 10s) + if job.runningstart and (current_time - job.runningstart) > 10: + exit_code, diagnostics = set_cpu_consumption_time(job) + if exit_code: + return exit_code, diagnostics + else: + logger.debug('skipping CPU consumption time check since job has not been running for long enough') # keep track of the subprocesses running (store payload subprocess PIDs) store_subprocess_pids(job) From 2d4e5b4647197067ec03f28601559b4fcc21c637 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Thu, 5 Dec 2024 09:42:18 +0100 Subject: [PATCH 37/37] Corrected core reporting --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/workernode.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 8c5d7627..aeb659e0 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.2.39 \ No newline at end of file +3.9.2.41 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 4cc40074..2b5785b8 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '39' # build number should be reset to '1' for every new development cycle +BUILD = '41' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/workernode.py b/pilot/util/workernode.py index aeeba599..df7e7227 100644 --- a/pilot/util/workernode.py +++ b/pilot/util/workernode.py @@ -381,13 +381,13 @@ def get_cpu_cores(modelstring): logger.info(f'found {number_of_cores} cores ({cores_per_socket} cores per socket, {sockets} sockets)') logger.debug(f'current model string: {modelstring}') - if number_of_cores > 0 and '-Core' not in modelstring: + if number_of_cores > 0: if '-Core Processor' in modelstring: # NN-Core info already in string - update it pattern = r'(\d+)\-Core Processor' _nn = re.findall(pattern, modelstring) if _nn: modelstring = modelstring.replace(f'{_nn[0]}-Core', f'{number_of_cores}-Core') - if 'Core Processor' in modelstring: + elif 'Core Processor' in modelstring: modelstring = modelstring.replace('Core', '%d-Core' % number_of_cores) elif 'Processor' in modelstring: modelstring = modelstring.replace('Processor', '%d-Core Processor' % number_of_cores)