diff --git a/PILOTVERSION b/PILOTVERSION index 1f3166a1..8d2739d3 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.3.2 \ No newline at end of file +3.9.4.15 \ No newline at end of file diff --git a/pilot/api/data.py b/pilot/api/data.py index 61600cc2..7f734a66 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -1057,7 +1057,7 @@ def check_availablespace(self, files: list): else: if disk_space: available_space = convert_mb_to_b(disk_space) - self.logger.info("locally available space: {available_space} B") + self.logger.info(f"locally available space: {available_space} B") # are we within the limit? if totalsize > available_space: diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index 89ff1f5f..399502fc 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -31,7 +31,7 @@ import re from collections import namedtuple -from os import environ, getuid +from os import environ, getuid, getpid from subprocess import ( Popen, PIPE @@ -53,6 +53,7 @@ get_local_oidc_token_info, update_local_oidc_token_info ) +from pilot.util.psutils import get_process_info from pilot.util.queuehandling import ( abort_jobs_in_queues, get_maxwalltime_from_job, @@ -84,8 +85,8 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 last_token_check = t_0 # for CPU usage debugging - # cpuchecktime = int(config.Pilot.cpu_check) - # tcpu = t_0 + cpuchecktime = int(config.Pilot.cpu_check) + tcpu = t_0 last_minute_check = t_0 queuedata = get_queuedata_from_job(queues) @@ -158,15 +159,15 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 time.sleep(1) # time to check the CPU usage? - # if is_pilot_check(check='cpu_usage'): - # if int(time.time() - tcpu) > cpuchecktime: - # processes = get_process_info('python3 pilot3/pilot.py', pid=getpid()) - # if processes: - # logger.info(f'PID={getpid()} has CPU usage={processes[0]}% CMD={processes[2]}') - # nproc = processes[3] - # if nproc > 1: - # logger.info(f'.. there are {nproc} such processes running') - # tcpu = time.time() + if is_pilot_check(check='cpu_usage'): + if int(time.time() - tcpu) > cpuchecktime: + processes = get_process_info('python3 pilot3/pilot.py', pid=getpid()) + if processes: + logger.info(f'PID={getpid()} has CPU usage={processes[0]}% CMD={processes[2]}') + nproc = processes[3] + if nproc > 1: + logger.info(f'.. there are {nproc} such processes running') + tcpu = time.time() # proceed with running the other checks run_checks(queues, args) @@ -283,11 +284,7 @@ def reached_maxtime_abort(args: Any): args.graceful_stop.set() -#def log_lifetime(sig, frame, traces): -# logger.info('lifetime: %i used, %i maximum', int(time.time() - traces.pilot['lifetime_start']), traces.pilot['lifetime_max']) - - -def get_process_info(cmd: str, user: str = "", args: str = 'aufx', pid: int = 0) -> list: +def get_process_info_old(cmd: str, user: str = "", args: str = 'aufx', pid: int = 0) -> list: """ Return process info for given command. diff --git a/pilot/user/atlas/loopingjob_definitions.py b/pilot/user/atlas/loopingjob_definitions.py index c78f46e1..0178639d 100644 --- a/pilot/user/atlas/loopingjob_definitions.py +++ b/pilot/user/atlas/loopingjob_definitions.py @@ -61,7 +61,8 @@ def remove_unwanted_files(workdir: str, files: list[str]) -> list[str]: "memory_" in _file or "mem." in _file or "docs/" in _file or - "DBRelease-" in _file): + "DBRelease-" in _file or + _file == ""): _files.append(_file) return _files diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 30bab0af..b4d5ed62 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '2' # build number should be reset to '1' for every new development cycle +REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '15' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index d4b1c627..6ba77ad0 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -27,6 +27,7 @@ import logging import queue import re +import select import shlex import signal import threading @@ -51,6 +52,86 @@ def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901 """ Executes the command with its options in the provided executable list using subprocess time-out handler. + :param executable: command to be executed (str or list) + :param kwargs: kwargs (dict) + :return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument). + """ + usecontainer = kwargs.get('usecontainer', False) + job = kwargs.get('job') + obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message + + # Convert executable to string if it is a list + if isinstance(executable, list): + executable = ' '.join(executable) + + if job and job.imagename != "" and "runcontainer" in executable: + usecontainer = False + job.usecontainer = usecontainer + + if usecontainer: + executable, diagnostics = containerise_executable(executable, **kwargs) + if not executable: + return None if kwargs.get('returnproc', False) else -1, "", diagnostics + + if not kwargs.get('mute', False): + print_executable(executable, obscure=obscure) + + timeout = get_timeout(kwargs.get('timeout', None)) + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + + process = None + try: + with execute_lock: + process = subprocess.Popen( + exe, + bufsize=-1, + stdout=kwargs.get('stdout', subprocess.PIPE), + stderr=kwargs.get('stderr', subprocess.PIPE), + cwd=kwargs.get('cwd', getcwd()), + start_new_session=True, + encoding='utf-8', + errors='replace' + ) + if kwargs.get('returnproc', False): + return process + + # Use communicate() to read stdout and stderr reliably + try: + logger.debug(f'subprocess.communicate() will use timeout {timeout} s') + stdout, stderr = process.communicate(timeout=timeout) + except subprocess.TimeoutExpired as exc: + # Timeout handling + stderr = f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + return exit_code, "", stderr + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) + return exit_code, "", stderr + + exit_code = process.poll() + if stdout and stdout.endswith('\n'): + stdout = stdout[:-1] + + return exit_code, stdout, stderr + + finally: + # Ensure the process is cleaned up + if process and not kwargs.get('returnproc', False): + try: + process.wait(timeout=60) + process.stdout.close() + process.stderr.close() + except Exception: + pass + + +def execute_old3(executable: Any, **kwargs: dict) -> Any: # noqa: C901 + """ + Executes the command with its options in the provided executable list using subprocess time-out handler. The function also determines whether the command should be executed within a container. @@ -113,24 +194,26 @@ def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901 def read_output(stream, queue): while True: - sleep(0.01) try: - line = stream.readline() - if not line: - break + # Use select to wait for the stream to be ready for reading + ready, _, _ = select.select([stream], [], [], 1.0) + if ready: + line = stream.readline() + if not line: + break + try: + queue.put_nowait(line) + except queue.Full: + pass # Handle the case where the queue is full + else: + sleep(0.01) # Sleep for a short interval to avoid busy waiting except (AttributeError, ValueError): - # Handle the case where stream is None (AttributeError) or closed (ValueError) break except OSError as e: if e.errno == errno.EBADF: - # Handle the case where the file descriptor is bad break else: raise - try: - queue.put_nowait(line) - except queue.Full: - sleep(0.01) # Sleep for a short interval to avoid busy waiting stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) @@ -155,7 +238,8 @@ def read_output(stream, queue): exit_code = errors.UNKNOWNEXCEPTION stderr = kill_all(process, str(exc)) else: - exit_code = process.poll() + #exit_code = process.poll() + exit_code = process.returncode # Wait for the threads to finish reading try: diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index 90eb6394..95c28b88 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2023-24 import logging import os @@ -373,3 +373,40 @@ def check_cpu_load(): else: logger.info("system load is normal") return False + + +def get_process_info(cmd: str, user: str = "", pid: int = 0) -> list: + """ + Return process info for given command. + + The function returns a list with format [cpu, mem, command, number of commands] for + a given command (e.g. python3 pilot3/pilot.py). + + :param cmd: command (str) + :param user: user (str) + :param pid: process id (int) + :return: list with process info (l[0]=cpu usage(%), l[1]=mem usage(%), l[2]=command(string)) (list). + """ + if not _is_psutil_available: + logger.warning('psutil not available, cannot check pilot CPU load') + return [] + + processes = [] + num = 0 + + for proc in psutil.process_iter(['pid', 'username', 'cpu_percent', 'memory_percent', 'cmdline']): + try: + if user and proc.info['username'] != user: + continue + cmdline = proc.info['cmdline'] + if cmdline and cmd in ' '.join(cmdline): + num += 1 + if proc.info['pid'] == pid: + processes = [proc.info['cpu_percent'], proc.info['memory_percent'], ' '.join(cmdline)] + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + + if processes: + processes.append(num) + + return processes