Skip to content

Commit

Permalink
Add options for instance reuse to wasmtime serve
Browse files Browse the repository at this point in the history
Add two new flags to the `serve` subcommand:

* `--requests-per-instance`
* `--max-cached-instances`

These can be used to reuse instances across requests although by default
each request still receives a new instance.
  • Loading branch information
alexcrichton committed Nov 2, 2024
1 parent 38845a0 commit 5e3ed23
Showing 1 changed file with 98 additions and 25 deletions.
123 changes: 98 additions & 25 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
Arc, Mutex,
},
};
use wasmtime::component::Linker;
use wasmtime::{Config, Engine, Memory, MemoryType, Store, StoreLimits};
use wasmtime_wasi::{StreamError, StreamResult, WasiCtx, WasiCtxBuilder, WasiView};
use wasmtime_wasi_http::bindings::http::types::Scheme;
use wasmtime_wasi_http::bindings::ProxyPre;
use wasmtime_wasi_http::bindings::{Proxy, ProxyPre};
use wasmtime_wasi_http::io::TokioIo;
use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView};

Expand Down Expand Up @@ -79,6 +79,21 @@ pub struct ServeCommand {
/// The WebAssembly component to run.
#[arg(value_name = "WASM", required = true)]
component: PathBuf,

/// The number of requests each wasm instance can handle.
///
/// By default this value is 1 meaning that each request receives a new
/// instance. This can be increased to handle multiple requests per
/// instance.
#[arg(long, default_value_t = 1, value_name = "N")]
requests_per_instance: u32,

/// Maximum number of instances to retain to handle future requests.
///
/// This option is only applicable if `--requests-per-instance` is larger
/// than one as otherwise instances are never retained.
#[arg(long, default_value_t = 1000, value_name = "N")]
max_cached_instances: usize,
}

impl ServeCommand {
Expand Down Expand Up @@ -132,25 +147,10 @@ impl ServeCommand {
Ok(())
}

fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
let mut builder = WasiCtxBuilder::new();
self.run.configure_wasip2(&mut builder)?;

builder.env("REQUEST_ID", req_id.to_string());

builder.stdout(LogStream::new(
format!("stdout [{req_id}] :: "),
Output::Stdout,
));

builder.stderr(LogStream::new(
format!("stderr [{req_id}] :: "),
Output::Stderr,
));

fn new_store(&self, engine: &Engine) -> Result<Store<Host>> {
let mut host = Host {
table: wasmtime::component::ResourceTable::new(),
ctx: builder.build(),
ctx: WasiCtxBuilder::new().build(),
http: WasiHttpCtx::new(),

limits: StoreLimits::default(),
Expand Down Expand Up @@ -444,12 +444,79 @@ struct ProxyHandlerInner {
engine: Engine,
instance_pre: ProxyPre<Host>,
next_id: AtomicU64,
instances: Mutex<Vec<Instance>>,
}

struct Instance {
store: Store<Host>,
handled: u32,
proxy: Proxy,
}

impl ProxyHandlerInner {
fn next_req_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}

/// Acquires a previously created `Instance` or creates a new one if none
/// are available.
///
/// The returned `Instance` will have WASI configured and logging configured
/// for the `req_id` passed in.
async fn pop_instance(&self, req_id: u64) -> Result<Instance> {
let mut instance = self._pop_instance().await?;
let mut builder = WasiCtxBuilder::new();
self.cmd.run.configure_wasip2(&mut builder)?;
builder.env("REQUEST_ID", req_id.to_string());
builder.stdout(LogStream::new(
format!("stdout [{req_id}] :: "),
Output::Stdout,
));
builder.stderr(LogStream::new(
format!("stderr [{req_id}] :: "),
Output::Stderr,
));
instance.store.data_mut().ctx = builder.build();
Ok(instance)
}

/// Internal helper to return a cached instance or instantiate a new one if
/// none are available.
async fn _pop_instance(&self) -> Result<Instance> {
if self.cmd.requests_per_instance > 1 {
let mut instances = self.instances.lock().unwrap();
if let Some(instance) = instances.pop() {
return Ok(instance);
}
}

let mut store = self.cmd.new_store(&self.engine)?;
let proxy = self.instance_pre.instantiate_async(&mut store).await?;
Ok(Instance {
store,
handled: 0,
proxy,
})
}

/// Return a completed `Instance` to this cache.
fn push_instance(&self, mut instance: Instance) {
// Throw away this instance if it's been used too much.
instance.handled += 1;
if instance.handled >= self.cmd.requests_per_instance {
return;
}

// Clear out the WASI context to free up any allocations that will no
// longer be in use.
instance.store.data_mut().ctx = WasiCtxBuilder::new().build();

// Return this instance to the cache if the cache isn't already full.
let mut instances = self.instances.lock().unwrap();
if instances.len() < self.cmd.max_cached_instances {
instances.push(instance);
}
}
}

#[derive(Clone)]
Expand All @@ -462,6 +529,7 @@ impl ProxyHandler {
engine,
instance_pre,
next_id: AtomicU64::from(0),
instances: Mutex::default(),
}))
}
}
Expand All @@ -482,22 +550,27 @@ async fn handle_request(
req.uri()
);

let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
let mut instance = inner.pop_instance(req_id).await?;

let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
let out = store.data_mut().new_response_outparam(sender)?;
let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
let req = instance
.store
.data_mut()
.new_incoming_request(Scheme::Http, req)?;
let out = instance.store.data_mut().new_response_outparam(sender)?;

let task = tokio::task::spawn(async move {
if let Err(e) = proxy
if let Err(e) = instance
.proxy
.wasi_http_incoming_handler()
.call_handle(store, req, out)
.call_handle(&mut instance.store, req, out)
.await
{
log::error!("[{req_id}] :: {:#?}", e);
return Err(e);
}

inner.push_instance(instance);

Ok(())
});

Expand Down

0 comments on commit 5e3ed23

Please sign in to comment.