Skip to content

Commit

Permalink
Revert "refactor bytes transfering and log output"
Browse files Browse the repository at this point in the history
This reverts commit 3b4ff94.

This commit causes unexpected connection reset, which needs to be
resolved to be merged back.
  • Loading branch information
neevek committed May 24, 2024
1 parent 15009d8 commit f829194
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 183 deletions.
2 changes: 1 addition & 1 deletion src/access_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AccessServer {
continue;
}

debug!("received conn : {addr}");
debug!("received new local connection, addr: {addr}");
match tcp_sender
.send_timeout(
Some(ChannelMessage::Request(socket)),
Expand Down
6 changes: 3 additions & 3 deletions src/bin/rstund.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() {
run(args)
.await
.map_err(|e| {
error!("{e}");
error!("{}", e);
})
.ok();
})
Expand All @@ -53,13 +53,13 @@ async fn run(mut args: RstundArgs) -> Result<()> {
}

if !d.contains(':') {
*d = format!("127.0.0.1:{d}");
*d = format!("127.0.0.1:{}", d);
}

if let Ok(addr) = d.parse() {
upstreams.push(addr);
} else {
log_and_bail!("invalid upstreams address: {d}");
log_and_bail!("invalid upstreams address: {}", d);
}
}

Expand Down
47 changes: 36 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,22 @@ impl Client {
}

Err(e) => {
error!("connect failed, err: {e}");
error!("connect failed, err: {}", e);
if connect_max_retry > 0 {
connect_retry_count += 1;
if connect_retry_count >= connect_max_retry {
info!("quit after having retried for {connect_retry_count} times");
info!(
"quit after having retried for {} times",
connect_retry_count
);
break;
}
}

debug!("will wait for {wait_before_retry_ms}ms before retrying...");
debug!(
"will wait for {}ms before retrying...",
wait_before_retry_ms
);
tokio::time::sleep(Duration::from_millis(wait_before_retry_ms)).await;
}
}
Expand Down Expand Up @@ -262,7 +268,7 @@ impl Client {
let connection = endpoint.connect(remote_addr, domain.as_str())?.await?;

self.set_and_post_tunnel_state(ClientState::Connected);
self.post_tunnel_log(format!("connected to server: {remote_addr:?}").as_str());
self.post_tunnel_log(format!("connected to server: {:?}", remote_addr).as_str());

let (mut quic_send, mut quic_recv) = connection
.open_bi()
Expand Down Expand Up @@ -301,7 +307,16 @@ impl Client {
// accept local connections and build a tunnel to remote
while let Some(ChannelMessage::Request(tcp_stream)) = access_server.recv().await {
match remote_conn.open_bi().await {
Ok(quic_stream) => Tunnel::new().start(true, tcp_stream, quic_stream),
Ok(quic_stream) => {
debug!(
"[TunnelOut] open stream for conn, {} -> {}",
quic_stream.0.id().index(),
remote_conn.remote_address(),
);

let tcp_stream = tcp_stream.into_split();
Tunnel::new().start(tcp_stream, quic_stream).await;
}
Err(e) => {
error!("failed to open_bi on remote connection: {e}");
self.post_tunnel_log(
Expand Down Expand Up @@ -338,7 +353,16 @@ impl Client {
let remote_conn = remote_conn.read().await;
while let Ok(quic_stream) = remote_conn.accept_bi().await {
match TcpStream::connect(self.config.local_access_server_addr.unwrap()).await {
Ok(tcp_stream) => Tunnel::new().start(false, tcp_stream, quic_stream),
Ok(tcp_stream) => {
debug!(
"[TunnelIn] open stream for conn, {} <- {}",
quic_stream.0.id().index(),
remote_conn.remote_address(),
);

let tcp_stream = tcp_stream.into_split();
Tunnel::new().start(tcp_stream, quic_stream).await;
}
Err(e) => {
error!(
"failed to connect to access server: {e}, {}",
Expand Down Expand Up @@ -455,7 +479,7 @@ impl Client {
let cert = certs.first().context("certificate is not in PEM format")?;

let mut roots = RootCertStore::empty();
roots.add(cert).context(format!(
roots.add(&cert).context(format!(
"failed to add certificate: {}",
self.config.cert_path
))?;
Expand Down Expand Up @@ -560,7 +584,7 @@ impl Client {
return Ok(SocketAddr::new(ip, port));
}

bail!("failed to resolve domain: {domain}");
bail!("failed to resolve domain: {}", domain);
}

async fn lookup_server_ip(
Expand All @@ -583,7 +607,7 @@ impl Client {
};

let ip = resolver.await.lookup_first(domain).await?;
info!("resolved {domain} to {ip}");
info!("resolved {} to {}", domain, ip);
Ok(ip)
}

Expand All @@ -592,8 +616,9 @@ impl Client {
self.post_tunnel_info(TunnelInfo::new(
TunnelInfoType::TunnelLog,
Box::new(format!(
"{} {log}",
chrono::Local::now().format(TIME_FORMAT)
"{} {}",
chrono::Local::now().format(TIME_FORMAT),
log
)),
));
}
Expand Down
26 changes: 19 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ pub struct ServerConfig {
pub dashboard_server_credential: String,
}

pub(crate) enum ReadResult {
Succeeded,
Eof,
}

impl ClientConfig {
pub fn create(
mode: &str,
Expand All @@ -156,7 +161,7 @@ impl ClientConfig {
let addr_mapping_str = addr_mapping;
let addr_mapping: Vec<&str> = addr_mapping_str.split('^').collect();
if addr_mapping.len() != 2 {
log_and_bail!("invalid address mapping: {addr_mapping_str}");
log_and_bail!("invalid address mapping: {}", addr_mapping_str);
}

let mut addr_mapping: Vec<String> =
Expand All @@ -168,12 +173,12 @@ impl ClientConfig {
sock_addr_mapping.push(None);
} else {
if !addr.contains(':') {
*addr = format!("127.0.0.1:{addr}");
*addr = format!("127.0.0.1:{}", addr);
}
sock_addr_mapping
.push(Some(addr.parse::<SocketAddr>().context(format!(
"invalid address mapping:[{addr_mapping_str}]"
))?));
sock_addr_mapping.push(Some(
addr.parse::<SocketAddr>()
.context(format!("invalid address mapping:[{}]", addr_mapping_str))?,
));
}
}

Expand Down Expand Up @@ -201,7 +206,7 @@ impl ClientConfig {
access_server_addr: sock_addr_mapping[0],
}))
} else {
if sock_addr_mapping[0].is_none() {
if sock_addr_mapping[0] == None {
log_and_bail!("'ANY' is not allowed as local access server for OUT tunneling");
}
config.local_access_server_addr = sock_addr_mapping[0];
Expand All @@ -215,6 +220,13 @@ impl ClientConfig {
}
}

impl ReadResult {
#![allow(dead_code)]
pub fn is_eof(&self) -> bool {
matches!(self, Self::Eof)
}
}

pub fn socket_addr_with_unspecified_ip_port(ipv6: bool) -> SocketAddr {
if ipv6 {
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
Expand Down
Loading

0 comments on commit f829194

Please sign in to comment.