Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create LocalProcessLauncher and ExceptionHandlingLauncher #164

Merged
merged 6 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions osbenchmark/builder/launchers/exception_handling_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from osbenchmark.builder.launchers.launcher import Launcher
from osbenchmark.exceptions import LaunchError


class ExceptionHandlingLauncher(Launcher):
def __init__(self, launcher, shell_executor=None):
super().__init__(shell_executor)
self.launcher = launcher

def start(self, host, node_configurations):
try:
self.launcher.start(host, node_configurations)
except Exception as e:
raise LaunchError("Launching node(s) on host \"{}\" failed".format(host), e)

def stop(self, host, nodes):
try:
self.launcher.stop(host, nodes)
except Exception as e:
raise LaunchError("Launching node(s) on host \"{}\" failed".format(host), e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to change the log statement here to suggest its a failure during stop process

187 changes: 187 additions & 0 deletions osbenchmark/builder/launchers/local_process_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import logging
import os
import subprocess

import psutil

from osbenchmark import time, telemetry
from osbenchmark.builder import java_resolver, cluster
from osbenchmark.builder.launchers.launcher import Launcher
from osbenchmark.exceptions import LaunchError
from osbenchmark.utils import io, opts
from osbenchmark.utils.periodic_waiter import PeriodicWaiter


class LocalProcessLauncher(Launcher):
PROCESS_WAIT_TIMEOUT_SECONDS = 90
PROCESS_WAIT_INTERVAL_SECONDS = 0.5

def __init__(self, pci, shell_executor, metrics_store, clock=time.Clock):
super().__init__(shell_executor)
self.logger = logging.getLogger(__name__)
self.pci = pci
Copy link
Collaborator

@IanHoang IanHoang Mar 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you chose pci over provision_config_instance? I think the latter is more descriptive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone back and forth on this in my mind a couple times. I settled on pci for brevity since many of the accesses to this object will be something like self.pci.variables["abcd"]["efgh"].

But ultimately clarity > brevity so I will refactor. I have used pci in a few of the other PRs as well so I will create a followup PR after this one to replace all of the references in 1 go

self.metrics_store = metrics_store
self.waiter = PeriodicWaiter(LocalProcessLauncher.PROCESS_WAIT_INTERVAL_SECONDS,
LocalProcessLauncher.PROCESS_WAIT_TIMEOUT_SECONDS, clock=clock)

def start(self, host, node_configurations):
node_count_on_host = len(node_configurations)
return [self._start_node(host, node_configuration, node_count_on_host) for node_configuration in node_configurations]

def _start_node(self, host, node_configuration, node_count_on_host):
host_name = node_configuration.ip
node_name = node_configuration.node_name
binary_path = node_configuration.binary_path

java_major_version, java_home = java_resolver.java_home(node_configuration.provision_config_instance_runtime_jdks,
self.pci.variables["system"]["runtime"]["jdk"],
node_configuration.provision_config_instance_provides_bundled_jdk)
self.logger.info("Java major version: %s", java_major_version)
self.logger.info("Java home: %s", java_home)
self.logger.info("Starting node [%s].", node_name)

telemetry = self._prepare_telemetry(node_configuration, node_count_on_host, java_major_version)
env = self._prepare_env(node_name, java_home, telemetry)
telemetry.on_pre_node_start(node_name)

node_pid = self._start_process(host, binary_path, env)
self.logger.info("Successfully started node [%s] with PID [%s].", node_name, node_pid)
node = cluster.Node(node_pid, binary_path, host_name, node_name, telemetry)

self.logger.info("Attaching telemetry devices to node [%s].", node_name)
telemetry.attach_to_node(node)

return node

def _prepare_telemetry(self, node_configuration, node_count_on_host, java_major_version):
data_paths = node_configuration.data_paths
node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry")

enabled_devices = self.pci.variables["telemetry"]["devices"]
telemetry_params = self.pci.variables["telemetry"]["params"]

node_telemetry = [
telemetry.FlightRecorder(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(telemetry_params, node_telemetry_dir, java_major_version),
telemetry.Heapdump(node_telemetry_dir),
telemetry.DiskIo(node_count_on_host),
telemetry.IndexSize(data_paths),
telemetry.StartupTime(),
]

return telemetry.Telemetry(enabled_devices, devices=node_telemetry)

def _prepare_env(self, node_name, java_home, telemetry):
env = {k: v for k, v in os.environ.items() if k in opts.csv_to_list(self.pci.variables["system"]["env"]["passenv"])}
if java_home:
self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep, prepend=True)
# This property is the higher priority starting in ES 7.12.0, and is the only supported java home in >=8.0
env["OPENSEARCH_JAVA_HOME"] = java_home
# TODO remove this when ES <8.0 becomes unsupported by Benchmark
env["JAVA_HOME"] = java_home
self.logger.info("JAVA HOME: %s", env["JAVA_HOME"])
if not env.get("OPENSEARCH_JAVA_OPTS"):
env["OPENSEARCH_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError"

# we just blindly trust telemetry here...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a note or will this be a TODO to refactor this portion so that we don't blindly trust telemetry in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an inherited comment. I don't think it's a TODO so much as calling out that the telemetry devices can set environment variables that may break the launching of OpenSearch. Essentially loosely passing the burden of validating the env vars that a telemetry device specifies to the telemetry device itself.

A better solution would probably be an allowlist, but I don't have enough familiarity with the scope of telemetry environment variables to propose one.

for v in telemetry.instrument_candidate_java_opts():
engechas marked this conversation as resolved.
Show resolved Hide resolved
self._set_env(env, "OPENSEARCH_JAVA_OPTS", v)

self.logger.debug("env for [%s]: %s", node_name, str(env))
return env

def _set_env(self, env, key, value, separator=' ', prepend=False):
if value is not None:
if key not in env:
env[key] = value
elif prepend:
env[key] = value + separator + env[key]
else:
env[key] = env[key] + separator + value

def _start_process(self, host, binary_path, env):
if os.name == "posix" and os.geteuid() == 0:
raise LaunchError("Cannot launch OpenSearch as root. Please run Benchmark as a non-root user.")

cmd = [io.escape_path(os.path.join(binary_path, "bin", "opensearch"))]
cmd.extend(["-d", "-p", "pid"])

self.shell_executor.execute(host, " ".join(cmd), env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, detach=True)

pid_file_name = io.escape_path(os.path.join(binary_path, "pid"))
self._wait_for_pid_file(pid_file_name)

return self._get_pid_from_file(pid_file_name)

def _wait_for_pid_file(self, pid_file_name):
self.waiter.wait(self._is_pid_file_available, pid_file_name)

def _is_pid_file_available(self, pid_file_name):
try:
pid = self._get_pid_from_file(pid_file_name)
return pid != 0
except (FileNotFoundError, EOFError):
self.logger.info("PID file %s is not ready", pid_file_name)
return False

def _get_pid_from_file(self, pid_file_name):
with open(pid_file_name, "rb") as f:
buf = f.read()
Copy link
Collaborator

@IanHoang IanHoang Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: what does buf represent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the byte buffer read from the file

if not buf:
raise EOFError
return int(buf)

def stop(self, host, nodes):
self.logger.info("Shutting down [%d] nodes on this host.", len(nodes))
stopped_nodes = []
for node in nodes:
node_stopped = self._stop_node(node)
if node_stopped:
stopped_nodes.append(node)

return stopped_nodes

def _stop_node(self, node):
node_stopped = False

if self.metrics_store:
telemetry.add_metadata_for_node(self.metrics_store, node.node_name, node.host_name)

opensearch_process = self._get_opensearch_process(node)
if opensearch_process:
node.telemetry.detach_from_node(node, running=True)
node_stopped = self._stop_process(opensearch_process, node)
node.telemetry.detach_from_node(node, running=False)
# store system metrics in any case (telemetry devices may derive system metrics while the node is running)
if self.metrics_store:
node.telemetry.store_system_metrics(node, self.metrics_store)

return node_stopped

def _get_opensearch_process(self, node):
try:
return psutil.Process(pid=node.pid)
except psutil.NoSuchProcess:
self.logger.warning("No process found with PID [%s] for node [%s].", node.pid, node.node_name)

def _stop_process(self, opensearch_process, node):
process_stopped = False

try:
opensearch_process.terminate()
opensearch_process.wait(10.0)
process_stopped = True
except psutil.NoSuchProcess:
self.logger.warning("No process found with PID [%s] for node [%s].", opensearch_process.pid, node.node_name)
except psutil.TimeoutExpired:
self.logger.info("kill -KILL node [%s]", node.node_name)
try:
# kill -9
opensearch_process.kill()
process_stopped = True
except psutil.NoSuchProcess:
self.logger.warning("No process found with PID [%s] for node [%s].", opensearch_process.pid, node.node_name)
self.logger.info("Done shutting down node [%s].", node.node_name)

return process_stopped
161 changes: 161 additions & 0 deletions tests/builder/launchers/local_process_launcher_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# pylint: disable=protected-access

import os
from unittest import TestCase, mock
from unittest.mock import Mock, mock_open

from psutil import NoSuchProcess

from osbenchmark import telemetry
from osbenchmark.builder import cluster
from osbenchmark.builder.launchers.local_process_launcher import LocalProcessLauncher
from osbenchmark.builder.provision_config import ProvisionConfigInstance
from osbenchmark.builder.provisioner import NodeConfiguration


class LocalProcessLauncherTests(TestCase):
def setUp(self):
self.shell_executor = Mock()
self.metrics_store = Mock()

self.variables = {
"system": {
"runtime": {
"jdk": None
},
"env": {
"passenv": "PATH"
}
},
"telemetry": {
"devices": [],
"params": None
}
}
self.pci = ProvisionConfigInstance("fake_pci", "/path/to/root", ["/path/to/config"], variables=self.variables)

self.launcher = LocalProcessLauncher(self.pci, self.shell_executor, self.metrics_store)
self.launcher.waiter = Mock()
self.host = None
self.path = "fake"

@mock.patch('osbenchmark.builder.java_resolver.java_home', return_value=(12, "/java_home/"))
@mock.patch('osbenchmark.utils.jvm.supports_option', return_value=True)
@mock.patch('osbenchmark.utils.io.get_size')
@mock.patch('osbenchmark.telemetry')
@mock.patch('psutil.Process')
def test_daemon_start_stop(self, process, telemetry, get_size, supports, java_home):
mo = mock_open(read_data="1234")

node_configs = []
for node in range(2):
node_configs.append(NodeConfiguration(build_type="tar",
provision_config_instance_runtime_jdks="12,11",
provision_config_instance_provides_bundled_jdk=True,
ip="127.0.0.1",
node_name="testnode-{}".format(node),
node_root_path="/tmp",
binary_path="/tmp",
data_paths="/tmp"))

with mock.patch("builtins.open", mo):
nodes = self.launcher.start(self.host, node_configs)

self.assertEqual(len(nodes), 2)
self.assertEqual(nodes[0].pid, 1234)

stopped_nodes = self.launcher.stop(self.host, nodes)
# all nodes should be stopped
self.assertEqual(nodes, stopped_nodes)

@mock.patch('psutil.Process')
def test_daemon_stop_with_already_terminated_process(self, process):
process.side_effect = NoSuchProcess(123)

nodes = [
cluster.Node(pid=-1,
binary_path="/bin",
host_name="localhost",
node_name="benchmark-0",
telemetry=telemetry.Telemetry())
]

stopped_nodes = self.launcher.stop(self.host, nodes)
# no nodes should have been stopped (they were already stopped)
self.assertEqual([], stopped_nodes)

# flight recorder shows a warning for several seconds before continuing
@mock.patch("osbenchmark.time.sleep")
def test_env_options_order(self, sleep):
node_telemetry = [
telemetry.FlightRecorder(telemetry_params={}, log_root="/tmp/telemetry", java_major_version=8)
]
telem = telemetry.Telemetry(["jfr"], devices=node_telemetry)
env = self.launcher._prepare_env(node_name="node0", java_home="/java_home", telemetry=telem)

self.assertEqual("/java_home/bin" + os.pathsep + os.environ["PATH"], env["PATH"])
self.assertEqual("-XX:+ExitOnOutOfMemoryError -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints "
"-XX:+UnlockCommercialFeatures -XX:+FlightRecorder "
"-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath=/tmp/telemetry/profile.jfr " # pylint: disable=line-too-long
"-XX:StartFlightRecording=defaultrecording=true", env["OPENSEARCH_JAVA_OPTS"])

def test_bundled_jdk_not_in_path(self):
os.environ["JAVA_HOME"] = "/path/to/java"

telem = telemetry.Telemetry()
# no JAVA_HOME -> use the bundled JDK
env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem)

# unmodified
self.assertEqual(os.environ["PATH"], env["PATH"])
self.assertIsNone(env.get("JAVA_HOME"))

def test_pass_env_vars(self):
self.pci.variables["system"]["env"]["passenv"] = "JAVA_HOME,FOO1"

os.environ["JAVA_HOME"] = "/path/to/java"
os.environ["FOO1"] = "BAR1"

telem = telemetry.Telemetry()
# no JAVA_HOME -> use the bundled JDK
env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem)

# unmodified
self.assertEqual(os.environ["JAVA_HOME"], env["JAVA_HOME"])
self.assertEqual(os.environ["FOO1"], env["FOO1"])
self.assertEqual(env["OPENSEARCH_JAVA_OPTS"], "-XX:+ExitOnOutOfMemoryError")

def test_pass_java_opts(self):
self.pci.variables["system"]["env"]["passenv"] = "OPENSEARCH_JAVA_OPTS"

os.environ["OPENSEARCH_JAVA_OPTS"] = "-XX:-someJunk"

telem = telemetry.Telemetry()
# no JAVA_HOME -> use the bundled JDK
env = self.launcher._prepare_env(node_name="node0", java_home=None, telemetry=telem)

# unmodified
self.assertEqual(os.environ["OPENSEARCH_JAVA_OPTS"], env["OPENSEARCH_JAVA_OPTS"])

def test_pid_file_not_created(self):
mo = mock_open()

with mock.patch("builtins.open", mo):
mo.side_effect = FileNotFoundError

is_pid_file_ready = self.launcher._is_pid_file_available("fake")
self.assertEqual(is_pid_file_ready, False)

def test_pid_file_empty(self):
mo = mock_open(read_data="")

with mock.patch("builtins.open", mo):
is_pid_file_ready = self.launcher._is_pid_file_available("fake")
self.assertEqual(is_pid_file_ready, False)

def test_pid_file_ready(self):
mo = mock_open(read_data="1234")

with mock.patch("builtins.open", mo):
is_pid_file_ready = self.launcher._is_pid_file_available("fake")
self.assertEqual(is_pid_file_ready, True)