From 9f4c23176568e0aaaae7f62743e5dc77fe6486ea Mon Sep 17 00:00:00 2001 From: Aitthi Arsa Date: Sat, 9 Mar 2024 12:38:37 +0700 Subject: [PATCH 1/6] =?UTF-8?q?=E2=9C=A8=20feat:=20runtime=20config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 3 ++ Cargo.toml | 3 ++ config/Cargo.toml | 5 ++- config/src/lib.rs | 39 +------------------ config/src/models.rs | 53 ------------------------- config/src/proxy.rs | 6 ++- config/src/runtime.rs | 90 +++++++++++++++++++++++++++++++++++++++++++ proxy/src/lib.rs | 2 +- runtime/src/main.rs | 12 +++++- 9 files changed, 116 insertions(+), 97 deletions(-) delete mode 100644 config/src/models.rs create mode 100644 config/src/runtime.rs diff --git a/Cargo.lock b/Cargo.lock index c72c5e6..f8da7f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,9 +280,12 @@ name = "config" version = "0.1.0" dependencies = [ "ahash", + "anyhow", "fnv", + "lazy_static", "matchit", "notify", + "once_cell", "pingora", "serde", "serde_yaml 0.9.32", diff --git a/Cargo.toml b/Cargo.toml index f9b36b9..809e37a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,9 @@ http = "1" notify = "6.1" matchit = "0.7" ahash = "0.8" +lazy_static = "1.4" +once_cell = "1.19" + [profile.release] strip = true diff --git a/config/Cargo.toml b/config/Cargo.toml index 9ab94b4..190ba32 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -13,4 +13,7 @@ fnv = { workspace = true } notify = { workspace = true } matchit = { workspace = true } tracing = { workspace = true } -ahash = { workspace = true } \ No newline at end of file +ahash = { workspace = true } +lazy_static = { workspace = true } +once_cell = { workspace = true } +anyhow = { workspace = true } \ No newline at end of file diff --git a/config/src/lib.rs b/config/src/lib.rs index f25b2cc..3253b8b 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -1,39 +1,2 @@ -pub mod models; pub mod proxy; - -// use -use std::sync::Once; -use std::{fs::File, io::BufReader}; - -// This is a global variable that is initialized once -static INIT_CONFIG: Once = Once::new(); -static mut GLOBAL_CONFIG: *const models::AppConfig = std::ptr::null(); - -pub fn app_config() -> &'static models::AppConfig { - INIT_CONFIG.call_once(|| { - // FROM ENV - let conf_path = std::env::var("EASY_PROXY_CONF"); - let conf_path = match conf_path { - Ok(val) => val, - Err(_e) => { - let conf_path = std::env::current_dir().expect("Unable to get current dir"); - conf_path - .join(".config/easy_proxy.yaml") - .to_str() - .expect("Unable to convert path") - .to_string() - } - }; - - let open_conf = File::open(conf_path).expect("Unable to open file"); - let read_conf = BufReader::new(open_conf); - let conf: models::AppConfig = - serde_yaml::from_reader(read_conf).expect("Unable to read conf file"); - - unsafe { - GLOBAL_CONFIG = Box::into_raw(Box::new(conf)); - } - }); - - unsafe { &*GLOBAL_CONFIG } -} +pub mod runtime; diff --git a/config/src/models.rs b/config/src/models.rs deleted file mode 100644 index eec012e..0000000 --- a/config/src/models.rs +++ /dev/null @@ -1,53 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -pub struct AppConfig { - pub proxy: Proxy, - pub pingora: Pingora, - pub providers: Vec, -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -pub struct Proxy { - pub addr: String, -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -pub struct Pingora { - pub daemon: Option, - pub threads: Option, - pub work_stealing: Option, // default: true - pub error_log: Option, - pub pid_file: Option, // default: "/tmp/pingora.pid" - pub upgrade_sock: Option, // default: "/tmp/pingora_upgrade.sock" - pub user: Option, - pub group: Option, - pub ca_file: Option, -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -pub struct Provider { - pub name: String, - pub path: Option, - pub watch: Option, -} - -impl From<&Provider> for ProviderFiles { - fn from(p: &Provider) -> Self { - ProviderFiles { - name: p.name.clone(), - path: p - .path - .clone() - .unwrap_or_else(|| "/etc/easy-proxy/dynamic".to_string()), - watch: p.watch.unwrap_or(true), - } - } -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -pub struct ProviderFiles { - pub name: String, - pub path: String, - pub watch: bool, -} diff --git a/config/src/proxy.rs b/config/src/proxy.rs index 412eff5..00d4921 100644 --- a/config/src/proxy.rs +++ b/config/src/proxy.rs @@ -1,4 +1,3 @@ -use crate::models::ProviderFiles; use ahash::AHashMap; use notify::{self, Event, Watcher}; use pingora::{ @@ -22,6 +21,9 @@ use std::{ sync::{Arc, Once}, }; +// internal crate +use crate::runtime::ProviderFiles; + #[derive(Clone, Deserialize)] pub struct ProxyConfigFile { pub services: Option>, @@ -118,7 +120,7 @@ pub fn get_backends() -> Option<&'static ProxyConfig> { // read proxy config from file pub fn read_config() { - let app_config = super::app_config(); + let app_config = super::runtime::config(); let providers = &app_config.providers; for provider in providers { match provider.name.as_str() { diff --git a/config/src/runtime.rs b/config/src/runtime.rs new file mode 100644 index 0000000..66d10bf --- /dev/null +++ b/config/src/runtime.rs @@ -0,0 +1,90 @@ +use anyhow; +use lazy_static::lazy_static; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use std::{fs::File, io::BufReader}; + +// models +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeConfig { + pub proxy: Proxy, + pub pingora: Pingora, + pub providers: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Proxy { + pub addr: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Pingora { + pub daemon: Option, + pub threads: Option, + pub work_stealing: Option, // default: true + pub error_log: Option, + pub pid_file: Option, // default: "/tmp/pingora.pid" + pub upgrade_sock: Option, // default: "/tmp/pingora_upgrade.sock" + pub user: Option, + pub group: Option, + pub ca_file: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Provider { + pub name: String, + pub path: Option, + pub watch: Option, +} + +impl From<&Provider> for ProviderFiles { + fn from(p: &Provider) -> Self { + ProviderFiles { + name: p.name.clone(), + path: p + .path + .clone() + .unwrap_or_else(|| "/etc/easy-proxy/dynamic".to_string()), + watch: p.watch.unwrap_or(true), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProviderFiles { + pub name: String, + pub path: String, + pub watch: bool, +} + +// Initialize global configuration +lazy_static! { + static ref GLOBAL_CONFIG: OnceCell = OnceCell::new(); +} + +pub fn initialize() -> Result<(), anyhow::Error> { + let conf_path = std::env::var("EASY_PROXY_CONF"); + let conf_path = match conf_path { + Ok(val) => val, + Err(_e) => { + let conf_path = std::env::current_dir().expect("Unable to get current dir"); + conf_path + .join(".config/easy_proxy.yaml") + .to_str() + .expect("Unable to convert path") + .to_string() + } + }; + + let open_conf = File::open(conf_path).expect("Unable to open file"); + let read_conf = BufReader::new(open_conf); + let conf: RuntimeConfig = serde_yaml::from_reader(read_conf).expect("Unable to read conf file"); + GLOBAL_CONFIG + .set(conf) + .map_err(|_| anyhow::anyhow!("Unable to set global config")) +} + +pub fn config() -> &'static RuntimeConfig { + // SAFETY: This is safe because we are initializing the global config + GLOBAL_CONFIG.get().expect("Config not initialized") +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 6d4a120..290e45f 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -18,7 +18,7 @@ pub struct Proxy {} impl Proxy { pub fn new_proxy() -> Result { - let app_conf = &config::app_config(); + let app_conf = &config::runtime::config(); let proxy = Proxy {}; let mut opt = Opt::default(); if let Some(conf) = &app_conf.pingora.daemon { diff --git a/runtime/src/main.rs b/runtime/src/main.rs index ffd9958..822d793 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -9,12 +9,20 @@ fn main() { // initialize the logger tracing_subscriber::fmt::init(); // initialize the config - let _ = config::app_config(); + match config::runtime::initialize() { + Ok(_) => { + tracing::info!("✅ Config initialized"); + } + Err(e) => { + tracing::error!("❌ Error initializing config: {:?}", e); + std::process::exit(1); + } + } config::proxy::read_config(); // create a new proxy proxy::Proxy::new_proxy() - .map_err(|e| tracing::error!("Error starting proxy: {:?}", e)) + .map_err(|e| tracing::error!("❌ Error creating proxy: {:?}", e)) .unwrap() .run_forever(); } From 5b48bfb148cbd14ddc1bb76b1599351502722e94 Mon Sep 17 00:00:00 2001 From: Aitthi Arsa Date: Sat, 9 Mar 2024 12:48:11 +0700 Subject: [PATCH 2/6] =?UTF-8?q?=F0=9F=8E=A8=20update=20runtime=20config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/src/runtime.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/config/src/runtime.rs b/config/src/runtime.rs index 66d10bf..3e26e62 100644 --- a/config/src/runtime.rs +++ b/config/src/runtime.rs @@ -59,32 +59,31 @@ pub struct ProviderFiles { // Initialize global configuration lazy_static! { - static ref GLOBAL_CONFIG: OnceCell = OnceCell::new(); + static ref GLOBAL_RUNTIME_CONFIG: OnceCell = OnceCell::new(); } pub fn initialize() -> Result<(), anyhow::Error> { - let conf_path = std::env::var("EASY_PROXY_CONF"); - let conf_path = match conf_path { + let conf_path = match std::env::var("EASY_PROXY_CONF") { Ok(val) => val, Err(_e) => { - let conf_path = std::env::current_dir().expect("Unable to get current dir"); + let conf_path = std::env::current_dir().map_err(|e| anyhow::anyhow!(e))?; conf_path .join(".config/easy_proxy.yaml") .to_str() - .expect("Unable to convert path") + .unwrap_or_default() .to_string() } }; - let open_conf = File::open(conf_path).expect("Unable to open file"); + let open_conf = File::open(conf_path).map_err(|e| anyhow::anyhow!(e))?; let read_conf = BufReader::new(open_conf); - let conf: RuntimeConfig = serde_yaml::from_reader(read_conf).expect("Unable to read conf file"); - GLOBAL_CONFIG + let conf: RuntimeConfig = serde_yaml::from_reader(read_conf).map_err(|e| anyhow::anyhow!(e))?; + GLOBAL_RUNTIME_CONFIG .set(conf) .map_err(|_| anyhow::anyhow!("Unable to set global config")) } pub fn config() -> &'static RuntimeConfig { // SAFETY: This is safe because we are initializing the global config - GLOBAL_CONFIG.get().expect("Config not initialized") + GLOBAL_RUNTIME_CONFIG.get().expect("Config not initialized") } From bca167ae7c48b6180b1b48a45f82f601f7cbffb1 Mon Sep 17 00:00:00 2001 From: Aitthi Arsa Date: Sat, 9 Mar 2024 18:01:51 +0700 Subject: [PATCH 3/6] feat: file conf --- .config/easy_proxy.yaml | 2 +- Cargo.lock | 3 + Cargo.toml | 3 +- config/Cargo.toml | 4 +- config/src/proxy.rs | 350 --------------------- config/src/proxy/mod.rs | 203 +++++++++++++ config/src/proxy/provider_files.rs | 472 +++++++++++++++++++++++++++++ config/src/runtime.rs | 6 +- examples/proxy.yaml | 13 +- proxy/src/lib.rs | 10 +- proxy/src/modify.rs | 61 ++-- proxy/src/services.rs | 26 +- runtime/src/main.rs | 10 +- 13 files changed, 754 insertions(+), 409 deletions(-) delete mode 100644 config/src/proxy.rs create mode 100644 config/src/proxy/mod.rs create mode 100644 config/src/proxy/provider_files.rs diff --git a/.config/easy_proxy.yaml b/.config/easy_proxy.yaml index 3d695b3..c9b74b7 100644 --- a/.config/easy_proxy.yaml +++ b/.config/easy_proxy.yaml @@ -8,7 +8,7 @@ pingora: # https://github.com/cloudflare/pingora/blob/main/docs/user_guide/daemon.md daemon: true # https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md - threads: 4 + threads: 6 # work_stealing: true # error_log: /var/log/pingora/error.log # pid_file: /run/pingora.pid diff --git a/Cargo.lock b/Cargo.lock index f8da7f7..b883389 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,7 @@ dependencies = [ "cfg-if", "getrandom", "once_cell", + "serde", "version_check", "zerocopy", ] @@ -282,6 +283,7 @@ dependencies = [ "ahash", "anyhow", "fnv", + "http 1.0.0", "lazy_static", "matchit", "notify", @@ -289,6 +291,7 @@ dependencies = [ "pingora", "serde", "serde_yaml 0.9.32", + "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 809e37a..289154b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,10 @@ fnv = "1" http = "1" notify = "6.1" matchit = "0.7" -ahash = "0.8" +ahash = { version="0.8", features = ["serde"] } lazy_static = "1.4" once_cell = "1.19" +tokio = { version="1", features = ["rt-multi-thread"] } [profile.release] diff --git a/config/Cargo.toml b/config/Cargo.toml index 190ba32..7262d27 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -16,4 +16,6 @@ tracing = { workspace = true } ahash = { workspace = true } lazy_static = { workspace = true } once_cell = { workspace = true } -anyhow = { workspace = true } \ No newline at end of file +anyhow = { workspace = true } +tokio = { workspace = true } +http = { workspace = true } \ No newline at end of file diff --git a/config/src/proxy.rs b/config/src/proxy.rs deleted file mode 100644 index 00d4921..0000000 --- a/config/src/proxy.rs +++ /dev/null @@ -1,350 +0,0 @@ -use ahash::AHashMap; -use notify::{self, Event, Watcher}; -use pingora::{ - lb::{ - selection::{ - algorithms::{Random, RoundRobin}, - consistent::{KetamaHashing, OwnedNodeIterator}, - weighted::{Weighted, WeightedIterator}, - BackendSelection, - }, - Backend, - }, - protocols::l4::socket::SocketAddr, -}; -use serde::Deserialize; -use std::{ - collections::BTreeSet, - fs::File, - io::BufReader, - path::Path, - sync::{Arc, Once}, -}; - -// internal crate -use crate::runtime::ProviderFiles; - -#[derive(Clone, Deserialize)] -pub struct ProxyConfigFile { - pub services: Option>, - pub routes: Option>, - pub service_selector: Option, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct Service { - pub name: String, - pub algorithm: String, - pub endpoints: Vec, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct Endpoint { - pub ip: String, - pub port: u16, - pub weight: Option, -} - -#[derive(Clone, Deserialize)] -pub struct Route { - pub host: Option, - pub header: Option, - pub paths: Vec, - pub add_headers: Option>, - pub del_headers: Option>, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct Header { - pub name: String, - pub value: String, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct SvcPath { - #[serde(rename = "pathType")] - pub path_type: String, - pub path: String, - pub service: ServiceRef, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct ServiceRef { - pub rewrite: Option, - pub name: String, -} - -#[derive(Deserialize, Debug)] -pub enum BackendType { - #[serde(skip)] - RoundRobin(*mut WeightedIterator), - #[serde(skip)] - Weighted(*mut WeightedIterator), - #[serde(skip)] - Consistent(*mut OwnedNodeIterator), - #[serde(skip)] - Random(*mut WeightedIterator), -} - -// service_selector: -// header: x-easy-proxy-svc # from header key "x-easy-proxy-svc" - -pub struct ProxyConfig { - pub routes: AHashMap, - pub service_selector: ServiceSelector, -} - -#[derive(Clone, Deserialize)] -pub struct ServiceSelector { - pub header: String, -} - -pub struct ProxyRoute { - pub route: Route, - pub paths: matchit::Router, - pub services: AHashMap, -} - -static INIT_BACKENDS: Once = Once::new(); -static mut GLOBAL_BACKENDS: *mut ProxyConfig = std::ptr::null_mut(); - -pub fn get_backends() -> Option<&'static ProxyConfig> { - INIT_BACKENDS.call_once(|| { - read_config(); - }); - if unsafe { GLOBAL_BACKENDS.is_null() } { - return None; - } - unsafe { Some(&*GLOBAL_BACKENDS) } -} - -// read proxy config from file -pub fn read_config() { - let app_config = super::runtime::config(); - let providers = &app_config.providers; - for provider in providers { - match provider.name.as_str() { - "files" => provider_files(&provider.into()), - _ => { - // do nothing - panic!("unknown provider: {}", provider.name); - } - } - } -} - -pub fn provider_files(file: &ProviderFiles) { - let mut path = file.path.clone(); - if !path.starts_with('/') { - if let Ok(cwd_path) = std::env::current_dir() { - let cwd_path = cwd_path.clone(); - let cwd_path = cwd_path.to_str().unwrap_or_default(); - path = format!("{}/{}", cwd_path, path); - } - } - read_file(path.clone()); - if file.watch { - std::thread::spawn(move || { - let path_ = path.clone(); - let mut watcher = notify::recommended_watcher(move |res: Result| { - // println!("watch event: {:?}", res); - match res { - Ok(e) => { - // println!("watch event: {:?}", e); - let kind = e.kind; - // println!("kind: {:?}", kind.is_modify()); - if !kind.is_modify() && !kind.is_create() { - return; - } - for path in e.paths { - // println!("config changed: {:?}", path); - tracing::info!("config changed: {:?}", path); - } - read_file(path_.clone()); - } - Err(e) => { - // println!("watch error: {:?}", e); - tracing::error!("watch error: {:?}", e); - } - } - }) - .expect("failed to create watcher"); - - watcher - .watch(Path::new(&path), notify::RecursiveMode::Recursive) - .expect("failed to watch path"); - // println!("watching: {}", path); - tracing::info!("watching: {}", path); - loop { - std::thread::sleep(std::time::Duration::from_secs(15)); - } - }); - } -} - -pub fn read_file(path: String) { - let mut proxy_config: Vec = vec![]; - let files = std::fs::read_dir(path).expect("Unable to read dir"); - for file in files { - let Ok(file) = file else { - continue; - }; - let file = file.path(); - let Some(path) = file.to_str() else { - continue; - }; - // println!("file: {}", path); - let Ok(open_conf) = File::open(path) else { - continue; - }; - let read_conf = BufReader::new(open_conf); - let conf = serde_yaml::from_reader(read_conf); - let conf: ProxyConfigFile = match conf { - Ok(val) => val, - Err(e) => { - // println!("Unable to read conf file: {:?}", e); - tracing::error!("Unable to read conf file: {:?}", e); - continue; - } - }; - proxy_config.push(conf); - } - let mut proxy_routes = AHashMap::new(); - let mut service_selector = ServiceSelector { - header: "x-easy-proxy-svc".to_string(), - }; - for conf in proxy_config { - if let Some(selector) = conf.service_selector { - service_selector = selector; - } - if let Some(routes) = conf.routes { - for route in routes { - let mut paths = matchit::Router::new(); - // println!("route.paths {:#?}", route.paths); - for path in route.paths.clone() { - if path.path_type == "Prefix" { - let match_path = format!("{}/:path", path.path); - match paths.insert(match_path.clone(), path.clone()) { - Ok(_) => {} - Err(e) => { - // println!("Unable to insert path: {:?}", e); - tracing::error!("Unable to insert path: {:?}", e); - } - } - if !route - .paths - .iter() - .any(|p| p.path_type == "Exact" && p.path == path.path) - { - match paths.insert(path.path.clone(), path.clone()) { - Ok(_) => {} - Err(e) => { - // println!("Unable to insert path: {:?}", e); - tracing::error!("Unable to insert path: {:?}", e); - } - } - } - } else { - match paths.insert(path.path.clone(), path) { - Ok(_) => {} - Err(e) => { - // println!("Unable to insert path: {:?}", e); - tracing::error!("Unable to insert path: {:?}", e); - } - } - } - } - let mut p_services: AHashMap = AHashMap::new(); - if let Some(services) = conf.services.clone() { - for service in services { - let mut backends: BTreeSet = BTreeSet::new(); - for e in service.endpoints { - let addr: SocketAddr = match format!("{}:{}", e.ip, e.port).parse() { - Ok(val) => val, - Err(e) => { - // println!("Unable to parse address: {:?}", e); - tracing::error!("Unable to parse address: {:?}", e); - continue; - } - }; - backends.insert(Backend { - addr, - weight: e.weight.unwrap_or(1) as usize, - }); - } - - match service.algorithm.as_str() { - "round_robin" => { - let hash: Arc> = - Arc::new(Weighted::build(&backends)); - p_services.insert( - service.name.clone(), - BackendType::RoundRobin(Box::into_raw(Box::new( - hash.iter(service.name.as_bytes()), - ))), - ); - } - "weighted" => { - let hash: Arc = Arc::new(Weighted::build(&backends)); - p_services.insert( - service.name.clone(), - BackendType::Weighted(Box::into_raw(Box::new( - hash.iter(service.name.as_bytes()), - ))), - ); - } - "consistent" => { - let hash = Arc::new(KetamaHashing::build(&backends)); - p_services.insert( - service.name.clone(), - BackendType::Consistent(Box::into_raw(Box::new( - hash.iter(service.name.as_bytes()), - ))), - ); - } - "random" => { - let hash: Arc> = - Arc::new(Weighted::build(&backends)); - p_services.insert( - service.name.clone(), - BackendType::Random(Box::into_raw(Box::new( - hash.iter(service.name.as_bytes()), - ))), - ); - } - _ => continue, - } - } - } - - let match_key = match route.host.clone() { - Some(val) => val, - None => match route.header.clone() { - Some(val) => val, - None => { - // println!("No match key found"); - tracing::error!("No match key found"); - continue; - } - }, - }; - // println!("match_key {}", match_key); - proxy_routes.insert( - match_key, - ProxyRoute { - route, - paths, - services: p_services, - }, - ); - } - } - } - - unsafe { - GLOBAL_BACKENDS = Box::into_raw(Box::new(ProxyConfig { - routes: proxy_routes, - service_selector, - })); - } -} diff --git a/config/src/proxy/mod.rs b/config/src/proxy/mod.rs new file mode 100644 index 0000000..7ae9482 --- /dev/null +++ b/config/src/proxy/mod.rs @@ -0,0 +1,203 @@ +// mods +pub mod provider_files; + +use ahash::AHashMap; +use pingora::lb::{ + selection::{ + algorithms::{Random, RoundRobin}, + consistent::OwnedNodeIterator, + weighted::{Weighted, WeightedIterator}, + BackendSelection, + }, + Backend, +}; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeSet, sync::Arc}; + +// internal crate +use crate::runtime; + +// model +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ProxyConfig { + pub routes: AHashMap, + pub service_selector: ServiceSelector, +} + +impl Default for ProxyConfig { + fn default() -> Self { + ProxyConfig { + routes: AHashMap::new(), + service_selector: ServiceSelector { + header: "x-easy-proxy-svc".to_string(), + }, + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Route { + #[serde(skip)] + pub paths: MatchitRouterWrapper, + pub host: Option, + pub header: Option, + pub add_headers: Option>, + pub del_headers: Option>, +} + +#[derive(Clone, Default)] +pub struct MatchitRouterWrapper(pub matchit::Router); +impl MatchitRouterWrapper { + pub fn new() -> Self { + MatchitRouterWrapper(matchit::Router::new()) + } + pub fn insert( + &mut self, + path: String, + service: ServicePath, + ) -> Result<(), matchit::InsertError> { + self.0.insert(path, service) + } +} +impl std::fmt::Debug for MatchitRouterWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "matchit::Router") + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Header { + pub name: String, + pub value: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ServicePath { + #[serde(rename = "pathType")] + pub path_type: PathType, + pub path: String, + pub service: ServiceRef, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub enum PathType { + Prefix, + Exact, +} + +impl PathType { + pub fn as_str(&self) -> &str { + match self { + PathType::Prefix => "Prefix", + PathType::Exact => "Exact", + } + } +} + +impl From<&str> for PathType { + fn from(s: &str) -> Self { + match s { + "Prefix" => PathType::Prefix, + "Exact" => PathType::Exact, + _ => PathType::Exact, + } + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ServiceSelector { + pub header: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ServiceRef { + pub rewrite: Option, + pub name: String, + #[serde(skip)] + pub backend: BackendType, +} + +#[derive(Clone)] +pub enum BackendType { + RoundRobin(*mut WeightedIterator), + Weighted(*mut WeightedIterator), + Consistent(*mut OwnedNodeIterator), + Random(*mut WeightedIterator), +} +impl Default for BackendType { + fn default() -> Self { + // SAFETY: This is safe because we are creating a default backend + let default = Backend::new("1.1.1.1:80").expect("Unable to create backend"); + let backends = BTreeSet::from_iter([default.clone()]); + let b: Arc> = Arc::new(Weighted::build(&backends)); + BackendType::RoundRobin(Box::into_raw(Box::new(b.iter("default".as_bytes())))) + } +} + +impl std::fmt::Debug for BackendType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "matchit::Router") + } +} + +// Initialize global configuration +static mut GLOBAL_PROXY_CONFIG: *mut ProxyConfig = std::ptr::null_mut(); + +pub fn initialize() -> Result<(), anyhow::Error> { + let proxy_config: ProxyConfig = ProxyConfig { + routes: AHashMap::new(), + service_selector: ServiceSelector { + header: "x-easy-proxy-svc".to_string(), + }, + }; + unsafe { + GLOBAL_PROXY_CONFIG = Box::into_raw(Box::new(proxy_config)); + } + let runtime_conf = runtime::config(); + let providers = &runtime_conf.providers; + for provider in providers { + match provider.name.as_str() { + "files" => provider_files::initialize(provider)?, + _ => { + // do nothing + tracing::warn!("Provider {} is not supported", provider.name); + } + } + } + Ok(()) +} + +pub fn proxy_config() -> Option<&'static ProxyConfig> { + unsafe { GLOBAL_PROXY_CONFIG.as_ref() } +} + +pub fn reload() -> Result<(), anyhow::Error> { + if proxy_config().is_none() { + return Err(anyhow::anyhow!("Proxy config is not initialized")); + } + let runtime_conf = runtime::config(); + let providers = &runtime_conf.providers; + let mut proxy_config: Vec = vec![]; + for provider in providers { + match provider.name.as_str() { + "files" => { + let files_conf = provider_files::get_config(provider)?; + for c in files_conf { + proxy_config.push(c.into()); + } + } + _ => { + // do nothing + tracing::warn!("Provider {} is not supported", provider.name); + } + } + } + let mut new_proxy_config = ProxyConfig::default(); + for c in proxy_config { + new_proxy_config.routes.extend(c.routes); + } + unsafe { + GLOBAL_PROXY_CONFIG = Box::into_raw(Box::new(new_proxy_config)); + } + Ok(()) +} diff --git a/config/src/proxy/provider_files.rs b/config/src/proxy/provider_files.rs new file mode 100644 index 0000000..4a5ab56 --- /dev/null +++ b/config/src/proxy/provider_files.rs @@ -0,0 +1,472 @@ +use crate::proxy::{BackendType, MatchitRouterWrapper}; +use ahash::AHashMap; +use notify::Watcher; +use pingora::{ + lb::{ + health_check::{HealthCheck, HttpHealthCheck}, + selection::{ + algorithms::{Random, RoundRobin}, + consistent::KetamaHashing, + weighted::Weighted, + BackendSelection, + }, + Backend, + }, + protocols::l4::socket::SocketAddr, +}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{BTreeSet, HashMap}, + fs::File, + io::BufReader, + path::Path, + sync::Arc, +}; +use tracing; + +// Internal crate imports +use super::{super::runtime::Provider, reload, Header, PathType, ProxyConfig, ServiceSelector}; + +// models +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ReadConfigFile { + pub services: Option>, + pub routes: Option>, + pub service_selector: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ConfigFile { + pub file: String, + pub services: Option>, + pub routes: Option>, + pub service_selector: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Service { + pub name: String, + pub algorithm: String, + pub endpoints: Vec, + pub health_check: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SvcHealthCheck { + pub path: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Endpoint { + pub ip: String, + pub port: u16, + pub weight: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ServicePath { + #[serde(rename = "pathType")] + pub path_type: PathType, + pub path: String, + pub service: ServiceRef, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ServiceRef { + pub rewrite: Option, + pub name: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Route { + pub host: Option, + pub header: Option, + pub paths: Vec, + pub add_headers: Option>, + pub del_headers: Option>, +} + +pub fn initialize(provider: &Provider) -> Result<(), anyhow::Error> { + let path = read_path(provider); + let proxy_config = match read_config(&path) { + Ok(conf) => conf, + Err(e) => { + tracing::error!("Unable to read config: {:?}", e); + vec![] + } + }; + if !proxy_config.is_empty() { + validator(&proxy_config)?; + } + match reload() { + Ok(_) => {} + Err(e) => { + tracing::error!("Unable to reload: {:?}", e); + } + } + let watch = provider.watch.unwrap_or(false); + if watch { + std::thread::spawn(move || { + let path_watcher = path.clone(); + let mut watcher = + notify::recommended_watcher(move |res: Result| match res { + Ok(e) => { + let kind = e.kind; + if !kind.is_modify() && !kind.is_create() { + return; + } + for path in e.paths { + tracing::info!("config changed: {:?}", path); + } + // let _ = read_config(&path_watcher).is_ok(); + match read_config(&path_watcher) { + Ok(_) => match reload() { + Ok(_) => {} + Err(e) => { + tracing::error!("Unable to reload: {:?}", e); + } + }, + Err(e) => { + tracing::error!("Unable to read config: {:?}", e); + } + } + } + Err(e) => { + tracing::error!("watch error: {:?}", e); + } + }) + .expect("failed to create watcher"); + watcher + .watch(Path::new(&path), notify::RecursiveMode::Recursive) + .expect("failed to watch path"); + // println!("watching: {}", path); + tracing::info!("watching: {}", path); + loop { + std::thread::sleep(std::time::Duration::from_secs(15)); + } + }); + } + Ok(()) +} + +pub fn get_config(provider: &Provider) -> Result, anyhow::Error> { + let path = read_path(provider); + let proxy_config = match read_config(&path) { + Ok(conf) => conf, + Err(e) => { + tracing::error!("Unable to read config: {:?}", e); + vec![] + } + }; + Ok(proxy_config) +} + +fn read_path(provider: &Provider) -> String { + let mut path = provider + .path + .clone() + .unwrap_or_else(|| "/etc/easy-proxy/dynamic".to_string()); + if !path.starts_with('/') { + if let Ok(cwd_path) = std::env::current_dir() { + let cwd_path = cwd_path.clone(); + let cwd_path = cwd_path.to_str().unwrap_or_default(); + path = format!("{}/{}", cwd_path, path); + } + } + path +} + +fn read_config(path: &str) -> Result, anyhow::Error> { + let mut proxy_config: Vec = vec![]; + let files = std::fs::read_dir(path).map_err(|e| anyhow::anyhow!(e))?; + for file in files { + let Ok(file) = file else { + continue; + }; + let file = file.path(); + let Some(path) = file.to_str() else { + continue; + }; + let Ok(open_conf) = File::open(path) else { + continue; + }; + let read_conf = BufReader::new(open_conf); + let conf = serde_yaml::from_reader(read_conf); + let conf: ReadConfigFile = match conf { + Ok(val) => val, + Err(e) => { + // println!("Unable to read conf file: {:?}", e); + tracing::error!("Unable to read conf file: {:?}", e); + continue; + } + }; + let conf = ConfigFile { + file: path.to_string(), + services: conf.services, + routes: conf.routes, + service_selector: conf.service_selector, + }; + proxy_config.push(conf); + } + Ok(proxy_config) +} + +// validator +pub fn validator(proxy_config: &Vec) -> Result<(), anyhow::Error> { + // println!("validator: {:#?}", proxy_config); + // todo!("validator") + let mut check_route: HashMap = HashMap::new(); + for conf in proxy_config { + println!("validator file: {}", conf.file); + if let Some(services) = &conf.services { + for service in services { + if service.endpoints.is_empty() { + return Err(anyhow::anyhow!("endpoints is empty")); + } + // round_robin, random, consistent, weighted + let algorithm = ["round_robin", "random", "consistent", "weighted"]; + if !algorithm.contains(&service.algorithm.as_str()) { + return Err(anyhow::anyhow!( + "algorithm should be one of {:?}", + algorithm + )); + } + // name + if service.name.is_empty() { + return Err(anyhow::anyhow!("name is empty")); + } + // endpoints + for endpoint in &service.endpoints { + if endpoint.ip.is_empty() { + return Err(anyhow::anyhow!("ip is empty")); + } + if endpoint.port == 0 { + return Err(anyhow::anyhow!("port is empty")); + } + // test connection + if let Some(health_check) = service.health_check.clone() { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let health_check = health_check.clone(); + let path = http::Uri::from_maybe_shared(health_check.path.clone()) + .map_err(|e| anyhow::anyhow!(e))?; + let mut http_check = HttpHealthCheck::new("localhost", false); + http_check.req.set_uri(path); + let backend = + Backend::new(format!("{}:{}", endpoint.ip, endpoint.port).as_str()) + .map_err(|e| anyhow::anyhow!(e))?; + match http_check.check(&backend).await { + Ok(_) => {} + Err(e) => { + // tracing::error!("{}:{} is unhealthy: {:?}", endpoint.ip, endpoint.port, e); + return Err(anyhow::anyhow!( + "{}:{} is unhealthy: {:?}", + endpoint.ip, + endpoint.port, + e + )); + } + } + Ok(()) + })?; + } + } + } + } + if let Some(routes) = &conf.routes { + for route in routes { + let mut key = String::new(); + if let Some(host) = route.host.clone() { + key = host; + } + if let Some(header) = route.header.clone() { + key = header; + } + if key.is_empty() { + return Err(anyhow::anyhow!("host or header is empty")); + } + if check_route.get(&key).is_some() { + return Err(anyhow::anyhow!("route {} already exists", key)); + } + check_route.insert(key, true); + if route.paths.is_empty() { + return Err(anyhow::anyhow!("paths is empty")); + } + for path in &route.paths { + if path.path.is_empty() { + return Err(anyhow::anyhow!("path is empty")); + } + if path.service.name.is_empty() { + return Err(anyhow::anyhow!("service name is empty")); + } + } + } + } + if let Some(service_selector) = &conf.service_selector { + if service_selector.header.is_empty() { + return Err(anyhow::anyhow!("selector is empty")); + } + } + } + Ok(()) +} + +impl From for ProxyConfig { + fn from(config: ConfigFile) -> Self { + let mut proxy_config = ProxyConfig { + routes: AHashMap::new(), + service_selector: match config.service_selector.clone() { + Some(selector) => selector, + None => ServiceSelector { + header: "x-easy-proxy".to_string(), + }, + }, + }; + + match validator(&vec![config.clone()]) { + Ok(_) => {} + Err(e) => { + tracing::error!("config validation failed: {:?}", e); + return proxy_config; + } + } + + let mut p_services: AHashMap = AHashMap::new(); + if let Some(services) = config.services.clone() { + for service in services { + let mut backends: BTreeSet = BTreeSet::new(); + for e in service.endpoints { + let addr: SocketAddr = match format!("{}:{}", e.ip, e.port).parse() { + Ok(val) => val, + Err(e) => { + // println!("Unable to parse address: {:?}", e); + tracing::error!("Unable to parse address: {:?}", e); + continue; + } + }; + backends.insert(Backend { + addr, + weight: e.weight.unwrap_or(1) as usize, + }); + } + + match service.algorithm.as_str() { + "round_robin" => { + let hash: Arc> = Arc::new(Weighted::build(&backends)); + p_services.insert( + service.name.clone(), + BackendType::RoundRobin(Box::into_raw(Box::new( + hash.iter(service.name.as_bytes()), + ))), + ); + } + "weighted" => { + let hash: Arc = Arc::new(Weighted::build(&backends)); + p_services.insert( + service.name.clone(), + BackendType::Weighted(Box::into_raw(Box::new( + hash.iter(service.name.as_bytes()), + ))), + ); + } + "consistent" => { + let hash = Arc::new(KetamaHashing::build(&backends)); + p_services.insert( + service.name.clone(), + BackendType::Consistent(Box::into_raw(Box::new( + hash.iter(service.name.as_bytes()), + ))), + ); + } + "random" => { + let hash: Arc> = Arc::new(Weighted::build(&backends)); + p_services.insert( + service.name.clone(), + BackendType::Random(Box::into_raw(Box::new( + hash.iter(service.name.as_bytes()), + ))), + ); + } + _ => continue, + } + } + } + + if let Some(routes) = config.routes { + for route in routes { + let key = match route.host.clone() { + Some(host) => host, + None => route.header.clone().unwrap_or_default(), + }; + if proxy_config.routes.get(&key).is_some() { + tracing::error!("route {} already exists", key); + continue; + } + let mut paths = MatchitRouterWrapper::new(); + for path in route.paths.clone() { + let backend = match p_services.get(&path.service.name) { + Some(val) => val, + None => continue, + }; + let service_path = super::ServicePath { + path_type: path.path_type.clone(), + path: path.path.clone(), + service: super::ServiceRef { + rewrite: path.service.rewrite, + name: path.service.name, + backend: backend.clone(), + }, + }; + // paths.insert(, service) + match path.path_type { + PathType::Prefix => { + let match_path = format!("{}/:path", path.path); + match paths.insert(match_path, service_path.clone()) { + Ok(_) => {} + Err(e) => { + // println!("Unable to insert path: {:?}", e); + tracing::error!("Unable to insert path: {:?}", e); + } + } + if !route + .paths + .iter() + .any(|p| p.path_type.as_str() == "Exact" && p.path == path.path) + { + match paths.insert(path.path.clone(), service_path) { + Ok(_) => {} + Err(e) => { + // println!("Unable to insert path: {:?}", e); + tracing::error!("Unable to insert path: {:?}", e); + } + } + } + } + PathType::Exact => { + match paths.insert(path.path.clone(), service_path) { + Ok(_) => {} + Err(e) => { + // println!("Unable to insert path: {:?}", e); + tracing::error!("Unable to insert path: {:?}", e); + } + } + } + } + } + + let route = crate::proxy::Route { + paths, + host: route.host, + header: route.header, + add_headers: route.add_headers, + del_headers: route.del_headers, + }; + proxy_config.routes.insert(key, route); + } + } + // println!("proxy_config: {:#?}", proxy_config); + // todo!("Implement From for ProxyConfig") + proxy_config + } +} diff --git a/config/src/runtime.rs b/config/src/runtime.rs index 3e26e62..93b7897 100644 --- a/config/src/runtime.rs +++ b/config/src/runtime.rs @@ -37,9 +37,9 @@ pub struct Provider { pub watch: Option, } -impl From<&Provider> for ProviderFiles { +impl From<&Provider> for ProviderFile { fn from(p: &Provider) -> Self { - ProviderFiles { + ProviderFile { name: p.name.clone(), path: p .path @@ -51,7 +51,7 @@ impl From<&Provider> for ProviderFiles { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ProviderFiles { +pub struct ProviderFile { pub name: String, pub path: String, pub watch: bool, diff --git a/examples/proxy.yaml b/examples/proxy.yaml index 271a4ed..adc74eb 100644 --- a/examples/proxy.yaml +++ b/examples/proxy.yaml @@ -6,17 +6,20 @@ service_selector: services: - name: backend_service algorithm: round_robin # round_robin, random, consistent, weighted + health_check: + path: /health endpoints: - - ip: 172.20.0.1 + - ip: 127.0.0.1 port: 3002 weight: 1 # Optional - - ip: 172.20.0.1 - port: 3003 - weight: 1 # Optional + # - ip: 172.20.0.1 + # port: 3003 + # weight: 1 # Optional # A list of routes to be proxied routes: - - host: mydomain.com + # - host: mydomain.com + - host: localhost:8088 del_headers: - accept add_headers: diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 290e45f..342c71f 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -122,9 +122,15 @@ impl ProxyHttp for Proxy { } }; // modify the request headers - modify::headers(session, &services.routes.route); + modify::headers( + session, + services.route.add_headers.clone().unwrap_or_default(), + services.route.del_headers.clone().unwrap_or_default(), + ); // rewrite the request - modify::rewrite(session, services.svc_path).await?; + if let Some(rewrite) = &services.svc_path.service.rewrite { + modify::rewrite(session, services.svc_path.path.clone(), rewrite.clone()).await?; + } // add the backend to the request headers session .req_header_mut() diff --git a/proxy/src/modify.rs b/proxy/src/modify.rs index b96927e..07a1896 100644 --- a/proxy/src/modify.rs +++ b/proxy/src/modify.rs @@ -1,45 +1,44 @@ -use config::proxy::{Route, SvcPath}; +use config::proxy::Header; use pingora::proxy::Session; // internal crate use crate::response; -pub fn headers(session: &mut Session, route: &'static Route) { - if let Some(headers) = &route.del_headers { - for header in headers.iter() { - let _ = session.req_header_mut().remove_header(header.as_str()); - } +pub fn headers(session: &mut Session, add_headers: Vec
, del_headers: Vec) { + for header in del_headers { + let _ = session.req_header_mut().remove_header(header.as_str()); } - if let Some(headers) = &route.add_headers { - for header in headers.iter() { - let _ = session - .req_header_mut() - .append_header(header.name.as_str(), header.value.as_str()) - .is_ok(); - } + for header in add_headers { + let name = header.name.clone(); + let _ = session + .req_header_mut() + .append_header(name, header.value.as_str()) + .is_ok(); } } -pub async fn rewrite(session: &mut Session, svc_path: &'static SvcPath) -> pingora::Result { +pub async fn rewrite( + session: &mut Session, + path: String, + rewrite: String, +) -> pingora::Result { let query = session.req_header().uri.query(); let old_path = session.req_header().uri.path(); - if let Some(rewrite) = svc_path.service.rewrite.clone() { - let rewrite = old_path.replace(svc_path.path.as_str(), rewrite.as_str()); - let mut uri = rewrite; - if let Some(q) = query { - uri.push('?'); - uri.push_str(q); - } - if !uri.is_empty() { - let rewrite = match http::uri::Uri::builder().path_and_query(uri).build() { - Ok(val) => val, - Err(e) => { - tracing::error!("Error building uri: {}", e); - return response::service_unavailable(session).await; - } - }; - session.req_header_mut().set_uri(rewrite.clone()); - } + let rewrite = old_path.replace(path.as_str(), rewrite.as_str()); + let mut uri = rewrite; + if let Some(q) = query { + uri.push('?'); + uri.push_str(q); + } + if !uri.is_empty() { + let rewrite = match http::uri::Uri::builder().path_and_query(uri).build() { + Ok(val) => val, + Err(e) => { + tracing::error!("Error building uri: {}", e); + return response::service_unavailable(session).await; + } + }; + session.req_header_mut().set_uri(rewrite.clone()); } // return false to continue processing the request Ok(false) diff --git a/proxy/src/services.rs b/proxy/src/services.rs index 7d02f5c..e7cbd45 100644 --- a/proxy/src/services.rs +++ b/proxy/src/services.rs @@ -1,9 +1,9 @@ -use config::proxy::{BackendType, ProxyConfig, ProxyRoute, SvcPath}; +use config::proxy::{BackendType, ProxyConfig, Route, ServicePath}; use pingora::proxy::Session; pub struct Service { - pub routes: &'static ProxyRoute, - pub svc_path: &'static SvcPath, + pub route: &'static Route, + pub svc_path: &'static ServicePath, pub backend: &'static BackendType, } @@ -14,14 +14,16 @@ pub fn find(session: &Session) -> Option { } // println!("Host: {:?}", host); let path = session.req_header().uri.path(); - let proxy_config = config::proxy::get_backends()?; - let routes = find_routes(host, proxy_config, session)?; - let svc_path = find_service_path(path, routes)?; - // Some(routes.services.get(&svc_path.service.name)?) + let proxy_config = config::proxy::proxy_config()?; + let route = find_routes(host, proxy_config, session)?; + let svc_path = match route.paths.0.at(path) { + Ok(val) => val.value, + Err(_) => return None, + }; let svc = Service { - routes, + route, svc_path, - backend: routes.services.get(&svc_path.service.name)?, + backend: &svc_path.service.backend, }; Some(svc) } @@ -30,7 +32,7 @@ fn find_routes( host: &str, proxy_config: &'static ProxyConfig, session: &Session, -) -> Option<&'static ProxyRoute> { +) -> Option<&'static Route> { match proxy_config.routes.get(host) { Some(val) => Some(val), None => { @@ -39,7 +41,3 @@ fn find_routes( } } } - -fn find_service_path(path: &str, routes: &'static ProxyRoute) -> Option<&'static SvcPath> { - routes.paths.at(path).ok().map(|v| v.value) -} diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 822d793..d2761c8 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -18,7 +18,15 @@ fn main() { std::process::exit(1); } } - config::proxy::read_config(); + match config::proxy::initialize() { + Ok(_) => { + tracing::info!("✅ Proxy config initialized"); + } + Err(e) => { + tracing::error!("❌ Error initializing proxy config: {:?}", e); + std::process::exit(1); + } + } // create a new proxy proxy::Proxy::new_proxy() From 29a7fe6a6c1ad12888b59c551eb99996349378fa Mon Sep 17 00:00:00 2001 From: Aitthi Arsa Date: Sat, 9 Mar 2024 19:20:07 +0700 Subject: [PATCH 4/6] feat: reload --- .config/easy_proxy.yaml | 2 +- Cargo.lock | 119 ++++++++++++++++++++++++++++- Cargo.toml | 1 + config/src/proxy/mod.rs | 32 ++++++++ config/src/proxy/provider_files.rs | 11 +-- examples/proxy.yaml | 4 +- runtime/Cargo.toml | 3 +- runtime/src/main.rs | 84 ++++++++++++++++++++ 8 files changed, 242 insertions(+), 14 deletions(-) diff --git a/.config/easy_proxy.yaml b/.config/easy_proxy.yaml index c9b74b7..84e8e32 100644 --- a/.config/easy_proxy.yaml +++ b/.config/easy_proxy.yaml @@ -3,7 +3,7 @@ proxy: providers: - name: files path: examples - watch: true + watch: false pingora: # https://github.com/cloudflare/pingora/blob/main/docs/user_guide/daemon.md daemon: true diff --git a/Cargo.lock b/Cargo.lock index b883389..e50e8cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,54 @@ dependencies = [ "winapi", ] +[[package]] +name = "anstream" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.80" @@ -261,12 +309,52 @@ dependencies = [ "ansi_term", "atty", "bitflags 1.3.2", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", ] +[[package]] +name = "clap" +version = "4.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.0", +] + +[[package]] +name = "clap_derive" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.52", +] + +[[package]] +name = "clap_lex" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" + [[package]] name = "cmake" version = "0.1.50" @@ -276,6 +364,12 @@ dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "config" version = "0.1.0" @@ -655,6 +749,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1665,6 +1765,7 @@ dependencies = [ name = "runtime" version = "0.1.0" dependencies = [ + "clap 4.5.2", "config", "mimalloc", "proxy", @@ -2006,13 +2107,19 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" + [[package]] name = "structopt" version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ - "clap", + "clap 2.34.0", "lazy_static", "structopt-derive", ] @@ -2023,7 +2130,7 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro-error", "proc-macro2", "quote", @@ -2448,6 +2555,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "uuid" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index 289154b..5e4d8ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ ahash = { version="0.8", features = ["serde"] } lazy_static = "1.4" once_cell = "1.19" tokio = { version="1", features = ["rt-multi-thread"] } +clap = { version="4.5", features = ["derive"] } [profile.release] diff --git a/config/src/proxy/mod.rs b/config/src/proxy/mod.rs index 7ae9482..db35c4d 100644 --- a/config/src/proxy/mod.rs +++ b/config/src/proxy/mod.rs @@ -109,6 +109,11 @@ pub struct ServiceSelector { pub header: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SvcHealthCheck { + pub path: String, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ServiceRef { pub rewrite: Option, @@ -201,3 +206,30 @@ pub fn reload() -> Result<(), anyhow::Error> { } Ok(()) } + +pub fn validate() -> Result<(), anyhow::Error> { + let runtime_conf = runtime::config(); + let providers = &runtime_conf.providers; + let mut errors: Vec = vec![]; + for provider in providers { + match provider.name.as_str() { + "files" => { + let files_conf = provider_files::get_config(provider)?; + match provider_files::validator(&files_conf) { + Ok(_) => continue, + Err(e) => { + errors.push(e); + } + } + } + _ => { + // do nothing + tracing::warn!("Provider {} is not supported", provider.name); + } + } + } + if !errors.is_empty() { + return Err(anyhow::anyhow!("Validation failed {:#?}", errors)); + } + Ok(()) +} diff --git a/config/src/proxy/provider_files.rs b/config/src/proxy/provider_files.rs index 4a5ab56..782e071 100644 --- a/config/src/proxy/provider_files.rs +++ b/config/src/proxy/provider_files.rs @@ -25,7 +25,10 @@ use std::{ use tracing; // Internal crate imports -use super::{super::runtime::Provider, reload, Header, PathType, ProxyConfig, ServiceSelector}; +use super::{ + super::runtime::Provider, reload, Header, PathType, ProxyConfig, ServiceSelector, + SvcHealthCheck, +}; // models #[derive(Debug, Clone, Deserialize, Serialize)] @@ -51,11 +54,6 @@ pub struct Service { pub health_check: Option, } -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct SvcHealthCheck { - pub path: String, -} - #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Endpoint { pub ip: String, @@ -350,7 +348,6 @@ impl From for ProxyConfig { weight: e.weight.unwrap_or(1) as usize, }); } - match service.algorithm.as_str() { "round_robin" => { let hash: Arc> = Arc::new(Weighted::build(&backends)); diff --git a/examples/proxy.yaml b/examples/proxy.yaml index adc74eb..0f4da37 100644 --- a/examples/proxy.yaml +++ b/examples/proxy.yaml @@ -18,8 +18,8 @@ services: # A list of routes to be proxied routes: - # - host: mydomain.com - - host: localhost:8088 + - host: mydomain.com + # - host: localhost:8088 del_headers: - accept add_headers: diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 8dfec5f..bed3867 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -10,4 +10,5 @@ proxy = { path = "../proxy" } config = { path = "../config" } tracing = { workspace = true } tracing-subscriber = { workspace = true } -mimalloc = { workspace = true } \ No newline at end of file +mimalloc = { workspace = true } +clap = { workspace = true } \ No newline at end of file diff --git a/runtime/src/main.rs b/runtime/src/main.rs index d2761c8..42eef04 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -1,3 +1,5 @@ +use std::os::unix::net::UnixStream; + #[cfg(not(debug_assertions))] use mimalloc::MiMalloc; @@ -5,6 +7,21 @@ use mimalloc::MiMalloc; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; +use clap::Parser; +use std::io::{Write, Read}; + +/// Simple program to greet a person +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Tries to check your configuration quite thoroughly. + #[arg(short = 't', long, default_value_t = false)] + test: bool, + // Reload the configuration + #[arg(short = 'r', long = "reload", default_value_t = false)] + reload: bool, +} + fn main() { // initialize the logger tracing_subscriber::fmt::init(); @@ -18,6 +35,44 @@ fn main() { std::process::exit(1); } } + + // parse the command line arguments + let args = Args::parse(); + if args.test { + match config::proxy::validate() { + Ok(_) => { + tracing::info!("✅ Proxy config is valid"); + } + Err(e) => { + tracing::error!("❌ {:?}", e); + std::process::exit(1); + } + } + // exit after testing the configuration + std::process::exit(0); + } + + if args.reload { + match config::proxy::validate() { + Ok(_) => { + let mut stream = UnixStream::connect("/tmp/easy-proxy.sock").expect("Failed to connect to socket"); + match stream.write_all(b"reload") { + Ok(_) => { + tracing::info!("✅ Proxy config reloaded"); + } + Err(e) => { + tracing::error!("❌ {:?}", e); + } + } + std::process::exit(0); + } + Err(e) => { + tracing::error!("❌ {:?}", e); + std::process::exit(1); + } + } + } + match config::proxy::initialize() { Ok(_) => { tracing::info!("✅ Proxy config initialized"); @@ -28,6 +83,35 @@ fn main() { } } + // Open unix socket for reload command + std::thread::spawn(|| { + // remove the socket if it already exists + let _ = std::fs::remove_file("/tmp/easy-proxy.sock"); + let listener = std::os::unix::net::UnixListener::bind("/tmp/easy-proxy.sock").unwrap(); + while let Ok((mut stream, _)) = listener.accept() { + let mut buffer = [0; 1024]; + // println!("Received a connection"); + match stream.read(&mut buffer) { + Ok(_) => { + let command = std::str::from_utf8(&buffer).unwrap(); + // println!("Command: {}", command); + if command.contains("reload") { + match config::proxy::reload() { + Ok(_) => { + tracing::info!("✅ Proxy config reloaded"); + } + Err(e) => { + tracing::error!("❌ Error reloading proxy config: {:?}", e); + } + } + } + } + Err(e) => { + tracing::error!("❌ {:?}", e); + } + } + } + }); // create a new proxy proxy::Proxy::new_proxy() .map_err(|e| tracing::error!("❌ Error creating proxy: {:?}", e)) From 48b5e68afd9abc53a36c206c4273fe5e6cb7b9a5 Mon Sep 17 00:00:00 2001 From: Aitthi Arsa Date: Sat, 9 Mar 2024 19:29:20 +0700 Subject: [PATCH 5/6] =?UTF-8?q?=E2=9C=A8=20feat:=20config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .config/easy_proxy.yaml | 2 +- Cargo.lock | 24 ++++++++++++------------ Cargo.toml | 2 +- Dockerfile | 2 +- README.md | 6 ++++++ {runtime => easy-proxy}/Cargo.toml | 2 +- {runtime => easy-proxy}/src/main.rs | 0 examples/proxy.yaml | 4 ++-- 8 files changed, 24 insertions(+), 18 deletions(-) rename {runtime => easy-proxy}/Cargo.toml (94%) rename {runtime => easy-proxy}/src/main.rs (100%) diff --git a/.config/easy_proxy.yaml b/.config/easy_proxy.yaml index 84e8e32..c9b74b7 100644 --- a/.config/easy_proxy.yaml +++ b/.config/easy_proxy.yaml @@ -3,7 +3,7 @@ proxy: providers: - name: files path: examples - watch: false + watch: true pingora: # https://github.com/cloudflare/pingora/blob/main/docs/user_guide/daemon.md daemon: true diff --git a/Cargo.lock b/Cargo.lock index e50e8cb..d580d31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,6 +493,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "easy-proxy" +version = "0.1.0" +dependencies = [ + "clap 4.5.2", + "config", + "mimalloc", + "proxy", + "tracing", + "tracing-subscriber", +] + [[package]] name = "encoding_rs" version = "0.8.33" @@ -1761,18 +1773,6 @@ dependencies = [ "serde", ] -[[package]] -name = "runtime" -version = "0.1.0" -dependencies = [ - "clap 4.5.2", - "config", - "mimalloc", - "proxy", - "tracing", - "tracing-subscriber", -] - [[package]] name = "rust_decimal" version = "1.34.3" diff --git a/Cargo.toml b/Cargo.toml index 5e4d8ea..16a9517 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" members = [ - "runtime", + "easy-proxy", "proxy", "config" ] diff --git a/Dockerfile b/Dockerfile index cdc3ec0..456b381 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,7 @@ ENV TZ=Asia/Bangkok WORKDIR /app # copy app release -COPY --from=builder /app/target/release/runtime ./easy-proxy +COPY --from=builder /app/target/release/easy-proxy ./easy-proxy COPY .config .config # default run entrypoint diff --git a/README.md b/README.md index c0663e5..73f7ec4 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,12 @@ pingora: # ca_file: /etc/ssl/certs/ca-certificates.crt ``` +Can be tested and reloaded using the following commands: +```bash +$ easy-proxy -t # Test the configuration file +$ easy-proxy -r # Reload the configuration file +``` + ### Service and Route Configuration ```yaml # Select the service to be proxied diff --git a/runtime/Cargo.toml b/easy-proxy/Cargo.toml similarity index 94% rename from runtime/Cargo.toml rename to easy-proxy/Cargo.toml index bed3867..9a6a789 100644 --- a/runtime/Cargo.toml +++ b/easy-proxy/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "runtime" +name = "easy-proxy" version = "0.1.0" edition = "2021" diff --git a/runtime/src/main.rs b/easy-proxy/src/main.rs similarity index 100% rename from runtime/src/main.rs rename to easy-proxy/src/main.rs diff --git a/examples/proxy.yaml b/examples/proxy.yaml index 0f4da37..adc74eb 100644 --- a/examples/proxy.yaml +++ b/examples/proxy.yaml @@ -18,8 +18,8 @@ services: # A list of routes to be proxied routes: - - host: mydomain.com - # - host: localhost:8088 + # - host: mydomain.com + - host: localhost:8088 del_headers: - accept add_headers: From 0afae76af9170abecee6b27131b1cffddac4132d Mon Sep 17 00:00:00 2001 From: Aitthi Arsa Date: Sat, 9 Mar 2024 19:31:43 +0700 Subject: [PATCH 6/6] cargo fmt && cargo clippy --- easy-proxy/src/main.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/easy-proxy/src/main.rs b/easy-proxy/src/main.rs index 42eef04..92ac0bd 100644 --- a/easy-proxy/src/main.rs +++ b/easy-proxy/src/main.rs @@ -8,7 +8,7 @@ use mimalloc::MiMalloc; static GLOBAL: MiMalloc = MiMalloc; use clap::Parser; -use std::io::{Write, Read}; +use std::io::{Read, Write}; /// Simple program to greet a person #[derive(Parser, Debug)] @@ -55,7 +55,8 @@ fn main() { if args.reload { match config::proxy::validate() { Ok(_) => { - let mut stream = UnixStream::connect("/tmp/easy-proxy.sock").expect("Failed to connect to socket"); + let mut stream = UnixStream::connect("/tmp/easy-proxy.sock") + .expect("Failed to connect to socket"); match stream.write_all(b"reload") { Ok(_) => { tracing::info!("✅ Proxy config reloaded"); @@ -87,13 +88,22 @@ fn main() { std::thread::spawn(|| { // remove the socket if it already exists let _ = std::fs::remove_file("/tmp/easy-proxy.sock"); - let listener = std::os::unix::net::UnixListener::bind("/tmp/easy-proxy.sock").unwrap(); + let listener = match std::os::unix::net::UnixListener::bind("/tmp/easy-proxy.sock") { + Ok(listener) => { + tracing::info!("✅ Listening on /tmp/easy-proxy.sock"); + listener + } + Err(e) => { + tracing::error!("❌ {:?}", e); + std::process::exit(1); + } + }; while let Ok((mut stream, _)) = listener.accept() { let mut buffer = [0; 1024]; // println!("Received a connection"); match stream.read(&mut buffer) { Ok(_) => { - let command = std::str::from_utf8(&buffer).unwrap(); + let command = std::str::from_utf8(&buffer).unwrap_or_default(); // println!("Command: {}", command); if command.contains("reload") { match config::proxy::reload() {