diff --git a/chaos-tproxy-controller/src/cmd/command_line.rs b/chaos-tproxy-controller/src/cmd/command_line.rs index 975ae94..15f9eef 100644 --- a/chaos-tproxy-controller/src/cmd/command_line.rs +++ b/chaos-tproxy-controller/src/cmd/command_line.rs @@ -17,10 +17,6 @@ pub struct Opt { #[structopt(name = "FILE", parse(from_os_str))] pub input: Option, - /// Allows applying json config by stdin/stdout - #[structopt(short, long)] - pub interactive: bool, - // The number of occurrences of the `v/verbose` flag /// Verbose mode (-v, -vv, -vvv, etc.) #[structopt(short, long, parse(from_occurrences))] @@ -33,6 +29,10 @@ pub struct Opt { /// ipc path for sub proxy. #[structopt(long)] pub ipc_path: Option, + + /// ipc path to communicate with chaos-mesh. + #[structopt(long = "interactive-path")] + pub interactive_path: Option, } impl Opt { @@ -50,7 +50,7 @@ impl Opt { } fn checked(self) -> Result { - if !self.interactive && !self.proxy && self.input.is_none() { + if self.interactive_path.is_none() && !self.proxy && self.input.is_none() { return Err(anyhow!("config file is required when interactive mode and daemon mode is all disabled, use `-h | --help` for more details")); } Ok(self) diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index f8891f7..a62f692 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; use std::future::Future; +use std::path::PathBuf; use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; @@ -8,16 +9,17 @@ use std::task::{Context, Poll}; use anyhow::Error; use futures::TryStreamExt; use http::{Method, Request, Response, StatusCode}; -use hyper::server::conn::{Connection, Http}; +use hyper::server::conn::Http; use hyper::service::Service; use hyper::Body; +use tokio::net::UnixListener; use tokio::select; use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::instrument; -use crate::cmd::interactive::stdio::StdStream; +#[cfg(unix)] use crate::proxy::config::Config; use crate::proxy::exec::Proxy; use crate::raw_config::RawConfig; @@ -41,24 +43,32 @@ impl ConfigServer { } } - pub fn serve_interactive(&mut self) { + pub fn serve_interactive(&mut self, interactive_path: PathBuf) { let mut rx = self.rx.take().unwrap(); - let mut service = ConfigService(self.proxy.clone()); - self.task = Some(tokio::spawn(async move { + let proxy = self.proxy.clone(); + self.task = Some(tokio::task::spawn(async move { let rx_mut = &mut rx; + tracing::info!("ConfigServer listener try binding {:?}", interactive_path); + let unix_listener = UnixListener::bind(interactive_path).unwrap(); + loop { - let stream = StdStream::default(); - let mut conn = Http::new().serve_connection(stream, &mut service); - let conn_mut = &mut conn; + let mut service = ConfigService(proxy.clone()); select! { _ = &mut *rx_mut => { tracing::trace!("catch signal in config server."); - Connection::graceful_shutdown(Pin::new(conn_mut)); return Ok(()); }, - ret = &mut *conn_mut => if let Err(e) = ret { - tracing::error!("{}",e); - } + stream = unix_listener.accept() => { + tokio::task::spawn(async move { + let (stream, _) = stream.unwrap(); + + let http = Http::new(); + let conn = http.serve_connection(stream, &mut service); + if let Err(e) = conn.await { + tracing::error!("{}",e); + } + }); + }, }; } })); diff --git a/chaos-tproxy-controller/src/main.rs b/chaos-tproxy-controller/src/main.rs index fa7a832..09e5e20 100644 --- a/chaos-tproxy-controller/src/main.rs +++ b/chaos-tproxy-controller/src/main.rs @@ -44,15 +44,18 @@ async fn main() -> anyhow::Result<()> { return Ok(()); } - if opt.interactive { + if let Some(path) = opt.interactive_path { let mut config_server = ConfigServer::new(Proxy::new(opt.verbose).await); - config_server.serve_interactive(); + config_server.serve_interactive(path.clone()); let mut signals = Signals::from_kinds(&[SignalKind::interrupt(), SignalKind::terminate()])?; signals.wait().await?; + // Currently we cannot graceful shutdown the config server. config_server.stop().await?; - // Currently we cannot graceful shutdown the config server. + // delete the unix socket file + std::fs::remove_file(path.clone())?; + exit(0); } Ok(())