Skip to content

Commit

Permalink
fixed provider initialize racy
Browse files Browse the repository at this point in the history
closed #67
  • Loading branch information
ibigbug committed Sep 10, 2023
1 parent 44cae47 commit 39ec292
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 36 deletions.
3 changes: 2 additions & 1 deletion clash/tests/data/config/rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ proxy-providers:
file-provider:
type: file
path: ./ss.yaml
interval: 5
health-check:
enable: true
url: http://www.gstatic.com/generate_204
interval: 300
interval: 5

rules:
- DOMAIN,ipinfo.io,relay
Expand Down
3 changes: 3 additions & 0 deletions clash_lib/src/app/outbound/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tracing::debug;
use tracing::info;

use crate::app::dns::ThreadSafeDNSResolver;
Expand Down Expand Up @@ -246,6 +247,7 @@ impl OutboundManager {

Ok(pd)
}

match outbound_group {
OutboundGroupProtocol::Relay(proto) => {
if proto.proxies.as_ref().map(|x| x.len()).unwrap_or_default()
Expand Down Expand Up @@ -481,6 +483,7 @@ impl OutboundManager {
.get(provider_name)
.expect(format!("provider {} not found", provider_name).as_str())
.clone();

providers.push(provider);
}
}
Expand Down
26 changes: 14 additions & 12 deletions clash_lib/src/app/proxy_manager/providers/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use std::{
};

use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use tokio::{
sync::{Mutex, RwLock},
time::Instant,
};
use tracing::{debug, info};
use tracing::{info, trace, warn};

use crate::common::utils;

Expand All @@ -29,13 +30,13 @@ pub struct Fetcher<U, P> {
ticker: Option<tokio::time::Interval>,
inner: std::sync::Arc<tokio::sync::RwLock<Inner>>,
parser: Arc<Mutex<P>>,
pub on_update: Arc<Mutex<Option<U>>>,
pub on_update: Option<Arc<Mutex<U>>>,
}

impl<T, U, P> Fetcher<U, P>
where
T: Send + Sync + 'static,
U: Fn(T) + Send + Sync + 'static,
U: Fn(T) -> BoxFuture<'static, ()> + Send + Sync + 'static,
P: Fn(&[u8]) -> anyhow::Result<T> + Send + Sync + 'static,
{
pub fn new(
Expand All @@ -62,7 +63,7 @@ where
hash: [0; 16],
})),
parser: Arc::new(Mutex::new(parser)),
on_update: Arc::new(Mutex::new(on_update)),
on_update: on_update.map(|f| Arc::new(Mutex::new(f))),
}
}
pub fn name(&self) -> &str {
Expand Down Expand Up @@ -206,21 +207,19 @@ where
match Fetcher::<U, P>::update_inner(inner, vehicle, parser).await {
Ok((elm, same)) => (elm, same),
Err(e) => {
tracing::error!("{} update failed: {}", &name, e);
warn!("{} update failed: {}", &name, e);
return;
}
};

if same {
tracing::info!("provider {} no update", &name);
trace!("fetcher {} no update", &name);
return;
}

tracing::info!("provider {} updated", &name);

let on_update = on_update.lock().await.take();
if let Some(on_update) = on_update {
on_update(elm)
info!("fetcher {} updated", &name);
on_update.lock().await(elm).await;
}
};

Expand All @@ -240,6 +239,7 @@ where
mod tests {
use std::{path::Path, sync::Arc, time::Duration};

use futures::future::BoxFuture;
use tokio::time::sleep;

use crate::app::proxy_manager::providers::{MockProviderVehicle, ProviderVehicleType};
Expand Down Expand Up @@ -272,8 +272,10 @@ mod tests {
Ok("parsed".to_owned())
};

let updater = move |input: String| -> () {
assert_eq!(input, "parsed".to_owned());
let updater = move |input: String| -> BoxFuture<'static, ()> {
Box::pin(async move {
assert_eq!(input, "parsed".to_owned());
})
};

let mut f = Fetcher::new(
Expand Down
40 changes: 24 additions & 16 deletions clash_lib/src/app/proxy_manager/providers/proxy_set_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use async_trait::async_trait;
use erased_serde::Serialize as ESerialize;
use futures::TryFutureExt;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use serde_yaml::Value;
use tracing::debug;
Expand Down Expand Up @@ -32,7 +32,7 @@ struct Inner {

pub struct ProxySetProvider {
fetcher: Fetcher<
Box<dyn Fn(Vec<AnyOutboundHandler>) + Send + Sync + 'static>,
Box<dyn Fn(Vec<AnyOutboundHandler>) -> BoxFuture<'static, ()> + Send + Sync + 'static>,
Box<dyn Fn(&[u8]) -> anyhow::Result<Vec<AnyOutboundHandler>> + Send + Sync + 'static>,
>,
inner: std::sync::Arc<tokio::sync::RwLock<Inner>>,
Expand All @@ -49,7 +49,7 @@ impl ProxySetProvider {

if hc.auto() {
let hc = hc.clone();
debug!("kicking off healthcheck for: {}", name);
debug!("kicking off healthcheck for: {}", &name);
tokio::spawn(async move {
hc.kick_off().await;
});
Expand All @@ -62,18 +62,26 @@ impl ProxySetProvider {

let inner_clone = inner.clone();

let updater: Box<dyn Fn(Vec<AnyOutboundHandler>) + Send + Sync + 'static> =
Box::new(move |input: Vec<AnyOutboundHandler>| -> () {
let n = name.clone();
let updater: Box<
dyn Fn(Vec<AnyOutboundHandler>) -> BoxFuture<'static, ()> + Send + Sync + 'static,
> = Box::new(
move |input: Vec<AnyOutboundHandler>| -> BoxFuture<'static, ()> {
let hc = hc.clone();
let inner = inner_clone.clone();
tokio::spawn(async move {
let n = n.clone();
let inner: Arc<tokio::sync::RwLock<Inner>> = inner_clone.clone();
Box::pin(async move {
let mut inner = inner.write().await;
debug!("updating {} proxies for: {}", n, input.len());
inner.proxies = input.clone();
hc.update(input).await;
// check once after update
hc.check().await;
});
});
tokio::spawn(async move {
hc.check().await;
});
})
},
);

let n = name.clone();
let parser: Box<
Expand Down Expand Up @@ -124,25 +132,25 @@ impl Provider for ProxySetProvider {
}

async fn initialize(&mut self) -> std::io::Result<()> {
let ele = self.fetcher.initial().map_err(map_io_error).await?;
let ele = self.fetcher.initial().await.map_err(map_io_error)?;
debug!("{} initialized with {} proxies", self.name(), ele.len());
if let Some(updater) = self.fetcher.on_update.clone().lock().await.as_ref() {
updater(ele);
if let Some(updater) = self.fetcher.on_update.as_ref() {
updater.lock().await(ele).await;
}
Ok(())
}

async fn update(&self) -> std::io::Result<()> {
let (ele, same) = self.fetcher.update().map_err(map_io_error).await?;
let (ele, same) = self.fetcher.update().await.map_err(map_io_error)?;
debug!(
"{} updated with {} proxies, same? {}",
self.name(),
ele.len(),
same
);
if !same {
if let Some(updater) = self.fetcher.on_update.clone().lock().await.as_ref() {
updater(ele);
if let Some(updater) = self.fetcher.on_update.as_ref() {
updater.lock().await(ele);
}
}
Ok(())
Expand Down
7 changes: 0 additions & 7 deletions clash_lib/src/proxy/selector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ impl Handler {
pub async fn new(opts: HandlerOptions, providers: Vec<ThreadSafeProxyProvider>) -> Self {
let provider = providers.first().unwrap();
let proxies = provider.read().await.proxies().await;
debug!(
"provider {} with {} proxies, {}",
provider.read().await.name(),
proxies.len(),
opts.name
);

let current = proxies.first().unwrap().name().to_owned();

Self {
Expand Down

0 comments on commit 39ec292

Please sign in to comment.