From a2ee76efc5e19c4f6fab6aa0be3dbbb8f388f6ae Mon Sep 17 00:00:00 2001 From: favilo Date: Thu, 1 Feb 2024 12:24:33 -0800 Subject: [PATCH] Adding returncode, stdout fields to `run_subprocess_with_logging` --- esrally/mechanic/launcher.py | 2 +- esrally/mechanic/provisioner.py | 2 +- esrally/telemetry.py | 2 +- esrally/utils/git.py | 23 ++++++++------- esrally/utils/jvm.py | 2 +- esrally/utils/process.py | 46 +++++++++++++++++++++--------- it/__init__.py | 6 ++-- it/docker_dev_image_test.py | 4 +-- tests/mechanic/launcher_test.py | 4 +-- tests/mechanic/provisioner_test.py | 10 +++---- tests/telemetry_test.py | 2 +- tests/utils/git_test.py | 2 +- tests/utils/process_test.py | 9 ++++++ 13 files changed, 71 insertions(+), 43 deletions(-) diff --git a/esrally/mechanic/launcher.py b/esrally/mechanic/launcher.py index e905d61f7..bfffab0c3 100644 --- a/esrally/mechanic/launcher.py +++ b/esrally/mechanic/launcher.py @@ -55,7 +55,7 @@ def start(self, node_configurations): def _start_process(self, binary_path): compose_cmd = self._docker_compose(binary_path, "up -d") - ret = process.run_subprocess_with_logging(compose_cmd) + ret = process.run_subprocess_with_logging(compose_cmd).returncode if ret != 0: msg = f"Docker daemon startup failed with exit code [{ret}]" logging.error(msg) diff --git a/esrally/mechanic/provisioner.py b/esrally/mechanic/provisioner.py index a805def9c..fb07dd4d9 100644 --- a/esrally/mechanic/provisioner.py +++ b/esrally/mechanic/provisioner.py @@ -363,7 +363,7 @@ def install(self, es_home_path, plugin_url=None): self.logger.info("Installing [%s] into [%s]", self.plugin_name, es_home_path) install_cmd = '%s install --batch "%s"' % (installer_binary_path, self.plugin_name) - return_code = process.run_subprocess_with_logging(install_cmd, env=self.env()) + return_code = process.run_subprocess_with_logging(install_cmd, env=self.env()).returncode # see: https://www.elastic.co/guide/en/elasticsearch/plugins/current/_other_command_line_parameters.html if return_code == 0: self.logger.info("Successfully installed [%s].", self.plugin_name) diff --git a/esrally/telemetry.py b/esrally/telemetry.py index 0ce294339..9b159b643 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -351,7 +351,7 @@ def detach_from_node(self, node, running): heap_dump_file = os.path.join(self.log_root, f"heap_at_exit_{node.pid}.hprof") console.info(f"{self.human_name}: Writing heap dump to [{heap_dump_file}]", logger=self.logger) cmd = f"jmap -dump:format=b,file={heap_dump_file} {node.pid}" - if process.run_subprocess_with_logging(cmd): + if process.run_subprocess_with_logging(cmd).returncode: self.logger.warning("Could not write heap dump to [%s]", heap_dump_file) diff --git a/esrally/utils/git.py b/esrally/utils/git.py index ca4ff8c79..b6295c811 100644 --- a/esrally/utils/git.py +++ b/esrally/utils/git.py @@ -26,7 +26,8 @@ def probed(f): def probe(src, *args, **kwargs): # Probe for -C if not process.exit_status_as_bool( - lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG), quiet=True + lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG).returncode, + quiet=True, ): version = process.run_subprocess_with_output("git --version") if version: @@ -51,14 +52,14 @@ def is_working_copy(src): @probed def is_branch(src_dir, identifier): show_ref_cmd = f"git -C {src_dir} show-ref {identifier}" - completed_process = process.run_subprocess(show_ref_cmd) + completed = process.run_subprocess_with_logging(show_ref_cmd) # if we get an non-zero exit code, we know that the identifier is not a branch (local or remote) - if not process.exit_status_as_bool(lambda: completed_process.returncode): + if not process.exit_status_as_bool(lambda: completed.returncode): return False # it's possible the identifier could be a tag, so we explicitly check that here - ref = completed_process.stdout.decode("utf-8").split("\n") + ref = completed.stdout.split("\n") if "refs/tags" in ref[0]: return False @@ -68,32 +69,32 @@ def is_branch(src_dir, identifier): def clone(src, *, remote): io.ensure_dir(src) # Don't swallow subprocess output, user might need to enter credentials... - if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))): + if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))).returncode: raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src)) @probed def fetch(src, *, remote): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}").returncode: raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote) @probed def checkout(src_dir, *, branch): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}").returncode: raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch) @probed def checkout_branch(src_dir, remote, branch): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}").returncode: raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch) @probed def rebase(src_dir, *, remote, branch): checkout(src_dir, branch=branch) - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}").returncode: raise exceptions.SupplyError("Could not rebase on branch [%s]" % branch) @@ -109,13 +110,13 @@ def pull_ts(src_dir, ts, *, remote, branch): clean_src = io.escape_path(src_dir) rev_list_command = f'git -C {clean_src} rev-list -n 1 --before="{ts}" --date=iso8601 {remote}/{branch}' revision = process.run_subprocess_with_output(rev_list_command)[0].strip() - if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}"): + if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}").returncode: raise exceptions.SupplyError("Could not checkout source tree for timestamped revision [%s]" % ts) @probed def checkout_revision(src_dir, *, revision): - if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}"): + if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}").returncode: raise exceptions.SupplyError("Could not checkout source tree for revision [%s]" % revision) diff --git a/esrally/utils/jvm.py b/esrally/utils/jvm.py index 3bfc2a168..cd3ee5630 100644 --- a/esrally/utils/jvm.py +++ b/esrally/utils/jvm.py @@ -34,7 +34,7 @@ def supports_option(java_home, option): :param option: The JVM option or combination of JVM options (separated by spaces) to check. :return: True iff the provided ``option`` is supported on this JVM. """ - return process.exit_status_as_bool(lambda: process.run_subprocess_with_logging(f"{_java(java_home)} {option} -version")) + return process.exit_status_as_bool(lambda: process.run_subprocess_with_logging(f"{_java(java_home)} {option} -version").returncode) def system_property(java_home, system_property_name): diff --git a/esrally/utils/process.py b/esrally/utils/process.py index 7ae41dd33..d46de65b6 100644 --- a/esrally/utils/process.py +++ b/esrally/utils/process.py @@ -20,15 +20,22 @@ import shlex import subprocess import time +from typing import Callable, Dict, List import psutil -def run_subprocess(command_line): - return subprocess.run(command_line, shell=True, capture_output=True, check=False) +def run_subprocess(command_line: str) -> subprocess.CompletedProcess: + """ + Runs the provided command line in a subprocess. All output will be returned in the `CompletedProcess.stdout` field. + + :param command_line: The command line of the subprocess to launch. + :return: The `CompletedProcess` object for the subprocess. `.returncode` contains the process' return code + """ + return subprocess.run(command_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False) -def run_subprocess_with_output(command_line, env=None): +def run_subprocess_with_output(command_line: str, env: Dict[str, str] = None) -> List[str]: logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with output.", command_line) command_line_args = shlex.split(command_line) @@ -44,7 +51,7 @@ def run_subprocess_with_output(command_line, env=None): return lines -def exit_status_as_bool(runnable, quiet=False): +def exit_status_as_bool(runnable: Callable[[], int], quiet: bool = False) -> bool: """ :param runnable: A runnable returning an int as exit status assuming ``0`` is meaning success. @@ -60,7 +67,18 @@ def exit_status_as_bool(runnable, quiet=False): return False -def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, stdin=None, env=None, detach=False): +LogLevel = int +FileId = int + + +def run_subprocess_with_logging( + command_line: str, + header: str = None, + level: LogLevel = logging.INFO, + stdin: FileId = None, + env: Dict[str, str] = None, + detach: bool = False, +) -> subprocess.Popen: """ Runs the provided command line in a subprocess. All output will be captured by a logger. @@ -71,7 +89,7 @@ def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, s (default: None). :param env: Use specific environment variables (default: None). :param detach: Whether to detach this process from its parent process (default: False). - :return: The process exit code as an int. + :return: The `Popen` object for the subprocess. `.returncode` contains the process' return code """ logger = logging.getLogger(__name__) logger.debug("Running subprocess [%s] with logging.", command_line) @@ -95,10 +113,10 @@ def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, s logger.log(level=level, msg=stdout) logger.debug("Subprocess [%s] finished with return code [%s].", command_line, str(command_line_process.returncode)) - return command_line_process.returncode + return command_line_process -def is_rally_process(p): +def is_rally_process(p: psutil.Process) -> bool: return ( p.name() == "esrally" or p.name() == "rally" @@ -110,14 +128,14 @@ def is_rally_process(p): ) -def find_all_other_rally_processes(): +def find_all_other_rally_processes() -> List[psutil.Process]: others = [] for_all_other_processes(is_rally_process, others.append) return others -def kill_all(predicate): - def kill(p): +def kill_all(predicate: Callable[[psutil.Process], bool]) -> None: + def kill(p: psutil.Process): logging.getLogger(__name__).info("Killing lingering process with PID [%s] and command line [%s].", p.pid, p.cmdline()) p.kill() # wait until process has terminated, at most 3 seconds. Otherwise we might run into race conditions with actor system @@ -132,7 +150,7 @@ def kill(p): for_all_other_processes(predicate, kill) -def for_all_other_processes(predicate, action): +def for_all_other_processes(predicate: Callable[[psutil.Process], bool], action: Callable[[psutil.Process], None]) -> None: # no harakiri please my_pid = os.getpid() for p in psutil.process_iter(): @@ -143,8 +161,8 @@ def for_all_other_processes(predicate, action): pass -def kill_running_rally_instances(): - def rally_process(p): +def kill_running_rally_instances() -> None: + def rally_process(p: psutil.Process) -> bool: return ( p.name() == "esrally" or p.name() == "rally" diff --git a/it/__init__.py b/it/__init__.py index 9e6d5b4c8..e466adccf 100644 --- a/it/__init__.py +++ b/it/__init__.py @@ -138,9 +138,9 @@ def wait_until_port_is_free(port_number=39200, timeout=120): def check_prerequisites(): - if process.run_subprocess_with_logging("docker ps") != 0: + if process.run_subprocess_with_logging("docker ps").returncode != 0: raise AssertionError("Docker must be installed and the daemon must be up and running to run integration tests.") - if process.run_subprocess_with_logging("docker-compose --help") != 0: + if process.run_subprocess_with_logging("docker-compose --help").returncode != 0: raise AssertionError("Docker Compose is required to run integration tests.") @@ -253,7 +253,7 @@ def build_docker_image(): f"-f {ROOT_DIR}/docker/Dockerfiles/Dockerfile-dev {ROOT_DIR}" ) - if process.run_subprocess_with_logging(command, env=env_variables) != 0: + if process.run_subprocess_with_logging(command, env=env_variables).returncode != 0: raise AssertionError("It was not possible to build the docker image from Dockerfile-dev") diff --git a/it/docker_dev_image_test.py b/it/docker_dev_image_test.py index 4830a33dd..43b1fd31b 100644 --- a/it/docker_dev_image_test.py +++ b/it/docker_dev_image_test.py @@ -63,9 +63,9 @@ def run_docker_compose_up(test_command): return process.run_subprocess_with_logging( f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml up --abort-on-container-exit", env=env_variables - ) + ).returncode def run_docker_compose_down(): - if process.run_subprocess_with_logging(f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml down -v") != 0: + if process.run_subprocess_with_logging(f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml down -v").returncode != 0: raise AssertionError("Failed to stop running containers from docker-compose-tests.yml") diff --git a/tests/mechanic/launcher_test.py b/tests/mechanic/launcher_test.py index c6758880e..40ca61594 100644 --- a/tests/mechanic/launcher_test.py +++ b/tests/mechanic/launcher_test.py @@ -346,7 +346,7 @@ class TestDockerLauncher: @mock.patch("esrally.utils.process.run_subprocess_with_logging") @mock.patch("esrally.utils.process.run_subprocess_with_output") def test_starts_container_successfully(self, run_subprocess_with_output, run_subprocess_with_logging): - run_subprocess_with_logging.return_value = 0 + run_subprocess_with_logging.return_value.returncode = 0 # Docker container id (from docker-compose ps), Docker container id (from docker ps --filter ...) run_subprocess_with_output.side_effect = [["de604d0d"], ["de604d0d"]] cfg = config.Config() @@ -385,7 +385,7 @@ def test_starts_container_successfully(self, run_subprocess_with_output, run_sub @mock.patch("esrally.utils.process.run_subprocess_with_logging") @mock.patch("esrally.utils.process.run_subprocess_with_output") def test_container_not_started(self, run_subprocess_with_output, run_subprocess_with_logging, sleep): - run_subprocess_with_logging.return_value = 0 + run_subprocess_with_logging.return_value.returncode = 0 # Docker container id (from docker-compose ps), but NO Docker container id (from docker ps --filter...) twice run_subprocess_with_output.side_effect = [["de604d0d"], [], []] cfg = config.Config() diff --git a/tests/mechanic/provisioner_test.py b/tests/mechanic/provisioner_test.py index fe6aeae4b..ea9bd1d74 100644 --- a/tests/mechanic/provisioner_test.py +++ b/tests/mechanic/provisioner_test.py @@ -433,7 +433,7 @@ def test_invokes_hook_no_java_home(self): class TestPluginInstaller: @mock.patch("esrally.utils.process.run_subprocess_with_logging") def test_install_plugin_successfully(self, installer_subprocess): - installer_subprocess.return_value = 0 + installer_subprocess.return_value.returncode = 0 plugin = team.PluginDescriptor( name="unit-test-plugin", @@ -453,7 +453,7 @@ def test_install_plugin_successfully(self, installer_subprocess): @mock.patch("esrally.utils.process.run_subprocess_with_logging") def test_install_plugin_with_bundled_jdk(self, installer_subprocess): - installer_subprocess.return_value = 0 + installer_subprocess.return_value.returncode = 0 plugin = team.PluginDescriptor( name="unit-test-plugin", @@ -479,7 +479,7 @@ def test_install_plugin_with_bundled_jdk(self, installer_subprocess): @mock.patch("esrally.utils.process.run_subprocess_with_logging") def test_install_unknown_plugin(self, installer_subprocess): # unknown plugin - installer_subprocess.return_value = 64 + installer_subprocess.return_value.returncode = 64 plugin = team.PluginDescriptor(name="unknown") installer = provisioner.PluginInstaller(plugin, java_home="/usr/local/javas/java8", hook_handler_class=NoopHookHandler) @@ -496,7 +496,7 @@ def test_install_unknown_plugin(self, installer_subprocess): @mock.patch("esrally.utils.process.run_subprocess_with_logging") def test_install_plugin_with_io_error(self, installer_subprocess): # I/O error - installer_subprocess.return_value = 74 + installer_subprocess.return_value.returncode = 74 plugin = team.PluginDescriptor(name="simple") installer = provisioner.PluginInstaller(plugin, java_home="/usr/local/javas/java8", hook_handler_class=NoopHookHandler) @@ -513,7 +513,7 @@ def test_install_plugin_with_io_error(self, installer_subprocess): @mock.patch("esrally.utils.process.run_subprocess_with_logging") def test_install_plugin_with_unknown_error(self, installer_subprocess): # some other error - installer_subprocess.return_value = 12987 + installer_subprocess.return_value.returncode = 12987 plugin = team.PluginDescriptor(name="simple") installer = provisioner.PluginInstaller(plugin, java_home="/usr/local/javas/java8", hook_handler_class=NoopHookHandler) diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index 47d5a0d6c..79f11a7c5 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -418,7 +418,7 @@ def test_can_override_options_for_java_9_or_above(self): class TestHeapdump: @mock.patch("esrally.utils.process.run_subprocess_with_logging") def test_generates_heap_dump(self, run_subprocess_with_logging): - run_subprocess_with_logging.return_value = 0 + run_subprocess_with_logging.return_value.returncode = 0 heapdump = telemetry.Heapdump("/var/log") t = telemetry.Telemetry(enabled_devices=[heapdump.command], devices=[heapdump]) node = cluster.Node(pid="1234", binary_path="/bin", host_name="localhost", node_name="rally0", telemetry=t) diff --git a/tests/utils/git_test.py b/tests/utils/git_test.py index 57bc152b2..fc09aef48 100644 --- a/tests/utils/git_test.py +++ b/tests/utils/git_test.py @@ -133,7 +133,7 @@ def test_list_tags(self): @mock.patch("esrally.utils.process.run_subprocess_with_logging") def test_git_version_too_old(self, run_subprocess_with_logging, run_subprocess): # any non-zero return value will do - run_subprocess_with_logging.return_value = 64 + run_subprocess_with_logging.return_value.returncode = 64 run_subprocess.return_value = "1.0.0" with pytest.raises(exceptions.SystemSetupError) as exc: diff --git a/tests/utils/process_test.py b/tests/utils/process_test.py index b54916d71..71c4b7879 100644 --- a/tests/utils/process_test.py +++ b/tests/utils/process_test.py @@ -198,3 +198,12 @@ def test_kills_only_rally_processes(self, process_iter): assert rally_process_mac.killed assert not own_rally_process.killed assert not night_rally_process.killed + + +def test_run_subprocess(): + cmd = "ls . not-a-file" + completed_process = process.run_subprocess_with_logging(cmd) + + assert completed_process.returncode != 0 + assert completed_process.stdout != "" + assert completed_process.stderr is None