From 63a3d2ab708ff70c8e013f02ab9a2156c994963a Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Fri, 18 Mar 2022 13:07:15 -0500 Subject: [PATCH 1/6] Initial LocalProcessLauncher migration Signed-off-by: Chase Engelbrecht --- .../launchers/local_process_launcher.py | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 osbenchmark/builder/launchers/local_process_launcher.py diff --git a/osbenchmark/builder/launchers/local_process_launcher.py b/osbenchmark/builder/launchers/local_process_launcher.py new file mode 100644 index 000000000..845b7512e --- /dev/null +++ b/osbenchmark/builder/launchers/local_process_launcher.py @@ -0,0 +1,176 @@ +import logging +import os + +import psutil + +from osbenchmark import time, telemetry +from osbenchmark.builder import java_resolver, cluster +from osbenchmark.builder.launchers.launcher import Launcher +from osbenchmark.utils import io, opts +from osbenchmark.utils.periodic_waiter import PeriodicWaiter + + +class LocalProcessLauncher(Launcher): + PROCESS_WAIT_TIMEOUT_SECONDS = 90 + PROCESS_WAIT_INTERVAL_SECONDS = 0.5 + + def __init__(self, pci, shell_executor, metrics_store, clock=time.Clock): + super().__init__(shell_executor) + self.logger = logging.getLogger(__name__) + self.pci = pci + self.metrics_store = metrics_store + self.waiter = PeriodicWaiter(LocalProcessLauncher.PROCESS_WAIT_INTERVAL_SECONDS, + LocalProcessLauncher.PROCESS_WAIT_TIMEOUT_SECONDS, clock=clock) + + def start(self, host, node_configurations): + node_count_on_host = len(node_configurations) + return [self._start_node(node_configuration, node_count_on_host) for node_configuration in node_configurations] + + def _start_node(self, node_configuration, node_count_on_host): + host_name = node_configuration.ip + node_name = node_configuration.node_name + binary_path = node_configuration.binary_path + data_paths = node_configuration.data_paths + node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry") + + java_major_version, java_home = java_resolver.java_home(node_configuration.provision_config_instance_runtime_jdks, + self.pci.variables["builder"]["runtime"]["jdk"], + node_configuration.provision_config_instance_provides_bundled_jdk) + self.logger.info("Java major version: %s", java_major_version) + self.logger.info("Java home: %s", java_home) + + self.logger.info("Starting node [%s].", node_name) + + enabled_devices = self.pci.variables["telemetry"]["devices"] + telemetry_params = self.pci.variables["telemetry"]["params"] + node_telemetry = [ + telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), + telemetry.JitCompiler(node_telemetry_dir), + telemetry.Gc(telemetry_params, node_telemetry_dir, java_major_version), + telemetry.Heapdump(node_telemetry_dir), + telemetry.DiskIo(node_count_on_host), + telemetry.IndexSize(data_paths), + telemetry.StartupTime(), + ] + + t = telemetry.Telemetry(enabled_devices, devices=node_telemetry) + env = self._prepare_env(node_name, java_home, t) + t.on_pre_node_start(node_name) + node_pid = self._start_process(binary_path, env) + self.logger.info("Successfully started node [%s] with PID [%s].", node_name, node_pid) + node = cluster.Node(node_pid, binary_path, host_name, node_name, t) + + self.logger.info("Attaching telemetry devices to node [%s].", node_name) + t.attach_to_node(node) + + return node + + def _prepare_env(self, node_name, java_home, t): + env = {k: v for k, v in os.environ.items() if k in opts.csv_to_list(self.pci.variables["system"]["env"]["passenv"])} + if java_home: + self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep, prepend=True) + # This property is the higher priority starting in ES 7.12.0, and is the only supported java home in >=8.0 + env["OPENSEARCH_JAVA_HOME"] = java_home + # TODO remove this when ES <8.0 becomes unsupported by Benchmark + env["JAVA_HOME"] = java_home + self.logger.info("JAVA HOME: %s", env["JAVA_HOME"]) + if not env.get("OPENSEARCH_JAVA_OPTS"): + env["OPENSEARCH_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError" + + # we just blindly trust telemetry here... + for v in t.instrument_candidate_java_opts(): + self._set_env(env, "OPENSEARCH_JAVA_OPTS", v) + + self.logger.debug("env for [%s]: %s", node_name, str(env)) + return env + + def _set_env(self, env, k, v, separator=' ', prepend=False): + if v is not None: + if k not in env: + env[k] = v + elif prepend: + env[k] = v + separator + env[k] + else: + env[k] = env[k] + separator + v + + @staticmethod + def _run_subprocess(command_line, env): + command_line_args = shlex.split(command_line) + + with subprocess.Popen(command_line_args, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=env, + start_new_session=True) as command_line_process: + # wait for it to finish + command_line_process.wait() + + return command_line_process.returncode + + def _start_process(self, binary_path, env): + if os.name == "posix" and os.geteuid() == 0: + raise exceptions.LaunchError("Cannot launch OpenSearch as root. Please run Benchmark as a non-root user.") + os.chdir(binary_path) + cmd = [io.escape_path(os.path.join(".", "bin", "opensearch"))] + cmd.extend(["-d", "-p", "pid"]) + ret = ProcessLauncher._run_subprocess(command_line=" ".join(cmd), env=env) + if ret != 0: + msg = "Daemon startup failed with exit code [{}]".format(ret) + logging.error(msg) + raise exceptions.LaunchError(msg) + + return self._wait_for_pidfile(io.escape_path(os.path.join(".", "pid"))) + + def _wait_for_pidfile(self, pidfilename, timeout=60, clock=time.Clock): + stop_watch = clock.stop_watch() + stop_watch.start() + while stop_watch.split_time() < timeout: + try: + with open(pidfilename, "rb") as f: + buf = f.read() + if not buf: + raise EOFError + return int(buf) + except (FileNotFoundError, EOFError): + time.sleep(0.5) + + msg = "pid file not available after {} seconds!".format(timeout) + logging.error(msg) + raise exceptions.LaunchError(msg) + + def stop(self, host, nodes): + self.logger.info("Shutting down [%d] nodes on this host.", len(nodes)) + stopped_nodes = [] + for node in nodes: + node_name = node.node_name + if self.metrics_store: + telemetry.add_metadata_for_node(self.metrics_store, node_name, node.host_name) + try: + opensearch = psutil.Process(pid=node.pid) + node.telemetry.detach_from_node(node, running=True) + except psutil.NoSuchProcess: + self.logger.warning("No process found with PID [%s] for node [%s].", node.pid, node_name) + opensearch = None + + if opensearch: + try: + opensearch.terminate() + opensearch.wait(10.0) + stopped_nodes.append(node) + except psutil.NoSuchProcess: + self.logger.warning("No process found with PID [%s] for node [%s].", opensearch.pid, node_name) + except psutil.TimeoutExpired: + self.logger.info("kill -KILL node [%s]", node_name) + try: + # kill -9 + opensearch.kill() + stopped_nodes.append(node) + except psutil.NoSuchProcess: + self.logger.warning("No process found with PID [%s] for node [%s].", opensearch.pid, node_name) + self.logger.info("Done shutting down node [%s].", node_name) + + node.telemetry.detach_from_node(node, running=False) + # store system metrics in any case (telemetry devices may derive system metrics while the node is running) + if self.metrics_store: + node.telemetry.store_system_metrics(node, self.metrics_store) + return stopped_nodes \ No newline at end of file From 0978def6f2463a4fbfca8776b93faa8c52b45f13 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Fri, 18 Mar 2022 13:38:22 -0500 Subject: [PATCH 2/6] Refactor LocalProcessLauncher methods to be more modular Signed-off-by: Chase Engelbrecht --- .../launchers/local_process_launcher.py | 200 ++++++++++-------- 1 file changed, 106 insertions(+), 94 deletions(-) diff --git a/osbenchmark/builder/launchers/local_process_launcher.py b/osbenchmark/builder/launchers/local_process_launcher.py index 845b7512e..d5c0df2aa 100644 --- a/osbenchmark/builder/launchers/local_process_launcher.py +++ b/osbenchmark/builder/launchers/local_process_launcher.py @@ -1,11 +1,13 @@ import logging import os +import subprocess import psutil from osbenchmark import time, telemetry from osbenchmark.builder import java_resolver, cluster from osbenchmark.builder.launchers.launcher import Launcher +from osbenchmark.exceptions import LaunchError from osbenchmark.utils import io, opts from osbenchmark.utils.periodic_waiter import PeriodicWaiter @@ -24,25 +26,40 @@ def __init__(self, pci, shell_executor, metrics_store, clock=time.Clock): def start(self, host, node_configurations): node_count_on_host = len(node_configurations) - return [self._start_node(node_configuration, node_count_on_host) for node_configuration in node_configurations] + return [self._start_node(host, node_configuration, node_count_on_host) for node_configuration in node_configurations] - def _start_node(self, node_configuration, node_count_on_host): + def _start_node(self, host, node_configuration, node_count_on_host): host_name = node_configuration.ip node_name = node_configuration.node_name binary_path = node_configuration.binary_path - data_paths = node_configuration.data_paths - node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry") java_major_version, java_home = java_resolver.java_home(node_configuration.provision_config_instance_runtime_jdks, self.pci.variables["builder"]["runtime"]["jdk"], node_configuration.provision_config_instance_provides_bundled_jdk) self.logger.info("Java major version: %s", java_major_version) self.logger.info("Java home: %s", java_home) - self.logger.info("Starting node [%s].", node_name) + telemetry = self._prepare_telemetry(node_configuration, node_count_on_host, java_major_version) + env = self._prepare_env(node_name, java_home, telemetry) + telemetry.on_pre_node_start(node_name) + + node_pid = self._start_process(host, binary_path, env) + self.logger.info("Successfully started node [%s] with PID [%s].", node_name, node_pid) + node = cluster.Node(node_pid, binary_path, host_name, node_name, telemetry) + + self.logger.info("Attaching telemetry devices to node [%s].", node_name) + telemetry.attach_to_node(node) + + return node + + def _prepare_telemetry(self, node_configuration, node_count_on_host, java_major_version): + data_paths = node_configuration.data_paths + node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry") + enabled_devices = self.pci.variables["telemetry"]["devices"] telemetry_params = self.pci.variables["telemetry"]["params"] + node_telemetry = [ telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), telemetry.JitCompiler(node_telemetry_dir), @@ -53,19 +70,9 @@ def _start_node(self, node_configuration, node_count_on_host): telemetry.StartupTime(), ] - t = telemetry.Telemetry(enabled_devices, devices=node_telemetry) - env = self._prepare_env(node_name, java_home, t) - t.on_pre_node_start(node_name) - node_pid = self._start_process(binary_path, env) - self.logger.info("Successfully started node [%s] with PID [%s].", node_name, node_pid) - node = cluster.Node(node_pid, binary_path, host_name, node_name, t) + return telemetry.Telemetry(enabled_devices, devices=node_telemetry) - self.logger.info("Attaching telemetry devices to node [%s].", node_name) - t.attach_to_node(node) - - return node - - def _prepare_env(self, node_name, java_home, t): + def _prepare_env(self, node_name, java_home, telemetry): env = {k: v for k, v in os.environ.items() if k in opts.csv_to_list(self.pci.variables["system"]["env"]["passenv"])} if java_home: self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep, prepend=True) @@ -78,99 +85,104 @@ def _prepare_env(self, node_name, java_home, t): env["OPENSEARCH_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError" # we just blindly trust telemetry here... - for v in t.instrument_candidate_java_opts(): + for v in telemetry.instrument_candidate_java_opts(): self._set_env(env, "OPENSEARCH_JAVA_OPTS", v) self.logger.debug("env for [%s]: %s", node_name, str(env)) return env - def _set_env(self, env, k, v, separator=' ', prepend=False): - if v is not None: - if k not in env: - env[k] = v + def _set_env(self, env, key, value, separator=' ', prepend=False): + if value is not None: + if key not in env: + env[key] = value elif prepend: - env[k] = v + separator + env[k] + env[key] = value + separator + env[key] else: - env[k] = env[k] + separator + v - - @staticmethod - def _run_subprocess(command_line, env): - command_line_args = shlex.split(command_line) + env[key] = env[key] + separator + value - with subprocess.Popen(command_line_args, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - env=env, - start_new_session=True) as command_line_process: - # wait for it to finish - command_line_process.wait() - - return command_line_process.returncode - - def _start_process(self, binary_path, env): + def _start_process(self, host, binary_path, env): if os.name == "posix" and os.geteuid() == 0: - raise exceptions.LaunchError("Cannot launch OpenSearch as root. Please run Benchmark as a non-root user.") + raise LaunchError("Cannot launch OpenSearch as root. Please run Benchmark as a non-root user.") + os.chdir(binary_path) cmd = [io.escape_path(os.path.join(".", "bin", "opensearch"))] cmd.extend(["-d", "-p", "pid"]) - ret = ProcessLauncher._run_subprocess(command_line=" ".join(cmd), env=env) - if ret != 0: - msg = "Daemon startup failed with exit code [{}]".format(ret) - logging.error(msg) - raise exceptions.LaunchError(msg) - - return self._wait_for_pidfile(io.escape_path(os.path.join(".", "pid"))) - - def _wait_for_pidfile(self, pidfilename, timeout=60, clock=time.Clock): - stop_watch = clock.stop_watch() - stop_watch.start() - while stop_watch.split_time() < timeout: - try: - with open(pidfilename, "rb") as f: - buf = f.read() - if not buf: - raise EOFError - return int(buf) - except (FileNotFoundError, EOFError): - time.sleep(0.5) - - msg = "pid file not available after {} seconds!".format(timeout) - logging.error(msg) - raise exceptions.LaunchError(msg) + + self.shell_executor.execute(host, " ".join(cmd), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, detach=True) + + pid_file_name = io.escape_path(os.path.join(binary_path, "pid")) + self._wait_for_pid_file(pid_file_name) + + return self._get_pid_from_file(pid_file_name) + + def _wait_for_pid_file(self, pid_file_name): + self.waiter.wait(self._is_pid_file_available, pid_file_name) + + def _is_pid_file_available(self, pid_file_name): + try: + pid = self._get_pid_from_file(pid_file_name) + return pid + except (FileNotFoundError, EOFError): + self.logger.info("PID file %s is not ready", pid_file_name) + return False + + def _get_pid_from_file(self, pid_file_name): + with open(pid_file_name, "rb") as f: + buf = f.read() + if not buf: + raise EOFError + return int(buf) def stop(self, host, nodes): self.logger.info("Shutting down [%d] nodes on this host.", len(nodes)) stopped_nodes = [] for node in nodes: - node_name = node.node_name - if self.metrics_store: - telemetry.add_metadata_for_node(self.metrics_store, node_name, node.host_name) + node_stopped = self._stop_node(node) + if node_stopped: + stopped_nodes.append(node) + + return stopped_nodes + + def _stop_node(self, node): + node_stopped = False + + if self.metrics_store: + telemetry.add_metadata_for_node(self.metrics_store, node.node_name, node.host_name) + + opensearch_process = self._get_opensearch_process(node) + if opensearch_process: + node.telemetry.detach_from_node(node, running=True) + node_stopped = self._stop_process(opensearch_process) + node.telemetry.detach_from_node(node, running=False) + # store system metrics in any case (telemetry devices may derive system metrics while the node is running) + if self.metrics_store: + node.telemetry.store_system_metrics(node, self.metrics_store) + + return node_stopped + + def _get_opensearch_process(self, node): + try: + return psutil.Process(pid=node.pid) + except psutil.NoSuchProcess: + self.logger.warning("No process found with PID [%s] for node [%s].", node.pid, node.node_name) + + def _stop_process(self, opensearch_process, node): + process_stopped = False + + try: + opensearch_process.terminate() + opensearch_process.wait(10.0) + process_stopped = True + except psutil.NoSuchProcess: + self.logger.warning("No process found with PID [%s] for node [%s].", opensearch_process.pid, node.node_name) + except psutil.TimeoutExpired: + self.logger.info("kill -KILL node [%s]", node.node_name) try: - opensearch = psutil.Process(pid=node.pid) - node.telemetry.detach_from_node(node, running=True) + # kill -9 + opensearch_process.kill() + process_stopped = True except psutil.NoSuchProcess: - self.logger.warning("No process found with PID [%s] for node [%s].", node.pid, node_name) - opensearch = None - - if opensearch: - try: - opensearch.terminate() - opensearch.wait(10.0) - stopped_nodes.append(node) - except psutil.NoSuchProcess: - self.logger.warning("No process found with PID [%s] for node [%s].", opensearch.pid, node_name) - except psutil.TimeoutExpired: - self.logger.info("kill -KILL node [%s]", node_name) - try: - # kill -9 - opensearch.kill() - stopped_nodes.append(node) - except psutil.NoSuchProcess: - self.logger.warning("No process found with PID [%s] for node [%s].", opensearch.pid, node_name) - self.logger.info("Done shutting down node [%s].", node_name) - - node.telemetry.detach_from_node(node, running=False) - # store system metrics in any case (telemetry devices may derive system metrics while the node is running) - if self.metrics_store: - node.telemetry.store_system_metrics(node, self.metrics_store) - return stopped_nodes \ No newline at end of file + self.logger.warning("No process found with PID [%s] for node [%s].", opensearch_process.pid, node.node_name) + self.logger.info("Done shutting down node [%s].", node.node_name) + + return process_stopped From de97f990e2709b4f675ce31b846772135d78fa9f Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Fri, 18 Mar 2022 15:05:34 -0500 Subject: [PATCH 3/6] Migrate ProcessLauncher unit tests to LocalProcessLauncher Signed-off-by: Chase Engelbrecht --- .../launchers/local_process_launcher.py | 9 +- .../launchers/local_process_launcher_test.py | 159 ++++++++++++++++++ 2 files changed, 163 insertions(+), 5 deletions(-) create mode 100644 tests/builder/launchers/local_process_launcher_test.py diff --git a/osbenchmark/builder/launchers/local_process_launcher.py b/osbenchmark/builder/launchers/local_process_launcher.py index d5c0df2aa..010edfbed 100644 --- a/osbenchmark/builder/launchers/local_process_launcher.py +++ b/osbenchmark/builder/launchers/local_process_launcher.py @@ -34,7 +34,7 @@ def _start_node(self, host, node_configuration, node_count_on_host): binary_path = node_configuration.binary_path java_major_version, java_home = java_resolver.java_home(node_configuration.provision_config_instance_runtime_jdks, - self.pci.variables["builder"]["runtime"]["jdk"], + self.pci.variables["system"]["runtime"]["jdk"], node_configuration.provision_config_instance_provides_bundled_jdk) self.logger.info("Java major version: %s", java_major_version) self.logger.info("Java home: %s", java_home) @@ -104,8 +104,7 @@ def _start_process(self, host, binary_path, env): if os.name == "posix" and os.geteuid() == 0: raise LaunchError("Cannot launch OpenSearch as root. Please run Benchmark as a non-root user.") - os.chdir(binary_path) - cmd = [io.escape_path(os.path.join(".", "bin", "opensearch"))] + cmd = [io.escape_path(os.path.join(binary_path, "bin", "opensearch"))] cmd.extend(["-d", "-p", "pid"]) self.shell_executor.execute(host, " ".join(cmd), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, detach=True) @@ -121,7 +120,7 @@ def _wait_for_pid_file(self, pid_file_name): def _is_pid_file_available(self, pid_file_name): try: pid = self._get_pid_from_file(pid_file_name) - return pid + return pid != 0 except (FileNotFoundError, EOFError): self.logger.info("PID file %s is not ready", pid_file_name) return False @@ -152,7 +151,7 @@ def _stop_node(self, node): opensearch_process = self._get_opensearch_process(node) if opensearch_process: node.telemetry.detach_from_node(node, running=True) - node_stopped = self._stop_process(opensearch_process) + node_stopped = self._stop_process(opensearch_process, node) node.telemetry.detach_from_node(node, running=False) # store system metrics in any case (telemetry devices may derive system metrics while the node is running) if self.metrics_store: diff --git a/tests/builder/launchers/local_process_launcher_test.py b/tests/builder/launchers/local_process_launcher_test.py new file mode 100644 index 000000000..e0f728fa4 --- /dev/null +++ b/tests/builder/launchers/local_process_launcher_test.py @@ -0,0 +1,159 @@ +import os +from unittest import TestCase, mock +from unittest.mock import Mock, mock_open + +from psutil import NoSuchProcess + +from osbenchmark import telemetry +from osbenchmark.builder import cluster +from osbenchmark.builder.launchers.local_process_launcher import LocalProcessLauncher +from osbenchmark.builder.provision_config import ProvisionConfigInstance +from osbenchmark.builder.provisioner import NodeConfiguration + + +class LocalProcessLauncherTests(TestCase): + def setUp(self): + self.shell_executor = Mock() + self.metrics_store = Mock() + + self.variables = { + "system": { + "runtime": { + "jdk": None + }, + "env": { + "passenv": "PATH" + } + }, + "telemetry": { + "devices": [], + "params": None + } + } + self.pci = ProvisionConfigInstance("fake_pci", "/path/to/root", ["/path/to/config"], variables=self.variables) + + self.launcher = LocalProcessLauncher(self.pci, self.shell_executor, self.metrics_store) + self.launcher.waiter = Mock() + self.host = None + self.path = "fake" + + @mock.patch('osbenchmark.builder.java_resolver.java_home', return_value=(12, "/java_home/")) + @mock.patch('osbenchmark.utils.jvm.supports_option', return_value=True) + @mock.patch('osbenchmark.utils.io.get_size') + @mock.patch('osbenchmark.telemetry') + @mock.patch('psutil.Process') + def test_daemon_start_stop(self, process, telemetry, get_size, supports, java_home): + mo = mock_open(read_data="1234") + + node_configs = [] + for node in range(2): + node_configs.append(NodeConfiguration(build_type="tar", + provision_config_instance_runtime_jdks="12,11", + provision_config_instance_provides_bundled_jdk=True, + ip="127.0.0.1", + node_name="testnode-{}".format(node), + node_root_path="/tmp", + binary_path="/tmp", + data_paths="/tmp")) + + with mock.patch("builtins.open", mo): + nodes = self.launcher.start(self.host, node_configs) + + self.assertEqual(len(nodes), 2) + self.assertEqual(nodes[0].pid, 1234) + + stopped_nodes = self.launcher.stop(self.host, nodes) + # all nodes should be stopped + self.assertEqual(nodes, stopped_nodes) + + @mock.patch('psutil.Process') + def test_daemon_stop_with_already_terminated_process(self, process): + process.side_effect = NoSuchProcess(123) + + nodes = [ + cluster.Node(pid=-1, + binary_path="/bin", + host_name="localhost", + node_name="benchmark-0", + telemetry=telemetry.Telemetry()) + ] + + stopped_nodes = self.launcher.stop(self.host, nodes) + # no nodes should have been stopped (they were already stopped) + self.assertEqual([], stopped_nodes) + + # flight recorder shows a warning for several seconds before continuing + @mock.patch("osbenchmark.time.sleep") + def test_env_options_order(self, sleep): + node_telemetry = [ + telemetry.FlightRecorder(telemetry_params={}, log_root="/tmp/telemetry", java_major_version=8) + ] + telem = telemetry.Telemetry(["jfr"], devices=node_telemetry) + env = self.launcher._prepare_env(node_name="node0", java_home="/java_home", telemetry=telem) + + self.assertEqual("/java_home/bin" + os.pathsep + os.environ["PATH"], env["PATH"]) + self.assertEqual("-XX:+ExitOnOutOfMemoryError -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints " + "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder " + "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath=/tmp/telemetry/profile.jfr " # pylint: disable=line-too-long + "-XX:StartFlightRecording=defaultrecording=true", env["OPENSEARCH_JAVA_OPTS"]) + + def test_bundled_jdk_not_in_path(self): + os.environ["JAVA_HOME"] = "/path/to/java" + + telem = telemetry.Telemetry() + # no JAVA_HOME -> use the bundled JDK + env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem) + + # unmodified + self.assertEqual(os.environ["PATH"], env["PATH"]) + self.assertIsNone(env.get("JAVA_HOME")) + + def test_pass_env_vars(self): + self.pci.variables["system"]["env"]["passenv"] = "JAVA_HOME,FOO1" + + os.environ["JAVA_HOME"] = "/path/to/java" + os.environ["FOO1"] = "BAR1" + + telem = telemetry.Telemetry() + # no JAVA_HOME -> use the bundled JDK + env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem) + + # unmodified + self.assertEqual(os.environ["JAVA_HOME"], env["JAVA_HOME"]) + self.assertEqual(os.environ["FOO1"], env["FOO1"]) + self.assertEqual(env["OPENSEARCH_JAVA_OPTS"], "-XX:+ExitOnOutOfMemoryError") + + def test_pass_java_opts(self): + self.pci.variables["system"]["env"]["passenv"] = "OPENSEARCH_JAVA_OPTS" + + os.environ["OPENSEARCH_JAVA_OPTS"] = "-XX:-someJunk" + + telem = telemetry.Telemetry() + # no JAVA_HOME -> use the bundled JDK + env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem) + + # unmodified + self.assertEqual(os.environ["OPENSEARCH_JAVA_OPTS"], env["OPENSEARCH_JAVA_OPTS"]) + + def test_pid_file_not_created(self): + mo = mock_open() + + with mock.patch("builtins.open", mo): + mo.side_effect = FileNotFoundError + + is_pid_file_ready = self.launcher._is_pid_file_available("fake") + self.assertEqual(is_pid_file_ready, False) + + def test_pid_file_empty(self): + mo = mock_open(read_data="") + + with mock.patch("builtins.open", mo): + is_pid_file_ready = self.launcher._is_pid_file_available("fake") + self.assertEqual(is_pid_file_ready, False) + + def test_pid_file_ready(self): + mo = mock_open(read_data="1234") + + with mock.patch("builtins.open", mo): + is_pid_file_ready = self.launcher._is_pid_file_available("fake") + self.assertEqual(is_pid_file_ready, True) From 6da1b3fbfb0f7576242d48f4d68ea607075dac4c Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Fri, 18 Mar 2022 15:17:47 -0500 Subject: [PATCH 4/6] Add ExceptionHandlingLauncher Signed-off-by: Chase Engelbrecht --- .../launchers/exception_handling_launcher.py | 20 +++++++++++++++++++ .../launchers/local_process_launcher_test.py | 2 ++ 2 files changed, 22 insertions(+) create mode 100644 osbenchmark/builder/launchers/exception_handling_launcher.py diff --git a/osbenchmark/builder/launchers/exception_handling_launcher.py b/osbenchmark/builder/launchers/exception_handling_launcher.py new file mode 100644 index 000000000..378248cfe --- /dev/null +++ b/osbenchmark/builder/launchers/exception_handling_launcher.py @@ -0,0 +1,20 @@ +from osbenchmark.builder.launchers.launcher import Launcher +from osbenchmark.exceptions import LaunchError + + +class ExceptionHandlingLauncher(Launcher): + def __init__(self, launcher, shell_executor=None): + super().__init__(shell_executor) + self.launcher = launcher + + def start(self, host, node_configurations): + try: + self.launcher.start(host, node_configurations) + except Exception as e: + raise LaunchError("Launching node(s) on host \"{}\" failed".format(host), e) + + def stop(self, host, nodes): + try: + self.launcher.stop(host, nodes) + except Exception as e: + raise LaunchError("Launching node(s) on host \"{}\" failed".format(host), e) diff --git a/tests/builder/launchers/local_process_launcher_test.py b/tests/builder/launchers/local_process_launcher_test.py index e0f728fa4..bcefb2f69 100644 --- a/tests/builder/launchers/local_process_launcher_test.py +++ b/tests/builder/launchers/local_process_launcher_test.py @@ -1,3 +1,5 @@ +# pylint: disable=protected-access + import os from unittest import TestCase, mock from unittest.mock import Mock, mock_open From 74c333225ef5bc3f417e2a6a7166773d5bb5e55f Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 21 Mar 2022 11:12:30 -0500 Subject: [PATCH 5/6] Rename variable v to jvm_option Signed-off-by: Chase Engelbrecht --- osbenchmark/builder/launchers/local_process_launcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osbenchmark/builder/launchers/local_process_launcher.py b/osbenchmark/builder/launchers/local_process_launcher.py index 010edfbed..48ce783bc 100644 --- a/osbenchmark/builder/launchers/local_process_launcher.py +++ b/osbenchmark/builder/launchers/local_process_launcher.py @@ -85,8 +85,8 @@ def _prepare_env(self, node_name, java_home, telemetry): env["OPENSEARCH_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError" # we just blindly trust telemetry here... - for v in telemetry.instrument_candidate_java_opts(): - self._set_env(env, "OPENSEARCH_JAVA_OPTS", v) + for jvm_option in telemetry.instrument_candidate_java_opts(): + self._set_env(env, "OPENSEARCH_JAVA_OPTS", jvm_option) self.logger.debug("env for [%s]: %s", node_name, str(env)) return env From ce1d2ca8306b5e7858b37ec049a900dfa28e866a Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Tue, 22 Mar 2022 09:05:49 -0500 Subject: [PATCH 6/6] Return values from ExceptionHandlingLauncher methods and update the log statements to be more specific Signed-off-by: Chase Engelbrecht --- .../builder/launchers/exception_handling_launcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/osbenchmark/builder/launchers/exception_handling_launcher.py b/osbenchmark/builder/launchers/exception_handling_launcher.py index 378248cfe..b4d9ca318 100644 --- a/osbenchmark/builder/launchers/exception_handling_launcher.py +++ b/osbenchmark/builder/launchers/exception_handling_launcher.py @@ -9,12 +9,12 @@ def __init__(self, launcher, shell_executor=None): def start(self, host, node_configurations): try: - self.launcher.start(host, node_configurations) + return self.launcher.start(host, node_configurations) except Exception as e: - raise LaunchError("Launching node(s) on host \"{}\" failed".format(host), e) + raise LaunchError("Starting node(s) on host \"{}\" failed".format(host), e) def stop(self, host, nodes): try: - self.launcher.stop(host, nodes) + return self.launcher.stop(host, nodes) except Exception as e: - raise LaunchError("Launching node(s) on host \"{}\" failed".format(host), e) + raise LaunchError("Stopping node(s) on host \"{}\" failed".format(host), e)