Skip to content

Commit

Permalink
Change fifo to pipe in shim like go shim
Browse files Browse the repository at this point in the history
Signed-off-by: jokemanfire <[email protected]>
  • Loading branch information
jokemanfire committed Jun 6, 2024
1 parent e69120e commit c3e1288
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 21 deletions.
37 changes: 24 additions & 13 deletions crates/runc-shim/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ use std::{

use containerd_shim::{
api::{ExecProcessRequest, Options},
io_error, other, other_error,
util::IntoOption,
Error,
io_error, other, other_error, Error,
};
use log::{debug, warn};
use log::{debug, info, warn};
use nix::{
cmsg_space,
sys::{
Expand All @@ -43,7 +41,7 @@ use nix::{
};
use oci_spec::runtime::{LinuxNamespaceType, Spec};
use runc::{
io::{Io, NullIo, FIFO},
io::{IOOption, Io, NullIo, PipedIo},
options::GlobalOpts,
Runc, Spawner,
};
Expand Down Expand Up @@ -76,8 +74,8 @@ pub struct ProcessIO {

pub fn create_io(
id: &str,
_io_uid: u32,
_io_gid: u32,
io_uid: u32,
io_gid: u32,
stdio: &Stdio,
) -> containerd_shim::Result<ProcessIO> {
let mut pio = ProcessIO::default();
Expand All @@ -100,19 +98,32 @@ pub fn create_io(

if scheme == FIFO_SCHEME {
debug!(
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
"create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
id,
stdio.stdin.as_str(),
stdio.stdout.as_str(),
stdio.stderr.as_str()
);
let io = FIFO {
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),

// let io = FIFO {
// stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
// stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
// stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
// };
// pio.copy = false;

if stdio.stdin.is_empty() {
debug!("stdin is empty");
}
let opts = IOOption {
open_stdin: !stdio.stdin.is_empty(),
open_stdout: !stdio.stdout.is_empty(),
open_stderr: !stdio.stderr.is_empty(),
};
let io = PipedIo::new(io_uid, io_gid, &opts).unwrap();
pio.copy = true;

pio.io = Some(Arc::new(io));
pio.copy = false;
}
Ok(pio)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/runc-shim/src/processes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tokio::{
sync::oneshot::{channel, Receiver, Sender},
};

use crate::io::Stdio;
use crate::{common::ProcessIO, io::Stdio};

#[async_trait]
pub trait Process {
Expand Down Expand Up @@ -71,6 +71,7 @@ pub struct ProcessTemplate<S> {
pub state: Status,
pub id: String,
pub stdio: Stdio,
pub io: Option<Arc<ProcessIO>>,
pub pid: i32,
pub exit_code: i32,
pub exited_at: Option<OffsetDateTime>,
Expand All @@ -86,6 +87,7 @@ impl<S> ProcessTemplate<S> {
state: Status::CREATED,
id: id.to_string(),
stdio,
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down
43 changes: 38 additions & 5 deletions crates/runc-shim/src/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl RuncFactory {
(Some(s), None)
} else {
let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?;
create_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
create_opts.io = ref_pio.io.clone();
init.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};

let resp = init
Expand All @@ -178,6 +180,22 @@ impl RuncFactory {
}
return Err(runtime_error(bundle, e, "OCI runtime create failed").await);
}
if !init.stdio.stdin.is_empty() {
let stdin_clone = init.stdio.stdin.clone();
let stdin_w = init.stdin.clone();
// Open the write side in advance to make sure read side will not block,
// open it in another thread otherwise it will block too.
tokio::spawn(async move {
if let Ok(stdin_w_file) = OpenOptions::new()
.write(true)
.open(stdin_clone.as_str())
.await
{
let mut lock_guard = stdin_w.lock().unwrap();
*lock_guard = Some(stdin_w_file);
}
});
}
copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
init.pid = pid;
Expand Down Expand Up @@ -232,6 +250,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
stderr: req.stderr.to_string(),
terminal: req.terminal,
},
io: None,
pid: 0,
exit_code: 0,
exited_at: None,
Expand Down Expand Up @@ -394,8 +413,10 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
(Some(s), None)
} else {
let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?;
exec_opts.io = pio.io.as_ref().cloned();
(None, Some(pio))
let ref_pio = Arc::new(pio);
exec_opts.io = ref_pio.io.clone();
p.io = Some(ref_pio.clone());
(None, Some(ref_pio))
};
//TODO checkpoint support
let exec_result = self
Expand Down Expand Up @@ -457,6 +478,15 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {

async fn delete(&self, p: &mut ExecProcess) -> Result<()> {
self.exit_signal.signal();
//close pipe read
if !p.stdio.is_null() {
if let Some(c) = p.io.clone() {
if let Some(io) = c.io.clone() {
io.close_all_sid();
}
}
}
debug!("Do close io complete");
let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id));
remove_file(exec_pid_path).await.unwrap_or_default();
Ok(())
Expand Down Expand Up @@ -568,6 +598,7 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
stdout,
exit_signal.clone(),
Some(move || {
debug!("stdout exit.....................");
drop(stdout_r);
}),
);
Expand All @@ -594,6 +625,7 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
stderr,
exit_signal,
Some(move || {
debug!("stderr exit.....................");
drop(stderr_r);
}),
);
Expand Down Expand Up @@ -632,7 +664,7 @@ where
async fn copy_io_or_console<P>(
p: &mut ProcessTemplate<P>,
socket: Option<ConsoleSocket>,
pio: Option<ProcessIO>,
pio: Option<Arc<ProcessIO>>,
exit_signal: Arc<ExitSignal>,
) -> Result<()> {
if p.stdio.terminal {
Expand Down Expand Up @@ -670,6 +702,7 @@ impl Spawner for ShimExecutor {
}
};
let pid = child.id().unwrap();

let (stdout, stderr, exit_code) = tokio::join!(
read_std(child.stdout),
read_std(child.stderr),
Expand Down
24 changes: 23 additions & 1 deletion crates/runc/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub trait Io: Debug + Send + Sync {

/// Only close write side (should be stdout/err "from" runc process)
fn close_after_start(&self);

/// Close read side
fn close_all_sid(&self);
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -227,7 +230,7 @@ impl Io for PipedIo {

if let Some(p) = self.stderr.as_ref() {
let pw = p.wr.try_clone()?;
cmd.stdout(pw);
cmd.stderr(pw);
}

Ok(())
Expand All @@ -242,6 +245,17 @@ impl Io for PipedIo {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e));
}
}

fn close_all_sid(&self) {
if let Some(p) = self.stdout.as_ref() {
debug!("close pipe read from stdout");
nix::unistd::close(p.rd.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e));
}
if let Some(p) = self.stderr.as_ref() {
debug!("close pipe read from stderr");
nix::unistd::close(p.rd.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e));
}
}
}

/// IO driver to direct output/error messages to /dev/null.
Expand Down Expand Up @@ -273,6 +287,8 @@ impl Io for NullIo {
let mut m = self.dev_null.lock().unwrap();
let _ = m.take();
}

fn close_all_sid(&self) {}
}

/// Io driver based on Stdio::inherited(), to direct outputs/errors to stdio.
Expand All @@ -296,6 +312,8 @@ impl Io for InheritedStdIo {
}

fn close_after_start(&self) {}

fn close_all_sid(&self) {}
}

/// Io driver based on Stdio::piped(), to capture outputs/errors from runC.
Expand All @@ -319,6 +337,8 @@ impl Io for PipedStdIo {
}

fn close_after_start(&self) {}

fn close_all_sid(&self) {}
}

/// FIFO for the scenario that set FIFO for command Io.
Expand Down Expand Up @@ -353,6 +373,8 @@ impl Io for FIFO {
}

fn close_after_start(&self) {}

fn close_all_sid(&self) {}
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion crates/runc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ impl Runc {
Ok(())
});
}

let (status, pid, stdout, stderr) = self.spawner.execute(cmd).await?;
if status.success() {
let output = if combined_output {
Expand Down Expand Up @@ -425,6 +424,7 @@ impl Runc {
}
args.push(id.to_string());
let mut cmd = self.command(&args)?;

match opts {
Some(CreateOpts { io: Some(io), .. }) => {
io.set(&mut cmd).map_err(Error::UnavailableIO)?;
Expand Down Expand Up @@ -618,6 +618,7 @@ impl Spawner for DefaultExecutor {
let mut cmd = cmd;
let child = cmd.spawn().map_err(Error::ProcessSpawnFailed)?;
let pid = child.id().unwrap();

let result = child
.wait_with_output()
.await
Expand Down

0 comments on commit c3e1288

Please sign in to comment.