Skip to content

Commit

Permalink
Add debug error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
nightkr committed Feb 19, 2024
1 parent b6ad59f commit ce59f16
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 44 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
sha2 = "0.10"
snafu = "0.7"
snafu = { version = "0.7", features = ["futures"] }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.61.0" }
tera = "1.18"
tokio = { version = "1.29.0", features = ["rt-multi-thread", "macros", "fs", "process"] }
Expand Down
189 changes: 146 additions & 43 deletions rust/stackablectl/src/cmds/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use std::{
};

use clap::Args;
use futures::{channel::mpsc::Sender, FutureExt, SinkExt, TryFutureExt};
use futures::{
channel::mpsc::{self, Sender},
FutureExt, SinkExt, TryFutureExt,
};
use rand::Rng;
use snafu::{ResultExt, Snafu};
use snafu::{futures::TryFutureExt as _, OptionExt, ResultExt, Snafu};
use stackable_operator::{
k8s_openapi::api::core::v1::{ContainerStatus, EphemeralContainer, Pod, PodSpec},
kube::{
self,
api::{AttachParams, PatchParams, TerminalSize},
runtime::reflector::ObjectRef,
},
};
use termion::{raw::IntoRawMode, terminal_size};
Expand All @@ -26,8 +30,63 @@ use crate::cli::Cli;

#[derive(Debug, Snafu)]
pub enum CmdError {
Attach { source: kube::Error },
#[snafu(display("failed to create Kubernetes client"))]
KubeClientCreate { source: kube::Error },
#[snafu(display("failed to get {pod}"))]
GetPod {
source: kube::Error,
pod: ObjectRef<Pod>,
},
#[snafu(display("{pod} has no container {container:?}"))]
FindTemplateContainer {
pod: ObjectRef<Pod>,
container: String,
},
#[snafu(display("failed to create ephemeral debug container {container:?} on {pod}"))]
CreateDebugContainer {
source: kube::Error,
pod: ObjectRef<Pod>,
container: String,
},
#[snafu(display("debug container {container:?} on {pod} never became ready"))]
AwaitDebugContainerReadiness {
source: kube::runtime::wait::Error,
pod: ObjectRef<Pod>,
container: String,
},
#[snafu(display("failed to get status of debug container {container:?} on {pod}"))]
FindDebugContainerStatus {
pod: ObjectRef<Pod>,
container: String,
},
#[snafu(display("failed to attach to container {container:?} on {pod}"))]
AttachContainer {
source: kube::Error,
pod: ObjectRef<Pod>,
container: String,
},
#[snafu(display("failed to enable raw local TTY input"))]
SetRawTtyMode { source: std::io::Error },
#[snafu(display("failed to turn stdin async"))]
AsyncifyStdin { source: std::io::Error },
#[snafu(display("failed to initialize AsyncFd for stdin"))]
AsyncFdStdin { source: std::io::Error },
#[snafu(display("container has no terminal size channel"))]
NoTerminalSizeChannel,
#[snafu(display("failed to read terminal size"))]
GetTerminalSize { source: std::io::Error },
#[snafu(display("failed to update terminal size"))]
UpdateTerminalSize { source: mpsc::SendError },
#[snafu(display("container has no stdin channel"))]
NoStdinChannel,
#[snafu(display("container has no stdout channel"))]
NoStdoutChannel,
#[snafu(display("failed to forward stdin to container"))]
ForwardStdin { source: std::io::Error },
#[snafu(display("failed to forward stdout from container"))]
ForwardStdout { source: std::io::Error },
}
type Result<T, E = CmdError> = std::result::Result<T, E>;

#[derive(Debug, Args)]
pub struct DebugArgs {
Expand All @@ -44,26 +103,32 @@ pub struct DebugArgs {

impl DebugArgs {
pub async fn run(&self, _cli: &Cli) -> Result<String, CmdError> {
let kube = kube::Client::try_default().await.unwrap();
let pods = match &self.namespace {
Some(ns) => kube::Api::<Pod>::namespaced(kube, ns),
None => kube::Api::<Pod>::default_namespaced(kube),
};
let mut rng = rand::thread_rng();
let mut debug_container_name = "sble-debug-".to_string();
for _ in 0..5 {
debug_container_name.push(rng.gen_range('a'..='z'));
}
let kube = kube::Client::try_default()
.await
.context(KubeClientCreateSnafu)?;
let namespace = self
.namespace
.as_deref()
.unwrap_or_else(|| kube.default_namespace());
let pods = kube::Api::<Pod>::namespaced(kube.clone(), namespace);
let debug_container_name = generate_debug_container_name();
info!(
container.name = debug_container_name,
"Creating debug container"
);
let pod = pods.get(&self.pod).await.unwrap();
let pod_ref = || ObjectRef::<Pod>::new(&self.pod).within(namespace);
let pod = pods
.get(&self.pod)
.await
.with_context(|_| GetPodSnafu { pod: pod_ref() })?;
let template_container = pod
.spec
.as_ref()
.and_then(|spec| spec.containers.iter().find(|c| c.name == self.container))
.unwrap();
.with_context(|| FindTemplateContainerSnafu {
pod: pod_ref(),
container: &self.container,
})?;
let pod_patch = Pod {
spec: Some(PodSpec {
ephemeral_containers: Some(vec![EphemeralContainer {
Expand Down Expand Up @@ -93,7 +158,10 @@ impl DebugArgs {
&kube::api::Patch::Strategic(pod_patch),
)
.await
.unwrap();
.with_context(|_| CreateDebugContainerSnafu {
pod: pod_ref(),
container: &self.container,
})?;
info!(
container.name = debug_container_name,
"Waiting for container to start"
Expand All @@ -109,11 +177,17 @@ impl DebugArgs {
.is_some()
})
.await
.unwrap();
.with_context(|_| AwaitDebugContainerReadinessSnafu {
pod: pod_ref(),
container: &self.container,
})?;
let debug_container_status = ready_pod
.as_ref()
.and_then(debug_container_status_of_pod(&debug_container_name))
.unwrap();
.with_context(|| FindDebugContainerStatusSnafu {
pod: pod_ref(),
container: &self.container,
})?;
if let Some(termination) = debug_container_status
.last_state
.as_ref()
Expand All @@ -135,29 +209,53 @@ impl DebugArgs {
&AttachParams::interactive_tty().container(debug_container_name),
)
.await
.context(AttachSnafu)?;
.with_context(|_| AttachContainerSnafu {
pod: pod_ref(),
container: &self.container,
})?;
info!("Attached to container, if the shell line looks empty, press ENTER!");
{
let _raw = std::io::stdout().into_raw_mode().unwrap();
let _raw = std::io::stdout()
.into_raw_mode()
.context(SetRawTtyModeSnafu)?;
futures::future::select_all([
update_terminal_size(attachment.terminal_size().unwrap())
.map(Ok)
.boxed(),
tokio::io::copy(&mut attachment.stdout().unwrap(), &mut tokio::io::stdout())
.map_ok(drop)
.boxed(),
tokio::io::copy(&mut AsyncStdin::new(), &mut attachment.stdin().unwrap())
.map_ok(drop)
.boxed(),
update_terminal_size(
attachment
.terminal_size()
.context(NoTerminalSizeChannelSnafu)?,
)
.boxed(),
tokio::io::copy(
&mut attachment.stdout().context(NoStdoutChannelSnafu)?,
&mut tokio::io::stdout(),
)
.map_ok(drop)
.context(ForwardStdoutSnafu)
.boxed(),
tokio::io::copy(
&mut AsyncStdin::new()?,
&mut attachment.stdin().context(NoStdinChannelSnafu)?,
)
.map_ok(drop)
.context(ForwardStdinSnafu)
.boxed(),
])
.await
.0
.unwrap();
.0?;
}
Ok(String::new())
}
}

fn generate_debug_container_name() -> String {
let mut rng = rand::thread_rng();
let mut name = "sble-debug-".to_string();
for _ in 0..5 {
name.push(rng.gen_range('a'..='z'));
}
name
}

/// Does true non-blocking reads of stdin, so that we can cancel properly on shutdown.
/// The compromise is that it does not handle having things piped into it very well, since their write sides
/// will also be turned non-blocking.
Expand All @@ -169,13 +267,13 @@ struct AsyncStdin {
}

impl AsyncStdin {
fn new() -> Self {
fn new() -> Result<Self> {
let stdin = std::io::stdin();
// Make stdin non-blocking
{
let old_flags = unsafe { libc::fcntl(stdin.as_raw_fd(), libc::F_GETFL) };
if old_flags == -1 {
panic!("{:?}", std::io::Error::last_os_error());
return Err(std::io::Error::last_os_error()).context(AsyncifyStdinSnafu);
}
let status = unsafe {
libc::fcntl(
Expand All @@ -185,12 +283,12 @@ impl AsyncStdin {
)
};
if status == -1 {
panic!("{:?}", std::io::Error::last_os_error());
return Err(std::io::Error::last_os_error()).context(AsyncifyStdinSnafu);
}
};
Self {
fd: AsyncFd::new(stdin).unwrap(),
}
Ok(Self {
fd: AsyncFd::new(stdin).context(AsyncFdStdinSnafu)?,
})
}
}

Expand All @@ -217,10 +315,10 @@ impl AsyncRead for AsyncStdin {
}
}

async fn update_terminal_size(mut tx: Sender<TerminalSize>) {
async fn update_terminal_size(mut tx: Sender<TerminalSize>) -> Result<()> {
let mut signal = tokio::signal::unix::signal(SignalKind::window_change()).unwrap();
{
let (width, height) = terminal_size().unwrap();
let (width, height) = terminal_size().context(GetTerminalSizeSnafu)?;
// Make TTY apps re-render by force-changing the terminal size
// Start by sending an invalid size so that it's a change no matter
// whether the size has actually changed.
Expand All @@ -229,13 +327,18 @@ async fn update_terminal_size(mut tx: Sender<TerminalSize>) {
height,
})
.await
.unwrap();
tx.send(TerminalSize { width, height }).await.unwrap();
.context(UpdateTerminalSizeSnafu)?;
tx.send(TerminalSize { width, height })
.await
.context(UpdateTerminalSizeSnafu)?;
}
while let Some(()) = signal.recv().await {
let (width, height) = terminal_size().unwrap();
tx.send(TerminalSize { width, height }).await.unwrap();
let (width, height) = terminal_size().context(GetTerminalSizeSnafu)?;
tx.send(TerminalSize { width, height })
.await
.context(UpdateTerminalSizeSnafu)?;
}
Ok(())
}

fn debug_container_status_of_pod(
Expand Down

0 comments on commit ce59f16

Please sign in to comment.