diff --git a/Cargo.lock b/Cargo.lock index 2dbbc1db..d61d164e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2600,6 +2600,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" dependencies = [ "doc-comment", + "futures-core", + "pin-project", "snafu-derive 0.7.5", ] diff --git a/Cargo.toml b/Cargo.toml index deee7c5a..ca22e305 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" sha2 = "0.10" -snafu = "0.7" +snafu = { version = "0.7", features = ["futures"] } stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.61.0" } tera = "1.18" tokio = { version = "1.29.0", features = ["rt-multi-thread", "macros", "fs", "process"] } diff --git a/rust/stackablectl/src/cmds/debug.rs b/rust/stackablectl/src/cmds/debug.rs index 40b19da5..e103aec2 100644 --- a/rust/stackablectl/src/cmds/debug.rs +++ b/rust/stackablectl/src/cmds/debug.rs @@ -5,14 +5,18 @@ use std::{ }; use clap::Args; -use futures::{channel::mpsc::Sender, FutureExt, SinkExt, TryFutureExt}; +use futures::{ + channel::mpsc::{self, Sender}, + FutureExt, SinkExt, TryFutureExt, +}; use rand::Rng; -use snafu::{ResultExt, Snafu}; +use snafu::{futures::TryFutureExt as _, OptionExt, ResultExt, Snafu}; use stackable_operator::{ k8s_openapi::api::core::v1::{ContainerStatus, EphemeralContainer, Pod, PodSpec}, kube::{ self, api::{AttachParams, PatchParams, TerminalSize}, + runtime::reflector::ObjectRef, }, }; use termion::{raw::IntoRawMode, terminal_size}; @@ -26,8 +30,63 @@ use crate::cli::Cli; #[derive(Debug, Snafu)] pub enum CmdError { - Attach { source: kube::Error }, + #[snafu(display("failed to create Kubernetes client"))] + KubeClientCreate { source: kube::Error }, + #[snafu(display("failed to get {pod}"))] + GetPod { + source: kube::Error, + pod: ObjectRef, + }, + #[snafu(display("{pod} has no container {container:?}"))] + FindTemplateContainer { + pod: ObjectRef, + container: String, + }, + #[snafu(display("failed to create ephemeral debug container {container:?} on {pod}"))] + CreateDebugContainer { + source: kube::Error, + pod: ObjectRef, + container: String, + }, + #[snafu(display("debug container {container:?} on {pod} never became ready"))] + AwaitDebugContainerReadiness { + source: kube::runtime::wait::Error, + pod: ObjectRef, + container: String, + }, + #[snafu(display("failed to get status of debug container {container:?} on {pod}"))] + FindDebugContainerStatus { + pod: ObjectRef, + container: String, + }, + #[snafu(display("failed to attach to container {container:?} on {pod}"))] + AttachContainer { + source: kube::Error, + pod: ObjectRef, + container: String, + }, + #[snafu(display("failed to enable raw local TTY input"))] + SetRawTtyMode { source: std::io::Error }, + #[snafu(display("failed to turn stdin async"))] + AsyncifyStdin { source: std::io::Error }, + #[snafu(display("failed to initialize AsyncFd for stdin"))] + AsyncFdStdin { source: std::io::Error }, + #[snafu(display("container has no terminal size channel"))] + NoTerminalSizeChannel, + #[snafu(display("failed to read terminal size"))] + GetTerminalSize { source: std::io::Error }, + #[snafu(display("failed to update terminal size"))] + UpdateTerminalSize { source: mpsc::SendError }, + #[snafu(display("container has no stdin channel"))] + NoStdinChannel, + #[snafu(display("container has no stdout channel"))] + NoStdoutChannel, + #[snafu(display("failed to forward stdin to container"))] + ForwardStdin { source: std::io::Error }, + #[snafu(display("failed to forward stdout from container"))] + ForwardStdout { source: std::io::Error }, } +type Result = std::result::Result; #[derive(Debug, Args)] pub struct DebugArgs { @@ -44,26 +103,32 @@ pub struct DebugArgs { impl DebugArgs { pub async fn run(&self, _cli: &Cli) -> Result { - let kube = kube::Client::try_default().await.unwrap(); - let pods = match &self.namespace { - Some(ns) => kube::Api::::namespaced(kube, ns), - None => kube::Api::::default_namespaced(kube), - }; - let mut rng = rand::thread_rng(); - let mut debug_container_name = "sble-debug-".to_string(); - for _ in 0..5 { - debug_container_name.push(rng.gen_range('a'..='z')); - } + let kube = kube::Client::try_default() + .await + .context(KubeClientCreateSnafu)?; + let namespace = self + .namespace + .as_deref() + .unwrap_or_else(|| kube.default_namespace()); + let pods = kube::Api::::namespaced(kube.clone(), namespace); + let debug_container_name = generate_debug_container_name(); info!( container.name = debug_container_name, "Creating debug container" ); - let pod = pods.get(&self.pod).await.unwrap(); + let pod_ref = || ObjectRef::::new(&self.pod).within(namespace); + let pod = pods + .get(&self.pod) + .await + .with_context(|_| GetPodSnafu { pod: pod_ref() })?; let template_container = pod .spec .as_ref() .and_then(|spec| spec.containers.iter().find(|c| c.name == self.container)) - .unwrap(); + .with_context(|| FindTemplateContainerSnafu { + pod: pod_ref(), + container: &self.container, + })?; let pod_patch = Pod { spec: Some(PodSpec { ephemeral_containers: Some(vec![EphemeralContainer { @@ -93,7 +158,10 @@ impl DebugArgs { &kube::api::Patch::Strategic(pod_patch), ) .await - .unwrap(); + .with_context(|_| CreateDebugContainerSnafu { + pod: pod_ref(), + container: &self.container, + })?; info!( container.name = debug_container_name, "Waiting for container to start" @@ -109,11 +177,17 @@ impl DebugArgs { .is_some() }) .await - .unwrap(); + .with_context(|_| AwaitDebugContainerReadinessSnafu { + pod: pod_ref(), + container: &self.container, + })?; let debug_container_status = ready_pod .as_ref() .and_then(debug_container_status_of_pod(&debug_container_name)) - .unwrap(); + .with_context(|| FindDebugContainerStatusSnafu { + pod: pod_ref(), + container: &self.container, + })?; if let Some(termination) = debug_container_status .last_state .as_ref() @@ -135,29 +209,53 @@ impl DebugArgs { &AttachParams::interactive_tty().container(debug_container_name), ) .await - .context(AttachSnafu)?; + .with_context(|_| AttachContainerSnafu { + pod: pod_ref(), + container: &self.container, + })?; info!("Attached to container, if the shell line looks empty, press ENTER!"); { - let _raw = std::io::stdout().into_raw_mode().unwrap(); + let _raw = std::io::stdout() + .into_raw_mode() + .context(SetRawTtyModeSnafu)?; futures::future::select_all([ - update_terminal_size(attachment.terminal_size().unwrap()) - .map(Ok) - .boxed(), - tokio::io::copy(&mut attachment.stdout().unwrap(), &mut tokio::io::stdout()) - .map_ok(drop) - .boxed(), - tokio::io::copy(&mut AsyncStdin::new(), &mut attachment.stdin().unwrap()) - .map_ok(drop) - .boxed(), + update_terminal_size( + attachment + .terminal_size() + .context(NoTerminalSizeChannelSnafu)?, + ) + .boxed(), + tokio::io::copy( + &mut attachment.stdout().context(NoStdoutChannelSnafu)?, + &mut tokio::io::stdout(), + ) + .map_ok(drop) + .context(ForwardStdoutSnafu) + .boxed(), + tokio::io::copy( + &mut AsyncStdin::new()?, + &mut attachment.stdin().context(NoStdinChannelSnafu)?, + ) + .map_ok(drop) + .context(ForwardStdinSnafu) + .boxed(), ]) .await - .0 - .unwrap(); + .0?; } Ok(String::new()) } } +fn generate_debug_container_name() -> String { + let mut rng = rand::thread_rng(); + let mut name = "sble-debug-".to_string(); + for _ in 0..5 { + name.push(rng.gen_range('a'..='z')); + } + name +} + /// Does true non-blocking reads of stdin, so that we can cancel properly on shutdown. /// The compromise is that it does not handle having things piped into it very well, since their write sides /// will also be turned non-blocking. @@ -169,13 +267,13 @@ struct AsyncStdin { } impl AsyncStdin { - fn new() -> Self { + fn new() -> Result { let stdin = std::io::stdin(); // Make stdin non-blocking { let old_flags = unsafe { libc::fcntl(stdin.as_raw_fd(), libc::F_GETFL) }; if old_flags == -1 { - panic!("{:?}", std::io::Error::last_os_error()); + return Err(std::io::Error::last_os_error()).context(AsyncifyStdinSnafu); } let status = unsafe { libc::fcntl( @@ -185,12 +283,12 @@ impl AsyncStdin { ) }; if status == -1 { - panic!("{:?}", std::io::Error::last_os_error()); + return Err(std::io::Error::last_os_error()).context(AsyncifyStdinSnafu); } }; - Self { - fd: AsyncFd::new(stdin).unwrap(), - } + Ok(Self { + fd: AsyncFd::new(stdin).context(AsyncFdStdinSnafu)?, + }) } } @@ -217,10 +315,10 @@ impl AsyncRead for AsyncStdin { } } -async fn update_terminal_size(mut tx: Sender) { +async fn update_terminal_size(mut tx: Sender) -> Result<()> { let mut signal = tokio::signal::unix::signal(SignalKind::window_change()).unwrap(); { - let (width, height) = terminal_size().unwrap(); + let (width, height) = terminal_size().context(GetTerminalSizeSnafu)?; // Make TTY apps re-render by force-changing the terminal size // Start by sending an invalid size so that it's a change no matter // whether the size has actually changed. @@ -229,13 +327,18 @@ async fn update_terminal_size(mut tx: Sender) { height, }) .await - .unwrap(); - tx.send(TerminalSize { width, height }).await.unwrap(); + .context(UpdateTerminalSizeSnafu)?; + tx.send(TerminalSize { width, height }) + .await + .context(UpdateTerminalSizeSnafu)?; } while let Some(()) = signal.recv().await { - let (width, height) = terminal_size().unwrap(); - tx.send(TerminalSize { width, height }).await.unwrap(); + let (width, height) = terminal_size().context(GetTerminalSizeSnafu)?; + tx.send(TerminalSize { width, height }) + .await + .context(UpdateTerminalSizeSnafu)?; } + Ok(()) } fn debug_container_status_of_pod(