Skip to content

Commit

Permalink
Adding returncode, stdout fields to run_subprocess_with_logging
Browse files Browse the repository at this point in the history
  • Loading branch information
favilo committed Feb 1, 2024
1 parent 21caafa commit a2ee76e
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 43 deletions.
2 changes: 1 addition & 1 deletion esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def start(self, node_configurations):
def _start_process(self, binary_path):
compose_cmd = self._docker_compose(binary_path, "up -d")

ret = process.run_subprocess_with_logging(compose_cmd)
ret = process.run_subprocess_with_logging(compose_cmd).returncode
if ret != 0:
msg = f"Docker daemon startup failed with exit code [{ret}]"
logging.error(msg)
Expand Down
2 changes: 1 addition & 1 deletion esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def install(self, es_home_path, plugin_url=None):
self.logger.info("Installing [%s] into [%s]", self.plugin_name, es_home_path)
install_cmd = '%s install --batch "%s"' % (installer_binary_path, self.plugin_name)

return_code = process.run_subprocess_with_logging(install_cmd, env=self.env())
return_code = process.run_subprocess_with_logging(install_cmd, env=self.env()).returncode
# see: https://www.elastic.co/guide/en/elasticsearch/plugins/current/_other_command_line_parameters.html
if return_code == 0:
self.logger.info("Successfully installed [%s].", self.plugin_name)
Expand Down
2 changes: 1 addition & 1 deletion esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def detach_from_node(self, node, running):
heap_dump_file = os.path.join(self.log_root, f"heap_at_exit_{node.pid}.hprof")
console.info(f"{self.human_name}: Writing heap dump to [{heap_dump_file}]", logger=self.logger)
cmd = f"jmap -dump:format=b,file={heap_dump_file} {node.pid}"
if process.run_subprocess_with_logging(cmd):
if process.run_subprocess_with_logging(cmd).returncode:
self.logger.warning("Could not write heap dump to [%s]", heap_dump_file)


Expand Down
23 changes: 12 additions & 11 deletions esrally/utils/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def probed(f):
def probe(src, *args, **kwargs):
# Probe for -C
if not process.exit_status_as_bool(
lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG), quiet=True
lambda: process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} --version", level=logging.DEBUG).returncode,
quiet=True,
):
version = process.run_subprocess_with_output("git --version")
if version:
Expand All @@ -51,14 +52,14 @@ def is_working_copy(src):
@probed
def is_branch(src_dir, identifier):
show_ref_cmd = f"git -C {src_dir} show-ref {identifier}"
completed_process = process.run_subprocess(show_ref_cmd)
completed = process.run_subprocess_with_logging(show_ref_cmd)

# if we get an non-zero exit code, we know that the identifier is not a branch (local or remote)
if not process.exit_status_as_bool(lambda: completed_process.returncode):
if not process.exit_status_as_bool(lambda: completed.returncode):
return False

# it's possible the identifier could be a tag, so we explicitly check that here
ref = completed_process.stdout.decode("utf-8").split("\n")
ref = completed.stdout.split("\n")
if "refs/tags" in ref[0]:
return False

Expand All @@ -68,32 +69,32 @@ def is_branch(src_dir, identifier):
def clone(src, *, remote):
io.ensure_dir(src)
# Don't swallow subprocess output, user might need to enter credentials...
if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))):
if process.run_subprocess_with_logging("git clone %s %s" % (remote, io.escape_path(src))).returncode:
raise exceptions.SupplyError("Could not clone from [%s] to [%s]" % (remote, src))


@probed
def fetch(src, *, remote):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src)} fetch --prune --tags {remote}").returncode:
raise exceptions.SupplyError("Could not fetch source tree from [%s]" % remote)


@probed
def checkout(src_dir, *, branch):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {branch}").returncode:
raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch)


@probed
def checkout_branch(src_dir, remote, branch):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {remote}/{branch}").returncode:
raise exceptions.SupplyError("Could not checkout [%s]. Do you have uncommitted changes?" % branch)


@probed
def rebase(src_dir, *, remote, branch):
checkout(src_dir, branch=branch)
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} rebase {remote}/{branch}").returncode:
raise exceptions.SupplyError("Could not rebase on branch [%s]" % branch)


Expand All @@ -109,13 +110,13 @@ def pull_ts(src_dir, ts, *, remote, branch):
clean_src = io.escape_path(src_dir)
rev_list_command = f'git -C {clean_src} rev-list -n 1 --before="{ts}" --date=iso8601 {remote}/{branch}'
revision = process.run_subprocess_with_output(rev_list_command)[0].strip()
if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}"):
if process.run_subprocess_with_logging(f"git -C {clean_src} checkout {revision}").returncode:
raise exceptions.SupplyError("Could not checkout source tree for timestamped revision [%s]" % ts)


@probed
def checkout_revision(src_dir, *, revision):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}"):
if process.run_subprocess_with_logging(f"git -C {io.escape_path(src_dir)} checkout {revision}").returncode:
raise exceptions.SupplyError("Could not checkout source tree for revision [%s]" % revision)


Expand Down
2 changes: 1 addition & 1 deletion esrally/utils/jvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def supports_option(java_home, option):
:param option: The JVM option or combination of JVM options (separated by spaces) to check.
:return: True iff the provided ``option`` is supported on this JVM.
"""
return process.exit_status_as_bool(lambda: process.run_subprocess_with_logging(f"{_java(java_home)} {option} -version"))
return process.exit_status_as_bool(lambda: process.run_subprocess_with_logging(f"{_java(java_home)} {option} -version").returncode)


def system_property(java_home, system_property_name):
Expand Down
46 changes: 32 additions & 14 deletions esrally/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@
import shlex
import subprocess
import time
from typing import Callable, Dict, List

import psutil


def run_subprocess(command_line):
return subprocess.run(command_line, shell=True, capture_output=True, check=False)
def run_subprocess(command_line: str) -> subprocess.CompletedProcess:
"""
Runs the provided command line in a subprocess. All output will be returned in the `CompletedProcess.stdout` field.
:param command_line: The command line of the subprocess to launch.
:return: The `CompletedProcess` object for the subprocess. `.returncode` contains the process' return code
"""
return subprocess.run(command_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False)


def run_subprocess_with_output(command_line, env=None):
def run_subprocess_with_output(command_line: str, env: Dict[str, str] = None) -> List[str]:
logger = logging.getLogger(__name__)
logger.debug("Running subprocess [%s] with output.", command_line)
command_line_args = shlex.split(command_line)
Expand All @@ -44,7 +51,7 @@ def run_subprocess_with_output(command_line, env=None):
return lines


def exit_status_as_bool(runnable, quiet=False):
def exit_status_as_bool(runnable: Callable[[], int], quiet: bool = False) -> bool:
"""
:param runnable: A runnable returning an int as exit status assuming ``0`` is meaning success.
Expand All @@ -60,7 +67,18 @@ def exit_status_as_bool(runnable, quiet=False):
return False


def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, stdin=None, env=None, detach=False):
LogLevel = int
FileId = int


def run_subprocess_with_logging(
command_line: str,
header: str = None,
level: LogLevel = logging.INFO,
stdin: FileId = None,
env: Dict[str, str] = None,
detach: bool = False,
) -> subprocess.Popen:
"""
Runs the provided command line in a subprocess. All output will be captured by a logger.
Expand All @@ -71,7 +89,7 @@ def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, s
(default: None).
:param env: Use specific environment variables (default: None).
:param detach: Whether to detach this process from its parent process (default: False).
:return: The process exit code as an int.
:return: The `Popen` object for the subprocess. `.returncode` contains the process' return code
"""
logger = logging.getLogger(__name__)
logger.debug("Running subprocess [%s] with logging.", command_line)
Expand All @@ -95,10 +113,10 @@ def run_subprocess_with_logging(command_line, header=None, level=logging.INFO, s
logger.log(level=level, msg=stdout)

logger.debug("Subprocess [%s] finished with return code [%s].", command_line, str(command_line_process.returncode))
return command_line_process.returncode
return command_line_process


def is_rally_process(p):
def is_rally_process(p: psutil.Process) -> bool:
return (
p.name() == "esrally"
or p.name() == "rally"
Expand All @@ -110,14 +128,14 @@ def is_rally_process(p):
)


def find_all_other_rally_processes():
def find_all_other_rally_processes() -> List[psutil.Process]:
others = []
for_all_other_processes(is_rally_process, others.append)
return others


def kill_all(predicate):
def kill(p):
def kill_all(predicate: Callable[[psutil.Process], bool]) -> None:
def kill(p: psutil.Process):
logging.getLogger(__name__).info("Killing lingering process with PID [%s] and command line [%s].", p.pid, p.cmdline())
p.kill()
# wait until process has terminated, at most 3 seconds. Otherwise we might run into race conditions with actor system
Expand All @@ -132,7 +150,7 @@ def kill(p):
for_all_other_processes(predicate, kill)


def for_all_other_processes(predicate, action):
def for_all_other_processes(predicate: Callable[[psutil.Process], bool], action: Callable[[psutil.Process], None]) -> None:
# no harakiri please
my_pid = os.getpid()
for p in psutil.process_iter():
Expand All @@ -143,8 +161,8 @@ def for_all_other_processes(predicate, action):
pass


def kill_running_rally_instances():
def rally_process(p):
def kill_running_rally_instances() -> None:
def rally_process(p: psutil.Process) -> bool:
return (
p.name() == "esrally"
or p.name() == "rally"
Expand Down
6 changes: 3 additions & 3 deletions it/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ def wait_until_port_is_free(port_number=39200, timeout=120):


def check_prerequisites():
if process.run_subprocess_with_logging("docker ps") != 0:
if process.run_subprocess_with_logging("docker ps").returncode != 0:
raise AssertionError("Docker must be installed and the daemon must be up and running to run integration tests.")
if process.run_subprocess_with_logging("docker-compose --help") != 0:
if process.run_subprocess_with_logging("docker-compose --help").returncode != 0:
raise AssertionError("Docker Compose is required to run integration tests.")


Expand Down Expand Up @@ -253,7 +253,7 @@ def build_docker_image():
f"-f {ROOT_DIR}/docker/Dockerfiles/Dockerfile-dev {ROOT_DIR}"
)

if process.run_subprocess_with_logging(command, env=env_variables) != 0:
if process.run_subprocess_with_logging(command, env=env_variables).returncode != 0:
raise AssertionError("It was not possible to build the docker image from Dockerfile-dev")


Expand Down
4 changes: 2 additions & 2 deletions it/docker_dev_image_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ def run_docker_compose_up(test_command):

return process.run_subprocess_with_logging(
f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml up --abort-on-container-exit", env=env_variables
)
).returncode


def run_docker_compose_down():
if process.run_subprocess_with_logging(f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml down -v") != 0:
if process.run_subprocess_with_logging(f"docker-compose -f {it.ROOT_DIR}/docker/docker-compose-tests.yml down -v").returncode != 0:
raise AssertionError("Failed to stop running containers from docker-compose-tests.yml")
4 changes: 2 additions & 2 deletions tests/mechanic/launcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ class TestDockerLauncher:
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
@mock.patch("esrally.utils.process.run_subprocess_with_output")
def test_starts_container_successfully(self, run_subprocess_with_output, run_subprocess_with_logging):
run_subprocess_with_logging.return_value = 0
run_subprocess_with_logging.return_value.returncode = 0
# Docker container id (from docker-compose ps), Docker container id (from docker ps --filter ...)
run_subprocess_with_output.side_effect = [["de604d0d"], ["de604d0d"]]
cfg = config.Config()
Expand Down Expand Up @@ -385,7 +385,7 @@ def test_starts_container_successfully(self, run_subprocess_with_output, run_sub
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
@mock.patch("esrally.utils.process.run_subprocess_with_output")
def test_container_not_started(self, run_subprocess_with_output, run_subprocess_with_logging, sleep):
run_subprocess_with_logging.return_value = 0
run_subprocess_with_logging.return_value.returncode = 0
# Docker container id (from docker-compose ps), but NO Docker container id (from docker ps --filter...) twice
run_subprocess_with_output.side_effect = [["de604d0d"], [], []]
cfg = config.Config()
Expand Down
10 changes: 5 additions & 5 deletions tests/mechanic/provisioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def test_invokes_hook_no_java_home(self):
class TestPluginInstaller:
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_install_plugin_successfully(self, installer_subprocess):
installer_subprocess.return_value = 0
installer_subprocess.return_value.returncode = 0

plugin = team.PluginDescriptor(
name="unit-test-plugin",
Expand All @@ -453,7 +453,7 @@ def test_install_plugin_successfully(self, installer_subprocess):

@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_install_plugin_with_bundled_jdk(self, installer_subprocess):
installer_subprocess.return_value = 0
installer_subprocess.return_value.returncode = 0

plugin = team.PluginDescriptor(
name="unit-test-plugin",
Expand All @@ -479,7 +479,7 @@ def test_install_plugin_with_bundled_jdk(self, installer_subprocess):
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_install_unknown_plugin(self, installer_subprocess):
# unknown plugin
installer_subprocess.return_value = 64
installer_subprocess.return_value.returncode = 64

plugin = team.PluginDescriptor(name="unknown")
installer = provisioner.PluginInstaller(plugin, java_home="/usr/local/javas/java8", hook_handler_class=NoopHookHandler)
Expand All @@ -496,7 +496,7 @@ def test_install_unknown_plugin(self, installer_subprocess):
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_install_plugin_with_io_error(self, installer_subprocess):
# I/O error
installer_subprocess.return_value = 74
installer_subprocess.return_value.returncode = 74

plugin = team.PluginDescriptor(name="simple")
installer = provisioner.PluginInstaller(plugin, java_home="/usr/local/javas/java8", hook_handler_class=NoopHookHandler)
Expand All @@ -513,7 +513,7 @@ def test_install_plugin_with_io_error(self, installer_subprocess):
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_install_plugin_with_unknown_error(self, installer_subprocess):
# some other error
installer_subprocess.return_value = 12987
installer_subprocess.return_value.returncode = 12987

plugin = team.PluginDescriptor(name="simple")
installer = provisioner.PluginInstaller(plugin, java_home="/usr/local/javas/java8", hook_handler_class=NoopHookHandler)
Expand Down
2 changes: 1 addition & 1 deletion tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def test_can_override_options_for_java_9_or_above(self):
class TestHeapdump:
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_generates_heap_dump(self, run_subprocess_with_logging):
run_subprocess_with_logging.return_value = 0
run_subprocess_with_logging.return_value.returncode = 0
heapdump = telemetry.Heapdump("/var/log")
t = telemetry.Telemetry(enabled_devices=[heapdump.command], devices=[heapdump])
node = cluster.Node(pid="1234", binary_path="/bin", host_name="localhost", node_name="rally0", telemetry=t)
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/git_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_list_tags(self):
@mock.patch("esrally.utils.process.run_subprocess_with_logging")
def test_git_version_too_old(self, run_subprocess_with_logging, run_subprocess):
# any non-zero return value will do
run_subprocess_with_logging.return_value = 64
run_subprocess_with_logging.return_value.returncode = 64
run_subprocess.return_value = "1.0.0"

with pytest.raises(exceptions.SystemSetupError) as exc:
Expand Down
9 changes: 9 additions & 0 deletions tests/utils/process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,12 @@ def test_kills_only_rally_processes(self, process_iter):
assert rally_process_mac.killed
assert not own_rally_process.killed
assert not night_rally_process.killed


def test_run_subprocess():
cmd = "ls . not-a-file"
completed_process = process.run_subprocess_with_logging(cmd)

assert completed_process.returncode != 0
assert completed_process.stdout != ""
assert completed_process.stderr is None

0 comments on commit a2ee76e

Please sign in to comment.