Skip to content

Commit

Permalink
Add guest-profiler support to serve mode (fastly#301)
Browse files Browse the repository at this point in the history
* Add guest-profiler support to serve mode

* Fix trap-test

* Fix doc-tests
  • Loading branch information
jameysharp authored and cmckendry committed Feb 8, 2024
1 parent db4e0e0 commit c989a8e
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 51 deletions.
34 changes: 23 additions & 11 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#![cfg_attr(not(debug_assertions), doc(test(attr(allow(dead_code)))))]
#![cfg_attr(not(debug_assertions), doc(test(attr(allow(unused_variables)))))]

use std::path::PathBuf;
use std::process::ExitCode;

use wasi_common::I32Exit;
Expand All @@ -39,7 +40,16 @@ use {
/// Create a new server, bind it to an address, and serve responses until an error occurs.
pub async fn serve(serve_args: ServeArgs) -> Result<(), Error> {
// Load the wasm module into an execution context
let ctx = create_execution_context(serve_args.shared(), true).await?;
let ctx = create_execution_context(
serve_args.shared(),
true,
serve_args.profile_guest().cloned(),
)
.await?;

if let Some(guest_profile_path) = serve_args.profile_guest() {
std::fs::create_dir_all(guest_profile_path)?;
}

let addr = serve_args.addr();
ViceroyService::new(ctx).serve(addr).await?;
Expand Down Expand Up @@ -93,18 +103,14 @@ pub async fn main() -> ExitCode {
/// Execute a Wasm program in the Viceroy environment.
pub async fn run_wasm_main(run_args: RunArgs) -> Result<(), anyhow::Error> {
// Load the wasm module into an execution context
let ctx = create_execution_context(run_args.shared(), false).await?;
let ctx = create_execution_context(run_args.shared(), false, run_args.profile_guest().cloned())
.await?;
let input = run_args.shared().input();
let program_name = match input.file_stem() {
Some(stem) => stem.to_string_lossy(),
None => panic!("program cannot be a directory"),
};
ctx.run_main(
&program_name,
run_args.wasm_args(),
run_args.profile_guest(),
)
.await
ctx.run_main(&program_name, run_args.wasm_args()).await
}

fn install_tracing_subscriber(verbosity: u8) {
Expand Down Expand Up @@ -233,11 +239,17 @@ impl<'a> MakeWriter<'a> for StdWriter {
async fn create_execution_context(
args: &SharedArgs,
check_backends: bool,
guest_profile_path: Option<PathBuf>,
) -> Result<ExecuteCtx, anyhow::Error> {
let input = args.input();
let mut ctx = ExecuteCtx::new(input, args.profiling_strategy(), args.wasi_modules())?
.with_log_stderr(args.log_stderr())
.with_log_stdout(args.log_stdout());
let mut ctx = ExecuteCtx::new(
input,
args.profiling_strategy(),
args.wasi_modules(),
guest_profile_path,
)?
.with_log_stderr(args.log_stderr())
.with_log_stdout(args.log_stdout());

if let Some(config_path) = args.config_path() {
let config = FastlyConfig::from_file(config_path)?;
Expand Down
10 changes: 10 additions & 0 deletions cli/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub struct ServeArgs {
#[arg(long = "addr")]
socket_addr: Option<SocketAddr>,

/// Whether to profile the wasm guest. Takes an optional directory to save
/// per-request profiles to
#[arg(long, default_missing_value = "guest-profiles", num_args=0..=1, require_equals=true)]
profile_guest: Option<PathBuf>,

#[command(flatten)]
shared: SharedArgs,
}
Expand Down Expand Up @@ -99,6 +104,11 @@ impl ServeArgs {
.unwrap_or_else(|| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7676))
}

/// The path to write guest profiles to
pub fn profile_guest(&self) -> Option<&PathBuf> {
self.profile_guest.as_ref()
}

pub fn shared(&self) -> &SharedArgs {
&self.shared
}
Expand Down
23 changes: 14 additions & 9 deletions cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,20 @@ impl Test {
self.backends.start_servers().await;
}

let ctx = ExecuteCtx::new(&self.module_path, ProfilingStrategy::None, HashSet::new())
.expect("failed to set up execution context")
.with_backends(self.backends.backend_configs().await)
.with_dictionaries(self.dictionaries.clone())
.with_geolocation(self.geolocation.clone())
.with_object_stores(self.object_stores.clone())
.with_secret_stores(self.secret_stores.clone())
.with_log_stderr(self.log_stderr)
.with_log_stdout(self.log_stdout);
let ctx = ExecuteCtx::new(
&self.module_path,
ProfilingStrategy::None,
HashSet::new(),
None,
)
.expect("failed to set up execution context")
.with_backends(self.backends.backend_configs().await)
.with_dictionaries(self.dictionaries.clone())
.with_geolocation(self.geolocation.clone())
.with_object_stores(self.object_stores.clone())
.with_secret_stores(self.secret_stores.clone())
.with_log_stderr(self.log_stderr)
.with_log_stdout(self.log_stdout);

if self.via_hyper {
let svc = ViceroyService::new(ctx);
Expand Down
2 changes: 1 addition & 1 deletion cli/tests/trap-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub type TestResult = Result<(), Error>;
#[tokio::test(flavor = "multi_thread")]
async fn fatal_error_traps() -> TestResult {
let module_path = format!("{RUST_FIXTURE_PATH}/response.wasm");
let ctx = ExecuteCtx::new(module_path, ProfilingStrategy::None, HashSet::new())?;
let ctx = ExecuteCtx::new(module_path, ProfilingStrategy::None, HashSet::new(), None)?;
let req = Request::get("http://127.0.0.1:7676/").body(Body::from(""))?;
let resp = ctx
.handle_request_with_runtime_error(req, "127.0.0.1".parse().unwrap())
Expand Down
85 changes: 56 additions & 29 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Guest code execution.

use std::time::SystemTime;

use wasmtime::GuestProfiler;

use {
Expand Down Expand Up @@ -68,6 +70,10 @@ pub struct ExecuteCtx {
// `Arc` for the two fields below because this struct must be `Clone`.
epoch_increment_thread: Option<Arc<JoinHandle<()>>>,
epoch_increment_stop: Arc<AtomicBool>,
/// Path to write profiling results from the guest. In serve mode,
/// this must refer to a directory, while in run mode it names
/// a file.
guest_profile_path: Arc<Option<PathBuf>>,
}

impl ExecuteCtx {
Expand All @@ -76,6 +82,7 @@ impl ExecuteCtx {
module_path: impl AsRef<Path>,
profiling_strategy: ProfilingStrategy,
wasi_modules: HashSet<ExperimentalModule>,
guest_profile_path: Option<PathBuf>,
) -> Result<Self, Error> {
let config = &configure_wasmtime(profiling_strategy);
let engine = Engine::new(config)?;
Expand Down Expand Up @@ -112,6 +119,7 @@ impl ExecuteCtx {
secret_stores: Arc::new(SecretStores::new()),
epoch_increment_thread,
epoch_increment_stop,
guest_profile_path: Arc::new(guest_profile_path),
})
}

Expand Down Expand Up @@ -217,7 +225,7 @@ impl ExecuteCtx {
/// # use viceroy_lib::{Error, ExecuteCtx, ProfilingStrategy, ViceroyService};
/// # async fn f() -> Result<(), Error> {
/// # let req = Request::new(Body::from(""));
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new())?;
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new(), None)?;
/// let resp = ctx.handle_request(req, "127.0.0.1".parse().unwrap()).await?;
/// # Ok(())
/// # }
Expand Down Expand Up @@ -310,11 +318,28 @@ impl ExecuteCtx {
self.object_store.clone(),
self.secret_stores.clone(),
);

let guest_profile_path = self.guest_profile_path.as_deref().map(|path| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
path.join(format!("{}-{}.json", now, req_id))
});
let profiler = guest_profile_path.is_some().then(|| {
let program_name = "main";
GuestProfiler::new(
program_name,
EPOCH_INTERRUPTION_PERIOD,
vec![(program_name.to_string(), self.module.clone())],
)
});

// We currently have to postpone linking and instantiation to the guest task
// due to wasmtime limitations, in particular the fact that `Instance` is not `Send`.
// However, the fact that the module itself is created within `ExecuteCtx::new`
// means that the heavy lifting happens only once.
let mut store = create_store(&self, session, None).map_err(ExecutionError::Context)?;
let mut store = create_store(&self, session, profiler).map_err(ExecutionError::Context)?;

let instance = self
.instance_pre
Expand Down Expand Up @@ -346,6 +371,9 @@ impl ExecuteCtx {
}
};

// If we collected a profile, write it to the file
write_profile(&mut store, guest_profile_path.as_ref());

// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
store.data_mut().close_downstream_response_sender();
Expand All @@ -364,12 +392,7 @@ impl ExecuteCtx {
outcome
}

pub async fn run_main(
self,
program_name: &str,
args: &[String],
guest_profile_path: Option<&PathBuf>,
) -> Result<(), anyhow::Error> {
pub async fn run_main(self, program_name: &str, args: &[String]) -> Result<(), anyhow::Error> {
// placeholders for request, result sender channel, and remote IP
let req = Request::get("http://example.com/").body(Body::empty())?;
let req_id = 0;
Expand All @@ -390,7 +413,7 @@ impl ExecuteCtx {
self.secret_stores.clone(),
);

let profiler = guest_profile_path.map(|_| {
let profiler = self.guest_profile_path.is_some().then(|| {
GuestProfiler::new(
program_name,
EPOCH_INTERRUPTION_PERIOD,
Expand Down Expand Up @@ -419,26 +442,7 @@ impl ExecuteCtx {
let result = main_func.call_async(&mut store, ()).await;

// If we collected a profile, write it to the file
if let (Some(profile), Some(path)) =
(store.data_mut().take_guest_profiler(), guest_profile_path)
{
if let Err(e) = std::fs::File::create(&path)
.map_err(anyhow::Error::new)
.and_then(|output| profile.finish(std::io::BufWriter::new(output)))
{
event!(
Level::ERROR,
"failed writing profile at {}: {e:#}",
path.display()
);
} else {
event!(
Level::INFO,
"\nProfile written to: {}\nView this profile at https://profiler.firefox.com/.",
path.display()
);
}
}
write_profile(&mut store, self.guest_profile_path.as_ref().as_ref());

// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
Expand All @@ -453,6 +457,29 @@ impl ExecuteCtx {
}
}

fn write_profile(store: &mut wasmtime::Store<WasmCtx>, guest_profile_path: Option<&PathBuf>) {
if let (Some(profile), Some(path)) =
(store.data_mut().take_guest_profiler(), guest_profile_path)
{
if let Err(e) = std::fs::File::create(&path)
.map_err(anyhow::Error::new)
.and_then(|output| profile.finish(std::io::BufWriter::new(output)))
{
event!(
Level::ERROR,
"failed writing profile at {}: {e:#}",
path.display()
);
} else {
event!(
Level::INFO,
"\nProfile written to: {}\nView this profile at https://profiler.firefox.com/.",
path.display()
);
}
}
}

impl Drop for ExecuteCtx {
fn drop(&mut self) {
if let Some(arc) = self.epoch_increment_thread.take() {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl ViceroyService {
/// # use std::collections::HashSet;
/// use viceroy_lib::{Error, ExecuteCtx, ProfilingStrategy, ViceroyService};
/// # fn f() -> Result<(), Error> {
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new())?;
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new(), None)?;
/// let svc = ViceroyService::new(ctx);
/// # Ok(())
/// # }
Expand Down

0 comments on commit c989a8e

Please sign in to comment.