Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Change to unix domain socket #52

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
28 changes: 18 additions & 10 deletions chaos-tproxy-controller/src/cmd/interactive/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::task::{Context, Poll};
use anyhow::Error;
use futures::TryStreamExt;
use http::{Method, Request, Response, StatusCode};
use hyper::server::conn::{Connection, Http};
use hyper::server::conn::{Http};
use hyper::service::Service;
use hyper::Body;
use tokio::select;
Expand All @@ -16,7 +16,9 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::instrument;

use crate::cmd::interactive::stdio::StdStream;
use tokio::net::{UnixListener};
#[cfg(unix)]
use std::os::unix::io::{FromRawFd};
use crate::proxy::config::Config;
use crate::proxy::exec::Proxy;
use crate::raw_config::RawConfig;
Expand All @@ -43,21 +45,27 @@ impl ConfigServer {
pub fn serve_interactive(&mut self) {
let mut rx = self.rx.take().unwrap();
let mut service = ConfigService(self.proxy.clone());

self.task = Some(tokio::spawn(async move {
let rx_mut = &mut rx;
let rx_mut = &mut rx;
let unix_listener = UnixListener::from_std(unsafe {std::os::unix::net::UnixListener::from_raw_fd(3)}).unwrap();
RandyLambert marked this conversation as resolved.
Show resolved Hide resolved

loop {
let stream = StdStream::default();
let mut conn = Http::new().serve_connection(stream, &mut service);
let conn_mut = &mut conn;
select! {
_ = &mut *rx_mut => {
tracing::trace!("catch signal in config server.");
Connection::graceful_shutdown(Pin::new(conn_mut));
return Ok(());
},
ret = &mut *conn_mut => if let Err(e) = ret {
tracing::error!("{}",e);
}
stream = unix_listener.accept() => {
let (stream, _) = stream.unwrap();

let http = Http::new();
let conn = http.serve_connection(stream, &mut service);
if let Err(e) = conn.await {
tracing::error!("{}",e);
return Err(anyhow::anyhow!("{}",e));
}
},
};
}
}));
Expand Down