diff --git a/.env b/.env index 702af24..cee90d4 100644 --- a/.env +++ b/.env @@ -7,4 +7,6 @@ IP_LIMIT_PER_SECOND=1 IP_LIMIT_BURST_SIZE=10 # default 500 CONCURRENCY_LIMIT=500 +# default 10 +RESPONSE_TIMEOUT=10 RUST_LOG=info \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 36ebb4e..aa23e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -252,7 +252,7 @@ checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" [[package]] name = "elex-proxy" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "axum", diff --git a/src/main.rs b/src/main.rs index 7eb28e0..4e4278d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,10 @@ +#![feature(lazy_cell)] + use std::any::Any; use std::collections::HashMap; use std::env; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; @@ -38,23 +40,23 @@ use tower_governor::GovernorLayer; use tower_http::catch_panic::CatchPanicLayer; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; -use tracing::{error, info}; +use tracing::{error, info, warn}; -#[derive(Serialize, Deserialize)] +#[derive(Serialize)] struct JsonRpcRequest { method: String, params: Vec, id: u64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Deserialize)] struct JsonRpcResponse { result: Option, error: Option, id: u64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize)] struct R { success: bool, #[serde(skip_serializing_if = "Option::is_none")] @@ -63,6 +65,8 @@ struct R { code: Option, #[serde(skip_serializing_if = "Option::is_none")] message: Option, + #[serde(skip_serializing_if = "Option::is_none")] + health: Option, } impl R { @@ -72,6 +76,7 @@ impl R { response: Some(payload), code: None, message: None, + health: None, } } fn error(code: i32, message: String) -> Self { @@ -80,12 +85,46 @@ impl R { response: None, code: Some(Value::Number(Number::from(code))), message: Some(Value::String(message)), + health: None, + } + } + fn health(health: bool) -> Self { + Self { + success: true, + response: None, + code: None, + message: None, + health: Some(health), } } } static ID_COUNTER: Lazy = Lazy::new(|| AtomicU64::new(0)); +static IP_LIMIT_PER_SECOND: LazyLock = LazyLock::new(|| { + env::var("IP_LIMIT_PER_SECOND").unwrap_or("1".to_string()).parse().unwrap() +}); + +static IP_LIMIT_BURST_SIZE: LazyLock = LazyLock::new(|| { + env::var("IP_LIMIT_BURST_SIZE").unwrap_or("10".to_string()).parse().unwrap() +}); + +static CONCURRENCY_LIMIT: LazyLock = LazyLock::new(|| { + env::var("CONCURRENCY_LIMIT").unwrap_or("500".to_string()).parse().unwrap() +}); + +static ELECTRUMX_WSS: LazyLock = LazyLock::new(|| { + env::var("ELECTRUMX_WSS").unwrap_or("wss://electrumx.atomicals.xyz:50012".to_string()) +}); + +static PROXY_HOST: LazyLock = LazyLock::new(|| { + env::var("PROXY_HOST").unwrap_or("0.0.0.0:12321".into()) +}); + +static RESPONSE_TIMEOUT: LazyLock = LazyLock::new(|| { + env::var("RESPONSE_TIMEOUT").unwrap_or("10".to_string()).parse().unwrap() +}); + fn get_next_id() -> u64 { ID_COUNTER.fetch_add(1, Ordering::SeqCst) } @@ -101,6 +140,7 @@ impl IntoResponse for AppError { code: None, message: Some(Value::String(self.0.to_string())), response: None, + health: None, }; Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) @@ -177,7 +217,7 @@ async fn handle_request( }; let request_text = serde_json::to_string(&request).unwrap(); ws_tx.send(Message::Text(request_text)).unwrap(); - match tokio::time::timeout(Duration::from_secs(10), response_rx).await { + match tokio::time::timeout(Duration::from_secs(*RESPONSE_TIMEOUT), response_rx).await { Ok(Ok(rep)) => { if let Some(result) = rep.result { R::ok(result) @@ -188,23 +228,55 @@ async fn handle_request( code: err.get("code").cloned(), message: err.get("message").cloned(), response: None, + health: None, } } else { R::error(-1, "No response".into()) } } Ok(Err(_)) | Err(_) => { + warn!("<= id: {}, no response received after {} seconds", &id, *RESPONSE_TIMEOUT); { callbacks.write().await.remove(&id); } - R::error(-1, "Read timeout".into()) + R::error(-1, "Response timeout".into()) } } } -async fn handle_proxy() -> impl IntoResponse { - Json(json!( +async fn handle_health( + Extension(callbacks): Extension, + Extension(ws_tx): Extension>, +) -> impl IntoResponse { + let id = get_next_id(); + info!("=> id: {}, check health", &id); + + let (response_tx, response_rx) = oneshot::channel(); { + callbacks.write().await.insert(id, response_tx); + } + let request = JsonRpcRequest { + id, + method: "blockchain.atomicals.get_global".into(), + params: vec![], + }; + let request_text = serde_json::to_string(&request).unwrap(); + ws_tx.send(Message::Text(request_text)).unwrap(); + match tokio::time::timeout(Duration::from_secs(5), response_rx).await { + Ok(Ok(rep)) => R::health(rep.result.is_some()), + Ok(Err(_)) | Err(_) => { + warn!("<= id: {}, check health timeout, no response received after 5 seconds", &id); + { + callbacks.write().await.remove(&id); + } + R::health(false) + } + } +} + + +async fn handle_proxy() -> impl IntoResponse { + Json(json!({ "success": true, "info": { "note": "Atomicals ElectrumX Digital Object Proxy Online", @@ -214,11 +286,10 @@ async fn handle_proxy() -> impl IntoResponse { "GET": "GET /proxy/:method?params=[\"value1\"] with string encoded array in the query argument \"params\" in the URL." }, "healthCheck": "GET /proxy/health", - "github": "https://github.com/atomicals/electrumx-proxy", + "github": "https://github.com/AstroxNetwork/elex-proxy", "license": "MIT" } - } - )) + })) } @@ -247,19 +318,30 @@ async fn main() { dotenv().ok(); tracing_subscriber::fmt::init(); let (ws_tx, ws_rx) = mpsc::unbounded_channel::(); - let callbacks: Arc>>> = - Arc::new(RwLock::new(HashMap::new())); + let callbacks: Callbacks = Arc::new(RwLock::new(HashMap::new())); let ws_rx_stream = Arc::new(Mutex::new(UnboundedReceiverStream::new(ws_rx))); let governor_conf = Box::new( GovernorConfigBuilder::default() - .per_second(env::var("IP_LIMIT_PER_SECOND").unwrap_or("1".to_string()).parse().unwrap()) - .burst_size(env::var("IP_LIMIT_BURST_SIZE").unwrap_or("10".to_string()).parse().unwrap()) + .per_second(*IP_LIMIT_PER_SECOND) + .burst_size(*IP_LIMIT_BURST_SIZE) .finish() .unwrap(), ); let app = Router::new() + .fallback( + |uri: http::Uri| async move { + let body = R::error(-1, format!("No route {}", &uri)); + let body = serde_json::to_string(&body).unwrap(); + Response::builder() + .status(StatusCode::NOT_FOUND) + .header(header::CONTENT_TYPE, "application/json") + .body(Full::from(body)) + .unwrap() + }, + ) .route("/", get(|| async { "Hello, Atomicals!" })) .route("/proxy", get(handle_proxy).post(handle_proxy)) + .route("/proxy/health", get(handle_health).post(handle_health)) .route("/proxy/:method", get(handle_get).post(handle_post)) .layer( ServiceBuilder::new() @@ -272,50 +354,54 @@ async fn main() { config: Box::leak(governor_conf), }) ) - .layer(ConcurrencyLimitLayer::new( - env::var("CONCURRENCY_LIMIT").unwrap_or("500".to_string()).parse().unwrap(), - )) + .layer(ConcurrencyLimitLayer::new(*CONCURRENCY_LIMIT)) .layer(CorsLayer::permissive()) .layer(Extension(callbacks.clone())) .layer(Extension(ws_tx.clone())); tokio::spawn(async move { - let wss_var = env::var("ELECTRUMX_WSS").unwrap_or("wss://electrumx.atomicals.xyz:50012".to_string()); - let list = wss_var.split(',').collect::>(); + let list = ELECTRUMX_WSS.split(',').collect::>(); info!("ElectrumX WSS: {:?}", &list); let mut index = 0; loop { let wss = list.get(index).unwrap(); - info!("Try to connect to electrumx: {}", &wss); + info!("Try to connect to ElectrumX: {}", &wss); match connect_async(*wss).await { Ok((ws, _)) => { - info!("Connected to electrumx"); + info!("Connected to ElectrumX: {}", &wss); let (mut write, mut read) = ws.split(); let ws_rx_stream = Arc::clone(&ws_rx_stream); tokio::spawn(async move { let mut guard = ws_rx_stream.lock().await; while let Some(message) = guard.next().await { - let _ = write.send(message).await; + if let Err(e) = write.send(message).await { + error!("Failed to send message to ElectrumX: {:?}", e); + } } }); while let Some(Ok(msg)) = read.next().await { - if let Ok(text) = msg.to_text() { - if let Ok(response) = serde_json::from_str::(text) { - info!( - "<= id: {}, success: {}", - &response.id, - response.result.is_some() - ); - if let Some(callback) = callbacks.write().await.remove(&response.id) - { - let _ = callback.send(response); + if msg.is_text() { + if let Ok(text) = msg.to_text() { + if let Ok(response) = serde_json::from_str::(text) { + if let Some(callback) = callbacks.write().await.remove(&response.id) + { + info!("<= id: {}, processed", &response.id); + let _ = callback.send(response); + } else { + warn!("<= id: {}, not processed", &response.id); + } + } else { + error!("Failed to parse ws response: {}", text); } } + } else if msg.is_close() { + info!("Connection closed: {}", &wss); + break; } } } Err(e) => { - error!("Failed to connect to electrumx: {:?}", e); + error!("Failed to connect to ElectrumX: {:?}", e); tokio::time::sleep(Duration::from_secs(3)).await; } } @@ -326,10 +412,9 @@ async fn main() { } } }); - let app_api = env::var("PROXY_HOST").unwrap_or("0.0.0.0:12321".to_string()); - let listener = tokio::net::TcpListener::bind(&app_api) + let listener = tokio::net::TcpListener::bind((*PROXY_HOST).clone()) .await .unwrap(); - info!("Listening on {}", &app_api); + info!("Listening on {}", *PROXY_HOST); axum::serve(listener, app.into_make_service_with_connect_info::()).await.unwrap(); }