-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create LocalProcessLauncher and ExceptionHandlingLauncher #164
Changes from all commits
63a3d2a
0978def
de97f99
6da1b3f
74c3332
ce1d2ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from osbenchmark.builder.launchers.launcher import Launcher | ||
from osbenchmark.exceptions import LaunchError | ||
|
||
|
||
class ExceptionHandlingLauncher(Launcher): | ||
def __init__(self, launcher, shell_executor=None): | ||
super().__init__(shell_executor) | ||
self.launcher = launcher | ||
|
||
def start(self, host, node_configurations): | ||
try: | ||
return self.launcher.start(host, node_configurations) | ||
except Exception as e: | ||
raise LaunchError("Starting node(s) on host \"{}\" failed".format(host), e) | ||
|
||
def stop(self, host, nodes): | ||
try: | ||
return self.launcher.stop(host, nodes) | ||
except Exception as e: | ||
raise LaunchError("Stopping node(s) on host \"{}\" failed".format(host), e) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
import logging | ||
import os | ||
import subprocess | ||
|
||
import psutil | ||
|
||
from osbenchmark import time, telemetry | ||
from osbenchmark.builder import java_resolver, cluster | ||
from osbenchmark.builder.launchers.launcher import Launcher | ||
from osbenchmark.exceptions import LaunchError | ||
from osbenchmark.utils import io, opts | ||
from osbenchmark.utils.periodic_waiter import PeriodicWaiter | ||
|
||
|
||
class LocalProcessLauncher(Launcher): | ||
PROCESS_WAIT_TIMEOUT_SECONDS = 90 | ||
PROCESS_WAIT_INTERVAL_SECONDS = 0.5 | ||
|
||
def __init__(self, pci, shell_executor, metrics_store, clock=time.Clock): | ||
super().__init__(shell_executor) | ||
self.logger = logging.getLogger(__name__) | ||
self.pci = pci | ||
self.metrics_store = metrics_store | ||
self.waiter = PeriodicWaiter(LocalProcessLauncher.PROCESS_WAIT_INTERVAL_SECONDS, | ||
LocalProcessLauncher.PROCESS_WAIT_TIMEOUT_SECONDS, clock=clock) | ||
|
||
def start(self, host, node_configurations): | ||
node_count_on_host = len(node_configurations) | ||
return [self._start_node(host, node_configuration, node_count_on_host) for node_configuration in node_configurations] | ||
|
||
def _start_node(self, host, node_configuration, node_count_on_host): | ||
host_name = node_configuration.ip | ||
node_name = node_configuration.node_name | ||
binary_path = node_configuration.binary_path | ||
|
||
java_major_version, java_home = java_resolver.java_home(node_configuration.provision_config_instance_runtime_jdks, | ||
self.pci.variables["system"]["runtime"]["jdk"], | ||
node_configuration.provision_config_instance_provides_bundled_jdk) | ||
self.logger.info("Java major version: %s", java_major_version) | ||
self.logger.info("Java home: %s", java_home) | ||
self.logger.info("Starting node [%s].", node_name) | ||
|
||
telemetry = self._prepare_telemetry(node_configuration, node_count_on_host, java_major_version) | ||
env = self._prepare_env(node_name, java_home, telemetry) | ||
telemetry.on_pre_node_start(node_name) | ||
|
||
node_pid = self._start_process(host, binary_path, env) | ||
self.logger.info("Successfully started node [%s] with PID [%s].", node_name, node_pid) | ||
node = cluster.Node(node_pid, binary_path, host_name, node_name, telemetry) | ||
|
||
self.logger.info("Attaching telemetry devices to node [%s].", node_name) | ||
telemetry.attach_to_node(node) | ||
|
||
return node | ||
|
||
def _prepare_telemetry(self, node_configuration, node_count_on_host, java_major_version): | ||
data_paths = node_configuration.data_paths | ||
node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry") | ||
|
||
enabled_devices = self.pci.variables["telemetry"]["devices"] | ||
telemetry_params = self.pci.variables["telemetry"]["params"] | ||
|
||
node_telemetry = [ | ||
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version), | ||
telemetry.JitCompiler(node_telemetry_dir), | ||
telemetry.Gc(telemetry_params, node_telemetry_dir, java_major_version), | ||
telemetry.Heapdump(node_telemetry_dir), | ||
telemetry.DiskIo(node_count_on_host), | ||
telemetry.IndexSize(data_paths), | ||
telemetry.StartupTime(), | ||
] | ||
|
||
return telemetry.Telemetry(enabled_devices, devices=node_telemetry) | ||
|
||
def _prepare_env(self, node_name, java_home, telemetry): | ||
env = {k: v for k, v in os.environ.items() if k in opts.csv_to_list(self.pci.variables["system"]["env"]["passenv"])} | ||
if java_home: | ||
self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep, prepend=True) | ||
# This property is the higher priority starting in ES 7.12.0, and is the only supported java home in >=8.0 | ||
env["OPENSEARCH_JAVA_HOME"] = java_home | ||
# TODO remove this when ES <8.0 becomes unsupported by Benchmark | ||
env["JAVA_HOME"] = java_home | ||
self.logger.info("JAVA HOME: %s", env["JAVA_HOME"]) | ||
if not env.get("OPENSEARCH_JAVA_OPTS"): | ||
env["OPENSEARCH_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError" | ||
|
||
# we just blindly trust telemetry here... | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a note or will this be a TODO to refactor this portion so that we don't blindly trust telemetry in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an inherited comment. I don't think it's a TODO so much as calling out that the telemetry devices can set environment variables that may break the launching of OpenSearch. Essentially loosely passing the burden of validating the env vars that a telemetry device specifies to the telemetry device itself. A better solution would probably be an allowlist, but I don't have enough familiarity with the scope of telemetry environment variables to propose one. |
||
for jvm_option in telemetry.instrument_candidate_java_opts(): | ||
self._set_env(env, "OPENSEARCH_JAVA_OPTS", jvm_option) | ||
|
||
self.logger.debug("env for [%s]: %s", node_name, str(env)) | ||
return env | ||
|
||
def _set_env(self, env, key, value, separator=' ', prepend=False): | ||
if value is not None: | ||
if key not in env: | ||
env[key] = value | ||
elif prepend: | ||
env[key] = value + separator + env[key] | ||
else: | ||
env[key] = env[key] + separator + value | ||
|
||
def _start_process(self, host, binary_path, env): | ||
if os.name == "posix" and os.geteuid() == 0: | ||
raise LaunchError("Cannot launch OpenSearch as root. Please run Benchmark as a non-root user.") | ||
|
||
cmd = [io.escape_path(os.path.join(binary_path, "bin", "opensearch"))] | ||
cmd.extend(["-d", "-p", "pid"]) | ||
|
||
self.shell_executor.execute(host, " ".join(cmd), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, detach=True) | ||
|
||
pid_file_name = io.escape_path(os.path.join(binary_path, "pid")) | ||
self._wait_for_pid_file(pid_file_name) | ||
|
||
return self._get_pid_from_file(pid_file_name) | ||
|
||
def _wait_for_pid_file(self, pid_file_name): | ||
self.waiter.wait(self._is_pid_file_available, pid_file_name) | ||
|
||
def _is_pid_file_available(self, pid_file_name): | ||
try: | ||
pid = self._get_pid_from_file(pid_file_name) | ||
return pid != 0 | ||
except (FileNotFoundError, EOFError): | ||
self.logger.info("PID file %s is not ready", pid_file_name) | ||
return False | ||
|
||
def _get_pid_from_file(self, pid_file_name): | ||
with open(pid_file_name, "rb") as f: | ||
buf = f.read() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: what does buf represent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the byte buffer read from the file |
||
if not buf: | ||
raise EOFError | ||
return int(buf) | ||
|
||
def stop(self, host, nodes): | ||
self.logger.info("Shutting down [%d] nodes on this host.", len(nodes)) | ||
stopped_nodes = [] | ||
for node in nodes: | ||
node_stopped = self._stop_node(node) | ||
if node_stopped: | ||
stopped_nodes.append(node) | ||
|
||
return stopped_nodes | ||
|
||
def _stop_node(self, node): | ||
node_stopped = False | ||
|
||
if self.metrics_store: | ||
telemetry.add_metadata_for_node(self.metrics_store, node.node_name, node.host_name) | ||
|
||
opensearch_process = self._get_opensearch_process(node) | ||
if opensearch_process: | ||
node.telemetry.detach_from_node(node, running=True) | ||
node_stopped = self._stop_process(opensearch_process, node) | ||
node.telemetry.detach_from_node(node, running=False) | ||
# store system metrics in any case (telemetry devices may derive system metrics while the node is running) | ||
if self.metrics_store: | ||
node.telemetry.store_system_metrics(node, self.metrics_store) | ||
|
||
return node_stopped | ||
|
||
def _get_opensearch_process(self, node): | ||
try: | ||
return psutil.Process(pid=node.pid) | ||
except psutil.NoSuchProcess: | ||
self.logger.warning("No process found with PID [%s] for node [%s].", node.pid, node.node_name) | ||
|
||
def _stop_process(self, opensearch_process, node): | ||
process_stopped = False | ||
|
||
try: | ||
opensearch_process.terminate() | ||
opensearch_process.wait(10.0) | ||
process_stopped = True | ||
except psutil.NoSuchProcess: | ||
self.logger.warning("No process found with PID [%s] for node [%s].", opensearch_process.pid, node.node_name) | ||
except psutil.TimeoutExpired: | ||
self.logger.info("kill -KILL node [%s]", node.node_name) | ||
try: | ||
# kill -9 | ||
opensearch_process.kill() | ||
process_stopped = True | ||
except psutil.NoSuchProcess: | ||
self.logger.warning("No process found with PID [%s] for node [%s].", opensearch_process.pid, node.node_name) | ||
self.logger.info("Done shutting down node [%s].", node.node_name) | ||
|
||
return process_stopped |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
# pylint: disable=protected-access | ||
|
||
import os | ||
from unittest import TestCase, mock | ||
from unittest.mock import Mock, mock_open | ||
|
||
from psutil import NoSuchProcess | ||
|
||
from osbenchmark import telemetry | ||
from osbenchmark.builder import cluster | ||
from osbenchmark.builder.launchers.local_process_launcher import LocalProcessLauncher | ||
from osbenchmark.builder.provision_config import ProvisionConfigInstance | ||
from osbenchmark.builder.provisioner import NodeConfiguration | ||
|
||
|
||
class LocalProcessLauncherTests(TestCase): | ||
def setUp(self): | ||
self.shell_executor = Mock() | ||
self.metrics_store = Mock() | ||
|
||
self.variables = { | ||
"system": { | ||
"runtime": { | ||
"jdk": None | ||
}, | ||
"env": { | ||
"passenv": "PATH" | ||
} | ||
}, | ||
"telemetry": { | ||
"devices": [], | ||
"params": None | ||
} | ||
} | ||
self.pci = ProvisionConfigInstance("fake_pci", "/path/to/root", ["/path/to/config"], variables=self.variables) | ||
|
||
self.launcher = LocalProcessLauncher(self.pci, self.shell_executor, self.metrics_store) | ||
self.launcher.waiter = Mock() | ||
self.host = None | ||
self.path = "fake" | ||
|
||
@mock.patch('osbenchmark.builder.java_resolver.java_home', return_value=(12, "/java_home/")) | ||
@mock.patch('osbenchmark.utils.jvm.supports_option', return_value=True) | ||
@mock.patch('osbenchmark.utils.io.get_size') | ||
@mock.patch('osbenchmark.telemetry') | ||
@mock.patch('psutil.Process') | ||
def test_daemon_start_stop(self, process, telemetry, get_size, supports, java_home): | ||
mo = mock_open(read_data="1234") | ||
|
||
node_configs = [] | ||
for node in range(2): | ||
node_configs.append(NodeConfiguration(build_type="tar", | ||
provision_config_instance_runtime_jdks="12,11", | ||
provision_config_instance_provides_bundled_jdk=True, | ||
ip="127.0.0.1", | ||
node_name="testnode-{}".format(node), | ||
node_root_path="/tmp", | ||
binary_path="/tmp", | ||
data_paths="/tmp")) | ||
|
||
with mock.patch("builtins.open", mo): | ||
nodes = self.launcher.start(self.host, node_configs) | ||
|
||
self.assertEqual(len(nodes), 2) | ||
self.assertEqual(nodes[0].pid, 1234) | ||
|
||
stopped_nodes = self.launcher.stop(self.host, nodes) | ||
# all nodes should be stopped | ||
self.assertEqual(nodes, stopped_nodes) | ||
|
||
@mock.patch('psutil.Process') | ||
def test_daemon_stop_with_already_terminated_process(self, process): | ||
process.side_effect = NoSuchProcess(123) | ||
|
||
nodes = [ | ||
cluster.Node(pid=-1, | ||
binary_path="/bin", | ||
host_name="localhost", | ||
node_name="benchmark-0", | ||
telemetry=telemetry.Telemetry()) | ||
] | ||
|
||
stopped_nodes = self.launcher.stop(self.host, nodes) | ||
# no nodes should have been stopped (they were already stopped) | ||
self.assertEqual([], stopped_nodes) | ||
|
||
# flight recorder shows a warning for several seconds before continuing | ||
@mock.patch("osbenchmark.time.sleep") | ||
def test_env_options_order(self, sleep): | ||
node_telemetry = [ | ||
telemetry.FlightRecorder(telemetry_params={}, log_root="/tmp/telemetry", java_major_version=8) | ||
] | ||
telem = telemetry.Telemetry(["jfr"], devices=node_telemetry) | ||
env = self.launcher._prepare_env(node_name="node0", java_home="/java_home", telemetry=telem) | ||
|
||
self.assertEqual("/java_home/bin" + os.pathsep + os.environ["PATH"], env["PATH"]) | ||
self.assertEqual("-XX:+ExitOnOutOfMemoryError -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints " | ||
"-XX:+UnlockCommercialFeatures -XX:+FlightRecorder " | ||
"-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath=/tmp/telemetry/profile.jfr " # pylint: disable=line-too-long | ||
"-XX:StartFlightRecording=defaultrecording=true", env["OPENSEARCH_JAVA_OPTS"]) | ||
|
||
def test_bundled_jdk_not_in_path(self): | ||
os.environ["JAVA_HOME"] = "/path/to/java" | ||
|
||
telem = telemetry.Telemetry() | ||
# no JAVA_HOME -> use the bundled JDK | ||
env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem) | ||
|
||
# unmodified | ||
self.assertEqual(os.environ["PATH"], env["PATH"]) | ||
self.assertIsNone(env.get("JAVA_HOME")) | ||
|
||
def test_pass_env_vars(self): | ||
self.pci.variables["system"]["env"]["passenv"] = "JAVA_HOME,FOO1" | ||
|
||
os.environ["JAVA_HOME"] = "/path/to/java" | ||
os.environ["FOO1"] = "BAR1" | ||
|
||
telem = telemetry.Telemetry() | ||
# no JAVA_HOME -> use the bundled JDK | ||
env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem) | ||
|
||
# unmodified | ||
self.assertEqual(os.environ["JAVA_HOME"], env["JAVA_HOME"]) | ||
self.assertEqual(os.environ["FOO1"], env["FOO1"]) | ||
self.assertEqual(env["OPENSEARCH_JAVA_OPTS"], "-XX:+ExitOnOutOfMemoryError") | ||
|
||
def test_pass_java_opts(self): | ||
self.pci.variables["system"]["env"]["passenv"] = "OPENSEARCH_JAVA_OPTS" | ||
|
||
os.environ["OPENSEARCH_JAVA_OPTS"] = "-XX:-someJunk" | ||
|
||
telem = telemetry.Telemetry() | ||
# no JAVA_HOME -> use the bundled JDK | ||
env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem) | ||
|
||
# unmodified | ||
self.assertEqual(os.environ["OPENSEARCH_JAVA_OPTS"], env["OPENSEARCH_JAVA_OPTS"]) | ||
|
||
def test_pid_file_not_created(self): | ||
mo = mock_open() | ||
|
||
with mock.patch("builtins.open", mo): | ||
mo.side_effect = FileNotFoundError | ||
|
||
is_pid_file_ready = self.launcher._is_pid_file_available("fake") | ||
self.assertEqual(is_pid_file_ready, False) | ||
|
||
def test_pid_file_empty(self): | ||
mo = mock_open(read_data="") | ||
|
||
with mock.patch("builtins.open", mo): | ||
is_pid_file_ready = self.launcher._is_pid_file_available("fake") | ||
self.assertEqual(is_pid_file_ready, False) | ||
|
||
def test_pid_file_ready(self): | ||
mo = mock_open(read_data="1234") | ||
|
||
with mock.patch("builtins.open", mo): | ||
is_pid_file_ready = self.launcher._is_pid_file_available("fake") | ||
self.assertEqual(is_pid_file_ready, True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why you chose
pci
overprovision_config_instance
? I think the latter is more descriptiveThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone back and forth on this in my mind a couple times. I settled on
pci
for brevity since many of the accesses to this object will be something likeself.pci.variables["abcd"]["efgh"]
.But ultimately clarity > brevity so I will refactor. I have used
pci
in a few of the other PRs as well so I will create a followup PR after this one to replace all of the references in 1 go