From 88a6f1bef293a545f113c29df64e23defa4627f0 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sat, 26 Oct 2024 15:45:29 +0200 Subject: [PATCH] feat: add node query --- Cargo.lock | 85 +++++++++++++++++++ crates/core/src/client_events/websocket.rs | 1 + crates/core/src/message.rs | 10 +++ crates/core/src/node.rs | 44 +++++++++- .../src/node/network_bridge/p2p_protoc.rs | 6 +- crates/core/src/node/testing_impl.rs | 3 + crates/fdev/Cargo.toml | 1 + crates/fdev/src/commands.rs | 28 +++--- crates/fdev/src/commands/v1.rs | 28 +++--- crates/fdev/src/config.rs | 21 ++--- crates/fdev/src/main.rs | 5 ++ crates/fdev/src/query.rs | 40 +++++++++ stdlib | 2 +- 13 files changed, 228 insertions(+), 46 deletions(-) create mode 100644 crates/fdev/src/query.rs diff --git a/Cargo.lock b/Cargo.lock index de181f962..08ed822a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -893,6 +893,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.9.2" @@ -1072,6 +1093,16 @@ dependencies = [ "dirs-sys", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + [[package]] name = "dirs-sys" version = "0.4.1" @@ -1084,6 +1115,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -1099,6 +1141,12 @@ dependencies = [ "serde", ] +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding_rs" version = "0.8.34" @@ -1256,6 +1304,7 @@ dependencies = [ "glob", "http 1.1.0", "pico-args", + "prettytable-rs", "rand", "reqwest", "semver", @@ -2123,6 +2172,17 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -3104,6 +3164,20 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettytable-rs" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a" +dependencies = [ + "csv", + "encode_unicode", + "is-terminal", + "lazy_static", + "term", + "unicode-width", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -4326,6 +4400,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "thiserror" version = "1.0.64" diff --git a/crates/core/src/client_events/websocket.rs b/crates/core/src/client_events/websocket.rs index c84d43ba9..e54549645 100644 --- a/crates/core/src/client_events/websocket.rs +++ b/crates/core/src/client_events/websocket.rs @@ -211,6 +211,7 @@ async fn websocket_commands( Extension(rs): Extension, ) -> axum::response::Response { let on_upgrade = move |ws: WebSocket| async move { + tracing::debug!(protoc = ?ws.protocol(), "websocket connection established"); if let Err(error) = websocket_interface(rs.clone(), auth_token, encoding_protoc, ws).await { tracing::error!("{error}"); } diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index c915624d7..e55dd7717 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -313,6 +313,13 @@ pub(crate) enum NodeEvent { Disconnect { cause: Option>, }, + QueryConnections { + callback: tokio::sync::mpsc::Sender, + }, +} + +pub(crate) enum QueryResult { + Connections(Vec), } impl Display for NodeEvent { @@ -330,6 +337,9 @@ impl Display for NodeEvent { NodeEvent::Disconnect { cause: None } => { write!(f, "Disconnect node, reason: unknown") } + NodeEvent::QueryConnections { .. } => { + write!(f, "QueryConnections") + } } } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 7fd7bb6f6..ce655cd5e 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -22,12 +22,14 @@ use std::{ use anyhow::Context; use either::Either; use freenet_stdlib::{ - client_api::{ClientRequest, ContractRequest, ErrorKind}, + client_api::{ClientRequest, ContractRequest, ErrorKind, HostResponse, QueryResponse}, prelude::{ContractKey, RelatedContracts, WrappedState}, }; +use futures::{stream::FuturesUnordered, StreamExt}; use rsa::pkcs8::DecodePublicKey; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; use tracing::Instrument; use self::p2p_impl::NodeP2P; @@ -39,7 +41,7 @@ use crate::{ ExecutorToEventLoopChannel, NetworkContractHandler, }, local_node::Executor, - message::{NetMessage, NodeEvent, Transaction, TransactionType}, + message::{NetMessage, NodeEvent, QueryResult, Transaction, TransactionType}, operations::{ connect::{self, ConnectOp}, get, put, subscribe, update, OpEnum, OpError, OpOutcome, @@ -365,6 +367,7 @@ async fn client_event_handling( ) where ClientEv: ClientEventsProxy + Send + 'static, { + let mut callbacks = FuturesUnordered::new(); loop { tokio::select! { client_request = client_events.recv() => { @@ -387,7 +390,10 @@ async fn client_event_handling( node_controller.send(NodeEvent::Disconnect { cause: cause.clone() }).await.ok(); break; } - process_open_request(req, op_manager.clone()).await; + let cli_id = req.client_id; + if let Some(mut cb) = process_open_request(req, op_manager.clone()).await { + callbacks.push(async move { cb.recv().await.map(|r| (cli_id, r)) }); + } } res = client_responses.recv() => { if let Some((cli_id, res)) = res { @@ -400,12 +406,33 @@ async fn client_event_handling( } } } + res = callbacks.next(), if !callbacks.is_empty() => { + if let Some(Some((cli_id, res))) = res { + let QueryResult::Connections(conns) = res; + let res = Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { + peers: conns.into_iter().map(|p| (p.pub_key.to_string(), p.addr)).collect() } + )); + if let Err(err) = client_events.send(cli_id, res).await { + tracing::debug!("channel closed: {err}"); + break; + } + } + } } } } #[inline] -async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc) { +async fn process_open_request( + request: OpenRequest<'static>, + op_manager: Arc, +) -> Option> { + let (callback_tx, callback_rx) = if matches!(&*request.request, ClientRequest::NodeQueries(_)) { + let (tx, rx) = mpsc::channel(1); + (Some(tx), Some(rx)) + } else { + (None, None) + }; // this will indirectly start actions on the local contract executor let fut = async move { let client_id = request.client_id; @@ -506,6 +533,14 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc todo!("FIXME: delegate op"), ClientRequest::Disconnect { .. } => unreachable!(), + ClientRequest::NodeQueries(_) => { + tracing::debug!("Received node queries from user event"); + let _ = op_manager + .notify_node_event(NodeEvent::QueryConnections { + callback: callback_tx.expect("should be set"), + }) + .await; + } _ => { tracing::error!("Op not supported"); } @@ -514,6 +549,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc { + let connections = self.connections.keys().cloned().collect(); + callback.send(QueryResult::Connections(connections)).await?; + } NodeEvent::Disconnect { cause } => { tracing::info!( "Disconnecting from network{}", diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index be8c9ad81..89352d7ac 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -888,6 +888,9 @@ where tracing::info!(peer = %peer_key, "Shutting down node"); return Ok(()); } + NodeEvent::QueryConnections { .. } => { + unimplemented!() + } }, Err(err) => { super::report_result( diff --git a/crates/fdev/Cargo.toml b/crates/fdev/Cargo.toml index 2cedf1029..18bce56f6 100644 --- a/crates/fdev/Cargo.toml +++ b/crates/fdev/Cargo.toml @@ -20,6 +20,7 @@ either = { workspace = true } futures = { workspace = true } glob = "0.3" pico-args = "0.5" +prettytable-rs = "0.10" rand = { workspace = true } serde = "1" serde_json = "1" diff --git a/crates/fdev/src/commands.rs b/crates/fdev/src/commands.rs index e906aa3fd..9b8133d49 100644 --- a/crates/fdev/src/commands.rs +++ b/crates/fdev/src/commands.rs @@ -1,9 +1,4 @@ -use std::{ - fs::File, - io::Read, - net::{IpAddr, SocketAddr}, - path::PathBuf, -}; +use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf}; use freenet::dev_tool::OperationMode; use freenet_stdlib::{ @@ -88,7 +83,8 @@ async fn put_contract( related_contracts, } .into(); - execute_command(request, other, config.address, config.port).await + let mut client = start_api_client(other).await?; + execute_command(request, &mut client).await } async fn put_delegate( @@ -127,7 +123,8 @@ For additional hardening is recommended to use a different cipher and nonce to e nonce, } .into(); - execute_command(request, other, config.address, config.port).await + let mut client = start_api_client(other).await?; + execute_command(request, &mut client).await } pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<()> { @@ -142,14 +139,17 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<( StateDelta::from(buf).into() }; let request = ContractRequest::Update { key, data }.into(); - execute_command(request, other, config.address, config.port).await + let mut client = start_api_client(other).await?; + execute_command(request, &mut client).await +} + +pub(crate) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result { + v1::start_api_client(cfg).await } -async fn execute_command( +pub(crate) async fn execute_command( request: ClientRequest<'static>, - other: BaseConfig, - address: IpAddr, - port: u16, + api_client: &mut WebApi, ) -> anyhow::Result<()> { - v1::execute_command(request, other, address, port).await + v1::execute_command(request, api_client).await } diff --git a/crates/fdev/src/commands/v1.rs b/crates/fdev/src/commands/v1.rs index 73eec9c05..f9970718f 100644 --- a/crates/fdev/src/commands/v1.rs +++ b/crates/fdev/src/commands/v1.rs @@ -1,13 +1,8 @@ use super::*; -pub(super) async fn execute_command( - request: ClientRequest<'static>, - other: BaseConfig, - address: IpAddr, - port: u16, -) -> anyhow::Result<()> { - let mode = other.mode; - +pub(super) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result { + let mode = cfg.mode; + let address = cfg.address; let target = match mode { OperationMode::Local => { if !address.is_loopback() { @@ -15,9 +10,9 @@ pub(super) async fn execute_command( "invalid ip: {address}, expecting a loopback ip address in local mode" )); } - SocketAddr::new(address, port) + SocketAddr::new(address, cfg.port) } - OperationMode::Network => SocketAddr::new(address, port), + OperationMode::Network => SocketAddr::new(address, cfg.port), }; let (stream, _) = tokio_tungstenite::connect_async(&format!( @@ -30,8 +25,13 @@ pub(super) async fn execute_command( anyhow::anyhow!(format!("fail to connect to the host({target}): {e}")) })?; - WebApi::start(stream) - .send(request) - .await - .map_err(Into::into) + Ok(WebApi::start(stream)) +} + +pub(super) async fn execute_command( + request: ClientRequest<'static>, + api_client: &mut WebApi, +) -> anyhow::Result<()> { + api_client.send(request).await?; + Ok(()) } diff --git a/crates/fdev/src/config.rs b/crates/fdev/src/config.rs index 9eb107815..10e06bc84 100644 --- a/crates/fdev/src/config.rs +++ b/crates/fdev/src/config.rs @@ -27,6 +27,13 @@ pub struct BaseConfig { /// Node operation mode. #[arg(value_enum, default_value_t=OperationMode::Local, env = "MODE")] pub mode: OperationMode, + /// The port of the running local freenet node websocket API. + #[arg(short, long, default_value = "50509", env = "WS_API_PORT")] + pub(crate) port: u16, + /// The ip address of freenet node to publish the contract to. If the node is running in local mode, + /// The default value is `127.0.0.1`. + #[arg(short, long, default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))] + pub(crate) address: IpAddr, } #[derive(clap::Subcommand, Clone)] @@ -35,6 +42,8 @@ pub enum SubCommand { Build(BuildToolConfig), Inspect(crate::inspect::InspectConfig), Publish(PutConfig), + /// Query the local node for information. Currently only shows open connections. + Query {}, WasmRuntime(ExecutorConfig), Execute(RunCliConfig), Test(crate::testing::TestConfig), @@ -78,9 +87,6 @@ pub struct UpdateConfig { /// The default value is `127.0.0.1` #[arg(short, long, default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))] pub(crate) address: IpAddr, - /// The port of the running local freenet node. - #[arg(short, long, default_value = "50509")] - pub(crate) port: u16, /// A path to the update/delta being pushed to the contract. pub(crate) delta: PathBuf, /// Whether this contract will be updated in the network or is just a dry run @@ -97,15 +103,6 @@ pub struct PutConfig { #[arg(long)] pub(crate) code: PathBuf, - /// The ip address of freenet node to publish the contract to. If the node is running in local mode, - /// The default value is `127.0.0.1`. - #[arg(short, long, default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))] - pub(crate) address: IpAddr, - - /// The port of the running local freenet node. - #[arg(short, long, default_value = "50509")] - pub(crate) port: u16, - /// A path to the file parameters for the contract/delegate. If not specified, will be published /// with empty parameters. #[arg(long)] diff --git a/crates/fdev/src/main.rs b/crates/fdev/src/main.rs index 3d24f4c06..aa8107c8c 100644 --- a/crates/fdev/src/main.rs +++ b/crates/fdev/src/main.rs @@ -9,6 +9,7 @@ mod config; mod inspect; pub(crate) mod network_metrics_server; mod new_package; +mod query; mod testing; mod util; mod wasm_runtime; @@ -66,6 +67,10 @@ fn main() -> anyhow::Result<()> { } Ok(()) } + SubCommand::Query {} => { + query::query(config.additional).await?; + Ok(()) + } }; // todo: make all commands return concrete `thiserror` compatible errors so we can use anyhow r.map_err(|e| anyhow::format_err!(e)) diff --git a/crates/fdev/src/query.rs b/crates/fdev/src/query.rs new file mode 100644 index 000000000..d1f946126 --- /dev/null +++ b/crates/fdev/src/query.rs @@ -0,0 +1,40 @@ +use freenet_stdlib::client_api::{ConnectedPeers, HostResponse, QueryResponse}; +use prettytable::{Cell, Row, Table}; + +use crate::{ + commands::{execute_command, start_api_client}, + config::BaseConfig, +}; + +pub async fn query(base_cfg: BaseConfig) -> anyhow::Result<()> { + let mut client = start_api_client(base_cfg).await?; + tracing::info!("Querying for connected peers"); + execute_command( + freenet_stdlib::client_api::ClientRequest::NodeQueries(ConnectedPeers {}), + &mut client, + ) + .await?; + let HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers }) = + client.recv().await? + else { + anyhow::bail!("Unexpected response from the host"); + }; + + let mut table = Table::new(); + + table.add_row(Row::new(vec![ + Cell::new("Identifier"), + Cell::new("SocketAddress"), + ])); + + for (identifier, socketaddress) in peers { + table.add_row(Row::new(vec![ + Cell::new(&identifier.to_string()), + Cell::new(&socketaddress.to_string()), + ])); + } + + table.printstd(); + + Ok(()) +} diff --git a/stdlib b/stdlib index aa5c7b784..e0769b543 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit aa5c7b78454d19d3355295e58e622db05df7d3e7 +Subproject commit e0769b543a2006914f7ca9a592963cb47c437b42