Skip to content

Commit

Permalink
Add support for leases
Browse files Browse the repository at this point in the history
Signed-off-by: James Sturtevant <[email protected]>
  • Loading branch information
jsturtevant committed Dec 12, 2023
1 parent 6b03aba commit 96dfd6b
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 42 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion crates/containerd-shim-wasm/src/container/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ use super::Source;
use crate::container::{PathResolve, RuntimeContext};
use crate::sandbox::Stdio;

pub struct RuntimeInfo {
pub name: &'static str,
pub version: &'static str,
}

pub trait Engine: Clone + Send + Sync + 'static {
/// The name to use for this engine
fn name() -> &'static str;
fn info() -> &'static RuntimeInfo;

/// Run a WebAssembly container
fn run_wasi(&self, ctx: &impl RuntimeContext, stdio: Stdio) -> Result<i32>;
Expand Down
2 changes: 1 addition & 1 deletion crates/containerd-shim-wasm/src/container/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod path;

pub(crate) use context::WasiContext;
pub use context::{Entrypoint, RuntimeContext, Source};
pub use engine::Engine;
pub use engine::{Engine, RuntimeInfo};
pub use instance::Instance;
pub use path::PathResolve;

Expand Down
8 changes: 6 additions & 2 deletions crates/containerd-shim-wasm/src/container/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::bail;

use super::engine::RuntimeInfo;
use crate::container::{Engine, RuntimeContext, Stdio};
use crate::sys::container::instance::Instance;
use crate::testing::WasiTest;
Expand All @@ -8,8 +9,11 @@ use crate::testing::WasiTest;
struct EngineFailingValidation;

impl Engine for EngineFailingValidation {
fn name() -> &'static str {
"wasi_instance"
fn info() -> &'static RuntimeInfo {
&RuntimeInfo {
name: "wasi_instance",
version: "0.0.0",
}
}
fn can_handle(&self, _ctx: &impl RuntimeContext) -> anyhow::Result<()> {
bail!("can't handle");
Expand Down
182 changes: 159 additions & 23 deletions crates/containerd-shim-wasm/src/sandbox/containerd.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#![cfg(unix)]

use std::collections::HashMap;
use std::path::Path;

use containerd_client;
use containerd_client::services::v1::containers_client::ContainersClient;
use containerd_client::services::v1::content_client::ContentClient;
use containerd_client::services::v1::images_client::ImagesClient;
use containerd_client::services::v1::leases_client::LeasesClient;
use containerd_client::services::v1::{
Container, DeleteContentRequest, GetContainerRequest, GetImageRequest, Image, Info,
InfoRequest, ReadContentRequest, UpdateImageRequest, UpdateRequest, WriteAction,
Expand All @@ -23,20 +23,76 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Code, Request};

use crate::container::Engine;
use crate::container::{Engine, RuntimeInfo};
use crate::sandbox::error::{Error as ShimError, Result};
use crate::sandbox::oci::{self, WasmLayer};

pub(crate) struct Client {
inner: Channel,
rt: Runtime,
namespace: String,
address: String,
}

// Adds lease info to grpc header
// https://github.com/containerd/containerd/blob/8459273f806e068e1a6bacfaf1355bbbad738d5e/docs/garbage-collection.md#using-grpc
#[macro_export]
macro_rules! with_lease {
($req : ident, $ns: expr, $lease_id: expr) => {{
let mut req = Request::new($req);
let md = req.metadata_mut();
// https://github.com/containerd/containerd/blob/main/namespaces/grpc.go#L27
md.insert("containerd-namespace", $ns.parse().unwrap());
md.insert("containerd-lease", $lease_id.parse().unwrap());
req
}};
}

struct LeaseGuard {
lease_id: String,
namespace: String,
address: String,
}

pub(crate) struct WriteContent {
_lease: LeaseGuard,
pub digest: String,
}

// Provides a best effort for dropping a lease of the content. If the lease cannot be dropped, it will log a warning
impl Drop for LeaseGuard {
fn drop(&mut self) {
let id = self.lease_id.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let client = rt.block_on(containerd_client::connect(self.address.clone()));

if client.is_err() {
log::warn!("failed to connect to containerd. lease may not be deleted");
return;
}

let mut client = LeasesClient::new(client.unwrap());

rt.block_on(async {
let req = containerd_client::services::v1::DeleteRequest { id, sync: false };
let req = with_namespace!(req, self.namespace);
let result = client.delete(req).await;

if result.is_err() {
log::warn!("failed to remove lease.");
}
});
}
}

// sync wrapper implementation from https://tokio.rs/tokio/topics/bridging
impl Client {
// wrapper around connection that will establish a connection and create a client
pub fn connect(address: impl AsRef<Path>, namespace: impl ToString) -> Result<Client> {
pub fn connect(address: &str, namespace: impl ToString) -> Result<Client> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
Expand All @@ -49,6 +105,7 @@ impl Client {
inner,
rt,
namespace: namespace.to_string(),
address: address.to_string(),
})
}

Expand Down Expand Up @@ -88,18 +145,57 @@ impl Client {
})
}

pub fn save_content(&self, data: Vec<u8>) -> Result<String> {
// wrapper around lease that will create a lease and return a guard that will delete the lease when dropped
fn lease(&self, r#ref: String) -> Result<LeaseGuard> {
self.rt.block_on(async {
let mut lease_labels = HashMap::new();
let expire = chrono::Utc::now() + chrono::Duration::hours(24);
lease_labels.insert("containerd.io/gc.expire".to_string(), expire.to_rfc3339());
let lease_request = containerd_client::services::v1::CreateRequest {
id: r#ref.clone(),
labels: lease_labels,
};

let mut leases_client = LeasesClient::new(self.inner.clone());

let lease = leases_client
.create(with_namespace!(lease_request, self.namespace))
.await
.map_err(|e| ShimError::Containerd(e.to_string()))?
.into_inner()
.lease
.ok_or_else(|| {
ShimError::Containerd(format!("unable to create lease for {}", r#ref.clone()))
})?;

Ok(LeaseGuard {
lease_id: lease.id.clone(),
address: self.address.clone(),
namespace: self.namespace.clone(),
})
})
}

pub fn save_content(
&self,
data: Vec<u8>,
original_digest: String,
info: &RuntimeInfo,
) -> Result<WriteContent> {
let expected = digest(data.clone());
let expected = format!("sha256:{}", expected);
let r#ref = format!("precompile-{}-{}-{}", info.name, info.version, expected);
let lease = self.lease(r#ref.clone())?;

let digest = self.rt.block_on(async {
// create a channel to feed the stream; only sending one message at a time so we can set this to one
let (tx, rx) = mpsc::channel(1);

let len = data.len() as i64;
let expected = digest(data.clone());
let expected = format!("sha256:{}", expected);

let mut client = ContentClient::new(self.inner.clone());
let r#ref = "test".to_string();

// Send Stat action to containerd to let it know that we are going to write content
// Send write request with Stat action to containerd to let it know that we are going to write content
// if the content is already there, it will return early with AlreadyExists
let req = WriteContentRequest {
r#ref: r#ref.clone(),
Expand All @@ -112,7 +208,8 @@ impl Client {
.await
.map_err(|err| ShimError::Containerd(err.to_string()))?;
let request_stream = ReceiverStream::new(rx);
let request_stream = with_namespace!(request_stream, self.namespace);
let request_stream =
with_lease!(request_stream, self.namespace, lease.lease_id.clone());
let mut response_stream = match client.write(request_stream).await {
Ok(response_stream) => response_stream.into_inner(),
Err(e) if e.code() == Code::AlreadyExists => {
Expand All @@ -134,7 +231,10 @@ impl Client {

// Write and commit at same time
let mut labels = HashMap::new();
labels.insert("runwasi.io/precompiled".to_string(), "".to_string());
labels.insert(
"runwasi.io/precompiled".to_string(),
original_digest.clone(),
);
let commit_request = WriteContentRequest {
action: WriteAction::Commit.into(),
total: len,
Expand Down Expand Up @@ -172,6 +272,11 @@ impl Client {
)));
}
Ok(response.digest)
})?;

Ok(WriteContent {
_lease: lease,
digest: digest.clone(),
})
}

Expand Down Expand Up @@ -311,8 +416,8 @@ impl Client {
) -> Result<(Vec<oci::WasmLayer>, Platform)> {
let container = self.get_container(containerd_id.to_string())?;
let mut image = self.get_image(container.image)?;
let digest = self.extract_image_content_sha(&image)?;
let manifest = self.read_content(digest.clone())?;
let image_digest = self.extract_image_content_sha(&image)?;
let manifest = self.read_content(image_digest.clone())?;
let manifest = manifest.as_slice();
let manifest = ImageManifest::from_reader(manifest)?;

Expand All @@ -328,7 +433,13 @@ impl Client {
};

log::info!("found manifest with WASM OCI image format.");
let label = format!("runwasi.io/precompiled/{}", T::name());
// This label is unique across runtimes and versions
// a precompiled component/module will not work across different runtimes or versions
let label = format!(
"runwasi.io/precompiled/{}/{}",
T::info().name,
T::info().version
);
match image.labels.get(&label) {
Some(precompile_digest) if T::can_precompile() => {
log::info!("found precompiled image");
Expand All @@ -353,20 +464,24 @@ impl Client {

log::debug!("precompile complete and saving content");
let precompiled = engine.precompile(layers.as_slice())?;
let precompile_digest = self.save_content(precompiled.clone())?;
let precompiled_content =
self.save_content(precompiled.clone(), image_digest.clone(), T::info())?;

log::debug!("updating image with compiled content digest");
image.labels.insert(
"runwasi.io/precompiled".to_string(),
precompile_digest.clone(),
precompiled_content.digest.clone(),
);
self.update_image(image)?;

// The original image is considered a root object, by adding a ref to the new compiled content
// We tell to containerd to not garbage collect the new content until this image is removed from the system
// this ensures that we keep the content around after the lease is dropped
log::debug!("updating content with precompile digest to avoid garbage collection");
let mut image_content = self.get_info(digest.clone())?;
let mut image_content = self.get_info(image_digest.clone())?;
image_content.labels.insert(
"containerd.io/gc.ref.content.precompile".to_string(),
precompile_digest.clone(),
precompiled_content.digest.clone(),
);
self.update_info(image_content)?;

Expand Down Expand Up @@ -419,15 +534,36 @@ mod tests {
let expected = digest(data.clone());
let expected = format!("sha256:{}", expected);

let returned = client.save_content(data).unwrap();
assert_eq!(expected, returned);

let data = client.read_content(returned).unwrap();
let returned = client
.save_content(
data,
"original".to_string(),
&RuntimeInfo {
name: "test",
version: "0.0.0",
},
)
.unwrap();
assert_eq!(expected, returned.digest.clone());

let data = client.read_content(returned.digest.clone()).unwrap();
assert_eq!(data, b"hello world");

// need to drop the lease to be able to create a second one
drop(returned);

// a second call should be successful since it already exists
let returned = client.save_content(data).unwrap();
assert_eq!(expected, returned);
let returned = client
.save_content(
data,
"original".to_string(),
&RuntimeInfo {
name: "test",
version: "0.0.0",
},
)
.unwrap();
assert_eq!(expected, returned.digest);

client.delete_content(expected.clone()).unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<E: Engine> LibcontainerExecutor for Executor<E> {
fn validate(&self, spec: &Spec) -> Result<(), ExecutorValidationError> {
// We can handle linux container. We delegate wasm container to the engine.
match self.inner(spec) {
InnerExecutor::CantHandle => Err(ExecutorValidationError::CantHandle(E::name())),
InnerExecutor::CantHandle => Err(ExecutorValidationError::CantHandle(E::info().name)),
_ => Ok(()),
}
}
Expand All @@ -45,7 +45,7 @@ impl<E: Engine> LibcontainerExecutor for Executor<E> {
// If it looks like a linux container, run it as a linux container.
// Otherwise, run it as a wasm container
match self.inner(spec) {
InnerExecutor::CantHandle => Err(LibcontainerExecutorError::CantHandle(E::name())),
InnerExecutor::CantHandle => Err(LibcontainerExecutorError::CantHandle(E::info().name)),
InnerExecutor::Linux => {
log::info!("executing linux container");
self.stdio.take().redirect().unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ impl<E: Engine> SandboxInstance for Instance<E> {
let engine = cfg.get_engine();
let bundle = cfg.get_bundle().to_path_buf();
let namespace = cfg.get_namespace();
let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name());
let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::info().name);
let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?;
let stdio = Stdio::init_from_cfg(cfg)?;

// check if container is OCI image with wasm layers and attempt to read the module
let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address(), &namespace)?
let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address().as_str(), &namespace)?
.load_modules(&id, engine.clone())
.unwrap_or_else(|e| {
log::warn!("Error obtaining wasm layers for container {id}. Will attempt to use files inside container image. Error: {e}");
Expand Down
Loading

0 comments on commit 96dfd6b

Please sign in to comment.