From 7b16c34dcb445da91301a495cc3d56b9d6b2ae2f Mon Sep 17 00:00:00 2001 From: Al Liu Date: Wed, 12 Jun 2024 22:11:41 +0800 Subject: [PATCH] refactor `run_network_node` (#1130) --- crates/core/src/bin/freenet.rs | 27 ++++- crates/core/src/config.rs | 9 ++ crates/core/src/lib.rs | 1 + crates/core/src/node.rs | 149 ++++++++++++++++++++++++- crates/core/src/node/p2p_impl.rs | 6 +- crates/core/src/ring.rs | 4 +- crates/core/src/server.rs | 77 +++---------- crates/core/src/server/http_gateway.rs | 2 +- 8 files changed, 202 insertions(+), 73 deletions(-) diff --git a/crates/core/src/bin/freenet.rs b/crates/core/src/bin/freenet.rs index 3f27bb661..5813668f1 100644 --- a/crates/core/src/bin/freenet.rs +++ b/crates/core/src/bin/freenet.rs @@ -1,10 +1,13 @@ +use anyhow::Context; use clap::Parser; use freenet::{ config::{Config, ConfigArgs}, + dev_tool::NodeConfig, local_node::{Executor, OperationMode}, - server::{local_node::run_local_node, network_node::run_network_node}, + run_local_node, run_network_node, + server::serve_gateway, }; -use std::{net::SocketAddr, sync::Arc}; +use std::sync::Arc; async fn run(config: Config) -> anyhow::Result<()> { match config.mode { @@ -15,13 +18,12 @@ async fn run(config: Config) -> anyhow::Result<()> { async fn run_local(config: Config) -> anyhow::Result<()> { tracing::info!("Starting freenet node in local mode"); - let port = config.ws_api.port; - let ip = config.ws_api.address; + let socket = config.ws_api; + let executor = Executor::from_config(Arc::new(config), None) .await .map_err(anyhow::Error::msg)?; - let socket: SocketAddr = (ip, port).into(); run_local_node(executor, socket) .await .map_err(anyhow::Error::msg) @@ -29,7 +31,20 @@ async fn run_local(config: Config) -> anyhow::Result<()> { async fn run_network(config: Config) -> anyhow::Result<()> { tracing::info!("Starting freenet node in network mode"); - run_network_node(config).await + + let clients = serve_gateway(config.ws_api).await; + tracing::info!("Initializing node configuration"); + + let node_config = NodeConfig::new(config) + .await + .with_context(|| "failed while loading node config")?; + + let node = node_config + .build(clients) + .await + .with_context(|| "failed while building the node")?; + + run_network_node(node).await } fn main() -> anyhow::Result<()> { diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 7d5ff083d..1e8f3498e 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -457,6 +457,15 @@ pub struct WebsocketApiConfig { pub port: u16, } +impl From for WebsocketApiConfig { + fn from(addr: SocketAddr) -> Self { + Self { + address: addr.ip(), + port: addr.port(), + } + } +} + impl Default for WebsocketApiConfig { #[inline] fn default() -> Self { diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5be996f88..73beee628 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -10,6 +10,7 @@ pub mod generated; mod message; /// Node configuration, implementations and execution (entry points for the binaries). mod node; +pub use node::{run_local_node, run_network_node}; /// Network operation/transaction state machines. mod operations; /// Ring connections and routing. diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 94992dab6..adc7bdfc6 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -32,11 +32,12 @@ use tracing::Instrument; use self::p2p_impl::NodeP2P; use crate::{ client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest}, - config::{Address, GatewayConfig, GlobalExecutor}, + config::{Address, GatewayConfig, GlobalExecutor, WebsocketApiConfig}, contract::{ - Callback, ClientResponsesReceiver, ClientResponsesSender, ContractError, + Callback, ClientResponsesReceiver, ClientResponsesSender, ContractError, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler, }, + local_node::Executor, message::{NetMessage, NodeEvent, Transaction, TransactionType}, operations::{ connect::{self, ConnectOp}, @@ -1016,6 +1017,150 @@ impl Display for PeerId { } } +pub async fn run_local_node( + mut executor: Executor, + socket: WebsocketApiConfig, +) -> anyhow::Result<()> { + match socket.address { + IpAddr::V4(ip) if !ip.is_loopback() => { + anyhow::bail!("invalid ip: {ip}, expecting localhost") + } + IpAddr::V6(ip) if !ip.is_loopback() => { + anyhow::bail!("invalid ip: {ip}, expecting localhost") + } + _ => {} + } + + let (mut gw, mut ws_proxy) = crate::server::serve_gateway_in(socket).await; + + // TODO: use combinator instead + // let mut all_clients = + // ClientEventsCombinator::new([Box::new(ws_handle), Box::new(http_handle)]); + enum Receiver { + Ws, + Gw, + } + let mut receiver; + loop { + let req = tokio::select! { + req = ws_proxy.recv() => { + receiver = Receiver::Ws; + req? + } + req = gw.recv() => { + receiver = Receiver::Gw; + req? + } + }; + let OpenRequest { + client_id: id, + request, + notification_channel, + token, + .. + } = req; + tracing::trace!(cli_id = %id, "got request -> {request}"); + + let res = match *request { + ClientRequest::ContractOp(op) => { + executor + .contract_requests(op, id, notification_channel) + .await + } + ClientRequest::DelegateOp(op) => { + let attested_contract = + token.and_then(|token| gw.attested_contracts.get(&token).map(|(t, _)| t)); + executor.delegate_request(op, attested_contract) + } + ClientRequest::Disconnect { cause } => { + if let Some(cause) = cause { + tracing::info!("disconnecting cause: {cause}"); + } + // fixme: token must live for a bit to allow reconnections + if let Some(rm_token) = gw + .attested_contracts + .iter() + .find_map(|(k, (_, eid))| (eid == &id).then(|| k.clone())) + { + gw.attested_contracts.remove(&rm_token); + } + continue; + } + _ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))), + }; + + match res { + Ok(res) => { + match receiver { + Receiver::Ws => ws_proxy.send(id, Ok(res)).await?, + Receiver::Gw => gw.send(id, Ok(res)).await?, + }; + } + Err(err) if err.is_request() => { + let err = ErrorKind::RequestError(err.unwrap_request()); + match receiver { + Receiver::Ws => { + ws_proxy.send(id, Err(err.into())).await?; + } + Receiver::Gw => { + gw.send(id, Err(err.into())).await?; + } + }; + } + Err(err) => { + tracing::error!("{err}"); + let err = Err(ErrorKind::Unhandled { + cause: format!("{err}").into(), + } + .into()); + match receiver { + Receiver::Ws => { + ws_proxy.send(id, err).await?; + } + Receiver::Gw => { + gw.send(id, err).await?; + } + }; + } + } + } +} + +pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> { + tracing::info!("Starting node"); + + let is_gateway = node.0.is_gateway; + let location = is_gateway + .then(|| { + node.0 + .peer_id + .clone() + .map(|id| Location::from_address(&id.addr())) + }) + .flatten(); + + if let Some(location) = location { + tracing::info!("Setting initial location: {location}"); + node.update_location(location); + } + + match node.run().await { + Ok(_) => { + if is_gateway { + tracing::info!("Gateway finished"); + } else { + tracing::info!("Node finished"); + } + + Ok(()) + } + Err(e) => { + tracing::error!("{e}"); + Err(e) + } + } +} + #[cfg(test)] mod tests { use std::net::{Ipv4Addr, Ipv6Addr}; diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index 887191f0f..54f27716c 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -7,7 +7,7 @@ use super::{ network_bridge::{ event_loop_notification_channel, p2p_protoc::P2pConnManager, EventLoopNotificationsReceiver, }, - NetEventRegister, + NetEventRegister, PeerId, }; use crate::transport::TransportPublicKey; use crate::{ @@ -34,6 +34,8 @@ pub(crate) struct NodeP2P { cli_response_sender: ClientResponsesSender, node_controller: tokio::sync::mpsc::Receiver, should_try_connect: bool, + pub(super) peer_id: Option, + pub(super) is_gateway: bool, } impl NodeP2P { @@ -118,6 +120,8 @@ impl NodeP2P { cli_response_sender, node_controller: node_controller_rx, should_try_connect: config.should_connect, + peer_id: config.peer_id, + is_gateway: config.is_gateway, }) } } diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index 4e36911ed..c124cbb2d 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -615,9 +615,7 @@ impl Ring { ) -> Option { let connections = self.connections_by_location.read(); let peers = connections.values().filter_map(|conns| { - let Some(conn) = conns.choose(&mut rand::thread_rng()) else { - return None; - }; + let conn = conns.choose(&mut rand::thread_rng())?; if let Some(requester) = requesting { if requester == &conn.location.peer { return None; diff --git a/crates/core/src/server.rs b/crates/core/src/server.rs index 559496af1..dc1ea7512 100644 --- a/crates/core/src/server.rs +++ b/crates/core/src/server.rs @@ -10,7 +10,13 @@ use freenet_stdlib::{ prelude::*, }; -use crate::client_events::{AuthToken, ClientId, HostResult}; +use http_gateway::HttpGateway; +use tower_http::trace::TraceLayer; + +use crate::{ + client_events::{websocket::WebSocketProxy, AuthToken, BoxedClient, ClientId, HostResult}, + config::WebsocketApiConfig, +}; pub use app_packaging::WebApp; @@ -175,64 +181,15 @@ pub mod local_node { } } -pub mod network_node { - use anyhow::Context; - use tower_http::trace::TraceLayer; - - use crate::{ - client_events::websocket::WebSocketProxy, - config::Config, - dev_tool::{Location, NodeConfig}, - }; - - use super::{http_gateway::HttpGateway, serve}; - - pub async fn run_network_node(config: Config) -> anyhow::Result<()> { - let ws_socket = (config.ws_api.address, config.ws_api.port).into(); - let (gw, gw_router) = HttpGateway::as_router(&ws_socket); - let (ws_proxy, ws_router) = WebSocketProxy::as_router(gw_router); - serve(ws_socket, ws_router.layer(TraceLayer::new_for_http())); - - tracing::info!("Initializing node configuration"); - - let node_config = NodeConfig::new(config) - .await - .with_context(|| "failed while loading node config")?; - let is_gateway = node_config.is_gateway; - let location = is_gateway - .then(|| { - node_config - .peer_id - .clone() - .map(|id| Location::from_address(&id.addr())) - }) - .flatten(); - let mut node = node_config - .build([Box::new(gw), Box::new(ws_proxy)]) - .await - .with_context(|| "failed while building the node")?; - - if let Some(location) = location { - tracing::info!("Setting initial location: {location}"); - node.update_location(location); - } - - tracing::info!("Starting node"); - - match node.run().await { - Ok(_) => { - if is_gateway { - tracing::info!("Gateway finished"); - } else { - tracing::info!("Node finished"); - } +pub async fn serve_gateway(config: WebsocketApiConfig) -> [BoxedClient; 2] { + let (gw, ws_proxy) = serve_gateway_in(config).await; + [Box::new(gw), Box::new(ws_proxy)] +} - Ok(()) - } - Err(e) => { - tracing::error!("{e}"); - Err(e) - } - } - } +pub(crate) async fn serve_gateway_in(config: WebsocketApiConfig) -> (HttpGateway, WebSocketProxy) { + let ws_socket = (config.address, config.port).into(); + let (gw, gw_router) = HttpGateway::as_router(&ws_socket); + let (ws_proxy, ws_router) = WebSocketProxy::as_router(gw_router); + serve(ws_socket, ws_router.layer(TraceLayer::new_for_http())); + (gw, ws_proxy) } diff --git a/crates/core/src/server/http_gateway.rs b/crates/core/src/server/http_gateway.rs index 0d2bed0c5..c0b247a0a 100644 --- a/crates/core/src/server/http_gateway.rs +++ b/crates/core/src/server/http_gateway.rs @@ -35,7 +35,7 @@ impl std::ops::Deref for HttpGatewayRequest { /// [specification](https://docs.freenet.org/glossary.html#container-contract) for Locutus. /// /// Check the Locutus book for [more information](https://docs.freenet.org/dev-guide.html). -pub(super) struct HttpGateway { +pub(crate) struct HttpGateway { pub attested_contracts: HashMap, proxy_server_request: mpsc::Receiver, response_channels: HashMap>,