diff --git a/esrally/mechanic/supplier.py b/esrally/mechanic/supplier.py index f3ce39e68..60a014721 100644 --- a/esrally/mechanic/supplier.py +++ b/esrally/mechanic/supplier.py @@ -808,7 +808,7 @@ def run(self, command, override_src_dir=None): console.info("Creating installable binary from source files") self.logger.info("Running build command [%s]", build_cmd) - if process.run_subprocess(build_cmd): + if process.run_subprocess(build_cmd) != 0: msg = f"Executing '{command}' failed. The last 20 lines in the build log file are:\n" msg += "=========================================================================================================\n" with open(log_file, encoding="utf-8") as f: diff --git a/esrally/utils/git.py b/esrally/utils/git.py index 7d2da0295..c55e3aee3 100644 --- a/esrally/utils/git.py +++ b/esrally/utils/git.py @@ -26,7 +26,8 @@ def probed(f): def probe(src, *args, **kwargs): # Probe for -C if not process.exit_status_as_bool( - lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG), quiet=True + lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG), + quiet=True, ): version = process.run_subprocess_with_output("git --version") if version: @@ -51,13 +52,14 @@ def is_working_copy(src): @probed def is_branch(src_dir, identifier): show_ref_cmd = f"git -C {src_dir} show-ref {identifier}" + completed_process = process.run_subprocess_with_logging_and_output(show_ref_cmd) # if we get an non-zero exit code, we know that the identifier is not a branch (local or remote) - if not process.exit_status_as_bool(lambda: process.run_subprocess_with_logging(show_ref_cmd)): + if not process.exit_status_as_bool(lambda: completed_process.returncode): return False # it's possible the identifier could be a tag, so we explicitly check that here - ref = process.run_subprocess_with_output(show_ref_cmd) + ref = completed_process.stdout.split("\n") if "refs/tags" in ref[0]: return False diff --git a/esrally/utils/process.py b/esrally/utils/process.py index 5d9e3bb41..18eb22c43 100644 --- a/esrally/utils/process.py +++ b/esrally/utils/process.py @@ -20,15 +20,25 @@ import shlex import subprocess import time +from typing import Callable, Dict, List import psutil +LogLevel = int +FileId = int -def run_subprocess(command_line): + +def run_subprocess(command_line: str) -> int: + """ + Runs the provided command line in a subprocess. + + :param command_line: The command line of the subprocess to launch. + :return: The process' return code + """ return subprocess.call(command_line, shell=True) -def run_subprocess_with_output(command_line, env=None): +def run_subprocess_with_output(command_line: str, env: Dict[str, str] = None) -> List[str]: logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with output.", command_line) command_line_args = shlex.split(command_line) @@ -44,7 +54,7 @@ def run_subprocess_with_output(command_line, env=None): return lines -def exit_status_as_bool(runnable, quiet=False): +def exit_status_as_bool(runnable: Callable[[], int], quiet: bool = False) -> bool: """ :param runnable: A runnable returning an int as exit status assuming ``0`` is meaning success. @@ -60,7 +70,14 @@ def exit_status_as_bool(runnable, quiet=False): return False -def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, stdin=None, env=None, detach=False): +def run_subprocess_with_logging( + command_line: str, + header: str = None, + level: LogLevel = logging.INFO, + stdin: FileId = None, + env: Dict[str, str] = None, + detach: bool = False, +) -> int: """ Runs the provided command line in a subprocess. All output will be captured by a logger. @@ -98,7 +115,53 @@ def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, s return command_line_process.returncode -def is_rally_process(p): +def run_subprocess_with_logging_and_output( + command_line: str, + header: str = None, + level: LogLevel = logging.INFO, + stdin: FileId = None, + env: Dict[str, str] = None, + detach: bool = False, +) -> subprocess.CompletedProcess: + """ + Runs the provided command line in a subprocess. All output will be captured by a logger. + + :param command_line: The command line of the subprocess to launch. + :param header: An optional header line that should be logged (this will be logged on info level, regardless of the defined log level). + :param level: The log level to use for output (default: logging.INFO). + :param stdin: The stdout object returned by subprocess.Popen(stdout=PIPE) allowing chaining of shell operations with pipes + (default: None). + :param env: Use specific environment variables (default: None). + :param detach: Whether to detach this process from its parent process (default: False). + :return: The process exit code as an int. + """ + logger = logging.getLogger(__name__) + logger.debug("Running subprocess [%s] with logging.", command_line) + command_line_args = shlex.split(command_line) + pre_exec = os.setpgrp if detach else None + if header is not None: + logger.info(header) + + # pylint: disable=subprocess-popen-preexec-fn + completed = subprocess.run( + command_line_args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + env=env, + check=False, + stdin=stdin if stdin else None, + preexec_fn=pre_exec, + ) + + for stdout in completed.stdout.splitlines(): + logger.log(level=level, msg=stdout) + + logger.debug("Subprocess [%s] finished with return code [%s].", command_line, str(completed.returncode)) + return completed + + +def is_rally_process(p: psutil.Process) -> bool: return ( p.name() == "esrally" or p.name() == "rally" @@ -110,14 +173,14 @@ def is_rally_process(p): ) -def find_all_other_rally_processes(): +def find_all_other_rally_processes() -> List[psutil.Process]: others = [] for_all_other_processes(is_rally_process, others.append) return others -def kill_all(predicate): - def kill(p): +def kill_all(predicate: Callable[[psutil.Process], bool]) -> None: + def kill(p: psutil.Process): logging.getLogger(__name__).info("Killing lingering process with PID [%s] and command line [%s].", p.pid, p.cmdline()) p.kill() # wait until process has terminated, at most 3 seconds. Otherwise we might run into race conditions with actor system @@ -132,7 +195,7 @@ def kill(p): for_all_other_processes(predicate, kill) -def for_all_other_processes(predicate, action): +def for_all_other_processes(predicate: Callable[[psutil.Process], bool], action: Callable[[psutil.Process], None]) -> None: # no harakiri please my_pid = os.getpid() for p in psutil.process_iter(): @@ -143,8 +206,8 @@ def for_all_other_processes(predicate, action): pass -def kill_running_rally_instances(): - def rally_process(p): +def kill_running_rally_instances() -> None: + def rally_process(p: psutil.Process) -> bool: return ( p.name() == "esrally" or p.name() == "rally" diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index b54916d71..15c350c83 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -198,3 +198,12 @@ def test_kills_only_rally_processes(self, process_iter): assert rally_process_mac.killed assert not own_rally_process.killed assert not night_rally_process.killed + + +def test_run_subprocess(): + cmd = "ls . not-a-file" + completed_process = process.run_subprocess_with_logging_and_output(cmd) + + assert completed_process.returncode != 0 + assert completed_process.stdout != "" + assert completed_process.stderr is None