From 3a368faf9d594ef107aaadb2af0f2bee6060ffd9 Mon Sep 17 00:00:00 2001 From: zzzzzzzzzy9 Date: Mon, 24 Jun 2024 16:54:33 +0800 Subject: [PATCH] shim: Implement the pause and resume interfaces --- crates/runc-shim/src/container.rs | 10 +++++++ crates/runc-shim/src/processes.rs | 13 +++++++++ crates/runc-shim/src/runc.rs | 48 +++++++++++++++++++++++++++++++ crates/runc-shim/src/task.rs | 30 +++++++++++++++++++ 4 files changed, 101 insertions(+) diff --git a/crates/runc-shim/src/container.rs b/crates/runc-shim/src/container.rs index 257e2c60..efb9753d 100644 --- a/crates/runc-shim/src/container.rs +++ b/crates/runc-shim/src/container.rs @@ -54,6 +54,8 @@ pub trait Container { async fn stats(&self) -> Result; async fn all_processes(&self) -> Result>; async fn close_io(&mut self, exec_id: Option<&str>) -> Result<()>; + async fn pause(&mut self) -> Result<()>; + async fn resume(&mut self) -> Result<()>; } #[async_trait] @@ -192,6 +194,14 @@ where let process = self.get_mut_process(exec_id)?; process.close_io().await } + + async fn pause(&mut self) -> Result<()> { + self.init.pause().await + } + + async fn resume(&mut self) -> Result<()> { + self.init.resume().await + } } impl ContainerTemplate diff --git a/crates/runc-shim/src/processes.rs b/crates/runc-shim/src/processes.rs index f0b982d0..0a7ce20a 100644 --- a/crates/runc-shim/src/processes.rs +++ b/crates/runc-shim/src/processes.rs @@ -55,6 +55,8 @@ pub trait Process { async fn stats(&self) -> Result; async fn ps(&self) -> Result>; async fn close_io(&mut self) -> Result<()>; + async fn pause(&mut self) -> Result<()>; + async fn resume(&mut self) -> Result<()>; } #[async_trait] @@ -65,9 +67,12 @@ pub trait ProcessLifecycle { async fn update(&self, p: &mut P, resources: &LinuxResources) -> Result<()>; async fn stats(&self, p: &P) -> Result; async fn ps(&self, p: &P) -> Result>; + async fn pause(&self, p: &mut P) -> Result<()>; + async fn resume(&self, p: &mut P) -> Result<()>; } pub struct ProcessTemplate { + // TODO: state should be Mutex pub state: Status, pub id: String, pub stdio: Stdio, @@ -198,4 +203,12 @@ where } Ok(()) } + + async fn pause(&mut self) -> Result<()> { + self.lifecycle.clone().pause(self).await + } + + async fn resume(&mut self) -> Result<()> { + self.lifecycle.clone().resume(self).await + } } diff --git a/crates/runc-shim/src/runc.rs b/crates/runc-shim/src/runc.rs index 92bb0f23..51ad5840 100644 --- a/crates/runc-shim/src/runc.rs +++ b/crates/runc-shim/src/runc.rs @@ -349,6 +349,46 @@ impl ProcessLifecycle for RuncInitLifecycle { }) .collect()) } + + #[cfg(target_os = "linux")] + async fn pause(&self, p: &mut InitProcess) -> Result<()> { + match p.state { + Status::RUNNING => { + p.state = Status::PAUSING; + if let Err(e) = self.runtime.pause(p.id.as_str()).await { + p.state = Status::RUNNING; + return Err(runtime_error(&self.bundle, e, "OCI runtime pause failed").await); + } + p.state = Status::PAUSED; + Ok(()) + } + _ => Err(other!("cannot pause when in {:?} state", p.state)), + } + } + + #[cfg(not(target_os = "linux"))] + async fn pause(&self, _p: &mut InitProcess) -> Result<()> { + Err(Error::Unimplemented("pause".to_string())) + } + + #[cfg(target_os = "linux")] + async fn resume(&self, p: &mut InitProcess) -> Result<()> { + match p.state { + Status::PAUSED => { + if let Err(e) = self.runtime.resume(p.id.as_str()).await { + return Err(runtime_error(&self.bundle, e, "OCI runtime pause failed").await); + } + p.state = Status::RUNNING; + Ok(()) + } + _ => Err(other!("cannot resume when in {:?} state", p.state)), + } + } + + #[cfg(not(target_os = "linux"))] + async fn resume(&self, _p: &mut InitProcess) -> Result<()> { + Err(Error::Unimplemented("resume".to_string())) + } } impl RuncInitLifecycle { @@ -473,6 +513,14 @@ impl ProcessLifecycle for RuncExecLifecycle { async fn ps(&self, _p: &ExecProcess) -> Result> { Err(Error::Unimplemented("exec ps".to_string())) } + + async fn pause(&self, _p: &mut ExecProcess) -> Result<()> { + Err(Error::Unimplemented("exec pause".to_string())) + } + + async fn resume(&self, _p: &mut ExecProcess) -> Result<()> { + Err(Error::Unimplemented("exec resume".to_string())) + } } async fn copy_console( diff --git a/crates/runc-shim/src/task.rs b/crates/runc-shim/src/task.rs index eec1237d..a2f8c737 100644 --- a/crates/runc-shim/src/task.rs +++ b/crates/runc-shim/src/task.rs @@ -50,6 +50,10 @@ use std::path::Path; #[cfg(target_os = "linux")] use cgroups_rs::hierarchies::is_cgroup2_unified_mode; +use containerd_shim::{ + api::{PauseRequest, ResumeRequest}, + protos::events::task::{TaskPaused, TaskResumed}, +}; #[cfg(target_os = "linux")] use containerd_shim::{ error::{Error, Result}, @@ -288,6 +292,32 @@ where }) } + async fn pause(&self, _ctx: &TtrpcContext, req: PauseRequest) -> TtrpcResult { + info!("pause request for {:?}", req); + let mut container = self.get_container(req.id()).await?; + container.pause().await?; + self.send_event(TaskPaused { + container_id: req.id.to_string(), + ..Default::default() + }) + .await; + info!("pause request for {:?} returns successfully", req); + Ok(Empty::new()) + } + + async fn resume(&self, _ctx: &TtrpcContext, req: ResumeRequest) -> TtrpcResult { + info!("resume request for {:?}", req); + let mut container = self.get_container(req.id()).await?; + container.resume().await?; + self.send_event(TaskResumed { + container_id: req.id.to_string(), + ..Default::default() + }) + .await; + info!("resume request for {:?} returns successfully", req); + Ok(Empty::new()) + } + async fn kill(&self, _ctx: &TtrpcContext, req: KillRequest) -> TtrpcResult { info!("Kill request for {:?}", req); let mut container = self.get_container(req.id()).await?;