diff --git a/apps/freenet-ping/app/src/main.rs b/apps/freenet-ping/app/src/main.rs index 993c8ca37..f54800cc0 100644 --- a/apps/freenet-ping/app/src/main.rs +++ b/apps/freenet-ping/app/src/main.rs @@ -35,7 +35,7 @@ async fn main() -> Result<(), Box // create a websocket connection to host. let uri = format!( - "ws://{}/contract/command?encodingProtocol=native", + "ws://{}/v1/contract/command?encodingProtocol=native", args.host ); let (stream, _resp) = tokio_tungstenite::connect_async(&uri).await.map_err(|e| { diff --git a/crates/core/src/client_events/websocket.rs b/crates/core/src/client_events/websocket.rs index 5d68f2db4..8a4655567 100644 --- a/crates/core/src/client_events/websocket.rs +++ b/crates/core/src/client_events/websocket.rs @@ -32,6 +32,8 @@ use crate::{ use super::{ClientError, ClientEventsProxy, ClientId, HostResult, OpenRequest}; +mod v1; + #[derive(Clone)] struct WebSocketRequest(mpsc::Sender); @@ -52,19 +54,7 @@ const PARALLELISM: usize = 10; // TODO: get this from config, or whatever optima impl WebSocketProxy { pub fn as_router(server_routing: Router) -> (Self, Router) { - let (proxy_request_sender, proxy_server_request) = mpsc::channel(PARALLELISM); - - let router = server_routing - .route("/contract/command", get(websocket_commands)) - .layer(Extension(WebSocketRequest(proxy_request_sender))) - .layer(axum::middleware::from_fn(connection_info)); - ( - WebSocketProxy { - proxy_server_request, - response_channels: HashMap::new(), - }, - router, - ) + WebSocketProxy::as_router_v1(server_routing) } async fn internal_proxy_recv( diff --git a/crates/core/src/client_events/websocket/v1.rs b/crates/core/src/client_events/websocket/v1.rs new file mode 100644 index 000000000..22ce206c0 --- /dev/null +++ b/crates/core/src/client_events/websocket/v1.rs @@ -0,0 +1,19 @@ +use super::*; + +impl WebSocketProxy { + pub fn as_router_v1(server_routing: Router) -> (Self, Router) { + let (proxy_request_sender, proxy_server_request) = mpsc::channel(PARALLELISM); + + let router = server_routing + .route("/v1/contract/command", get(websocket_commands)) + .layer(Extension(WebSocketRequest(proxy_request_sender))) + .layer(axum::middleware::from_fn(connection_info)); + ( + WebSocketProxy { + proxy_server_request, + response_channels: HashMap::new(), + }, + router, + ) + } +} diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index a1901ef30..9e65272fb 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -269,8 +269,9 @@ impl ConfigArgs { Err(err) => { #[cfg(not(any(test, debug_assertions)))] { - if peer_id.is_none() { + if peer_id.is_none() && mode == OperationMode::Network { tracing::error!(file = ?gateways_file, "Failed to read gateways file: {err}"); + return Err(std::io::Error::new( std::io::ErrorKind::NotFound, "Cannot initialize node without gateways", diff --git a/crates/core/src/server/http_gateway.rs b/crates/core/src/server/http_gateway.rs index 8fc43dc02..0d2bed0c5 100644 --- a/crates/core/src/server/http_gateway.rs +++ b/crates/core/src/server/http_gateway.rs @@ -16,6 +16,8 @@ use crate::server::HostCallbackResult; use super::{errors::WebSocketApiError, path_handlers, AuthToken, ClientConnection}; +mod v1; + #[derive(Clone)] pub(super) struct HttpGatewayRequest(mpsc::Sender); @@ -39,91 +41,20 @@ pub(super) struct HttpGateway { response_channels: HashMap>, } -#[derive(Clone)] -struct Config { - localhost: bool, -} - impl HttpGateway { /// Returns the uninitialized axum router to compose with other routing handling or websockets. pub fn as_router(socket: &SocketAddr) -> (Self, Router) { - let localhost = match socket.ip() { - IpAddr::V4(ip) if ip.is_loopback() => true, - IpAddr::V6(ip) if ip.is_loopback() => true, - _ => false, - }; - let contract_web_path = std::env::temp_dir().join("freenet").join("webs"); - std::fs::create_dir_all(contract_web_path).unwrap(); - - let (proxy_request_sender, request_to_server) = mpsc::channel(1); - - let config = Config { localhost }; - - let router = Router::new() - .route("/", get(home)) - .route("/contract/web/:key/", get(web_home)) - .with_state(config) - .route("/contract/web/:key/*path", get(web_subpages)) - .layer(Extension(HttpGatewayRequest(proxy_request_sender))); - - ( - Self { - proxy_server_request: request_to_server, - attested_contracts: HashMap::new(), - response_channels: HashMap::new(), - }, - router, - ) + Self::as_router_v1(socket) } } -async fn home() -> axum::response::Response { - axum::response::Response::default() -} - -async fn web_home( - Path(key): Path, - Extension(rs): Extension, - axum::extract::State(config): axum::extract::State, -) -> Result { - use headers::{Header, HeaderMapExt}; - - let domain = config - .localhost - .then_some("localhost") - .expect("non-local connections not supported yet"); - let token = AuthToken::generate(); - - let auth_header = headers::Authorization::::name().to_string(); - let cookie = cookie::Cookie::build((auth_header, format!("Bearer {}", token.as_str()))) - .domain(domain) - .path(format!("/contract/web/{key}")) - .same_site(cookie::SameSite::Strict) - .max_age(cookie::time::Duration::days(1)) - .secure(!config.localhost) - .http_only(false) - .build(); - - let token_header = headers::Authorization::bearer(token.as_str()).unwrap(); - let contract_idx = path_handlers::contract_home(key, rs, token).await?; - let mut response = contract_idx.into_response(); - response.headers_mut().typed_insert(token_header); - response.headers_mut().insert( - headers::SetCookie::name(), - headers::HeaderValue::from_str(&cookie.to_string()).unwrap(), - ); - - Ok(response) +#[derive(Clone)] +struct Config { + localhost: bool, } -async fn web_subpages( - Path((key, last_path)): Path<(String, String)>, -) -> Result { - let full_path: String = format!("/contract/web/{}/{}", key, last_path); - path_handlers::variable_content(key, full_path) - .await - .map_err(|e| *e) - .map(|r| r.into_response()) +async fn home() -> axum::response::Response { + axum::response::Response::default() } impl ClientEventsProxy for HttpGateway { diff --git a/crates/core/src/server/http_gateway/v1.rs b/crates/core/src/server/http_gateway/v1.rs new file mode 100644 index 000000000..3998b7f68 --- /dev/null +++ b/crates/core/src/server/http_gateway/v1.rs @@ -0,0 +1,79 @@ +use super::*; + +impl HttpGateway { + /// Returns the uninitialized axum router to compose with other routing handling or websockets. + pub fn as_router_v1(socket: &SocketAddr) -> (Self, Router) { + let localhost = match socket.ip() { + IpAddr::V4(ip) if ip.is_loopback() => true, + IpAddr::V6(ip) if ip.is_loopback() => true, + _ => false, + }; + let contract_web_path = std::env::temp_dir().join("freenet").join("webs"); + std::fs::create_dir_all(contract_web_path).unwrap(); + + let (proxy_request_sender, request_to_server) = mpsc::channel(1); + + let config = Config { localhost }; + + let router = Router::new() + .route("/v1", get(home)) + .route("/v1/contract/web/:key/", get(web_home)) + .with_state(config) + .route("/v1/contract/web/:key/*path", get(web_subpages)) + .layer(Extension(HttpGatewayRequest(proxy_request_sender))); + + ( + Self { + proxy_server_request: request_to_server, + attested_contracts: HashMap::new(), + response_channels: HashMap::new(), + }, + router, + ) + } +} + +async fn web_home( + Path(key): Path, + Extension(rs): Extension, + axum::extract::State(config): axum::extract::State, +) -> Result { + use headers::{Header, HeaderMapExt}; + + let domain = config + .localhost + .then_some("localhost") + .expect("non-local connections not supported yet"); + let token = AuthToken::generate(); + + let auth_header = headers::Authorization::::name().to_string(); + let cookie = cookie::Cookie::build((auth_header, format!("Bearer {}", token.as_str()))) + .domain(domain) + .path(format!("/v1/contract/web/{key}")) + .same_site(cookie::SameSite::Strict) + .max_age(cookie::time::Duration::days(1)) + .secure(!config.localhost) + .http_only(false) + .build(); + + let token_header = headers::Authorization::bearer(token.as_str()).unwrap(); + let contract_idx = path_handlers::contract_home(key, rs, token).await?; + let mut response = contract_idx.into_response(); + response.headers_mut().typed_insert(token_header); + response.headers_mut().insert( + headers::SetCookie::name(), + headers::HeaderValue::from_str(&cookie.to_string()).unwrap(), + ); + + Ok(response) +} + +async fn web_subpages( + Path((key, last_path)): Path<(String, String)>, +) -> Result { + let full_path: String = format!("/v1/contract/web/{}/{}", key, last_path); + path_handlers::variable_content(key, full_path) + .await + .map_err(|e| *e) + .map(|r| r.into_response()) +} diff --git a/crates/core/src/server/path_handlers.rs b/crates/core/src/server/path_handlers.rs index 094913f01..be9e451ee 100644 --- a/crates/core/src/server/path_handlers.rs +++ b/crates/core/src/server/path_handlers.rs @@ -18,6 +18,8 @@ use super::{ ClientConnection, HostCallbackResult, }; +mod v1; + const ALPHABET: &str = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"; pub(super) async fn contract_home( @@ -214,33 +216,5 @@ fn contract_web_path(key: &ContractKey) -> PathBuf { #[inline] fn get_file_path(uri: axum::http::Uri) -> Result> { - let p = uri.path().strip_prefix("/contract/").ok_or_else(|| { - Box::new(WebSocketApiError::InvalidParam { - error_cause: format!("{uri} not valid"), - }) - })?; - let path = p - .chars() - .skip_while(|c| ALPHABET.contains(*c)) - .skip_while(|c| c == &'/') - .skip_while(|c| ALPHABET.contains(*c)) - .skip_while(|c| c == &'/') - .collect::(); - Ok(path) -} - -#[test] -fn get_path() { - let req_path = "/contract/HjpgVdSziPUmxFoBgTdMkQ8xiwhXdv1qn5ouQvSaApzD/web/state.html"; - let base_dir = - PathBuf::from("/tmp/freenet/webs/HjpgVdSziPUmxFoBgTdMkQ8xiwhXdv1qn5ouQvSaApzD/web/"); - let uri: axum::http::Uri = req_path.parse().unwrap(); - let parsed = get_file_path(uri).unwrap(); - let result = base_dir.join(parsed); - assert_eq!( - std::path::PathBuf::from( - "/tmp/freenet/webs/HjpgVdSziPUmxFoBgTdMkQ8xiwhXdv1qn5ouQvSaApzD/web/state.html" - ), - result - ); + v1::get_file_path(uri) } diff --git a/crates/core/src/server/path_handlers/v1.rs b/crates/core/src/server/path_handlers/v1.rs new file mode 100644 index 000000000..e8057e3fc --- /dev/null +++ b/crates/core/src/server/path_handlers/v1.rs @@ -0,0 +1,34 @@ +use super::*; + +#[inline] +pub(super) fn get_file_path(uri: axum::http::Uri) -> Result> { + let p = uri.path().strip_prefix("/v1/contract/").ok_or_else(|| { + Box::new(WebSocketApiError::InvalidParam { + error_cause: format!("{uri} not valid"), + }) + })?; + let path = p + .chars() + .skip_while(|c| ALPHABET.contains(*c)) + .skip_while(|c| c == &'/') + .skip_while(|c| ALPHABET.contains(*c)) + .skip_while(|c| c == &'/') + .collect::(); + Ok(path) +} + +#[test] +pub(super) fn get_path() { + let req_path = "/v1/contract/HjpgVdSziPUmxFoBgTdMkQ8xiwhXdv1qn5ouQvSaApzD/web/state.html"; + let base_dir = + PathBuf::from("/tmp/freenet/webs/HjpgVdSziPUmxFoBgTdMkQ8xiwhXdv1qn5ouQvSaApzD/web/"); + let uri: axum::http::Uri = req_path.parse().unwrap(); + let parsed = get_file_path(uri).unwrap(); + let result = base_dir.join(parsed); + assert_eq!( + std::path::PathBuf::from( + "/tmp/freenet/webs/HjpgVdSziPUmxFoBgTdMkQ8xiwhXdv1qn5ouQvSaApzD/web/state.html" + ), + result + ); +} diff --git a/crates/fdev/src/commands.rs b/crates/fdev/src/commands.rs index 8afc9fa1e..ec12c01a8 100644 --- a/crates/fdev/src/commands.rs +++ b/crates/fdev/src/commands.rs @@ -13,6 +13,8 @@ use freenet_stdlib::{ use crate::config::{BaseConfig, PutConfig, UpdateConfig}; +mod v1; + #[derive(Debug, Clone, clap::Subcommand)] pub(crate) enum PutType { /// Puts a new contract @@ -149,32 +151,5 @@ async fn execute_command( address: IpAddr, port: u16, ) -> Result<(), anyhow::Error> { - let mode = other.mode; - - let target = match mode { - OperationMode::Local => { - if !address.is_loopback() { - return Err(anyhow::anyhow!( - "invalid ip: {address}, expecting a loopback ip address in local mode" - )); - } - SocketAddr::new(address, port) - } - OperationMode::Network => SocketAddr::new(address, port), - }; - - let (stream, _) = tokio_tungstenite::connect_async(&format!( - "ws://{}/contract/command?encodingProtocol=native", - target - )) - .await - .map_err(|e| { - tracing::error!(err=%e); - anyhow::anyhow!(format!("fail to connect to the host({target}): {e}")) - })?; - - WebApi::start(stream) - .send(request) - .await - .map_err(Into::into) + v1::execute_command(request, other, address, port).await } diff --git a/crates/fdev/src/commands/v1.rs b/crates/fdev/src/commands/v1.rs new file mode 100644 index 000000000..62a31be25 --- /dev/null +++ b/crates/fdev/src/commands/v1.rs @@ -0,0 +1,37 @@ +use super::*; + +pub(super) async fn execute_command( + request: ClientRequest<'static>, + other: BaseConfig, + address: IpAddr, + port: u16, +) -> Result<(), anyhow::Error> { + let mode = other.mode; + + let target = match mode { + OperationMode::Local => { + if !address.is_loopback() { + return Err(anyhow::anyhow!( + "invalid ip: {address}, expecting a loopback ip address in local mode" + )); + } + SocketAddr::new(address, port) + } + OperationMode::Network => SocketAddr::new(address, port), + }; + + let (stream, _) = tokio_tungstenite::connect_async(&format!( + "ws://{}/v1/contract/command?encodingProtocol=native", + target + )) + .await + .map_err(|e| { + tracing::error!(err=%e); + anyhow::anyhow!(format!("fail to connect to the host({target}): {e}")) + })?; + + WebApi::start(stream) + .send(request) + .await + .map_err(Into::into) +} diff --git a/crates/fdev/src/network_metrics_server.rs b/crates/fdev/src/network_metrics_server.rs index e001d504b..0dc30d5aa 100644 --- a/crates/fdev/src/network_metrics_server.rs +++ b/crates/fdev/src/network_metrics_server.rs @@ -21,6 +21,8 @@ use freenet::{ use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; +mod v1; + /// Network metrics server. Records metrics and data from a test network that can be used for /// analysis and visualization. #[derive(clap::Parser, Clone)] @@ -64,37 +66,7 @@ async fn run_server( barrier: Arc, changes: tokio::sync::broadcast::Sender, ) -> anyhow::Result<()> { - const DEFAULT_PORT: u16 = 55010; - - let port = std::env::var("FDEV_NETWORK_METRICS_SERVER_PORT") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(DEFAULT_PORT); - - let router = Router::new() - .route("/", get(home)) - .route("/push-stats/", get(push_stats)) - .route("/pull-stats/peer-changes/", get(pull_peer_changes)) - .with_state(Arc::new(ServerState { - changes, - peer_data: DashMap::new(), - transactions_data: DashMap::new(), - })); - - tracing::info!("Starting metrics server on port {port}"); - barrier.wait().await; - let listener = tokio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, port)).await?; - axum::serve(listener, router).await?; - Ok(()) -} - -async fn home() -> Response { - Response::builder() - .status(StatusCode::FOUND) - .header("Location", "/pull-stats/") - .body(Body::empty()) - .expect("should be valid response") - .into_response() + v1::run_server(barrier, changes).await } async fn push_stats( diff --git a/crates/fdev/src/network_metrics_server/v1.rs b/crates/fdev/src/network_metrics_server/v1.rs new file mode 100644 index 000000000..a7de9c1ba --- /dev/null +++ b/crates/fdev/src/network_metrics_server/v1.rs @@ -0,0 +1,38 @@ +use super::*; + +pub(super) async fn run_server( + barrier: Arc, + changes: tokio::sync::broadcast::Sender, +) -> anyhow::Result<()> { + const DEFAULT_PORT: u16 = 55010; + + let port = std::env::var("FDEV_NETWORK_METRICS_SERVER_PORT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_PORT); + + let router = Router::new() + .route("/v1", get(home)) + .route("/v1/push-stats/", get(push_stats)) + .route("/v1/pull-stats/peer-changes/", get(pull_peer_changes)) + .with_state(Arc::new(ServerState { + changes, + peer_data: DashMap::new(), + transactions_data: DashMap::new(), + })); + + tracing::info!("Starting metrics server on port {port}"); + barrier.wait().await; + let listener = tokio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, port)).await?; + axum::serve(listener, router).await?; + Ok(()) +} + +async fn home() -> Response { + Response::builder() + .status(StatusCode::FOUND) + .header("Location", "/v1/pull-stats/") + .body(Body::empty()) + .expect("should be valid response") + .into_response() +} diff --git a/crates/fdev/src/testing/network.rs b/crates/fdev/src/testing/network.rs index c2d4304cc..789297218 100644 --- a/crates/fdev/src/testing/network.rs +++ b/crates/fdev/src/testing/network.rs @@ -38,6 +38,8 @@ use tokio::{ use super::{Error, TestConfig}; +mod v1; + #[derive(Debug, Error)] pub enum NetworkSimulationError { #[error("Server start failed: {0}")] @@ -208,44 +210,7 @@ pub(super) async fn run( } pub async fn start_server(supervisor: Arc) -> Result<(), NetworkSimulationError> { - let (startup_sender, startup_receiver) = oneshot::channel(); - let peers_config = supervisor.peers_config.clone(); - - let cloned_supervisor = supervisor.clone(); - - let router = Router::new() - .route("/ws", get(|ws| ws_handler(ws, cloned_supervisor))) - .route( - "/config/:peer_id", - get(|path: Path| config_handler(peers_config, path)), - ); - - let socket = SocketAddr::from(([0, 0, 0, 0], 3000)); - - tokio::spawn(async move { - tracing::info!("Supervisor running on {}", socket); - let listener = tokio::net::TcpListener::bind(socket).await.map_err(|_| { - NetworkSimulationError::ServerStartFailure("Failed to bind TCP listener".into()) - })?; - - if startup_sender.send(()).is_err() { - tracing::error!("Failed to send startup signal"); - return Err(NetworkSimulationError::ServerStartFailure( - "Failed to send startup signal".into(), - )); - } - - axum::serve(listener, router) - .await - .map_err(|e| NetworkSimulationError::ServerStartFailure(format!("Server error: {}", e))) - }); - - startup_receiver - .await - .map_err(|_| NetworkSimulationError::ServerStartFailure("Server startup failed".into()))?; - - tracing::info!("Server started successfully"); - Ok(()) + v1::start_server_v1(supervisor).await } pub async fn run_network( diff --git a/crates/fdev/src/testing/network/v1.rs b/crates/fdev/src/testing/network/v1.rs new file mode 100644 index 000000000..54afb50af --- /dev/null +++ b/crates/fdev/src/testing/network/v1.rs @@ -0,0 +1,42 @@ +use super::*; + +pub async fn start_server_v1(supervisor: Arc) -> Result<(), NetworkSimulationError> { + let (startup_sender, startup_receiver) = oneshot::channel(); + let peers_config = supervisor.peers_config.clone(); + + let cloned_supervisor = supervisor.clone(); + + let router = Router::new() + .route("/v1/ws", get(|ws| ws_handler(ws, cloned_supervisor))) + .route( + "/v1/config/:peer_id", + get(|path: Path| config_handler(peers_config, path)), + ); + + let socket = SocketAddr::from(([0, 0, 0, 0], 3000)); + + tokio::spawn(async move { + tracing::info!("Supervisor running on {}", socket); + let listener = tokio::net::TcpListener::bind(socket).await.map_err(|_| { + NetworkSimulationError::ServerStartFailure("Failed to bind TCP listener".into()) + })?; + + if startup_sender.send(()).is_err() { + tracing::error!("Failed to send startup signal"); + return Err(NetworkSimulationError::ServerStartFailure( + "Failed to send startup signal".into(), + )); + } + + axum::serve(listener, router) + .await + .map_err(|e| NetworkSimulationError::ServerStartFailure(format!("Server error: {}", e))) + }); + + startup_receiver + .await + .map_err(|_| NetworkSimulationError::ServerStartFailure("Server startup failed".into()))?; + + tracing::info!("Server started successfully"); + Ok(()) +} diff --git a/crates/fdev/src/wasm_runtime/state.rs b/crates/fdev/src/wasm_runtime/state.rs index 20081960e..7c46b1e9e 100644 --- a/crates/fdev/src/wasm_runtime/state.rs +++ b/crates/fdev/src/wasm_runtime/state.rs @@ -7,6 +7,8 @@ use crate::wasm_runtime::DeserializationFmt; use super::ExecutorConfig; +mod v1; + #[derive(Clone)] pub(super) struct AppState { pub(crate) local_node: Arc>, @@ -15,21 +17,7 @@ pub(super) struct AppState { impl AppState { pub async fn new(config: &ExecutorConfig) -> Result { - let target: SocketAddr = (config.address, config.port).into(); - let (stream, _) = tokio_tungstenite::connect_async(&format!( - "ws://{}/contract/command?encodingProtocol=native", - target - )) - .await - .map_err(|e| { - tracing::error!(err=%e); - anyhow::anyhow!(format!("fail to connect to the host({target}): {e}")) - })?; - - Ok(AppState { - local_node: Arc::new(RwLock::new(WebApi::start(stream))), - config: config.clone(), - }) + AppState::new_v1(config).await } pub fn printout_deser + ?Sized>(&self, data: &R) -> Result<(), std::io::Error> { diff --git a/crates/fdev/src/wasm_runtime/state/v1.rs b/crates/fdev/src/wasm_runtime/state/v1.rs new file mode 100644 index 000000000..d4cb51173 --- /dev/null +++ b/crates/fdev/src/wasm_runtime/state/v1.rs @@ -0,0 +1,21 @@ +use super::*; + +impl AppState { + pub async fn new_v1(config: &ExecutorConfig) -> Result { + let target: SocketAddr = (config.address, config.port).into(); + let (stream, _) = tokio_tungstenite::connect_async(&format!( + "ws://{}/v1/contract/command?encodingProtocol=native", + target + )) + .await + .map_err(|e| { + tracing::error!(err=%e); + anyhow::anyhow!(format!("fail to connect to the host({target}): {e}")) + })?; + + Ok(AppState { + local_node: Arc::new(RwLock::new(WebApi::start(stream))), + config: config.clone(), + }) + } +}