Skip to content

Commit

Permalink
refactor run_network_node (#1130)
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n authored Jun 12, 2024
1 parent 7d5d65f commit 7b16c34
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 73 deletions.
27 changes: 21 additions & 6 deletions crates/core/src/bin/freenet.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use anyhow::Context;
use clap::Parser;
use freenet::{
config::{Config, ConfigArgs},
dev_tool::NodeConfig,
local_node::{Executor, OperationMode},
server::{local_node::run_local_node, network_node::run_network_node},
run_local_node, run_network_node,
server::serve_gateway,
};
use std::{net::SocketAddr, sync::Arc};
use std::sync::Arc;

async fn run(config: Config) -> anyhow::Result<()> {
match config.mode {
Expand All @@ -15,21 +18,33 @@ async fn run(config: Config) -> anyhow::Result<()> {

async fn run_local(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in local mode");
let port = config.ws_api.port;
let ip = config.ws_api.address;
let socket = config.ws_api;

let executor = Executor::from_config(Arc::new(config), None)
.await
.map_err(anyhow::Error::msg)?;

let socket: SocketAddr = (ip, port).into();
run_local_node(executor, socket)
.await
.map_err(anyhow::Error::msg)
}

async fn run_network(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in network mode");
run_network_node(config).await

let clients = serve_gateway(config.ws_api).await;
tracing::info!("Initializing node configuration");

let node_config = NodeConfig::new(config)
.await
.with_context(|| "failed while loading node config")?;

let node = node_config
.build(clients)
.await
.with_context(|| "failed while building the node")?;

run_network_node(node).await
}

fn main() -> anyhow::Result<()> {
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,15 @@ pub struct WebsocketApiConfig {
pub port: u16,
}

impl From<SocketAddr> for WebsocketApiConfig {
fn from(addr: SocketAddr) -> Self {
Self {
address: addr.ip(),
port: addr.port(),
}
}
}

impl Default for WebsocketApiConfig {
#[inline]
fn default() -> Self {
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod generated;
mod message;
/// Node configuration, implementations and execution (entry points for the binaries).
mod node;
pub use node::{run_local_node, run_network_node};
/// Network operation/transaction state machines.
mod operations;
/// Ring connections and routing.
Expand Down
149 changes: 147 additions & 2 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use tracing::Instrument;
use self::p2p_impl::NodeP2P;
use crate::{
client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest},
config::{Address, GatewayConfig, GlobalExecutor},
config::{Address, GatewayConfig, GlobalExecutor, WebsocketApiConfig},
contract::{
Callback, ClientResponsesReceiver, ClientResponsesSender, ContractError,
Callback, ClientResponsesReceiver, ClientResponsesSender, ContractError, ExecutorError,
ExecutorToEventLoopChannel, NetworkContractHandler,
},
local_node::Executor,
message::{NetMessage, NodeEvent, Transaction, TransactionType},
operations::{
connect::{self, ConnectOp},
Expand Down Expand Up @@ -1016,6 +1017,150 @@ impl Display for PeerId {
}
}

pub async fn run_local_node(
mut executor: Executor,
socket: WebsocketApiConfig,
) -> anyhow::Result<()> {
match socket.address {
IpAddr::V4(ip) if !ip.is_loopback() => {
anyhow::bail!("invalid ip: {ip}, expecting localhost")
}
IpAddr::V6(ip) if !ip.is_loopback() => {
anyhow::bail!("invalid ip: {ip}, expecting localhost")
}
_ => {}
}

let (mut gw, mut ws_proxy) = crate::server::serve_gateway_in(socket).await;

// TODO: use combinator instead
// let mut all_clients =
// ClientEventsCombinator::new([Box::new(ws_handle), Box::new(http_handle)]);
enum Receiver {
Ws,
Gw,
}
let mut receiver;
loop {
let req = tokio::select! {
req = ws_proxy.recv() => {
receiver = Receiver::Ws;
req?
}
req = gw.recv() => {
receiver = Receiver::Gw;
req?
}
};
let OpenRequest {
client_id: id,
request,
notification_channel,
token,
..
} = req;
tracing::trace!(cli_id = %id, "got request -> {request}");

let res = match *request {
ClientRequest::ContractOp(op) => {
executor
.contract_requests(op, id, notification_channel)
.await
}
ClientRequest::DelegateOp(op) => {
let attested_contract =
token.and_then(|token| gw.attested_contracts.get(&token).map(|(t, _)| t));
executor.delegate_request(op, attested_contract)
}
ClientRequest::Disconnect { cause } => {
if let Some(cause) = cause {
tracing::info!("disconnecting cause: {cause}");
}
// fixme: token must live for a bit to allow reconnections
if let Some(rm_token) = gw
.attested_contracts
.iter()
.find_map(|(k, (_, eid))| (eid == &id).then(|| k.clone()))
{
gw.attested_contracts.remove(&rm_token);
}
continue;
}
_ => Err(ExecutorError::other(anyhow::anyhow!("not supported"))),
};

match res {
Ok(res) => {
match receiver {
Receiver::Ws => ws_proxy.send(id, Ok(res)).await?,
Receiver::Gw => gw.send(id, Ok(res)).await?,
};
}
Err(err) if err.is_request() => {
let err = ErrorKind::RequestError(err.unwrap_request());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, Err(err.into())).await?;
}
Receiver::Gw => {
gw.send(id, Err(err.into())).await?;
}
};
}
Err(err) => {
tracing::error!("{err}");
let err = Err(ErrorKind::Unhandled {
cause: format!("{err}").into(),
}
.into());
match receiver {
Receiver::Ws => {
ws_proxy.send(id, err).await?;
}
Receiver::Gw => {
gw.send(id, err).await?;
}
};
}
}
}
}

pub async fn run_network_node(mut node: Node) -> anyhow::Result<()> {
tracing::info!("Starting node");

let is_gateway = node.0.is_gateway;
let location = is_gateway
.then(|| {
node.0
.peer_id
.clone()
.map(|id| Location::from_address(&id.addr()))
})
.flatten();

if let Some(location) = location {
tracing::info!("Setting initial location: {location}");
node.update_location(location);
}

match node.run().await {
Ok(_) => {
if is_gateway {
tracing::info!("Gateway finished");
} else {
tracing::info!("Node finished");
}

Ok(())
}
Err(e) => {
tracing::error!("{e}");
Err(e)
}
}
}

#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/node/p2p_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
network_bridge::{
event_loop_notification_channel, p2p_protoc::P2pConnManager, EventLoopNotificationsReceiver,
},
NetEventRegister,
NetEventRegister, PeerId,
};
use crate::transport::TransportPublicKey;
use crate::{
Expand All @@ -34,6 +34,8 @@ pub(crate) struct NodeP2P {
cli_response_sender: ClientResponsesSender,
node_controller: tokio::sync::mpsc::Receiver<NodeEvent>,
should_try_connect: bool,
pub(super) peer_id: Option<PeerId>,
pub(super) is_gateway: bool,
}

impl NodeP2P {
Expand Down Expand Up @@ -118,6 +120,8 @@ impl NodeP2P {
cli_response_sender,
node_controller: node_controller_rx,
should_try_connect: config.should_connect,
peer_id: config.peer_id,
is_gateway: config.is_gateway,
})
}
}
4 changes: 1 addition & 3 deletions crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,7 @@ impl Ring {
) -> Option<PeerKeyLocation> {
let connections = self.connections_by_location.read();
let peers = connections.values().filter_map(|conns| {
let Some(conn) = conns.choose(&mut rand::thread_rng()) else {
return None;
};
let conn = conns.choose(&mut rand::thread_rng())?;
if let Some(requester) = requesting {
if requester == &conn.location.peer {
return None;
Expand Down
77 changes: 17 additions & 60 deletions crates/core/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ use freenet_stdlib::{
prelude::*,
};

use crate::client_events::{AuthToken, ClientId, HostResult};
use http_gateway::HttpGateway;
use tower_http::trace::TraceLayer;

use crate::{
client_events::{websocket::WebSocketProxy, AuthToken, BoxedClient, ClientId, HostResult},
config::WebsocketApiConfig,
};

pub use app_packaging::WebApp;

Expand Down Expand Up @@ -175,64 +181,15 @@ pub mod local_node {
}
}

pub mod network_node {
use anyhow::Context;
use tower_http::trace::TraceLayer;

use crate::{
client_events::websocket::WebSocketProxy,
config::Config,
dev_tool::{Location, NodeConfig},
};

use super::{http_gateway::HttpGateway, serve};

pub async fn run_network_node(config: Config) -> anyhow::Result<()> {
let ws_socket = (config.ws_api.address, config.ws_api.port).into();
let (gw, gw_router) = HttpGateway::as_router(&ws_socket);
let (ws_proxy, ws_router) = WebSocketProxy::as_router(gw_router);
serve(ws_socket, ws_router.layer(TraceLayer::new_for_http()));

tracing::info!("Initializing node configuration");

let node_config = NodeConfig::new(config)
.await
.with_context(|| "failed while loading node config")?;
let is_gateway = node_config.is_gateway;
let location = is_gateway
.then(|| {
node_config
.peer_id
.clone()
.map(|id| Location::from_address(&id.addr()))
})
.flatten();
let mut node = node_config
.build([Box::new(gw), Box::new(ws_proxy)])
.await
.with_context(|| "failed while building the node")?;

if let Some(location) = location {
tracing::info!("Setting initial location: {location}");
node.update_location(location);
}

tracing::info!("Starting node");

match node.run().await {
Ok(_) => {
if is_gateway {
tracing::info!("Gateway finished");
} else {
tracing::info!("Node finished");
}
pub async fn serve_gateway(config: WebsocketApiConfig) -> [BoxedClient; 2] {
let (gw, ws_proxy) = serve_gateway_in(config).await;
[Box::new(gw), Box::new(ws_proxy)]
}

Ok(())
}
Err(e) => {
tracing::error!("{e}");
Err(e)
}
}
}
pub(crate) async fn serve_gateway_in(config: WebsocketApiConfig) -> (HttpGateway, WebSocketProxy) {
let ws_socket = (config.address, config.port).into();
let (gw, gw_router) = HttpGateway::as_router(&ws_socket);
let (ws_proxy, ws_router) = WebSocketProxy::as_router(gw_router);
serve(ws_socket, ws_router.layer(TraceLayer::new_for_http()));
(gw, ws_proxy)
}
2 changes: 1 addition & 1 deletion crates/core/src/server/http_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl std::ops::Deref for HttpGatewayRequest {
/// [specification](https://docs.freenet.org/glossary.html#container-contract) for Locutus.
///
/// Check the Locutus book for [more information](https://docs.freenet.org/dev-guide.html).
pub(super) struct HttpGateway {
pub(crate) struct HttpGateway {
pub attested_contracts: HashMap<AuthToken, (ContractInstanceId, ClientId)>,
proxy_server_request: mpsc::Receiver<ClientConnection>,
response_channels: HashMap<ClientId, mpsc::UnboundedSender<HostCallbackResult>>,
Expand Down

0 comments on commit 7b16c34

Please sign in to comment.