Skip to content

Commit

Permalink
relay working
Browse files Browse the repository at this point in the history
  • Loading branch information
ibigbug committed Jul 31, 2023
1 parent d7d4801 commit 8941cbc
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 40 deletions.
15 changes: 5 additions & 10 deletions clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::debug;

use crate::app::proxy_manager::healthcheck::HealthCheck;
use crate::app::proxy_manager::providers::file_vehicle;
use crate::app::proxy_manager::providers::http_vehicle::{self, Vehicle};
use crate::app::proxy_manager::providers::http_vehicle;
use crate::app::proxy_manager::providers::plain_provider::PlainProvider;
use crate::app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider;
use crate::app::proxy_manager::providers::proxy_set_provider::ProxySetProvider;
Expand All @@ -27,6 +27,8 @@ pub struct OutboundManager {
proxy_manager: Arc<Mutex<ProxyManager>>,
}

static DEFAULT_LATENCY_TEST_URL: &str = "http://www.gstatic.com/generate_204";

pub type ThreadSafeOutboundManager = Arc<RwLock<OutboundManager>>;

impl OutboundManager {
Expand Down Expand Up @@ -108,19 +110,12 @@ impl OutboundManager {
.clone()
})
.collect::<Vec<_>>();
let hc = HealthCheck::new(
proxies.clone(),
proto.url.clone(),
proto.interval,
true,
proxy_manager.clone(),
)
.map_err(|x| Error::InvalidConfig(format!("invalid hc config: {}", x)))?;

let provider = PlainProvider::new(
proto.name.clone(),
proxies,
hc,
proxy_manager.clone(),
DEFAULT_LATENCY_TEST_URL.to_owned(),
)
.map_err(|x| {
Error::InvalidConfig(format!("invalid provider config: {}", x))
Expand Down
17 changes: 10 additions & 7 deletions clash_lib/src/app/proxy_manager/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::sync::Arc;

use tokio::{
sync::{Mutex, RwLock},
time::Instant,
};
use tokio::{sync::Mutex, time::Instant};

use crate::proxy::AnyOutboundHandler;

use super::ProxyManager;

pub type ThreadSafeHealthCheck = Arc<RwLock<HealthCheck>>;
pub type ThreadSafeHealthCheck = Arc<Mutex<HealthCheck>>;

struct HealCheckInner {
last_check: Instant,
Expand Down Expand Up @@ -53,21 +50,23 @@ impl HealthCheck {
let lazy = self.lazy;
let proxies = self.proxies.clone();

let url = self.url.clone();
tokio::spawn(async move {
latency_manager.blocking_lock().check(&proxies).await;
latency_manager.blocking_lock().check(&proxies, &url).await;
});

let inner = self.inner.clone();
let proxies = self.proxies.clone();
let latency_manager = self.latency_manager.clone();
let url = self.url.clone();
let task_handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(interval));
loop {
tokio::select! {
_ = ticker.tick() => {
let now = tokio::time::Instant::now();
if !lazy || now.duration_since(inner.blocking_lock().last_check).as_secs() >= interval {
latency_manager.blocking_lock().check(&proxies).await;
latency_manager.lock().await.check(&proxies, &url).await;
inner.blocking_lock().last_check = now;
}
},
Expand Down Expand Up @@ -95,4 +94,8 @@ impl HealthCheck {
pub fn auto(&self) -> bool {
self.interval != 0
}

pub fn url(&self) -> &str {
&self.url
}
}
2 changes: 1 addition & 1 deletion clash_lib/src/app/proxy_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl ProxyManager {
Self::default()
}

pub async fn check(&mut self, _proxy: &Vec<AnyOutboundHandler>) {
pub async fn check(&mut self, _proxy: &Vec<AnyOutboundHandler>, url: &str) {
todo!("check latency for proxies")
}
}
18 changes: 9 additions & 9 deletions clash_lib/src/app/proxy_manager/providers/plain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,26 @@ use super::{proxy_provider::ProxyProvider, Provider, ProviderType, ProviderVehic
pub struct PlainProvider {
name: String,
proxies: Vec<AnyOutboundHandler>,
healthcheck: HealthCheck,
proxy_registry: Arc<Mutex<ProxyManager>>,
latency_test_url: String,
}

impl PlainProvider {
pub fn new(
name: String,
proxies: Vec<AnyOutboundHandler>,
mut healthcheck: HealthCheck,
proxy_registry: Arc<Mutex<ProxyManager>>,
latency_test_url: String,
) -> anyhow::Result<Self> {
if proxies.is_empty() {
return Err(Error::InvalidConfig(format!("{}: proxies is empty", name)).into());
}

if healthcheck.auto() {
healthcheck.kick_off();
}

Ok(Self {
name,
proxies,
healthcheck,
proxy_registry,
latency_test_url,
})
}
}
Expand Down Expand Up @@ -67,9 +63,13 @@ impl ProxyProvider for PlainProvider {
self.proxies.clone()
}
async fn touch(&mut self) {
self.healthcheck.touch().await;
todo!("PlainProvider::touch");
}
async fn healthcheck(&self) {
self.proxy_registry.lock().await.check(&self.proxies).await;
self.proxy_registry
.lock()
.await
.check(&self.proxies, &self.latency_test_url)
.await;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct ProxySetProvider {
Box<dyn Fn(Vec<AnyOutboundHandler>) + Send + Sync + 'static>,
Box<dyn Fn(&[u8]) -> anyhow::Result<Vec<AnyOutboundHandler>> + Send + Sync + 'static>,
>,
healthcheck: HealthCheck,
hc: HealthCheck,
inner: std::sync::Arc<tokio::sync::Mutex<FileProviderInner>>,
proxy_registry: Arc<Mutex<ProxyManager>>,
}
Expand Down Expand Up @@ -94,7 +94,7 @@ impl ProxySetProvider {
let fetcher = Fetcher::new(name, interval, vehicle, parser, Some(updater.into()));
Ok(Self {
fetcher,
healthcheck: hc,
hc,
inner,
proxy_registry,
})
Expand Down Expand Up @@ -147,14 +147,14 @@ impl ProxyProvider for ProxySetProvider {
}

async fn touch(&mut self) {
self.healthcheck.touch().await;
self.hc.touch().await;
}

async fn healthcheck(&self) {
self.proxy_registry
.lock()
.await
.check(&self.proxies().await)
.check(&self.proxies().await, self.hc.url())
.await;
}
}
Expand Down
7 changes: 0 additions & 7 deletions clash_lib/src/config/internal/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,6 @@ pub struct OutboundGroupRelay {
pub proxies: Option<Vec<String>>,
#[serde(rename = "use")]
pub use_provider: Option<Vec<String>>,

// hc
pub url: String,
#[serde(deserialize_with = "utils::deserialize_u64")]
pub interval: u64,
pub tolerance: Option<i32>,
pub lazy: Option<bool>,
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
Expand Down
6 changes: 4 additions & 2 deletions clash_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ pub fn start(opts: Options) -> Result<(), Error> {
.unwrap()
.block_on(async {
match start_async(opts).await {
Err(e) => tracing::error!("failed to start: {}", e),
Ok(_) => tracing::info!("main program finished"),
Err(e) => {
eprintln!("start error: {}", e);
}
Ok(_) => {}
}
});
Ok(())
Expand Down

0 comments on commit 8941cbc

Please sign in to comment.