Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
iota9star committed Dec 7, 2023
0 parents commit af21263
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
APP_API=0.0.0.0:12321
ELECTRUMX_WSS=wss://electrumx.atomicals.xyz:50012
IP_LIMIT_PER_SECOND=1
IP_LIMIT_BURST_SIZE=10
RUST_LOG=info
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target
36 changes: 36 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "fastelex-proxy"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = { version = "^0", features = ["http2"] }
futures = "^0"
serde = { version = "^1", features = ["derive"] }
serde_json = "^1"
tokio = { version = "^1", features = ["full"] }
tokio-stream = "^0"
tokio-tungstenite = { version = "^0", features = ["native-tls"] }
tungstenite = "^0"
url = "^2"
time = { version = "^0", features = [] }
tower = { version = "^0", features = ["full"] }
tower-http = { version = "^0", features = ["cors", "trace", "catch-panic"] }
once_cell = "^1"
tracing = "^0"
tracing-subscriber = "^0"
openssl = { version = "^0", features = ["vendored"] }
anyhow = "^1"
tower_governor = "^0.2"
bytes = "^1"
http-body-util = "^0"
dotenv = "0"

[profile.release]
strip = true
opt-level = "z" # Optimize for size.
lto = true
codegen-units = 1
panic = "abort"
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## Build

1. Create .cargo/config and add the instructions below:

```toml
[target.x86_64-unknown-linux-gnu]
linker = "x86_64-unknown-linux-gnu-gcc"
```

2. Build the project:

```shell
sh build_linux.sh
```
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cargo build --release
2 changes: 2 additions & 0 deletions build_linux.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
rustup target add x86_64-unknown-linux-gnu
cargo build --release --target x86_64-unknown-linux-gnu
305 changes: 305 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
use std::any::Any;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use axum::body::Body;
use axum::error_handling::HandleErrorLayer;
use axum::extract::{Path, Query};
use axum::extract::Extension;
use axum::extract::Json;
use axum::http;
use axum::http::header;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::response::Response;
use axum::Router;
use axum::routing::get;
use axum::ServiceExt;
use bytes::Bytes;
use dotenv::dotenv;
use futures::{SinkExt, StreamExt};
use http_body_util::Full;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::{json, Number, Value};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use tower::{BoxError, ServiceBuilder};
use tower_governor::errors::display_error;
use tower_governor::governor::GovernorConfigBuilder;
use tower_governor::GovernorLayer;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::{error, info};

#[derive(Serialize, Deserialize)]
struct JsonRpcRequest {
method: String,
params: Vec<Value>,
id: u64,
}

#[derive(Serialize, Deserialize, Debug)]
struct JsonRpcResponse {
result: Option<Value>,
error: Option<Value>,
id: u64,
}

#[derive(Serialize, Deserialize, Debug)]
struct R {
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
response: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
code: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<Value>,
}

impl R {
fn ok(payload: Value) -> Self {
Self {
success: true,
response: Some(payload),
code: None,
message: None,
}
}
fn error(code: i32, message: String) -> Self {
Self {
success: false,
response: None,
code: Some(Value::Number(Number::from(code))),
message: Some(Value::String(message)),
}
}
}

static ID_COUNTER: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));

fn get_next_id() -> u64 {
ID_COUNTER.fetch_add(1, Ordering::SeqCst)
}

type Callbacks = Arc<RwLock<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>>;

struct AppError(anyhow::Error);

impl IntoResponse for AppError {
fn into_response(self) -> Response {
let value = R {
success: false,
code: None,
message: Some(Value::String(self.0.to_string())),
response: None,
};
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(serde_json::to_string(&value).unwrap()))
.unwrap()
}
}

impl IntoResponse for R {
fn into_response(self) -> Response {
Json(self).into_response()
}
}

async fn handle_get(
Extension(callbacks): Extension<Callbacks>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<Message>>,
Path(method): Path<String>,
Query(query): Query<Value>,
) -> Result<R, AppError> {
let x = query.get("params").unwrap().as_str().unwrap();
let params = serde_json::from_str(x).unwrap();
let r = handle_request(method, params, callbacks, ws_tx).await;
Ok(r)
}

async fn handle_post(
Extension(callbacks): Extension<Callbacks>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<Message>>,
Path(method): Path<String>,
Json(body): Json<Value>,
) -> Result<R, AppError> {
let x = body.get("params").unwrap().as_array().unwrap();
let r = handle_request(method, x.clone(), callbacks, ws_tx).await;
Ok(r)
}

async fn handle_request(
method: String,
params: Vec<Value>,
callbacks: Callbacks,
ws_tx: mpsc::UnboundedSender<Message>,
) -> R {
let id = get_next_id();
info!("=> id: {}, method: {}, params: {:?}", &id, &method, &params);

let (response_tx, response_rx) = oneshot::channel();
{
callbacks.write().await.insert(id, response_tx);
}
let request = JsonRpcRequest {
id,
method,
params,
};
let request_text = serde_json::to_string(&request).unwrap();
ws_tx.send(Message::Text(request_text)).unwrap();
match tokio::time::timeout(Duration::from_secs(10), response_rx).await {
Ok(Ok(rep)) => {
if let Some(result) = rep.result {
R::ok(result)
} else if let Some(err) = rep.error {
let err = err.as_object().unwrap();
R {
success: false,
code: err.get("code").cloned(),
message: err.get("message").cloned(),
response: None,
}
} else {
R::error(-1, "No response".into())
}
}
Ok(Err(_)) | Err(_) => {
{
callbacks.write().await.remove(&id);
}
R::error(-1, "Timeout".into())
}
}
}

async fn handle_proxy() -> impl IntoResponse {
Json(json!(
{
"success": true,
"info": {
"note": "Atomicals ElectrumX Digital Object Proxy Online",
"usageInfo": {
"note": "The service offers both POST and GET requests for proxying requests to ElectrumX. To handle larger broadcast transaction payloads use the POST method instead of GET.",
"POST": "POST /proxy/:method with string encoded array in the field \"params\" in the request body. ",
"GET": "GET /proxy/:method?params=[\"value1\"] with string encoded array in the query argument \"params\" in the URL."
},
"healthCheck": "GET /proxy/health",
"github": "https://github.com/atomicals/electrumx-proxy",
"license": "MIT"
}
}
))
}


fn handle_panic(err: Box<dyn Any + Send + 'static>) -> http::Response<Full<Bytes>> {
let details = if let Some(s) = err.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = err.downcast_ref::<&str>() {
s.to_string()
} else {
"Unknown panic message".to_string()
};

let body = R::error(-1, details);
let body = serde_json::to_string(&body).unwrap();

Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(header::CONTENT_TYPE, "application/json")
.body(Full::from(body))
.unwrap()
}


#[tokio::main]
async fn main() {
dotenv().ok();
tracing_subscriber::fmt::init();
let (ws_tx, ws_rx) = mpsc::unbounded_channel::<Message>();
let callbacks: Arc<RwLock<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>> =
Arc::new(RwLock::new(HashMap::new()));
let ws_rx_stream = Arc::new(Mutex::new(UnboundedReceiverStream::new(ws_rx)));
let governor_conf = Box::new(
GovernorConfigBuilder::default()
.per_second(env::var("IP_LIMIT_PER_SECOND").unwrap_or("1".to_string()).parse().unwrap())
.burst_size(env::var("IP_LIMIT_BURST_SIZE").unwrap_or("10".to_string()).parse().unwrap())
.finish()
.unwrap(),
);
let app = Router::new()
.route("/", get(|| async { "Hello, Atomicals!" }))
.route("/proxy", get(handle_proxy).post(handle_proxy))
.route("/proxy/:method", get(handle_get).post(handle_post))
.route_layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(|e: BoxError| async move {
display_error(e)
}))
.layer(CatchPanicLayer::custom(handle_panic))
.layer(TraceLayer::new_for_http())
.layer(GovernorLayer {
config: Box::leak(governor_conf),
}),
)
.layer(CorsLayer::permissive())
.layer(Extension(callbacks.clone()))
.layer(Extension(ws_tx.clone()))
;
tokio::spawn(async move {
loop {
let wss = env::var("ELECTRUMX_WSS").unwrap_or("wss://electrumx.atomicals.xyz:50012".to_string());
info!("Try to connect to electrumx: {}", &wss);
match connect_async(wss).await {
Ok((ws, _)) => {
info!("Connected to electrumx");
let (mut write, mut read) = ws.split();
let ws_rx_stream = Arc::clone(&ws_rx_stream);
tokio::spawn(async move {
let mut guard = ws_rx_stream.lock().await;
while let Some(message) = guard.next().await {
let _ = write.send(message).await;
}
});
while let Some(Ok(msg)) = read.next().await {
if let Ok(text) = msg.to_text() {
if let Ok(response) = serde_json::from_str::<JsonRpcResponse>(text) {
info!(
"<= id: {}, success: {}",
&response.id,
response.result.is_some()
);
if let Some(callback) = callbacks.write().await.remove(&response.id)
{
let _ = callback.send(response);
}
}
}
}
}
Err(e) => {
error!("Failed to connect to electrumx: {:?}", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
});
let app_api = env::var("APP_API").unwrap_or("0.0.0.0:12321".to_string());
let listener = tokio::net::TcpListener::bind(&app_api)
.await
.unwrap();
info!("listening on {}", &app_api);
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
.await.unwrap();
}
21 changes: 21 additions & 0 deletions test.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@


###
GET http://localhost:12321/proxy/blockchain.atomicals.listscripthash?params=[%22a98d3e974bdf9488520ce83ea14fbdb55878e73c8be79ddd38749cc742b3ea40%22,true]

###
POST http://localhost:12321/proxy/blockchain.atomicals.listscripthash
Content-Type: application/json

{
"params": [
"a98d3e974bdf9488520ce83ea14fbdb55878e73c8be79ddd38749cc742b3ea40",
true
]
}

###
GET http://localhost:12321/proxy

###
GET http://localhost:12321

0 comments on commit af21263

Please sign in to comment.