Skip to content

Commit

Permalink
✨ Add /proxy/health
Browse files Browse the repository at this point in the history
  • Loading branch information
iota9star committed Dec 20, 2023
1 parent eb507a7 commit 7deedcf
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ IP_LIMIT_PER_SECOND=1
IP_LIMIT_BURST_SIZE=10
# default 500
CONCURRENCY_LIMIT=500
# default 10
RESPONSE_TIMEOUT=10
RUST_LOG=info
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

161 changes: 123 additions & 38 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#![feature(lazy_cell)]

use std::any::Any;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

Expand Down Expand Up @@ -38,23 +40,23 @@ use tower_governor::GovernorLayer;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::{error, info};
use tracing::{error, info, warn};

#[derive(Serialize, Deserialize)]
#[derive(Serialize)]
struct JsonRpcRequest {
method: String,
params: Vec<Value>,
id: u64,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Deserialize)]
struct JsonRpcResponse {
result: Option<Value>,
error: Option<Value>,
id: u64,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize)]
struct R {
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -63,6 +65,8 @@ struct R {
code: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
health: Option<bool>,
}

impl R {
Expand All @@ -72,6 +76,7 @@ impl R {
response: Some(payload),
code: None,
message: None,
health: None,
}
}
fn error(code: i32, message: String) -> Self {
Expand All @@ -80,12 +85,46 @@ impl R {
response: None,
code: Some(Value::Number(Number::from(code))),
message: Some(Value::String(message)),
health: None,
}
}
fn health(health: bool) -> Self {
Self {
success: true,
response: None,
code: None,
message: None,
health: Some(health),
}
}
}

static ID_COUNTER: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));

static IP_LIMIT_PER_SECOND: LazyLock<u64> = LazyLock::new(|| {
env::var("IP_LIMIT_PER_SECOND").unwrap_or("1".to_string()).parse().unwrap()
});

static IP_LIMIT_BURST_SIZE: LazyLock<u32> = LazyLock::new(|| {
env::var("IP_LIMIT_BURST_SIZE").unwrap_or("10".to_string()).parse().unwrap()
});

static CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
env::var("CONCURRENCY_LIMIT").unwrap_or("500".to_string()).parse().unwrap()
});

static ELECTRUMX_WSS: LazyLock<String> = LazyLock::new(|| {
env::var("ELECTRUMX_WSS").unwrap_or("wss://electrumx.atomicals.xyz:50012".to_string())
});

static PROXY_HOST: LazyLock<String> = LazyLock::new(|| {
env::var("PROXY_HOST").unwrap_or("0.0.0.0:12321".into())
});

static RESPONSE_TIMEOUT: LazyLock<u64> = LazyLock::new(|| {
env::var("RESPONSE_TIMEOUT").unwrap_or("10".to_string()).parse().unwrap()
});

fn get_next_id() -> u64 {
ID_COUNTER.fetch_add(1, Ordering::SeqCst)
}
Expand All @@ -101,6 +140,7 @@ impl IntoResponse for AppError {
code: None,
message: Some(Value::String(self.0.to_string())),
response: None,
health: None,
};
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
Expand Down Expand Up @@ -177,7 +217,7 @@ async fn handle_request(
};
let request_text = serde_json::to_string(&request).unwrap();
ws_tx.send(Message::Text(request_text)).unwrap();
match tokio::time::timeout(Duration::from_secs(10), response_rx).await {
match tokio::time::timeout(Duration::from_secs(*RESPONSE_TIMEOUT), response_rx).await {
Ok(Ok(rep)) => {
if let Some(result) = rep.result {
R::ok(result)
Expand All @@ -188,23 +228,55 @@ async fn handle_request(
code: err.get("code").cloned(),
message: err.get("message").cloned(),
response: None,
health: None,
}
} else {
R::error(-1, "No response".into())
}
}
Ok(Err(_)) | Err(_) => {
warn!("<= id: {}, no response received after {} seconds", &id, *RESPONSE_TIMEOUT);
{
callbacks.write().await.remove(&id);
}
R::error(-1, "Read timeout".into())
R::error(-1, "Response timeout".into())
}
}
}

async fn handle_proxy() -> impl IntoResponse {
Json(json!(
async fn handle_health(
Extension(callbacks): Extension<Callbacks>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<Message>>,
) -> impl IntoResponse {
let id = get_next_id();
info!("=> id: {}, check health", &id);

let (response_tx, response_rx) = oneshot::channel();
{
callbacks.write().await.insert(id, response_tx);
}
let request = JsonRpcRequest {
id,
method: "blockchain.atomicals.get_global".into(),
params: vec![],
};
let request_text = serde_json::to_string(&request).unwrap();
ws_tx.send(Message::Text(request_text)).unwrap();
match tokio::time::timeout(Duration::from_secs(5), response_rx).await {
Ok(Ok(rep)) => R::health(rep.result.is_some()),
Ok(Err(_)) | Err(_) => {
warn!("<= id: {}, check health timeout, no response received after 5 seconds", &id);
{
callbacks.write().await.remove(&id);
}
R::health(false)
}
}
}


async fn handle_proxy() -> impl IntoResponse {
Json(json!({
"success": true,
"info": {
"note": "Atomicals ElectrumX Digital Object Proxy Online",
Expand All @@ -214,11 +286,10 @@ async fn handle_proxy() -> impl IntoResponse {
"GET": "GET /proxy/:method?params=[\"value1\"] with string encoded array in the query argument \"params\" in the URL."
},
"healthCheck": "GET /proxy/health",
"github": "https://github.com/atomicals/electrumx-proxy",
"github": "https://github.com/AstroxNetwork/elex-proxy",
"license": "MIT"
}
}
))
}))
}


Expand Down Expand Up @@ -247,19 +318,30 @@ async fn main() {
dotenv().ok();
tracing_subscriber::fmt::init();
let (ws_tx, ws_rx) = mpsc::unbounded_channel::<Message>();
let callbacks: Arc<RwLock<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>> =
Arc::new(RwLock::new(HashMap::new()));
let callbacks: Callbacks = Arc::new(RwLock::new(HashMap::new()));
let ws_rx_stream = Arc::new(Mutex::new(UnboundedReceiverStream::new(ws_rx)));
let governor_conf = Box::new(
GovernorConfigBuilder::default()
.per_second(env::var("IP_LIMIT_PER_SECOND").unwrap_or("1".to_string()).parse().unwrap())
.burst_size(env::var("IP_LIMIT_BURST_SIZE").unwrap_or("10".to_string()).parse().unwrap())
.per_second(*IP_LIMIT_PER_SECOND)
.burst_size(*IP_LIMIT_BURST_SIZE)
.finish()
.unwrap(),
);
let app = Router::new()
.fallback(
|uri: http::Uri| async move {
let body = R::error(-1, format!("No route {}", &uri));
let body = serde_json::to_string(&body).unwrap();
Response::builder()
.status(StatusCode::NOT_FOUND)
.header(header::CONTENT_TYPE, "application/json")
.body(Full::from(body))
.unwrap()
},
)
.route("/", get(|| async { "Hello, Atomicals!" }))
.route("/proxy", get(handle_proxy).post(handle_proxy))
.route("/proxy/health", get(handle_health).post(handle_health))
.route("/proxy/:method", get(handle_get).post(handle_post))
.layer(
ServiceBuilder::new()
Expand All @@ -272,50 +354,54 @@ async fn main() {
config: Box::leak(governor_conf),
})
)
.layer(ConcurrencyLimitLayer::new(
env::var("CONCURRENCY_LIMIT").unwrap_or("500".to_string()).parse().unwrap(),
))
.layer(ConcurrencyLimitLayer::new(*CONCURRENCY_LIMIT))
.layer(CorsLayer::permissive())
.layer(Extension(callbacks.clone()))
.layer(Extension(ws_tx.clone()));

tokio::spawn(async move {
let wss_var = env::var("ELECTRUMX_WSS").unwrap_or("wss://electrumx.atomicals.xyz:50012".to_string());
let list = wss_var.split(',').collect::<Vec<&str>>();
let list = ELECTRUMX_WSS.split(',').collect::<Vec<&str>>();
info!("ElectrumX WSS: {:?}", &list);
let mut index = 0;
loop {
let wss = list.get(index).unwrap();
info!("Try to connect to electrumx: {}", &wss);
info!("Try to connect to ElectrumX: {}", &wss);
match connect_async(*wss).await {
Ok((ws, _)) => {
info!("Connected to electrumx");
info!("Connected to ElectrumX: {}", &wss);
let (mut write, mut read) = ws.split();
let ws_rx_stream = Arc::clone(&ws_rx_stream);
tokio::spawn(async move {
let mut guard = ws_rx_stream.lock().await;
while let Some(message) = guard.next().await {
let _ = write.send(message).await;
if let Err(e) = write.send(message).await {
error!("Failed to send message to ElectrumX: {:?}", e);
}
}
});
while let Some(Ok(msg)) = read.next().await {
if let Ok(text) = msg.to_text() {
if let Ok(response) = serde_json::from_str::<JsonRpcResponse>(text) {
info!(
"<= id: {}, success: {}",
&response.id,
response.result.is_some()
);
if let Some(callback) = callbacks.write().await.remove(&response.id)
{
let _ = callback.send(response);
if msg.is_text() {
if let Ok(text) = msg.to_text() {
if let Ok(response) = serde_json::from_str::<JsonRpcResponse>(text) {
if let Some(callback) = callbacks.write().await.remove(&response.id)
{
info!("<= id: {}, processed", &response.id);
let _ = callback.send(response);
} else {
warn!("<= id: {}, not processed", &response.id);
}
} else {
error!("Failed to parse ws response: {}", text);
}
}
} else if msg.is_close() {
info!("Connection closed: {}", &wss);
break;
}
}
}
Err(e) => {
error!("Failed to connect to electrumx: {:?}", e);
error!("Failed to connect to ElectrumX: {:?}", e);
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
Expand All @@ -326,10 +412,9 @@ async fn main() {
}
}
});
let app_api = env::var("PROXY_HOST").unwrap_or("0.0.0.0:12321".to_string());
let listener = tokio::net::TcpListener::bind(&app_api)
let listener = tokio::net::TcpListener::bind((*PROXY_HOST).clone())
.await
.unwrap();
info!("Listening on {}", &app_api);
info!("Listening on {}", *PROXY_HOST);
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
}

0 comments on commit 7deedcf

Please sign in to comment.