diff --git a/clash/tests/data/config/rules.yaml b/clash/tests/data/config/rules.yaml index 6cd8f9ad..0cce5f67 100644 --- a/clash/tests/data/config/rules.yaml +++ b/clash/tests/data/config/rules.yaml @@ -147,10 +147,11 @@ proxy-providers: file-provider: type: file path: ./ss.yaml + interval: 5 health-check: enable: true url: http://www.gstatic.com/generate_204 - interval: 300 + interval: 5 rules: - DOMAIN,ipinfo.io,relay diff --git a/clash_lib/src/app/outbound/manager.rs b/clash_lib/src/app/outbound/manager.rs index 01f34bba..14c61745 100644 --- a/clash_lib/src/app/outbound/manager.rs +++ b/clash_lib/src/app/outbound/manager.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; +use tracing::debug; use tracing::info; use crate::app::dns::ThreadSafeDNSResolver; @@ -246,6 +247,7 @@ impl OutboundManager { Ok(pd) } + match outbound_group { OutboundGroupProtocol::Relay(proto) => { if proto.proxies.as_ref().map(|x| x.len()).unwrap_or_default() @@ -481,6 +483,7 @@ impl OutboundManager { .get(provider_name) .expect(format!("provider {} not found", provider_name).as_str()) .clone(); + providers.push(provider); } } diff --git a/clash_lib/src/app/proxy_manager/providers/fetcher.rs b/clash_lib/src/app/proxy_manager/providers/fetcher.rs index b369e735..e6d85901 100644 --- a/clash_lib/src/app/proxy_manager/providers/fetcher.rs +++ b/clash_lib/src/app/proxy_manager/providers/fetcher.rs @@ -6,11 +6,12 @@ use std::{ }; use chrono::{DateTime, Utc}; +use futures::future::BoxFuture; use tokio::{ sync::{Mutex, RwLock}, time::Instant, }; -use tracing::{debug, info}; +use tracing::{info, trace, warn}; use crate::common::utils; @@ -29,13 +30,13 @@ pub struct Fetcher { ticker: Option, inner: std::sync::Arc>, parser: Arc>, - pub on_update: Arc>>, + pub on_update: Option>>, } impl Fetcher where T: Send + Sync + 'static, - U: Fn(T) + Send + Sync + 'static, + U: Fn(T) -> BoxFuture<'static, ()> + Send + Sync + 'static, P: Fn(&[u8]) -> anyhow::Result + Send + Sync + 'static, { pub fn new( @@ -62,7 +63,7 @@ where hash: [0; 16], })), parser: Arc::new(Mutex::new(parser)), - on_update: Arc::new(Mutex::new(on_update)), + on_update: on_update.map(|f| Arc::new(Mutex::new(f))), } } pub fn name(&self) -> &str { @@ -206,21 +207,19 @@ where match Fetcher::::update_inner(inner, vehicle, parser).await { Ok((elm, same)) => (elm, same), Err(e) => { - tracing::error!("{} update failed: {}", &name, e); + warn!("{} update failed: {}", &name, e); return; } }; if same { - tracing::info!("provider {} no update", &name); + trace!("fetcher {} no update", &name); return; } - tracing::info!("provider {} updated", &name); - - let on_update = on_update.lock().await.take(); if let Some(on_update) = on_update { - on_update(elm) + info!("fetcher {} updated", &name); + on_update.lock().await(elm).await; } }; @@ -240,6 +239,7 @@ where mod tests { use std::{path::Path, sync::Arc, time::Duration}; + use futures::future::BoxFuture; use tokio::time::sleep; use crate::app::proxy_manager::providers::{MockProviderVehicle, ProviderVehicleType}; @@ -272,8 +272,10 @@ mod tests { Ok("parsed".to_owned()) }; - let updater = move |input: String| -> () { - assert_eq!(input, "parsed".to_owned()); + let updater = move |input: String| -> BoxFuture<'static, ()> { + Box::pin(async move { + assert_eq!(input, "parsed".to_owned()); + }) }; let mut f = Fetcher::new( diff --git a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs b/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs index a2cdfbb4..05550f7a 100644 --- a/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs +++ b/clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use erased_serde::Serialize as ESerialize; -use futures::TryFutureExt; +use futures::future::BoxFuture; use serde::{Deserialize, Serialize}; use serde_yaml::Value; use tracing::debug; @@ -32,7 +32,7 @@ struct Inner { pub struct ProxySetProvider { fetcher: Fetcher< - Box) + Send + Sync + 'static>, + Box) -> BoxFuture<'static, ()> + Send + Sync + 'static>, Box anyhow::Result> + Send + Sync + 'static>, >, inner: std::sync::Arc>, @@ -49,7 +49,7 @@ impl ProxySetProvider { if hc.auto() { let hc = hc.clone(); - debug!("kicking off healthcheck for: {}", name); + debug!("kicking off healthcheck for: {}", &name); tokio::spawn(async move { hc.kick_off().await; }); @@ -62,18 +62,26 @@ impl ProxySetProvider { let inner_clone = inner.clone(); - let updater: Box) + Send + Sync + 'static> = - Box::new(move |input: Vec| -> () { + let n = name.clone(); + let updater: Box< + dyn Fn(Vec) -> BoxFuture<'static, ()> + Send + Sync + 'static, + > = Box::new( + move |input: Vec| -> BoxFuture<'static, ()> { let hc = hc.clone(); - let inner = inner_clone.clone(); - tokio::spawn(async move { + let n = n.clone(); + let inner: Arc> = inner_clone.clone(); + Box::pin(async move { let mut inner = inner.write().await; + debug!("updating {} proxies for: {}", n, input.len()); inner.proxies = input.clone(); hc.update(input).await; // check once after update - hc.check().await; - }); - }); + tokio::spawn(async move { + hc.check().await; + }); + }) + }, + ); let n = name.clone(); let parser: Box< @@ -124,16 +132,16 @@ impl Provider for ProxySetProvider { } async fn initialize(&mut self) -> std::io::Result<()> { - let ele = self.fetcher.initial().map_err(map_io_error).await?; + let ele = self.fetcher.initial().await.map_err(map_io_error)?; debug!("{} initialized with {} proxies", self.name(), ele.len()); - if let Some(updater) = self.fetcher.on_update.clone().lock().await.as_ref() { - updater(ele); + if let Some(updater) = self.fetcher.on_update.as_ref() { + updater.lock().await(ele).await; } Ok(()) } async fn update(&self) -> std::io::Result<()> { - let (ele, same) = self.fetcher.update().map_err(map_io_error).await?; + let (ele, same) = self.fetcher.update().await.map_err(map_io_error)?; debug!( "{} updated with {} proxies, same? {}", self.name(), @@ -141,8 +149,8 @@ impl Provider for ProxySetProvider { same ); if !same { - if let Some(updater) = self.fetcher.on_update.clone().lock().await.as_ref() { - updater(ele); + if let Some(updater) = self.fetcher.on_update.as_ref() { + updater.lock().await(ele); } } Ok(()) diff --git a/clash_lib/src/proxy/selector/mod.rs b/clash_lib/src/proxy/selector/mod.rs index 75000891..6e98aba6 100644 --- a/clash_lib/src/proxy/selector/mod.rs +++ b/clash_lib/src/proxy/selector/mod.rs @@ -51,13 +51,6 @@ impl Handler { pub async fn new(opts: HandlerOptions, providers: Vec) -> Self { let provider = providers.first().unwrap(); let proxies = provider.read().await.proxies().await; - debug!( - "provider {} with {} proxies, {}", - provider.read().await.name(), - proxies.len(), - opts.name - ); - let current = proxies.first().unwrap().name().to_owned(); Self {