From c14a16b7c30977a05fb635084e112ed0bf288be4 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 18 Dec 2024 10:36:30 +0100 Subject: [PATCH] New version of execute() --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/container.py | 105 +++++++++++++++++++++++++++++++--------- 3 files changed, 85 insertions(+), 24 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 6851997d..8d2739d3 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.4.9 \ No newline at end of file +3.9.4.15 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index f225b448..b4d5ed62 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '4' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '9' # build number should be reset to '1' for every new development cycle +BUILD = '15' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index f65058c4..6ba77ad0 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -52,6 +52,86 @@ def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901 """ Executes the command with its options in the provided executable list using subprocess time-out handler. + :param executable: command to be executed (str or list) + :param kwargs: kwargs (dict) + :return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument). + """ + usecontainer = kwargs.get('usecontainer', False) + job = kwargs.get('job') + obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message + + # Convert executable to string if it is a list + if isinstance(executable, list): + executable = ' '.join(executable) + + if job and job.imagename != "" and "runcontainer" in executable: + usecontainer = False + job.usecontainer = usecontainer + + if usecontainer: + executable, diagnostics = containerise_executable(executable, **kwargs) + if not executable: + return None if kwargs.get('returnproc', False) else -1, "", diagnostics + + if not kwargs.get('mute', False): + print_executable(executable, obscure=obscure) + + timeout = get_timeout(kwargs.get('timeout', None)) + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + + process = None + try: + with execute_lock: + process = subprocess.Popen( + exe, + bufsize=-1, + stdout=kwargs.get('stdout', subprocess.PIPE), + stderr=kwargs.get('stderr', subprocess.PIPE), + cwd=kwargs.get('cwd', getcwd()), + start_new_session=True, + encoding='utf-8', + errors='replace' + ) + if kwargs.get('returnproc', False): + return process + + # Use communicate() to read stdout and stderr reliably + try: + logger.debug(f'subprocess.communicate() will use timeout {timeout} s') + stdout, stderr = process.communicate(timeout=timeout) + except subprocess.TimeoutExpired as exc: + # Timeout handling + stderr = f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + return exit_code, "", stderr + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) + return exit_code, "", stderr + + exit_code = process.poll() + if stdout and stdout.endswith('\n'): + stdout = stdout[:-1] + + return exit_code, stdout, stderr + + finally: + # Ensure the process is cleaned up + if process and not kwargs.get('returnproc', False): + try: + process.wait(timeout=60) + process.stdout.close() + process.stderr.close() + except Exception: + pass + + +def execute_old3(executable: Any, **kwargs: dict) -> Any: # noqa: C901 + """ + Executes the command with its options in the provided executable list using subprocess time-out handler. The function also determines whether the command should be executed within a container. @@ -125,35 +205,15 @@ def read_output(stream, queue): queue.put_nowait(line) except queue.Full: pass # Handle the case where the queue is full - except (AttributeError, ValueError): - break - except OSError as e: - if e.errno == errno.EBADF: - break else: - raise - - def read_output_old(stream, queue): - while True: - #sleep(1) - try: - line = stream.readline() - if not line: - break + sleep(0.01) # Sleep for a short interval to avoid busy waiting except (AttributeError, ValueError): - # Handle the case where stream is None (AttributeError) or closed (ValueError) break except OSError as e: if e.errno == errno.EBADF: - # Handle the case where the file descriptor is bad break else: raise - try: - queue.put_nowait(line) - except queue.Full: - pass - #sleep(0.01) # Sleep for a short interval to avoid busy waiting stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) @@ -178,7 +238,8 @@ def read_output_old(stream, queue): exit_code = errors.UNKNOWNEXCEPTION stderr = kill_all(process, str(exc)) else: - exit_code = process.poll() + #exit_code = process.poll() + exit_code = process.returncode # Wait for the threads to finish reading try: