From d8b75bc093fb3ea443056991c5e1d4fd04d0529f Mon Sep 17 00:00:00 2001 From: dev0 Date: Thu, 17 Aug 2023 23:04:38 +1000 Subject: [PATCH] Vmess WIP --- Cargo.Bazel.lock | 10 +- Cargo.lock | 8 +- clash_lib/Cargo.toml | 8 +- .../providers/proxy_set_provider.rs | 3 +- clash_lib/src/config/internal/proxy.rs | 20 +++ clash_lib/src/proxy/mod.rs | 2 +- clash_lib/src/proxy/shadowsocks/datagram.rs | 1 + clash_lib/src/proxy/vmess/mod.rs | 39 ++++- .../src/proxy/vmess/vmess_impl/datagram.rs | 162 ++++++++++++++++++ clash_lib/src/proxy/vmess/vmess_impl/mod.rs | 1 + .../src/proxy/vmess/vmess_impl/stream.rs | 12 +- 11 files changed, 246 insertions(+), 20 deletions(-) create mode 100644 clash_lib/src/proxy/vmess/vmess_impl/datagram.rs diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 1b7c13c7..ffc1a817 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "f267c28d6e0cbe45608f60b117a704c18e3155b38981d0866edb7f1ea5e9fa9b", + "checksum": "3e9cefe14ba34202af1b06aa8dc813b02fe4ac0fa0a3022682061695eb3ec7e8", "crates": { "addr2line 0.20.0": { "name": "addr2line", @@ -2044,7 +2044,7 @@ "Git": { "remote": "https://github.com/Watfaq/boring.git", "commitish": { - "Branch": "bazel" + "Rev": "a5c2442" }, "strip_prefix": "boring" } @@ -2102,7 +2102,7 @@ "Git": { "remote": "https://github.com/Watfaq/boring.git", "commitish": { - "Branch": "bazel" + "Rev": "a5c2442" }, "strip_prefix": "boring-sys" } @@ -7141,7 +7141,7 @@ "Git": { "remote": "https://github.com/Watfaq/boring.git", "commitish": { - "Branch": "bazel" + "Rev": "a5c2442" }, "strip_prefix": "hyper-boring" } @@ -14488,7 +14488,7 @@ "Git": { "remote": "https://github.com/Watfaq/boring.git", "commitish": { - "Branch": "bazel" + "Rev": "a5c2442" }, "strip_prefix": "tokio-boring" } diff --git a/Cargo.lock b/Cargo.lock index a0d87f99..724e6776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,7 +393,7 @@ dependencies = [ [[package]] name = "boring" version = "2.1.0" -source = "git+https://github.com/Watfaq/boring.git?branch=bazel#d58d4367cb21a44d301d295b9259f755eb302041" +source = "git+https://github.com/Watfaq/boring.git?rev=a5c2442#a5c2442e894777f35b196dd99bbacd828aadc92b" dependencies = [ "bitflags 1.3.2", "boring-sys", @@ -405,7 +405,7 @@ dependencies = [ [[package]] name = "boring-sys" version = "2.1.0" -source = "git+https://github.com/Watfaq/boring.git?branch=bazel#d58d4367cb21a44d301d295b9259f755eb302041" +source = "git+https://github.com/Watfaq/boring.git?rev=a5c2442#a5c2442e894777f35b196dd99bbacd828aadc92b" dependencies = [ "bindgen", "cmake", @@ -1365,7 +1365,7 @@ dependencies = [ [[package]] name = "hyper-boring" version = "2.1.2" -source = "git+https://github.com/Watfaq/boring.git?branch=bazel#d58d4367cb21a44d301d295b9259f755eb302041" +source = "git+https://github.com/Watfaq/boring.git?rev=a5c2442#a5c2442e894777f35b196dd99bbacd828aadc92b" dependencies = [ "antidote", "boring", @@ -2748,7 +2748,7 @@ dependencies = [ [[package]] name = "tokio-boring" version = "2.1.5" -source = "git+https://github.com/Watfaq/boring.git?branch=bazel#d58d4367cb21a44d301d295b9259f755eb302041" +source = "git+https://github.com/Watfaq/boring.git?rev=a5c2442#a5c2442e894777f35b196dd99bbacd828aadc92b" dependencies = [ "boring", "boring-sys", diff --git a/clash_lib/Cargo.toml b/clash_lib/Cargo.toml index e7e591a9..fcc427ea 100644 --- a/clash_lib/Cargo.toml +++ b/clash_lib/Cargo.toml @@ -33,10 +33,10 @@ foreign-types-shared = "0.3.1" network-interface = "1.0.0" base64 = "0.21" uuid = { version = "1.2.1", features = ["v4", "fast-rng", "macro-diagnostics"] } -boring = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } -boring-sys = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } -hyper-boring = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } -tokio-boring = { git = "https://github.com/Watfaq/boring.git", branch = "bazel" } +boring = { git = "https://github.com/Watfaq/boring.git", rev = "a5c2442" } +boring-sys = { git = "https://github.com/Watfaq/boring.git", rev = "a5c2442" } +hyper-boring = { git = "https://github.com/Watfaq/boring.git", rev = "a5c2442" } +tokio-boring = { git = "https://github.com/Watfaq/boring.git", rev = "a5c2442" } crc32fast = "1.3.2" brotli = "3.3.4" hmac = "0.12.1" diff --git a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs b/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs index 85fe1ae7..ba039f56 100644 --- a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs +++ b/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; -use futures::{future, TryFutureExt}; +use futures::TryFutureExt; use serde::{Deserialize, Serialize}; use serde_yaml::Value; use tokio::sync::Mutex; @@ -82,6 +82,7 @@ impl ProxySetProvider { OutboundProxyProtocol::Ss(s) => s.try_into(), OutboundProxyProtocol::Socks5(_) => todo!(), OutboundProxyProtocol::Trojan(_) => todo!(), + OutboundProxyProtocol::Vmess(_) => todo!(), }) .collect::, _>>(); Ok(proxies?) diff --git a/clash_lib/src/config/internal/proxy.rs b/clash_lib/src/config/internal/proxy.rs index 879a5925..ab7db032 100644 --- a/clash_lib/src/config/internal/proxy.rs +++ b/clash_lib/src/config/internal/proxy.rs @@ -50,6 +50,8 @@ pub enum OutboundProxyProtocol { Socks5(OutboundSocks5), #[serde(rename = "trojan")] Trojan(OutboundTrojan), + #[serde(rename = "vmess")] + Vmess(OutboundVmess), } impl OutboundProxyProtocol { @@ -60,6 +62,7 @@ impl OutboundProxyProtocol { OutboundProxyProtocol::Ss(ss) => &ss.name, OutboundProxyProtocol::Socks5(socks5) => &socks5.name, OutboundProxyProtocol::Trojan(trojan) => &trojan.name, + OutboundProxyProtocol::Vmess(vmess) => &vmess.name, } } } @@ -81,6 +84,7 @@ impl Display for OutboundProxyProtocol { OutboundProxyProtocol::Direct => write!(f, "{}", PROXY_DIRECT), OutboundProxyProtocol::Reject => write!(f, "{}", PROXY_REJECT), OutboundProxyProtocol::Trojan(_) => write!(f, "{}", "Trojan"), + OutboundProxyProtocol::Vmess(_) => write!(f, "{}", "Vmess"), } } } @@ -137,6 +141,22 @@ pub struct OutboundTrojan { pub ws_opts: Option, } +#[derive(serde::Serialize, serde::Deserialize, Debug, Default)] +pub struct OutboundVmess { + pub name: String, + pub server: String, + pub port: u16, + pub uuid: String, + pub alter_id: u16, + pub cipher: String, + pub udp: bool, + pub tls: bool, + pub skip_cert_verify: bool, + pub server_name: Option, + pub network: Option, + pub ws_opts: Option, +} + #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] #[serde(tag = "type")] pub enum OutboundGroupProtocol { diff --git a/clash_lib/src/proxy/mod.rs b/clash_lib/src/proxy/mod.rs index f93042a6..f4e7d421 100644 --- a/clash_lib/src/proxy/mod.rs +++ b/clash_lib/src/proxy/mod.rs @@ -21,7 +21,7 @@ pub mod shadowsocks; pub mod socks; //pub mod trojan; pub mod utils; -//pub mod vmess; +pub mod vmess; pub mod converters; diff --git a/clash_lib/src/proxy/shadowsocks/datagram.rs b/clash_lib/src/proxy/shadowsocks/datagram.rs index 10ee7960..d846fc3e 100644 --- a/clash_lib/src/proxy/shadowsocks/datagram.rs +++ b/clash_lib/src/proxy/shadowsocks/datagram.rs @@ -142,6 +142,7 @@ impl Sink for OutboundDatagramShadowsocks { Poll::Ready(Ok(())) } } + impl Stream for OutboundDatagramShadowsocks { type Item = UdpPacket; diff --git a/clash_lib/src/proxy/vmess/mod.rs b/clash_lib/src/proxy/vmess/mod.rs index 1f778abb..0918c0b1 100644 --- a/clash_lib/src/proxy/vmess/mod.rs +++ b/clash_lib/src/proxy/vmess/mod.rs @@ -6,7 +6,11 @@ use http::Uri; mod vmess_impl; -use crate::{app::ThreadSafeDNSResolver, session::Session}; +use crate::{ + app::ThreadSafeDNSResolver, + config::internal::proxy::{OutboundProxy, OutboundProxyProtocol}, + session::{Session, SocksAddr}, +}; use super::{ transport::{self, Http2Config}, @@ -77,13 +81,29 @@ impl OutboundHandler for Handler { &self.opts.name } + /// The protocol of the outbound handler + /// only contains Type information, do not rely on the underlying value + fn proto(&self) -> OutboundProxy { + OutboundProxy::ProxyServer(OutboundProxyProtocol::Vmess(Default::default())) + } + + /// The proxy remote address + async fn remote_addr(&self) -> Option { + Some(SocksAddr::Domain(self.opts.server.clone(), self.opts.port)) + } + + /// whether the outbound handler support UDP + async fn support_udp(&self) -> bool { + self.opts.udp + } + async fn connect_stream( &self, sess: &Session, resolver: ThreadSafeDNSResolver, ) -> io::Result { - let mut stream = new_tcp_stream( - resolver, + let stream = new_tcp_stream( + resolver.clone(), self.opts.server.as_str(), self.opts.port, self.opts.common_opts.iface.as_ref(), @@ -99,6 +119,18 @@ impl OutboundHandler for Handler { }) .await?; + self.proxy_stream(stream, sess, resolver).await + } + + /// wraps a stream with outbound handler + async fn proxy_stream( + &self, + s: AnyStream, + sess: &Session, + resolver: ThreadSafeDNSResolver, + ) -> io::Result { + let mut stream = s; + let underlying = match self.opts.transport { Some(VmessTransport::Ws(ref opt)) => { let uri = format!("ws://{}:{}{}", self.opts.server, self.opts.port, opt.path) @@ -169,7 +201,6 @@ impl OutboundHandler for Handler { vmess_builder.proxy_stream(underlying).await } - async fn connect_datagram( &self, sess: &Session, diff --git a/clash_lib/src/proxy/vmess/vmess_impl/datagram.rs b/clash_lib/src/proxy/vmess/vmess_impl/datagram.rs new file mode 100644 index 00000000..71323c32 --- /dev/null +++ b/clash_lib/src/proxy/vmess/vmess_impl/datagram.rs @@ -0,0 +1,162 @@ +use std::{io, pin::Pin, task::Poll}; + +use futures::{ready, Sink, Stream}; +use tracing::{debug, instrument}; + +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use crate::{ + proxy::{datagram::UdpPacket, AnyStream}, + session::SocksAddr, +}; + +pub struct OutboundDatagramVmess { + inner: AnyStream, + remote_addr: SocksAddr, + + flushed: bool, + pkt: Option, + buf: Vec, +} + +impl OutboundDatagramVmess { + pub fn new(inner: AnyStream, remote_addr: SocksAddr) -> Self { + Self { + inner, + remote_addr, + flushed: true, + pkt: None, + buf: vec![0u8; 65535], + } + } +} + +impl Sink for OutboundDatagramVmess { + type Error = std::io::Error; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if !self.flushed { + match self.poll_flush(cx)? { + Poll::Ready(()) => {} + Poll::Pending => return Poll::Pending, + } + } + + Poll::Ready(Ok(())) + } + + fn start_send(self: std::pin::Pin<&mut Self>, item: UdpPacket) -> Result<(), Self::Error> { + let pin = self.get_mut(); + pin.pkt = Some(item); + pin.flushed = false; + Ok(()) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.flushed { + return Poll::Ready(Ok(())); + } + + let Self { + ref mut inner, + ref mut pkt, + ref remote_addr, + ref mut flushed, + .. + } = *self; + + let mut inner = Pin::new(inner); + + let pkt_container = pkt; + + if let Some(pkt) = pkt_container.take() { + if &pkt.dst_addr != remote_addr { + debug!( + "udp packet dst_addr not match, pkt.dst_addr: {}, remote_addr: {}", + pkt.dst_addr, remote_addr + ); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "udp packet dst_addr not match", + ))); + } + let data = pkt.data; + let addr: shadowsocks::relay::Address = + (pkt.dst_addr.host(), pkt.dst_addr.port()).into(); + + let n = ready!(inner.as_mut().poll_write(cx, data.as_ref()))?; + + debug!( + "send udp packet to remote ss server, len: {}, remote_addr: {}, dst_addr: {}", + n, remote_addr, addr + ); + + let wrote_all = n == data.len(); + *pkt_container = None; + *flushed = true; + + let res = if wrote_all { + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "failed to write entire datagram", + )) + }; + Poll::Ready(res) + } else { + debug!("no udp packet to send"); + Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "no packet to send", + ))) + } + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + ready!(self.poll_flush(cx))?; + Poll::Ready(Ok(())) + } +} + +impl Stream for OutboundDatagramVmess { + type Item = UdpPacket; + + #[instrument(skip(self, cx))] + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let Self { + ref mut buf, + ref mut inner, + ref remote_addr, + .. + } = *self; + + let inner = Pin::new(inner); + + let mut buf = ReadBuf::new(buf); + + let rv = ready!(inner.poll_read(cx, &mut buf)); + debug!("recv udp packet from remote ss server: {:?}", rv); + + match rv { + Ok(()) => Poll::Ready(Some(UdpPacket { + data: buf.filled().to_vec(), + src_addr: remote_addr.clone(), + dst_addr: SocksAddr::any_ipv4(), + })), + Err(_) => Poll::Ready(None), + } + } +} diff --git a/clash_lib/src/proxy/vmess/vmess_impl/mod.rs b/clash_lib/src/proxy/vmess/vmess_impl/mod.rs index 4390557c..43cc411b 100644 --- a/clash_lib/src/proxy/vmess/vmess_impl/mod.rs +++ b/clash_lib/src/proxy/vmess/vmess_impl/mod.rs @@ -3,6 +3,7 @@ mod chunk; mod client; mod header; //pub mod http; +mod datagram; mod kdf; mod stream; mod tls; diff --git a/clash_lib/src/proxy/vmess/vmess_impl/stream.rs b/clash_lib/src/proxy/vmess/vmess_impl/stream.rs index 686560a0..05d2867c 100644 --- a/clash_lib/src/proxy/vmess/vmess_impl/stream.rs +++ b/clash_lib/src/proxy/vmess/vmess_impl/stream.rs @@ -1,4 +1,4 @@ -use std::{pin::Pin, task::Poll, time::SystemTime}; +use std::{fmt::Debug, pin::Pin, task::Poll, time::SystemTime}; use aes_gcm::Aes128Gcm; use bytes::{BufMut, BytesMut}; @@ -56,6 +56,16 @@ pub struct VmessStream { is_udp: bool, } +impl Debug for VmessStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VmessStream") + .field("dst", &self.dst) + .field("is_aead", &self.is_aead) + .field("is_udp", &self.is_udp) + .finish() + } +} + impl VmessStream where S: AsyncRead + AsyncWrite + Unpin,