Skip to content

Commit

Permalink
Merge pull request #68 from Watfaq/rp
Browse files Browse the repository at this point in the history
Rp
  • Loading branch information
ibigbug authored Sep 14, 2023
2 parents f81574a + 62251df commit 61127c0
Show file tree
Hide file tree
Showing 39 changed files with 757 additions and 173 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clash/tests/data/config/rule-set.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
payload:
- 'httpbin.yba.dev'
9 changes: 8 additions & 1 deletion clash/tests/data/config/rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,16 @@ proxy-providers:
url: http://www.gstatic.com/generate_204
interval: 300

rule-providers:
file-provider:
type: file
path: ./rule-set.yaml
interval: 300
behavior: domain

rules:
- DOMAIN,ipinfo.io,relay
- DOMAIN-KEYWORD,httpbin,h2-vmess
- RULE-SET,file-provider,h2-vmess
- GEOIP,CN,relay
- DOMAIN-SUFFIX,facebook.com,REJECT
- DOMAIN-KEYWORD,google,select
Expand Down
1 change: 1 addition & 0 deletions clash_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
boring-sys = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
hyper-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
tokio-boring = { git = "https://github.com/Watfaq/boring.git", rev = "24c006f" }
ip_network_table-deps-treebitmap = "0.5.0"

crc32fast = "1.3.2"
brotli = "3.3.4"
Expand Down
8 changes: 4 additions & 4 deletions clash_lib/src/app/api/handlers/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use axum::{
};
use serde::Deserialize;

use crate::app::{api::AppState, outbound::manager::ThreadSafeOutboundManager};
use crate::{
app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider,
proxy::AnyOutboundHandler,
use crate::app::{
api::AppState, outbound::manager::ThreadSafeOutboundManager,
remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider,
};
use crate::proxy::AnyOutboundHandler;
#[derive(Clone)]
struct ProviderState {
outbound_manager: ThreadSafeOutboundManager,
Expand Down
4 changes: 2 additions & 2 deletions clash_lib/src/app/dispatcher/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Dispatcher {
};

let mode = self.mode.lock().await;
info!("dispatching {} with mode {}", sess, mode);
debug!("dispatching {} with mode {}", sess, mode);
let (outbound_name, rule) = match *mode {
RunMode::Global => (PROXY_GLOBAL, None),
RunMode::Rule => self.router.match_route(&sess).await,
Expand All @@ -120,7 +120,7 @@ impl Dispatcher {

match handler.connect_stream(&sess, self.resolver.clone()).await {
Ok(rhs) => {
info!("remote connection established {}", sess);
debug!("remote connection established {}", sess);
let mut rhs = Box::new(
TrackedStream::new(rhs, self.manager.clone(), sess.clone(), rule).await,
);
Expand Down
2 changes: 1 addition & 1 deletion clash_lib/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ pub mod inbound;
pub mod logging;
pub mod outbound;
pub mod profile;
pub mod proxy_manager;
pub mod remote_content_manager;
pub mod router;
41 changes: 26 additions & 15 deletions clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tracing::debug;
use tracing::error;
use tracing::warn;

use tracing::info;

use crate::app::dns::ThreadSafeDNSResolver;
use crate::app::profile::ThreadSafeCacheFile;
use crate::app::proxy_manager::healthcheck::HealthCheck;
use crate::app::proxy_manager::providers::file_vehicle;
use crate::app::proxy_manager::providers::http_vehicle;
use crate::app::proxy_manager::providers::plain_provider::PlainProvider;
use crate::app::proxy_manager::providers::proxy_provider::ThreadSafeProxyProvider;
use crate::app::proxy_manager::providers::proxy_set_provider::ProxySetProvider;
use crate::app::proxy_manager::ProxyManager;

use crate::app::remote_content_manager::healthcheck::HealthCheck;
use crate::app::remote_content_manager::providers::file_vehicle;
use crate::app::remote_content_manager::providers::http_vehicle;
use crate::app::remote_content_manager::ProxyManager;

use crate::app::remote_content_manager::providers::proxy_provider::PlainProvider;
use crate::app::remote_content_manager::providers::proxy_provider::ProxySetProvider;
use crate::app::remote_content_manager::providers::proxy_provider::ThreadSafeProxyProvider;
use crate::config::internal::proxy::PROXY_GLOBAL;
use crate::config::internal::proxy::{OutboundProxyProvider, PROXY_DIRECT, PROXY_REJECT};
use crate::config::internal::proxy::{OutboundProxyProviderDef, PROXY_DIRECT, PROXY_REJECT};
use crate::proxy::fallback;
use crate::proxy::loadbalance;
use crate::proxy::selector;
Expand Down Expand Up @@ -52,7 +54,7 @@ impl OutboundManager {
pub async fn new(
outbounds: Vec<OutboundProxyProtocol>,
outbound_groups: Vec<OutboundGroupProtocol>,
proxy_providers: HashMap<String, OutboundProxyProvider>,
proxy_providers: HashMap<String, OutboundProxyProviderDef>,
proxy_names: Vec<String>,
dns_resolver: ThreadSafeDNSResolver,
cache_store: ThreadSafeCacheFile,
Expand Down Expand Up @@ -548,14 +550,14 @@ impl OutboundManager {
}

async fn load_proxy_providers(
proxy_providers: HashMap<String, OutboundProxyProvider>,
proxy_providers: HashMap<String, OutboundProxyProviderDef>,
proxy_manager: ProxyManager,
resolver: ThreadSafeDNSResolver,
provider_registry: &mut HashMap<String, ThreadSafeProxyProvider>,
) -> Result<(), Error> {
for (name, provider) in proxy_providers.into_iter() {
match provider {
OutboundProxyProvider::Http(http) => {
OutboundProxyProviderDef::Http(http) => {
let vehicle = http_vehicle::Vehicle::new(
http.url
.parse::<Uri>()
Expand All @@ -581,7 +583,7 @@ impl OutboundManager {

provider_registry.insert(name, Arc::new(RwLock::new(provider)));
}
OutboundProxyProvider::File(file) => {
OutboundProxyProviderDef::File(file) => {
let vehicle = file_vehicle::Vehicle::new(&file.path);
let hc = HealthCheck::new(
vec![],
Expand All @@ -607,7 +609,16 @@ impl OutboundManager {

for p in provider_registry.values() {
info!("initializing provider {}", p.read().await.name());
p.write().await.initialize().await?;
match p.write().await.initialize().await {
Ok(_) => {}
Err(err) => {
error!(
"failed to initialize proxy provider {}: {}",
p.read().await.name(),
err
);
}
}
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion clash_lib/src/app/proxy_manager/providers/rule_provider.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use boring::ssl::{SslConnector, SslMethod};

use chrono::{DateTime, Utc};
use futures::StreamExt;

use http::{Request, Version};
use hyper_boring::HttpsConnector;
use serde::Serialize;
Expand Down Expand Up @@ -246,11 +246,10 @@ impl ProxyManager {
mod tests {
use std::{net::Ipv4Addr, sync::Arc, time::Duration};

use anyhow::Chain;
use futures::TryFutureExt;

use crate::{
app::{dispatcher::ChainedStreamWrapper, dns::MockClashResolver},
app::{dispatcher::ChainedStreamWrapper, dns::MockClashResolver, remote_content_manager},
config::internal::proxy::PROXY_DIRECT,
proxy::mocks::MockDummyOutboundHandler,
};
Expand All @@ -262,7 +261,7 @@ mod tests {
.expect_resolve()
.returning(|_, _| Ok(Some(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))));

let manager = super::ProxyManager::new(Arc::new(mock_resolver));
let manager = remote_content_manager::ProxyManager::new(Arc::new(mock_resolver));

let mut mock_handler = MockDummyOutboundHandler::new();
mock_handler
Expand Down Expand Up @@ -317,7 +316,7 @@ mod tests {
.expect_resolve()
.returning(|_, _| Ok(Some(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))));

let manager = super::ProxyManager::new(Arc::new(mock_resolver));
let manager = remote_content_manager::ProxyManager::new(Arc::new(mock_resolver));

let mut mock_handler = MockDummyOutboundHandler::new();
mock_handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use std::{

use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use tokio::{
sync::{Mutex, RwLock},
time::Instant,
};
use serde::de;
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, info, trace, warn};

use crate::common::utils;
Expand All @@ -20,14 +18,15 @@ use super::{ProviderVehicleType, ThreadSafeProviderVehicle};
struct Inner {
updated_at: SystemTime,
hash: [u8; 16],

thread_handle: Option<tokio::task::JoinHandle<()>>,
}

pub struct Fetcher<U, P> {
name: String,
interval: Duration,
vehicle: ThreadSafeProviderVehicle,
thread_handle: Option<tokio::task::JoinHandle<()>>,
ticker: Option<tokio::time::Interval>,
ticker_interval: Duration,
inner: std::sync::Arc<tokio::sync::RwLock<Inner>>,
parser: Arc<Mutex<P>>,
pub on_update: Option<Arc<Mutex<U>>>,
Expand All @@ -50,17 +49,11 @@ where
name,
interval,
vehicle,
thread_handle: None,
ticker: match interval.as_secs() {
0 => None,
_ => Some(tokio::time::interval_at(
Instant::now() + interval,
interval,
)),
},
ticker_interval: interval,
inner: Arc::new(tokio::sync::RwLock::new(Inner {
updated_at: SystemTime::UNIX_EPOCH,
hash: [0; 16],
thread_handle: None,
})),
parser: Arc::new(Mutex::new(parser)),
on_update: on_update.map(|f| Arc::new(Mutex::new(f))),
Expand All @@ -78,7 +71,7 @@ where
self.inner.read().await.updated_at.into()
}

pub async fn initial(&mut self) -> anyhow::Result<T> {
pub async fn initial(&self) -> anyhow::Result<T> {
let mut is_local = false;
let mut immediately_update = false;

Expand All @@ -100,14 +93,16 @@ where
Err(_) => self.vehicle.read().await?,
};

let proxies = match (self.parser.lock().await)(&content) {
let parser_guard = self.parser.lock().await;

let proxies = match (parser_guard)(&content) {
Ok(proxies) => proxies,
Err(e) => {
if !is_local {
return Err(e);
}
let content = self.vehicle.read().await?;
(self.parser.lock().await)(&content)?
(parser_guard)(&content)?
}
};

Expand All @@ -127,8 +122,12 @@ where

drop(inner);

if let Some(ticker) = self.ticker.take() {
self.pull_loop(immediately_update, ticker);
if !self.ticker_interval.is_zero() {
self.pull_loop(
immediately_update,
tokio::time::interval(self.ticker_interval),
)
.await;
}

Ok(proxies)
Expand Down Expand Up @@ -180,21 +179,21 @@ where
Ok((proxies, false))
}

pub fn destroy(&mut self) {
if let Some(handle) = self.thread_handle.take() {
pub async fn destroy(&mut self) {
if let Some(handle) = self.inner.write().await.thread_handle.take() {
handle.abort();
}
}

fn pull_loop(&mut self, immediately_update: bool, mut ticker: tokio::time::Interval) {
async fn pull_loop(&self, immediately_update: bool, mut ticker: tokio::time::Interval) {
let inner = self.inner.clone();
let vehicle = self.vehicle.clone();
let parser = self.parser.clone();
let on_update = self.on_update.clone();
let name = self.name.clone();
let fire_immediately = immediately_update;

self.thread_handle = Some(tokio::spawn(async move {
let thread_handle = Some(tokio::spawn(async move {
debug!("fetcher {} started", &name);
loop {
let inner = inner.clone();
Expand Down Expand Up @@ -232,6 +231,8 @@ where
}
}
}));

self.inner.write().await.thread_handle = thread_handle;
}
}

Expand All @@ -242,7 +243,7 @@ mod tests {
use futures::future::BoxFuture;
use tokio::time::sleep;

use crate::app::proxy_manager::providers::{MockProviderVehicle, ProviderVehicleType};
use crate::app::remote_content_manager::providers::{MockProviderVehicle, ProviderVehicleType};

use super::Fetcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl ProviderVehicle for Vehicle {
}

fn typ(&self) -> ProviderVehicleType {
ProviderVehicleType::HTTP
ProviderVehicleType::Http
}
}

Expand Down
Loading

0 comments on commit 61127c0

Please sign in to comment.