Skip to content

Commit

Permalink
[rooch-networkgh-2406] add ws client to support proxied requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Feliciss committed Sep 2, 2024
1 parent b5ca58f commit 428158d
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 9 deletions.
70 changes: 65 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ tonic = { version = "0.8", features = ["gzip"] }
tracing = "0.1.37"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15" }
fast-socks5 = { version = "0.9.1" }
pin-project = { version = "1.1.5" }

codespan-reporting = "0.11.1"
codespan = "0.11.1"
Expand Down
3 changes: 3 additions & 0 deletions crates/rooch-rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jsonrpsee = { workspace = true }
serde_json = { workspace = true }
log = { workspace = true }
hex = { workspace = true }
axum = { workspace = true }
fast-socks5 = { workspace = true }
pin-project = { workspace = true }

move-core-types = { workspace = true }

Expand Down
83 changes: 83 additions & 0 deletions crates/rooch-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use fast_socks5::client::Socks5Stream;
use fast_socks5::server::{Config, Socks5Socket};
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use move_core_types::language_storage::ModuleId;
use move_core_types::metadata::Metadata;
use move_core_types::resolver::ModuleResolver;
Expand All @@ -15,9 +18,13 @@ use moveos_types::{
function_return_value::FunctionResult, module_binding::MoveFunctionCaller,
moveos_std::tx_context::TxContext, transaction::FunctionCall,
};
use pin_project::pin_project;
use rooch_client::RoochRpcClient;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Handle;

pub mod client_config;
Expand Down Expand Up @@ -56,11 +63,67 @@ impl ClientBuilder {
.build(http)?,
);

let server_addr = self.socks_server_no_auth().await;
let server_url = format!("ws://{}", server_addr);
// TODO: build_with_stream
let ws_client = Arc::new(WsClientBuilder::default().build(&server_url).await.unwrap());

Ok(Client {
http: http_client.clone(),
ws: ws_client.clone(),
rooch: RoochRpcClient::new(http_client.clone()),
})
}

pub async fn socks_server_no_auth(self) -> SocketAddr {
let mut config = Config::default();
config.set_dns_resolve(false);
let config = Arc::new(config);

let proxy_url = if self.proxy_url.is_some() {
self.proxy_url.clone().unwrap()
} else {
env::var("ALL_PROXY").unwrap()
};

let listener = TcpListener::bind(proxy_url).await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
self.spawn_socks_server(listener, config).await;

proxy_addr
}

pub async fn spawn_socks_server(self, listener: TcpListener, config: Arc<Config>) {
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (stream, _) = listener.accept().await.unwrap();
let mut socks5_socket = Socks5Socket::new(stream, config.clone());
socks5_socket.set_reply_ip(addr.ip());

socks5_socket.upgrade_to_socks5().await.unwrap();
}
});
}

pub async fn connect_over_socks_stream(
self,
server_addr: SocketAddr,
) -> Socks5Stream<TcpStream> {
let target_addr = server_addr.ip().to_string();
let target_port = server_addr.port();

let socks_server = self.socks_server_no_auth().await;

Socks5Stream::connect(
socks_server,
target_addr,
target_port,
fast_socks5::client::Config::default(),
)
.await
.unwrap()
}
}

impl Default for ClientBuilder {
Expand All @@ -76,6 +139,7 @@ impl Default for ClientBuilder {
#[derive(Clone)]
pub struct Client {
http: Arc<HttpClient>,
ws: Arc<WsClient>,
pub rooch: RoochRpcClient,
}

Expand All @@ -93,6 +157,14 @@ impl Client {
) -> Result<serde_json::Value> {
Ok(self.http.request(method, params).await?)
}

pub async fn request_with_ws(
&self,
method: &str,
params: Vec<serde_json::Value>,
) -> Result<serde_json::Value> {
Ok(self.ws.request(method, params).await?)
}
}

impl MoveFunctionCaller for Client {
Expand Down Expand Up @@ -129,3 +201,14 @@ impl ModuleResolver for &Client {
})
}
}

#[pin_project]
pub struct DataStream<T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin>(
#[pin] Socks5Stream<T>,
);

impl<T: tokio::io::AsyncRead + tokio::io::AsyncWrite + std::marker::Unpin> DataStream<T> {
pub fn new(t: Socks5Stream<T>) -> Self {
Self(t)
}
}
5 changes: 1 addition & 4 deletions crates/rooch/src/commands/rpc/commands/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,8 @@ impl CommandAction<serde_json::Value> for RequestCommand {
}
None => vec![],
};
let active_env = context.client_config.get_active_env()?;

Ok(client
.request_by_proxy(&active_env.rpc, self.method.as_str(), params)
.await?)
Ok(client.request_with_ws(&self.method, params).await?)
}

/// Executes the command, and serializes it to the common JSON output type
Expand Down

0 comments on commit 428158d

Please sign in to comment.