Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add node query #1279

Merged
merged 1 commit into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/src/client_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ async fn websocket_commands(
Extension(rs): Extension<WebSocketRequest>,
) -> axum::response::Response {
let on_upgrade = move |ws: WebSocket| async move {
tracing::debug!(protoc = ?ws.protocol(), "websocket connection established");
if let Err(error) = websocket_interface(rs.clone(), auth_token, encoding_protoc, ws).await {
tracing::error!("{error}");
}
Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ pub(crate) enum NodeEvent {
Disconnect {
cause: Option<Cow<'static, str>>,
},
QueryConnections {
callback: tokio::sync::mpsc::Sender<QueryResult>,
},
}

pub(crate) enum QueryResult {
Connections(Vec<PeerId>),
}

impl Display for NodeEvent {
Expand All @@ -330,6 +337,9 @@ impl Display for NodeEvent {
NodeEvent::Disconnect { cause: None } => {
write!(f, "Disconnect node, reason: unknown")
}
NodeEvent::QueryConnections { .. } => {
write!(f, "QueryConnections")
}
}
}
}
Expand Down
44 changes: 40 additions & 4 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use std::{
use anyhow::Context;
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, ErrorKind},
client_api::{ClientRequest, ContractRequest, ErrorKind, HostResponse, QueryResponse},
prelude::{ContractKey, RelatedContracts, WrappedState},
};

use futures::{stream::FuturesUnordered, StreamExt};
use rsa::pkcs8::DecodePublicKey;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::Instrument;

use self::p2p_impl::NodeP2P;
Expand All @@ -39,7 +41,7 @@ use crate::{
ExecutorToEventLoopChannel, NetworkContractHandler,
},
local_node::Executor,
message::{NetMessage, NodeEvent, Transaction, TransactionType},
message::{NetMessage, NodeEvent, QueryResult, Transaction, TransactionType},
operations::{
connect::{self, ConnectOp},
get, put, subscribe, update, OpEnum, OpError, OpOutcome,
Expand Down Expand Up @@ -365,6 +367,7 @@ async fn client_event_handling<ClientEv>(
) where
ClientEv: ClientEventsProxy + Send + 'static,
{
let mut callbacks = FuturesUnordered::new();
loop {
tokio::select! {
client_request = client_events.recv() => {
Expand All @@ -387,7 +390,10 @@ async fn client_event_handling<ClientEv>(
node_controller.send(NodeEvent::Disconnect { cause: cause.clone() }).await.ok();
break;
}
process_open_request(req, op_manager.clone()).await;
let cli_id = req.client_id;
if let Some(mut cb) = process_open_request(req, op_manager.clone()).await {
callbacks.push(async move { cb.recv().await.map(|r| (cli_id, r)) });
}
}
res = client_responses.recv() => {
if let Some((cli_id, res)) = res {
Expand All @@ -400,12 +406,33 @@ async fn client_event_handling<ClientEv>(
}
}
}
res = callbacks.next(), if !callbacks.is_empty() => {
if let Some(Some((cli_id, res))) = res {
let QueryResult::Connections(conns) = res;
let res = Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers {
peers: conns.into_iter().map(|p| (p.pub_key.to_string(), p.addr)).collect() }
));
if let Err(err) = client_events.send(cli_id, res).await {
tracing::debug!("channel closed: {err}");
break;
}
}
}
}
}
}

#[inline]
async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpManager>) {
async fn process_open_request(
request: OpenRequest<'static>,
op_manager: Arc<OpManager>,
) -> Option<mpsc::Receiver<QueryResult>> {
let (callback_tx, callback_rx) = if matches!(&*request.request, ClientRequest::NodeQueries(_)) {
let (tx, rx) = mpsc::channel(1);
(Some(tx), Some(rx))
} else {
(None, None)
};
// this will indirectly start actions on the local contract executor
let fut = async move {
let client_id = request.client_id;
Expand Down Expand Up @@ -506,6 +533,14 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
},
ClientRequest::DelegateOp(_op) => todo!("FIXME: delegate op"),
ClientRequest::Disconnect { .. } => unreachable!(),
ClientRequest::NodeQueries(_) => {
tracing::debug!("Received node queries from user event");
let _ = op_manager
.notify_node_event(NodeEvent::QueryConnections {
callback: callback_tx.expect("should be set"),
})
.await;
}
_ => {
tracing::error!("Op not supported");
}
Expand All @@ -514,6 +549,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
GlobalExecutor::spawn(fut.instrument(
tracing::info_span!(parent: tracing::Span::current(), "process_client_request"),
));
callback_rx
}

#[allow(unused)]
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{ConnectionError, EventLoopNotificationsReceiver, NetworkBridge};
use crate::message::NetMessageV1;
use crate::message::{NetMessageV1, QueryResult};
use dashmap::DashSet;
use either::{Either, Left, Right};
use futures::future::BoxFuture;
Expand Down Expand Up @@ -257,6 +257,10 @@ impl P2pConnManager {
)
.await?;
}
NodeEvent::QueryConnections { callback } => {
let connections = self.connections.keys().cloned().collect();
callback.send(QueryResult::Connections(connections)).await?;
}
NodeEvent::Disconnect { cause } => {
tracing::info!(
"Disconnecting from network{}",
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,9 @@ where
tracing::info!(peer = %peer_key, "Shutting down node");
return Ok(());
}
NodeEvent::QueryConnections { .. } => {
unimplemented!()
}
},
Err(err) => {
super::report_result(
Expand Down
1 change: 1 addition & 0 deletions crates/fdev/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ either = { workspace = true }
futures = { workspace = true }
glob = "0.3"
pico-args = "0.5"
prettytable-rs = "0.10"
rand = { workspace = true }
serde = "1"
serde_json = "1"
Expand Down
28 changes: 14 additions & 14 deletions crates/fdev/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use std::{
fs::File,
io::Read,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf};

use freenet::dev_tool::OperationMode;
use freenet_stdlib::{
Expand Down Expand Up @@ -88,7 +83,8 @@ async fn put_contract(
related_contracts,
}
.into();
execute_command(request, other, config.address, config.port).await
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
}

async fn put_delegate(
Expand Down Expand Up @@ -127,7 +123,8 @@ For additional hardening is recommended to use a different cipher and nonce to e
nonce,
}
.into();
execute_command(request, other, config.address, config.port).await
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
}

pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<()> {
Expand All @@ -142,14 +139,17 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<(
StateDelta::from(buf).into()
};
let request = ContractRequest::Update { key, data }.into();
execute_command(request, other, config.address, config.port).await
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
}

pub(crate) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result<WebApi> {
v1::start_api_client(cfg).await
}

async fn execute_command(
pub(crate) async fn execute_command(
request: ClientRequest<'static>,
other: BaseConfig,
address: IpAddr,
port: u16,
api_client: &mut WebApi,
) -> anyhow::Result<()> {
v1::execute_command(request, other, address, port).await
v1::execute_command(request, api_client).await
}
Loading
Loading