Skip to content

Commit

Permalink
feat: add node query
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Oct 27, 2024
1 parent 5deb245 commit 88a6f1b
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 46 deletions.
85 changes: 85 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/src/client_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ async fn websocket_commands(
Extension(rs): Extension<WebSocketRequest>,
) -> axum::response::Response {
let on_upgrade = move |ws: WebSocket| async move {
tracing::debug!(protoc = ?ws.protocol(), "websocket connection established");
if let Err(error) = websocket_interface(rs.clone(), auth_token, encoding_protoc, ws).await {
tracing::error!("{error}");
}
Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ pub(crate) enum NodeEvent {
Disconnect {
cause: Option<Cow<'static, str>>,
},
QueryConnections {
callback: tokio::sync::mpsc::Sender<QueryResult>,
},
}

pub(crate) enum QueryResult {
Connections(Vec<PeerId>),
}

impl Display for NodeEvent {
Expand All @@ -330,6 +337,9 @@ impl Display for NodeEvent {
NodeEvent::Disconnect { cause: None } => {
write!(f, "Disconnect node, reason: unknown")
}
NodeEvent::QueryConnections { .. } => {
write!(f, "QueryConnections")
}
}
}
}
Expand Down
44 changes: 40 additions & 4 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use std::{
use anyhow::Context;
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, ErrorKind},
client_api::{ClientRequest, ContractRequest, ErrorKind, HostResponse, QueryResponse},
prelude::{ContractKey, RelatedContracts, WrappedState},
};

use futures::{stream::FuturesUnordered, StreamExt};
use rsa::pkcs8::DecodePublicKey;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::Instrument;

use self::p2p_impl::NodeP2P;
Expand All @@ -39,7 +41,7 @@ use crate::{
ExecutorToEventLoopChannel, NetworkContractHandler,
},
local_node::Executor,
message::{NetMessage, NodeEvent, Transaction, TransactionType},
message::{NetMessage, NodeEvent, QueryResult, Transaction, TransactionType},
operations::{
connect::{self, ConnectOp},
get, put, subscribe, update, OpEnum, OpError, OpOutcome,
Expand Down Expand Up @@ -365,6 +367,7 @@ async fn client_event_handling<ClientEv>(
) where
ClientEv: ClientEventsProxy + Send + 'static,
{
let mut callbacks = FuturesUnordered::new();
loop {
tokio::select! {
client_request = client_events.recv() => {
Expand All @@ -387,7 +390,10 @@ async fn client_event_handling<ClientEv>(
node_controller.send(NodeEvent::Disconnect { cause: cause.clone() }).await.ok();
break;
}
process_open_request(req, op_manager.clone()).await;
let cli_id = req.client_id;
if let Some(mut cb) = process_open_request(req, op_manager.clone()).await {
callbacks.push(async move { cb.recv().await.map(|r| (cli_id, r)) });
}
}
res = client_responses.recv() => {
if let Some((cli_id, res)) = res {
Expand All @@ -400,12 +406,33 @@ async fn client_event_handling<ClientEv>(
}
}
}
res = callbacks.next(), if !callbacks.is_empty() => {
if let Some(Some((cli_id, res))) = res {
let QueryResult::Connections(conns) = res;
let res = Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers {
peers: conns.into_iter().map(|p| (p.pub_key.to_string(), p.addr)).collect() }
));
if let Err(err) = client_events.send(cli_id, res).await {
tracing::debug!("channel closed: {err}");
break;
}
}
}
}
}
}

#[inline]
async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpManager>) {
async fn process_open_request(
request: OpenRequest<'static>,
op_manager: Arc<OpManager>,
) -> Option<mpsc::Receiver<QueryResult>> {
let (callback_tx, callback_rx) = if matches!(&*request.request, ClientRequest::NodeQueries(_)) {
let (tx, rx) = mpsc::channel(1);
(Some(tx), Some(rx))
} else {
(None, None)
};
// this will indirectly start actions on the local contract executor
let fut = async move {
let client_id = request.client_id;
Expand Down Expand Up @@ -506,6 +533,14 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
},
ClientRequest::DelegateOp(_op) => todo!("FIXME: delegate op"),
ClientRequest::Disconnect { .. } => unreachable!(),
ClientRequest::NodeQueries(_) => {
tracing::debug!("Received node queries from user event");
let _ = op_manager
.notify_node_event(NodeEvent::QueryConnections {
callback: callback_tx.expect("should be set"),
})
.await;
}
_ => {
tracing::error!("Op not supported");
}
Expand All @@ -514,6 +549,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
GlobalExecutor::spawn(fut.instrument(
tracing::info_span!(parent: tracing::Span::current(), "process_client_request"),
));
callback_rx
}

#[allow(unused)]
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge};
use crate::message::NetMessageV1;
use crate::message::{NetMessageV1, QueryResult};
use dashmap::DashSet;
use either::{Either, Left, Right};
use futures::future::BoxFuture;
Expand Down Expand Up @@ -257,6 +257,10 @@ impl P2pConnManager {
)
.await?;
}
NodeEvent::QueryConnections { callback } => {
let connections = self.connections.keys().cloned().collect();
callback.send(QueryResult::Connections(connections)).await?;
}
NodeEvent::Disconnect { cause } => {
tracing::info!(
"Disconnecting from network{}",
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,9 @@ where
tracing::info!(peer = %peer_key, "Shutting down node");
return Ok(());
}
NodeEvent::QueryConnections { .. } => {
unimplemented!()
}
},
Err(err) => {
super::report_result(
Expand Down
1 change: 1 addition & 0 deletions crates/fdev/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ either = { workspace = true }
futures = { workspace = true }
glob = "0.3"
pico-args = "0.5"
prettytable-rs = "0.10"
rand = { workspace = true }
serde = "1"
serde_json = "1"
Expand Down
28 changes: 14 additions & 14 deletions crates/fdev/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use std::{
fs::File,
io::Read,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf};

use freenet::dev_tool::OperationMode;
use freenet_stdlib::{
Expand Down Expand Up @@ -88,7 +83,8 @@ async fn put_contract(
related_contracts,
}
.into();
execute_command(request, other, config.address, config.port).await
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
}

async fn put_delegate(
Expand Down Expand Up @@ -127,7 +123,8 @@ For additional hardening is recommended to use a different cipher and nonce to e
nonce,
}
.into();
execute_command(request, other, config.address, config.port).await
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
}

pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<()> {
Expand All @@ -142,14 +139,17 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<(
StateDelta::from(buf).into()
};
let request = ContractRequest::Update { key, data }.into();
execute_command(request, other, config.address, config.port).await
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
}

pub(crate) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result<WebApi> {
v1::start_api_client(cfg).await
}

async fn execute_command(
pub(crate) async fn execute_command(
request: ClientRequest<'static>,
other: BaseConfig,
address: IpAddr,
port: u16,
api_client: &mut WebApi,
) -> anyhow::Result<()> {
v1::execute_command(request, other, address, port).await
v1::execute_command(request, api_client).await
}
Loading

0 comments on commit 88a6f1b

Please sign in to comment.