diff --git a/clients/service/src/lib.rs b/clients/service/src/lib.rs index 06258f540..664e011f0 100644 --- a/clients/service/src/lib.rs +++ b/clients/service/src/lib.rs @@ -1,10 +1,14 @@ -use std::{fmt, sync::Arc, time::Duration}; +use std::{ + fmt, + sync::Arc, + time::{Duration, SystemTime}, +}; use async_trait::async_trait; use futures::{future::Either, Future, FutureExt}; use governor::{Quota, RateLimiter}; use nonzero_ext::*; -use tokio::sync::RwLock; +use tokio::{sync::RwLock, time::sleep}; pub use warp; pub use cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig}; @@ -19,6 +23,10 @@ mod cli; mod error; mod trace; +const RESET_RESTART_TIME_IN_SECS: u64 = 1800; // reset the restart time in 30 minutes +const DEFAULT_RESTART_TIME_IN_SECS: u64 = 20; // default sleep time before restarting everything +const RESTART_BACKOFF_DELAY: u64 = 10; + #[async_trait] pub trait Service { const NAME: &'static str; @@ -67,7 +75,28 @@ impl ConnectionManager { pub async fn start, InnerError: fmt::Display>( &self, ) -> Result<(), Error> { + let mut restart_in_secs = DEFAULT_RESTART_TIME_IN_SECS; // set default to 20 seconds for restart + let mut last_start_timestamp = SystemTime::now(); + loop { + let time_now = SystemTime::now(); + let _ = time_now.duration_since(last_start_timestamp).map(|duration| { + // Revert the counter if the restart happened more than 30 minutes (or 1800 seconds) + // ago + if duration.as_secs() > RESET_RESTART_TIME_IN_SECS { + restart_in_secs = DEFAULT_RESTART_TIME_IN_SECS; + } + // Increase time by 10 seconds if a restart is triggered too frequently. + // This waits for delayed packets to be removed in the network, + // even though the connection on the client side is closed. + // Else, these straggler packets will interfere with the new connection. + // https://www.rfc-editor.org/rfc/rfc793#page-22 + else { + restart_in_secs += RESTART_BACKOFF_DELAY; + last_start_timestamp = time_now; + } + }); + tracing::info!("Version: {}", S::VERSION); tracing::info!( "Vault uses Substrate account with ID: {}", @@ -129,6 +158,10 @@ impl ConnectionManager { RestartPolicy::Never => return Err(Error::ClientShutdown), RestartPolicy::Always => { (self.increment_restart_counter)(); + + tracing::info!("Restarting in {restart_in_secs} seconds"); + sleep(Duration::from_secs(restart_in_secs)).await; + continue }, }; diff --git a/clients/stellar-relay-lib/README.md b/clients/stellar-relay-lib/README.md index e2bacb26e..c98557130 100644 --- a/clients/stellar-relay-lib/README.md +++ b/clients/stellar-relay-lib/README.md @@ -41,9 +41,7 @@ pub struct ConnectionInfoCfg { pub recv_scp_msgs: bool, pub remote_called_us: bool, /// how long to wait for the Stellar Node's messages. - timeout_in_secs: u64, - /// number of retries to wait for the Stellar Node's messages and/or to connect back to it. - retries:u8 + timeout_in_secs: u64 } ``` @@ -66,17 +64,9 @@ Create a connection using the `connect_to_stellar_overlay_network` function: ```rust let mut overlay_connection = stellar_relay_lib::connect_to_stellar_overlay_network(cfg, secret_key).await?; ``` -The `StellarOverlayConnection` has 2 async methods to interact with the Stellar Node: -* _`send(&self, message: StellarMessage)`_ -> for sending `StellarMessage`s to Stellar Node -* _`listen(&mut self)`_ -> for receiving `StellarRelayMessage`s from the Stellar Relay. - -### Interpreting the `StellarRelayMessage` -The `StellarRelayMessage` is an enum with the following variants: -* _`Connect`_ -> interprets a successful connection to Stellar Node. It contains the `PublicKey` and the `NodeInfo` -* _`Data`_ -> a wrapper of a `StellarMessage` and additional fields: the _message type_ and the unique `p_id`(process id) -* _`Timeout`_ -> Depends on the `timeout_in_secs` and `retries` defined in the `ConnectionInfo` (**10** and **3** by default). This message is returned after multiple retries have been done. -For example, Stellar Relay will wait for 10 seconds to read from the existing tcp stream before retrying again. After the 3rd retry, StellarRelay will create a new stream in 3 attempts, with an interval of 3 seconds. -* _`Error`_ -> a todo +The `StellarOverlayConnection` has 2 methods to interact with the Stellar Node: +* _`sender(&self)`_ -> used to send `StellarMessage`s to Stellar Node +* _`listen(&mut self)`_ -> async method for receiving `StellarMessage`s from the Stellar Node. ## Example In the `stellar-relay-lib` directory, run this command: @@ -102,25 +92,4 @@ and you should be able to see in the terminal: ... [2022-10-14T13:16:02Z INFO connect] R0E1U1RCTVY2UURYRkRHRDYyTUVITExIWlRQREk3N1UzUEZPRDJTRUxVNVJKREhRV0JSNU5OSzc= sent StellarMessage of type ScpStNominate for ledger 43109751 [2022-10-14T13:16:02Z INFO connect] R0NHQjJTMktHWUFSUFZJQTM3SFlaWFZSTTJZWlVFWEE2UzMzWlU1QlVEQzZUSFNCNjJMWlNUWUg= sent StellarMessage of type ScpStPrepare for ledger 43109751 -``` - -Here is an example in the terminal when disconnection/reconnection happens: -``` -[2022-10-17T05:56:47Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 0 -[2022-10-17T05:56:47Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 0 -[2022-10-17T05:56:57Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 1 -[2022-10-17T05:56:57Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 1 -[2022-10-17T05:57:07Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 2 -[2022-10-17T05:57:07Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 2 -[2022-10-17T05:57:17Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 3 -[2022-10-17T05:57:17Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 3 -[2022-10-17T05:57:17Z INFO stellar_relay::connection::user_controls] reconnecting to "135.181.16.110". -[2022-10-17T05:57:17Z ERROR stellar_relay::connection::user_controls] failed to reconnect! # of retries left: 2. Retrying in 3 seconds... -[2022-10-17T05:57:20Z INFO stellar_relay::connection::user_controls] reconnecting to "135.181.16.110". -[2022-10-17T05:57:20Z INFO stellar_relay::connection::services] Starting Handshake with Hello. -[2022-10-17T05:57:21Z INFO stellar_relay::connection::connector::message_handler] Hello message processed successfully -[2022-10-17T05:57:21Z INFO stellar_relay::connection::connector::message_handler] Handshake completed -``` - - -todo: add multiple tests \ No newline at end of file +``` \ No newline at end of file diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 0de4b5c2c..94a8ab1be 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -1,7 +1,7 @@ use stellar_relay_lib::{ connect_to_stellar_overlay_network, sdk::types::{ScpStatementPledges, StellarMessage}, - StellarOverlayConfig, StellarRelayMessage, + StellarOverlayConfig, }; #[tokio::main] @@ -27,42 +27,28 @@ async fn main() -> Result<(), Box> { let mut overlay_connection = connect_to_stellar_overlay_network(cfg, &secret_key).await?; - while let Some(relay_message) = overlay_connection.listen().await { - match relay_message { - StellarRelayMessage::Connect { pub_key, node_info } => { - let pub_key = pub_key.to_encoding(); - let pub_key = std::str::from_utf8(&pub_key).expect("should work?"); - log::info!("Connected to Stellar Node: {pub_key}"); - log::info!("{:?}", node_info); - }, - StellarRelayMessage::Data { p_id: _, msg_type, msg } => match *msg { - StellarMessage::ScpMessage(msg) => { - let node_id = msg.statement.node_id.to_encoding(); - let node_id = base64::encode(&node_id); - let slot = msg.statement.slot_index; + while let Ok(Some(msg)) = overlay_connection.listen().await { + match msg { + StellarMessage::ScpMessage(msg) => { + let node_id = msg.statement.node_id.to_encoding(); + let node_id = base64::encode(&node_id); + let slot = msg.statement.slot_index; - let stmt_type = match msg.statement.pledges { - ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare", - ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm", - ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize", - ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ", - }; - log::info!( - "{} sent StellarMessage of type {} for ledger {}", - node_id, - stmt_type, - slot - ); - }, - _ => { - log::info!("rcv StellarMessage of type: {:?}", msg_type); - }, - }, - StellarRelayMessage::Error(e) => { - log::error!("Error: {:?}", e); + let stmt_type = match msg.statement.pledges { + ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare", + ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm", + ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize", + ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ", + }; + log::info!( + "{} sent StellarMessage of type {} for ledger {}", + node_id, + stmt_type, + slot + ); }, - StellarRelayMessage::Timeout => { - log::error!("timed out"); + _ => { + let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await; }, } } diff --git a/clients/stellar-relay-lib/src/config.rs b/clients/stellar-relay-lib/src/config.rs index e34004e9a..ce787e143 100644 --- a/clients/stellar-relay-lib/src/config.rs +++ b/clients/stellar-relay-lib/src/config.rs @@ -1,4 +1,8 @@ -use crate::{connection::Error, node::NodeInfo, ConnectionInfo, StellarOverlayConnection}; +use crate::{ + connection::{ConnectionInfo, Error}, + node::NodeInfo, + StellarOverlayConnection, +}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, BytesOrString}; use std::fmt::Debug; @@ -42,13 +46,13 @@ impl StellarOverlayConfig { let public_key = secret_key.get_public().to_encoding(); let public_key = std::str::from_utf8(&public_key).unwrap(); log::info!( - "connection_info(): Connected to Stellar overlay network with public key: {public_key}" + "connection_info(): Connecting to Stellar overlay network using public key: {public_key}" ); let address = std::str::from_utf8(&cfg.address) .map_err(|e| Error::ConfigError(format!("Address: {:?}", e)))?; - Ok(ConnectionInfo::new_with_timeout_and_retries( + Ok(ConnectionInfo::new_with_timeout( address, cfg.port, secret_key, @@ -57,7 +61,6 @@ impl StellarOverlayConfig { cfg.recv_scp_msgs, cfg.remote_called_us, cfg.timeout_in_secs, - cfg.retries, )) } } @@ -97,10 +100,6 @@ pub struct ConnectionInfoCfg { /// how long to wait for the Stellar Node's messages. #[serde(default = "ConnectionInfoCfg::default_timeout")] pub timeout_in_secs: u64, - - /// number of retries to wait for the Stellar Node's messages and/or to connect back to it. - #[serde(default = "ConnectionInfoCfg::default_retries")] - pub retries: u8, } impl ConnectionInfoCfg { @@ -123,10 +122,6 @@ impl ConnectionInfoCfg { fn default_timeout() -> u64 { 10 } - - fn default_retries() -> u8 { - 3 - } } /// Triggers connection to the Stellar Node. diff --git a/clients/stellar-relay-lib/src/connection/authentication/certificate.rs b/clients/stellar-relay-lib/src/connection/authentication/certificate.rs index b454d458c..588a3e371 100644 --- a/clients/stellar-relay-lib/src/connection/authentication/certificate.rs +++ b/clients/stellar-relay-lib/src/connection/authentication/certificate.rs @@ -51,7 +51,11 @@ pub fn create_auth_cert( let raw_sig_data = hash.finalize().to_vec(); - let signature: Signature = Signature::new(keypair.create_signature(raw_sig_data).to_vec())?; + let signature: Signature = Signature::new(keypair.create_signature(raw_sig_data).to_vec()) + .map_err(|e| { + log::error!("create_auth_cert(): {e:?}"); + Error::AuthSignatureFailed + })?; Ok(AuthCert { pubkey: pub_key_ecdh, expiration, sig: signature }) } diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index 44591e5d0..05e6a70c1 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -3,16 +3,17 @@ use substrate_stellar_sdk::{ types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType}, XdrCodec, }; -use tokio::sync::mpsc; +use tokio::net::tcp::OwnedWriteHalf; use crate::{ connection::{ authentication::{gen_shared_key, ConnectionAuth}, flow_controller::FlowController, + handshake::HandshakeState, hmac::{verify_hmac, HMacKeys}, + ConnectionInfo, Error, }, node::{LocalInfo, NodeInfo, RemoteInfo}, - ConnectionInfo, ConnectorActions, Error, HandshakeState, StellarRelayMessage, }; pub struct Connector { @@ -23,7 +24,6 @@ pub struct Connector { pub(crate) connection_auth: ConnectionAuth, pub(crate) timeout_in_secs: u64, - pub(crate) retries: u8, remote_called_us: bool, receive_tx_messages: bool, @@ -32,11 +32,8 @@ pub struct Connector { handshake_state: HandshakeState, flow_controller: FlowController, - /// a channel for writing xdr messages to stream. - actions_sender: mpsc::Sender, - - /// a channel for communicating back to the caller - relay_message_sender: mpsc::Sender, + /// for writing xdr messages to stream. + pub(crate) write_stream_overlay: OwnedWriteHalf, } impl Debug for Connector { @@ -48,7 +45,6 @@ impl Debug for Connector { .field("hmac_keys_exist", &is_hmac_keys_filled) .field("connection_auth", &self.connection_auth) .field("timeout_in_secs", &self.timeout_in_secs) - .field("retries", &self.retries) .field("receive_tx_messages", &self.receive_tx_messages) .field("receive_scp_messages", &self.receive_scp_messages) .field("handshake_state", &self.handshake_state) @@ -57,15 +53,9 @@ impl Debug for Connector { } } -impl Drop for Connector { - fn drop(&mut self) { - log::trace!("dropped Connector: {:?}", self); - } -} - impl Connector { /// Verifies the AuthenticatedMessage, received from the Stellar Node - pub(crate) fn verify_auth( + pub(super) fn verify_auth( &self, auth_msg: &AuthenticatedMessageV0, body: &[u8], @@ -122,8 +112,7 @@ impl Connector { pub fn new( local_node: NodeInfo, conn_info: ConnectionInfo, - actions_sender: mpsc::Sender, - relay_message_sender: mpsc::Sender, + write_stream_overlay: OwnedWriteHalf, ) -> Self { let connection_auth = ConnectionAuth::new( &local_node.network_id, @@ -137,14 +126,12 @@ impl Connector { hmac_keys: None, connection_auth, timeout_in_secs: conn_info.timeout_in_secs, - retries: conn_info.retries, remote_called_us: conn_info.remote_called_us, receive_tx_messages: conn_info.recv_tx_msgs, receive_scp_messages: conn_info.recv_scp_msgs, handshake_state: HandshakeState::Connecting, flow_controller: FlowController::default(), - actions_sender, - relay_message_sender, + write_stream_overlay, } } @@ -209,14 +196,6 @@ impl Connector { self.handshake_state = HandshakeState::Completed; } - pub async fn send_to_user(&self, msg: StellarRelayMessage) -> Result<(), Error> { - self.relay_message_sender.send(msg).await.map_err(Error::from) - } - - pub async fn send_to_node(&self, action: ConnectorActions) -> Result<(), Error> { - self.actions_sender.send(action).await.map_err(Error::from) - } - pub fn inner_check_to_send_more(&mut self, msg_type: MessageType) -> bool { self.flow_controller.send_more(msg_type) } @@ -231,23 +210,26 @@ impl Connector { #[cfg(test)] mod test { - use crate::{connection::hmac::HMacKeys, node::RemoteInfo, Connector, StellarOverlayConfig}; + use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig}; + use serial_test::serial; use substrate_stellar_sdk::{ compound_types::LimitedString, types::{Hello, MessageType}, PublicKey, }; - use tokio::sync::mpsc::{self, Receiver}; + use tokio::{io::AsyncWriteExt, net::tcp::OwnedReadHalf}; use crate::{ - connection::authentication::{create_auth_cert, ConnectionAuth}, - helper::time_now, + connection::{ + authentication::{create_auth_cert, ConnectionAuth}, + Connector, + }, + helper::{create_stream, time_now}, node::NodeInfo, - ConnectionInfo, ConnectorActions, StellarRelayMessage, + ConnectionInfo, }; - #[cfg(test)] fn create_auth_cert_from_connection_auth( connector_auth: &ConnectionAuth, ) -> substrate_stellar_sdk::types::AuthCert { @@ -262,14 +244,15 @@ mod test { new_auth_cert } - #[cfg(test)] - fn create_connector() -> ( - NodeInfo, - ConnectionInfo, - Connector, - Receiver, - Receiver, - ) { + impl Connector { + fn shutdown(&mut self, read_half: OwnedReadHalf) { + let _ = self.write_stream_overlay.shutdown(); + + drop(read_half); + } + } + + async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector, OwnedReadHalf) { let cfg_file_path = "./resources/config/testnet/stellar_relay_config_sdftest1.json"; let secret_key_path = "./resources/secretkey/stellar_secretkey_testnet"; let secret_key = @@ -280,22 +263,17 @@ mod test { let node_info = cfg.node_info(); let conn_info = cfg.connection_info(&secret_key).expect("should create a connection info"); // this is a channel to communicate with the connection/config (this needs renaming) - let (actions_sender, actions_receiver) = mpsc::channel::(1024); - // this is a channel to communicate with the user/caller. - let (relay_message_sender, relay_message_receiver) = - mpsc::channel::(1024); - let connector = Connector::new( - node_info.clone(), - conn_info.clone(), - actions_sender, - relay_message_sender, - ); - (node_info, conn_info, connector, actions_receiver, relay_message_receiver) + + let (read_half, write_half) = + create_stream(&conn_info.address()).await.expect("should return a stream"); + let connector = Connector::new(node_info.clone(), conn_info.clone(), write_half); + (node_info, conn_info, connector, read_half) } - #[test] - fn create_new_connector_works() { - let (node_info, _, connector, _, _) = create_connector(); + #[tokio::test] + #[serial] + async fn create_new_connector_works() { + let (node_info, _, mut connector, read_half) = create_connector().await; let connector_local_node = connector.local.node(); @@ -304,19 +282,25 @@ mod test { assert_eq!(connector_local_node.overlay_min_version, node_info.overlay_min_version); assert_eq!(connector_local_node.version_str, node_info.version_str); assert_eq!(connector_local_node.network_id, node_info.network_id); + + connector.shutdown(read_half); } - #[test] - fn connector_local_sequence_works() { - let (_node_info, _, mut connector, _, _) = create_connector(); + #[tokio::test] + #[serial] + async fn connector_local_sequence_works() { + let (_, _, mut connector, read_half) = create_connector().await; assert_eq!(connector.local_sequence(), 0); connector.increment_local_sequence(); assert_eq!(connector.local_sequence(), 1); + + connector.shutdown(read_half); } - #[test] - fn connector_set_remote_works() { - let (_node_info, _, mut connector, _, _) = create_connector(); + #[tokio::test] + #[serial] + async fn connector_set_remote_works() { + let (_, _, mut connector, read_half) = create_connector().await; let connector_auth = &connector.connection_auth; let new_auth_cert = create_auth_cert_from_connection_auth(connector_auth); @@ -335,11 +319,14 @@ mod test { connector.set_remote(RemoteInfo::new(&hello)); assert!(connector.remote().is_some()); + + connector.shutdown(read_half); } - #[test] - fn connector_increment_remote_sequence_works() { - let (_node_info, _, mut connector, _, _) = create_connector(); + #[tokio::test] + #[serial] + async fn connector_increment_remote_sequence_works() { + let (_, _, mut connector, read_half) = create_connector().await; let connector_auth = &connector.connection_auth; let new_auth_cert = create_auth_cert_from_connection_auth(connector_auth); @@ -362,12 +349,15 @@ mod test { connector.increment_remote_sequence().unwrap(); connector.increment_remote_sequence().unwrap(); assert_eq!(connector.remote().unwrap().sequence(), 3); + + connector.shutdown(read_half); } - #[test] - fn connector_get_and_set_hmac_keys_works() { + #[tokio::test] + #[serial] + async fn connector_get_and_set_hmac_keys_works() { //arrange - let (_, _, mut connector, _, _) = create_connector(); + let (_, _, mut connector, read_half) = create_connector().await; let connector_auth = &connector.connection_auth; let new_auth_cert = create_auth_cert_from_connection_auth(connector_auth); @@ -397,11 +387,14 @@ mod test { )); //assert assert!(connector.hmac_keys().is_some()); + + connector.shutdown(read_half); } - #[test] - fn connector_method_works() { - let (_, conn_config, mut connector, _, _) = create_connector(); + #[tokio::test] + #[serial] + async fn connector_method_works() { + let (_, conn_config, mut connector, read_half) = create_connector().await; assert_eq!(connector.remote_called_us(), conn_config.remote_called_us); assert_eq!(connector.receive_tx_messages(), conn_config.recv_tx_msgs); @@ -412,48 +405,18 @@ mod test { connector.handshake_completed(); assert!(connector.is_handshake_created()); - } - #[tokio::test] - async fn connector_send_to_user_works() { - let (_, _, connector, _, mut message_receiver) = create_connector(); - - let message = StellarRelayMessage::Timeout; - connector.send_to_user(message).await.unwrap(); - - let received_message = message_receiver.recv().await; - assert!(received_message.is_some()); - let message = received_message.unwrap(); - match message { - StellarRelayMessage::Timeout => {}, - _ => { - panic!("Incorrect message received!!!") - }, - } + connector.shutdown(read_half); } - #[test] - fn enable_flow_controller_works() { - let (node_info, _, mut connector, _, _) = create_connector(); + #[tokio::test] + #[serial] + async fn enable_flow_controller_works() { + let (node_info, _, mut connector, read_half) = create_connector().await; assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage)); connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version); - } - #[tokio::test] - async fn connector_send_to_node_works() { - let (_, _, connector, mut actions_receiver, _) = create_connector(); - - connector.send_to_node(ConnectorActions::SendHello).await.unwrap(); - - let received_message = actions_receiver.recv().await; - assert!(received_message.is_some()); - let message = received_message.unwrap(); - match message { - ConnectorActions::SendHello => {}, - _ => { - panic!("Incorrect message received!!!") - }, - } + connector.shutdown(read_half); } } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_creation.rs b/clients/stellar-relay-lib/src/connection/connector/message_creation.rs index 8e59b5325..16f1e3db1 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_creation.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_creation.rs @@ -1,6 +1,6 @@ -use crate::{ - connection::{authentication::create_auth_cert, handshake, hmac::create_sha256_hmac}, - xdr_converter, Connector, Error, +use crate::connection::{ + authentication::create_auth_cert, handshake, hmac::create_sha256_hmac, xdr_converter, + Connector, Error, }; use substrate_stellar_sdk::{ types::{AuthenticatedMessage, AuthenticatedMessageV0, HmacSha256Mac, StellarMessage}, diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index 3a71461c9..3333f09c5 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -1,20 +1,24 @@ -use crate::{ - connection::{ - authentication::verify_remote_auth_cert, error_to_string, helper::time_now, hmac::HMacKeys, - xdr_converter::parse_authenticated_message, Connector, Xdr, - }, - node::RemoteInfo, - Error, StellarRelayMessage, -}; use substrate_stellar_sdk::{ - types::{Hello, MessageType, StellarMessage}, + types::{ErrorCode, Hello, MessageType, StellarMessage}, XdrCodec, }; +use crate::connection::{ + authentication::verify_remote_auth_cert, + helper::{error_to_string, time_now}, + hmac::HMacKeys, + xdr_converter::parse_authenticated_message, + Connector, Error, Xdr, +}; + +use crate::node::RemoteInfo; + impl Connector { /// Processes the raw bytes from the stream - pub(crate) async fn process_raw_message(&mut self, xdr: Xdr) -> Result<(), Error> { - let (proc_id, data) = xdr; + pub(super) async fn process_raw_message( + &mut self, + data: Xdr, + ) -> Result, Error> { let (auth_msg, msg_type) = parse_authenticated_message(&data)?; match msg_type { @@ -30,12 +34,15 @@ impl Connector { MessageType::ErrorMsg => match auth_msg.message { StellarMessage::ErrorMsg(e) => { log::error!( - "process_raw_message(): Received ErrorMsg: {}", + "process_raw_message(): Received ErrorMsg during authentication: {}", error_to_string(e.clone()) ); - return Err(Error::OverlayError(e.code)) + return Err(Error::from(e)) }, - other => log::error!("process_raw_message(): Received ErroMsg other: {:?}", other), + other => log::error!( + "process_raw_message(): Received ErroMsg during authentication: {:?}", + other + ), }, _ => { @@ -44,23 +51,23 @@ impl Connector { self.verify_auth(&auth_msg, &data[4..(data.len() - 32)])?; self.increment_remote_sequence()?; log::trace!( - "process_raw_message(): proc_id: {proc_id}. Processing {msg_type:?} message: auth verified" + "process_raw_message(): Processing {msg_type:?} message: auth verified" ); } - self.process_stellar_message(proc_id, auth_msg.message, msg_type).await?; + return self.process_stellar_message(auth_msg.message, msg_type).await }, } - Ok(()) + Ok(None) } - /// Handles what to do next with the Stellar message. Mostly it will be sent back to the user + /// Returns a StellarMessage for the user/outsider. Else none if user/outsider do not need it. + /// This handles what to do with the Stellar message. async fn process_stellar_message( &mut self, - p_id: u32, msg: StellarMessage, msg_type: MessageType, - ) -> Result<(), Error> { + ) -> Result, Error> { match msg { StellarMessage::Hello(hello) => { // update the node info based on the hello message @@ -81,23 +88,25 @@ impl Connector { }, StellarMessage::ErrorMsg(e) => { - self.send_to_user(StellarRelayMessage::Error(error_to_string(e))).await?; + log::error!( + "process_stellar_message(): Received ErrorMsg during authentication: {e:?}" + ); + if e.code == ErrorCode::ErrConf || e.code == ErrorCode::ErrAuth { + return Err(Error::from(e)) + } + return Ok(Some(StellarMessage::ErrorMsg(e))) }, other => { log::trace!( - "process_stellar_message(): proc_id: {p_id}. Processing {msg_type:?} message: received from overlay" + "process_stellar_message(): Processing {other:?} message: received from overlay" ); - self.send_to_user(StellarRelayMessage::Data { - p_id, - msg_type, - msg: Box::new(other), - }) - .await?; self.check_to_send_more(msg_type).await?; + return Ok(Some(other)) }, } - Ok(()) + + Ok(None) } async fn process_auth_message(&mut self) -> Result<(), Error> { @@ -109,12 +118,6 @@ impl Connector { if let Some(remote) = self.remote() { log::debug!("process_auth_message(): sending connect message: {remote:?}"); - self.send_to_user(StellarRelayMessage::Connect { - pub_key: remote.pub_key().clone(), - node_info: remote.node().clone(), - }) - .await?; - self.enable_flow_controller( self.local().node().overlay_version, remote.node().overlay_version, diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs new file mode 100644 index 000000000..8836fd998 --- /dev/null +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -0,0 +1,246 @@ +use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; +use std::time::Duration; +use substrate_stellar_sdk::types::StellarMessage; + +use tokio::{ + io::AsyncReadExt, + net::{tcp, tcp::OwnedReadHalf}, + sync::{mpsc, mpsc::error::TryRecvError}, + time::timeout, +}; + +/// Polls for messages coming from the Stellar Node and communicates it back to the user +/// +/// # Arguments +/// * `connector` - contains the config and necessary info for connecting to Stellar Node +/// * `read_stream_overlay` - the read half of the stream that is connected to Stellar Node +/// * `send_to_user_sender` - sends message from Stellar to the user +/// * `send_to_node_receiver` - receives message from user and writes it to the write half of the +/// stream. +pub(crate) async fn poll_messages_from_stellar( + mut connector: Connector, + mut read_stream_overlay: OwnedReadHalf, + send_to_user_sender: mpsc::Sender, + mut send_to_node_receiver: mpsc::Receiver, +) { + log::info!("poll_messages_from_stellar(): started."); + + loop { + if send_to_user_sender.is_closed() { + log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); + // close this channel as communication to user was closed. + break + } + + // check for messages from user. + match send_to_node_receiver.try_recv() { + Ok(msg) => + if let Err(e) = connector.send_to_node(msg).await { + log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); + }, + Err(TryRecvError::Disconnected) => break, + Err(TryRecvError::Empty) => {}, + } + + // check for messages from Stellar Node. + match read_message_from_stellar(&mut read_stream_overlay, connector.timeout_in_secs).await { + Err(e) => { + log::error!("poll_messages_from_stellar(): {e:?}"); + break + }, + Ok(xdr) => match connector.process_raw_message(xdr).await { + Ok(Some(stellar_msg)) => + // push message to user + if let Err(e) = send_to_user_sender.send(stellar_msg).await { + log::warn!("poll_messages_from_stellar(): Error occurred during sending message to user: {e:?}"); + }, + Ok(_) => {}, + Err(e) => { + log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); + break + }, + }, + } + } + + // make sure to drop/shutdown the stream + connector.write_stream_overlay.forget(); + drop(read_stream_overlay); + + send_to_node_receiver.close(); + drop(send_to_user_sender); + + log::debug!("poll_messages_from_stellar(): stopped."); +} + +/// Returns Xdr format of the `StellarMessage` sent from the Stellar Node +async fn read_message_from_stellar( + r_stream: &mut tcp::OwnedReadHalf, + timeout_in_secs: u64, +) -> Result { + // holds the number of bytes that were missing from the previous stellar message. + let mut lack_bytes_from_prev = 0; + let mut readbuf: Vec = vec![]; + + let mut buff_for_peeking = vec![0; 4]; + loop { + // check whether or not we should read the bytes as: + // 1. the length of the next stellar message + // 2. the remaining bytes of the previous stellar message + match timeout(Duration::from_secs(timeout_in_secs), r_stream.peek(&mut buff_for_peeking)) + .await + { + Ok(Ok(0)) => { + log::trace!("read_message_from_stellar(): ERROR: Received 0 size"); + return Err(Error::Disconnected) + }, + + Ok(Ok(_)) if lack_bytes_from_prev == 0 => { + // if there are no more bytes lacking from the previous message, + // then check the size of next stellar message. + // If it's not enough, skip it. + let expect_msg_len = next_message_length(r_stream).await; + log::trace!( + "read_message_from_stellar(): The next message length is {expect_msg_len}" + ); + + if expect_msg_len == 0 { + // there's nothing to read; wait for the next iteration + log::trace!( + "read_message_from_stellar(): Nothing left to read; waiting for next loop" + ); + continue + } + + // let's start reading the actual stellar message. + readbuf = vec![0; expect_msg_len]; + + match read_message( + r_stream, + &mut lack_bytes_from_prev, + &mut readbuf, + expect_msg_len, + ) + .await + { + Ok(None) => continue, + Ok(Some(xdr)) => return Ok(xdr), + Err(e) => { + log::trace!("read_message_from_stellar(): ERROR: {e:?}"); + return Err(e) + }, + } + }, + + Ok(Ok(_)) => { + // let's read the continuation number of bytes from the previous message. + match read_unfinished_message(r_stream, &mut lack_bytes_from_prev, &mut readbuf) + .await + { + Ok(None) => continue, + Ok(Some(xdr)) => return Ok(xdr), + Err(e) => { + log::trace!("read_message_from_stellar(): ERROR: {e:?}"); + return Err(e) + }, + } + }, + Ok(Err(e)) => { + log::trace!("read_message_from_stellar(): ERROR: {e:?}"); + return Err(Error::ReadFailed(e.to_string())) + }, + + Err(_) => { + log::error!("read_message_from_stellar(): timeout elapsed."); + return Err(Error::Timeout) + }, + } + } +} + +/// Returns Xdr when all bytes from the stream have successfully been converted; else None. +/// This reads a number of bytes based on the expected message length. +/// +/// # Arguments +/// * `r_stream` - the read stream for reading the xdr stellar message +/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message +/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message +/// * `xpect_msg_len` - the expected # of bytes of the Stellar message +async fn read_message( + r_stream: &mut tcp::OwnedReadHalf, + lack_bytes_from_prev: &mut usize, + readbuf: &mut Vec, + xpect_msg_len: usize, +) -> Result, Error> { + let actual_msg_len = read_stream(r_stream, readbuf).await?; + + // only when the message has the exact expected size bytes, should we send to user. + if actual_msg_len == xpect_msg_len { + return Ok(Some(readbuf.clone())) + } + + // The next bytes are remnants from the previous stellar message. + // save it and read it on the next loop. + *lack_bytes_from_prev = xpect_msg_len - actual_msg_len; + *readbuf = readbuf[0..actual_msg_len].to_owned(); + log::trace!( + "read_message(): received only partial message. Need {lack_bytes_from_prev} bytes to complete." + ); + + Ok(None) +} + +/// Returns Xdr when all bytes from the stream have successfully been converted; else None. +/// Reads a continuation of bytes that belong to the previous message +/// +/// # Arguments +/// * `r_stream` - the read stream for reading the xdr stellar message +/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message +/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message +async fn read_unfinished_message( + r_stream: &mut tcp::OwnedReadHalf, + lack_bytes_from_prev: &mut usize, + readbuf: &mut Vec, +) -> Result, Error> { + // let's read the continuation number of bytes from the previous message. + let mut cont_buf = vec![0; *lack_bytes_from_prev]; + + let actual_msg_len = read_stream(r_stream, &mut cont_buf).await?; + + // this partial message completes the previous message. + if actual_msg_len == *lack_bytes_from_prev { + log::trace!("read_unfinished_message(): received continuation from the previous message."); + readbuf.append(&mut cont_buf); + + return Ok(Some(readbuf.clone())) + } + + // this partial message is not enough to complete the previous message. + if actual_msg_len > 0 { + *lack_bytes_from_prev -= actual_msg_len; + cont_buf = cont_buf[0..actual_msg_len].to_owned(); + readbuf.append(&mut cont_buf); + log::trace!( + "read_unfinished_message(): not enough bytes to complete the previous message. Need {lack_bytes_from_prev} bytes to complete." + ); + } + + Ok(None) +} + +/// checks the length of the next stellar message. +async fn next_message_length(r_stream: &mut tcp::OwnedReadHalf) -> usize { + // let's check for messages. + let mut sizebuf = [0; 4]; + + if r_stream.read(&mut sizebuf).await.unwrap_or(0) == 0 { + return 0 + } + + get_xdr_message_length(&sizebuf) +} + +/// reads data from the stream and store to buffer +async fn read_stream(r_stream: &mut tcp::OwnedReadHalf, buffer: &mut [u8]) -> Result { + r_stream.read(buffer).await.map_err(|e| Error::ReadFailed(e.to_string())) +} diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 01964e168..d5504e5c9 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -1,14 +1,41 @@ -use crate::connection::connector::{Connector, ConnectorActions}; +use std::time::Duration; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; +use tokio::{io::AsyncWriteExt, time::timeout}; -use crate::{ - connection::flow_controller::MAX_FLOOD_MSG_CAP, handshake::create_auth_message, Error, +use crate::connection::{ + flow_controller::MAX_FLOOD_MSG_CAP, + handshake::create_auth_message, + helper::{time_now, to_base64_xdr_string}, + Connector, Error, }; impl Connector { - /// Sends an xdr version of a wrapped AuthenticatedMessage ( StellarMessage ). - async fn send_stellar_message(&mut self, msg: StellarMessage) -> Result<(), Error> { - self.send_to_node(ConnectorActions::SendMessage(Box::new(msg))).await + pub async fn send_to_node(&mut self, msg: StellarMessage) -> Result<(), Error> { + let xdr_msg = &self.create_xdr_message(msg)?; + + match timeout( + Duration::from_secs(self.timeout_in_secs), + self.write_stream_overlay.write_all(&xdr_msg), + ) + .await + { + Ok(res) => res.map_err(|e| Error::WriteFailed(e.to_string())), + Err(_) => Err(Error::Timeout), + } + } + + pub async fn send_hello_message(&mut self) -> Result<(), Error> { + let msg = self.create_hello_message(time_now())?; + log::info!("send_hello_message(): Sending Hello Message: {}", to_base64_xdr_string(&msg)); + + self.send_to_node(msg).await + } + + pub(super) async fn send_auth_message(&mut self) -> Result<(), Error> { + let msg = create_auth_message(); + log::info!("send_auth_message(): Sending Auth Message: {}", to_base64_xdr_string(&msg)); + + self.send_to_node(create_auth_message()).await } pub(super) async fn check_to_send_more( @@ -20,14 +47,6 @@ impl Connector { } let msg = StellarMessage::SendMore(SendMore { num_messages: MAX_FLOOD_MSG_CAP }); - self.send_stellar_message(msg).await - } - - pub(super) async fn send_hello_message(&mut self) -> Result<(), Error> { - self.send_to_node(ConnectorActions::SendHello).await.map_err(Error::from) - } - - pub(super) async fn send_auth_message(&mut self) -> Result<(), Error> { - self.send_stellar_message(create_auth_message()).await + self.send_to_node(msg).await } } diff --git a/clients/stellar-relay-lib/src/connection/connector/mod.rs b/clients/stellar-relay-lib/src/connection/connector/mod.rs index 238962764..d1c5e3dfe 100644 --- a/clients/stellar-relay-lib/src/connection/connector/mod.rs +++ b/clients/stellar-relay-lib/src/connection/connector/mod.rs @@ -1,17 +1,8 @@ -use crate::connection::Xdr; -use substrate_stellar_sdk::types::StellarMessage; - mod connector; mod message_creation; mod message_handler; +mod message_reader; mod message_sender; -pub(crate) use connector::Connector; - -#[derive(Debug)] -pub enum ConnectorActions { - SendHello, - SendMessage(Box), - HandleMessage(Xdr), - Disconnect, -} +pub(crate) use connector::*; +pub(crate) use message_reader::poll_messages_from_stellar; diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index 9edbf7992..334600383 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -1,20 +1,26 @@ #![allow(dead_code)] //todo: remove after being tested and implemented -use crate::connection::xdr_converter::Error as XDRError; +use crate::{connection::xdr_converter::Error as XDRError, helper::error_to_string}; use substrate_stellar_sdk::{types::ErrorCode, StellarSdkError}; use tokio::sync; #[derive(Debug, err_derive::Error)] pub enum Error { - #[error(display = "Authentication Certification: Expired")] + #[error(display = "Auth Certificate: Expired")] AuthCertExpired, - #[error(display = "Authentication Certification: Not Found")] + #[error(display = "Auth Certificate: Not Found")] AuthCertNotFound, - #[error(display = "Authentication Certification: Invalid")] + #[error(display = "Auth Certificate: Invalid")] AuthCertInvalid, + #[error(display = "Auth Certificate Creation: Signature Failed")] + AuthSignatureFailed, + + #[error(display = "Authentication Failed: {}", _0)] + AuthFailed(String), + #[error(display = "Connection: {}", _0)] ConnectionFailed(String), @@ -56,6 +62,12 @@ pub enum Error { #[error(display = "Received Error from Overlay: {:?}", _0)] OverlayError(ErrorCode), + + #[error(display = "Encountered timeout")] + Timeout, + + #[error(display = "Config Error: Version String too long")] + VersionStrTooLong, } impl From for Error { @@ -75,3 +87,16 @@ impl From for Error { Error::StellarSdkError(e) } } + +impl From for Error { + fn from(value: substrate_stellar_sdk::types::Error) -> Self { + match value.code { + ErrorCode::ErrConf => Self::ConfigError(error_to_string(value)), + ErrorCode::ErrAuth => Self::AuthFailed(error_to_string(value)), + other => { + log::error!("Stellar Node returned error: {}", error_to_string(value)); + Self::OverlayError(other) + }, + } + } +} diff --git a/clients/stellar-relay-lib/src/connection/handshake.rs b/clients/stellar-relay-lib/src/connection/handshake.rs index 017454bee..d9952fdac 100644 --- a/clients/stellar-relay-lib/src/connection/handshake.rs +++ b/clients/stellar-relay-lib/src/connection/handshake.rs @@ -1,6 +1,6 @@ use substrate_stellar_sdk::compound_types::LimitedString; -use crate::{node::NodeInfo, Error}; +use crate::{connection::Error, node::NodeInfo}; use substrate_stellar_sdk::{ types::{Auth, AuthCert, Hello, StellarMessage, Uint256}, PublicKey, @@ -33,7 +33,10 @@ pub fn create_hello_message( overlay_version: node_info.overlay_version, overlay_min_version: node_info.overlay_min_version, network_id: node_info.network_id, - version_str: LimitedString::<100>::new(version_str.clone())?, + version_str: LimitedString::<100>::new(version_str.clone()).map_err(|e| { + log::error!("create_hello_message(): {e:?}"); + Error::VersionStrTooLong + })?, listening_port: i32::try_from(listening_port).unwrap_or(11625), peer_id, cert, diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 07e033362..051357777 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -5,19 +5,7 @@ use substrate_stellar_sdk::{ types::{Error, Uint256}, SecretKey, XdrCodec, }; - -/// a helpful macro to log an error (if it occurs) and return immediately. -macro_rules! log_error { - // expression, return value, extra log - ($res:expr, $log:expr) => { - if let Err(e) = $res { - log::error!("{:?}: {e:?}", $log); - return - } - }; -} - -pub(crate) use log_error; +use tokio::net::{tcp, TcpStream}; /// Returns a new BigNumber with a pseudo-random value equal to or greater than 0 and less than 1. pub fn generate_random_nonce() -> Uint256 { @@ -53,3 +41,13 @@ pub fn to_base64_xdr_string(msg: &T) -> String { let xdr = msg.to_base64_xdr(); String::from_utf8(xdr.clone()).unwrap_or(format!("{:?}", xdr)) } + +pub async fn create_stream( + address: &str, +) -> Result<(tcp::OwnedReadHalf, tcp::OwnedWriteHalf), crate::Error> { + let stream = TcpStream::connect(address) + .await + .map_err(|e| crate::Error::ConnectionFailed(e.to_string()))?; + + Ok(stream.into_split()) +} diff --git a/clients/stellar-relay-lib/src/connection/mod.rs b/clients/stellar-relay-lib/src/connection/mod.rs index 1da0f04d7..ac342a8ed 100644 --- a/clients/stellar-relay-lib/src/connection/mod.rs +++ b/clients/stellar-relay-lib/src/connection/mod.rs @@ -6,43 +6,16 @@ mod hmac; mod authentication; mod connector; pub mod helper; -mod overlay_connection; -mod services; pub mod xdr_converter; pub(crate) use connector::*; pub use error::Error; pub use helper::*; -pub use overlay_connection::*; use serde::Serialize; use std::fmt::{Debug, Formatter}; +use substrate_stellar_sdk::SecretKey; -type Xdr = (u32, Vec); - -use crate::node::NodeInfo; -use substrate_stellar_sdk::{ - types::{MessageType, StellarMessage}, - PublicKey, SecretKey, -}; - -#[derive(Debug)] -/// Represents the messages that the connection creates bases on the Stellar Node -pub enum StellarRelayMessage { - /// Successfully connected to the node - Connect { - pub_key: PublicKey, - node_info: NodeInfo, - }, - /// Stellar messages from the node - Data { - p_id: u32, - msg_type: MessageType, - msg: Box, - }, - Error(String), - /// The amount of time to wait for Stellar Node messages - Timeout, -} +type Xdr = Vec; /// Config for connecting to Stellar Node #[derive(Clone, Serialize, PartialEq, Eq)] @@ -57,8 +30,6 @@ pub struct ConnectionInfo { pub remote_called_us: bool, /// how long to wait for the Stellar Node's messages. timeout_in_secs: u64, - /// number of retries to wait for the Stellar Node's messages and/or to connect back to it. - retries: u8, } impl Debug for ConnectionInfo { @@ -73,14 +44,13 @@ impl Debug for ConnectionInfo { .field("receive_scp_messages", &self.recv_scp_msgs) .field("remote_called_us", &self.remote_called_us) .field("timeout_in_seconds", &self.timeout_in_secs) - .field("retries", &self.retries) .finish() } } impl ConnectionInfo { #[allow(clippy::too_many_arguments)] - pub(crate) fn new_with_timeout_and_retries( + pub(crate) fn new_with_timeout( addr: &str, port: u32, secret_key: SecretKey, @@ -89,7 +59,6 @@ impl ConnectionInfo { recv_scp_msgs: bool, remote_called_us: bool, timeout_in_secs: u64, - retries: u8, ) -> Self { ConnectionInfo { address: addr.to_string(), @@ -100,33 +69,9 @@ impl ConnectionInfo { recv_scp_msgs, remote_called_us, timeout_in_secs, - retries, } } - #[cfg(test)] - pub(crate) fn new( - addr: &str, - port: u32, - secret_key: SecretKey, - auth_cert_expiration: u64, - recv_tx_msgs: bool, - recv_scp_msgs: bool, - remote_called_us: bool, - ) -> Self { - Self::new_with_timeout_and_retries( - addr, - port, - secret_key, - auth_cert_expiration, - recv_tx_msgs, - recv_scp_msgs, - remote_called_us, - 10, - 3, - ) - } - pub fn address(&self) -> String { format!("{}:{}", self.address, self.port) } diff --git a/clients/stellar-relay-lib/src/connection/overlay_connection.rs b/clients/stellar-relay-lib/src/connection/overlay_connection.rs deleted file mode 100644 index 183c02de7..000000000 --- a/clients/stellar-relay-lib/src/connection/overlay_connection.rs +++ /dev/null @@ -1,260 +0,0 @@ -use crate::{ - connection::{ - connector::ConnectorActions, - services::{connection_handler, create_stream, receiving_service}, - }, - node::NodeInfo, - ConnectionInfo, Connector, Error, StellarRelayMessage, -}; -use substrate_stellar_sdk::types::StellarMessage; -use tokio::{sync::mpsc, time::Duration}; - -pub struct StellarOverlayConnection { - /// This is when we want to send stellar messages - actions_sender: mpsc::Sender, - /// For receiving stellar messages - relay_message_receiver: mpsc::Receiver, - local_node: NodeInfo, - conn_info: ConnectionInfo, -} - -impl StellarOverlayConnection { - fn new( - actions_sender: mpsc::Sender, - relay_message_receiver: mpsc::Receiver, - local_node: NodeInfo, - conn_info: ConnectionInfo, - ) -> Self { - StellarOverlayConnection { actions_sender, relay_message_receiver, local_node, conn_info } - } - - pub async fn send(&self, message: StellarMessage) -> Result<(), Error> { - self.actions_sender - .send(ConnectorActions::SendMessage(Box::new(message))) - .await - .map_err(Error::from) - } - - pub async fn disconnect(&mut self) -> Result<(), Error> { - self.actions_sender - .send(ConnectorActions::Disconnect) - .await - .map_err(Error::from) - } - - /// Receives Stellar messages from the connection. - /// Restarts the connection when lost. - pub async fn listen(&mut self) -> Option { - let res = self.relay_message_receiver.recv().await; - - match &res { - Some(StellarRelayMessage::Timeout) | Some(StellarRelayMessage::Error(_)) | None => { - log::info!("listen(): Reconnecting to {:?}...", &self.conn_info.address); - - match StellarOverlayConnection::connect( - self.local_node.clone(), - self.conn_info.clone(), - ) - .await - { - Ok(new_user) => { - self.actions_sender = new_user.actions_sender; - self.relay_message_receiver = new_user.relay_message_receiver; - log::info!( - "listen(): overlay connection reconnected to {:?}", - &self.conn_info.address - ); - return self.relay_message_receiver.recv().await - }, - Err(e) => { - log::error!( - "listen(): overlay connection failed to reconnect: {e:?}\n. Retrying in 3 seconds...", - ); - tokio::time::sleep(Duration::from_secs(3)).await; - }, - }; - }, - _ => {}, - } - - res - } - - /// Triggers connection to the Stellar Node. - /// Returns the UserControls for the user to send and receive Stellar messages. - pub(crate) async fn connect( - local_node: NodeInfo, - conn_info: ConnectionInfo, - ) -> Result { - log::info!("connect(): Connecting to: {conn_info:?}"); - - let retries = conn_info.retries; - let timeout_in_secs = conn_info.timeout_in_secs; - // split the stream for easy handling of read and write - let (rd, wr) = create_stream(&conn_info.address()).await?; - // ------------------ prepare the channels - // this is a channel to communicate with the connection/config (this needs renaming) - let (actions_sender, actions_receiver) = mpsc::channel::(1024); - // this is a channel to communicate with the user/caller. - let (relay_message_sender, relay_message_receiver) = - mpsc::channel::(1024); - let overlay_connection = StellarOverlayConnection::new( - actions_sender.clone(), - relay_message_receiver, - local_node, - conn_info, - ); - let connector = Connector::new( - overlay_connection.local_node.clone(), - overlay_connection.conn_info.clone(), - actions_sender.clone(), - relay_message_sender, - ); - // start the receiving_service - tokio::spawn(receiving_service(rd, actions_sender.clone(), timeout_in_secs, retries)); - // run the connector communication - tokio::spawn(connection_handler(connector, actions_receiver, wr)); - // start the handshake - actions_sender.send(ConnectorActions::SendHello).await?; - Ok(overlay_connection) - } - - pub fn get_actions_sender(&self) -> mpsc::Sender { - self.actions_sender.clone() - } - pub fn get_disconnect_action(&self) -> ConnectorActions { - ConnectorActions::Disconnect - } -} - -#[cfg(test)] -mod test { - use crate::{ - node::NodeInfo, ConnectionInfo, ConnectorActions, Error, StellarOverlayConnection, - StellarRelayMessage, - }; - use substrate_stellar_sdk::{ - network::TEST_NETWORK, - types::{MessageType, StellarMessage}, - SecretKey, - }; - use tokio::sync::mpsc; - - fn create_node_and_conn() -> (NodeInfo, ConnectionInfo) { - let secret = - SecretKey::from_encoding("SBLI7RKEJAEFGLZUBSCOFJHQBPFYIIPLBCKN7WVCWT4NEG2UJEW33N73") - .unwrap(); - let node_info = NodeInfo::new(19, 21, 19, "v19.1.0".to_string(), &TEST_NETWORK); - let conn_info = ConnectionInfo::new("34.235.168.98", 11625, secret, 0, false, true, false); - (node_info, conn_info) - } - - #[test] - fn create_stellar_overlay_connection_works() { - let (node_info, conn_info) = create_node_and_conn(); - - let (actions_sender, _) = mpsc::channel::(1024); - let (_, relay_message_receiver) = mpsc::channel::(1024); - - StellarOverlayConnection::new(actions_sender, relay_message_receiver, node_info, conn_info); - } - - #[tokio::test] - async fn stellar_overlay_connection_send_works() { - //arrange - let (node_info, conn_info) = create_node_and_conn(); - - let (actions_sender, mut actions_receiver) = mpsc::channel::(1024); - let (_, relay_message_receiver) = mpsc::channel::(1024); - - let overlay_connection = StellarOverlayConnection::new( - actions_sender.clone(), - relay_message_receiver, - node_info, - conn_info, - ); - let message_s = StellarMessage::GetPeers; - - //act - overlay_connection.send(message_s.clone()).await.expect("Should sent message"); - - //assert - let message = actions_receiver.recv().await.unwrap(); - if let ConnectorActions::SendMessage(message) = message { - assert_eq!(*message, message_s); - } else { - panic!("Incorrect stellar message") - } - } - - #[tokio::test] - async fn stellar_overlay_connection_listen_works() { - //arrange - let (node_info, conn_info) = create_node_and_conn(); - - let (actions_sender, _actions_receiver) = mpsc::channel::(1024); - let (relay_message_sender, relay_message_receiver) = - mpsc::channel::(1024); - - let mut overlay_connection = StellarOverlayConnection::new( - actions_sender.clone(), - relay_message_receiver, - node_info, - conn_info, - ); - - let expected_p_id = 2; - let expected_msg_type = MessageType::ErrorMsg; - relay_message_sender - .send(StellarRelayMessage::Data { - p_id: expected_p_id, - msg_type: expected_msg_type, - msg: Box::new(StellarMessage::GetPeers), - }) - .await - .expect("Stellar Relay message should be sent"); - - //act - let message = overlay_connection.listen().await.expect("Should receive some message"); - - //assert - match message { - StellarRelayMessage::Data { p_id, msg_type, msg } => { - assert_eq!(p_id, expected_p_id); - assert_eq!(msg_type, expected_msg_type); - assert_eq!(msg, Box::new(StellarMessage::GetPeers)) - }, - _ => panic!("wrong relay message received"), - } - } - - #[tokio::test] - async fn connect_should_fail_incorrect_address() { - let secret = - SecretKey::from_encoding("SBLI7RKEJAEFGLZUBSCOFJHQBPFYIIPLBCKN7WVCWT4NEG2UJEW33N73") - .unwrap(); - let node_info = NodeInfo::new(19, 21, 19, "v19.1.0".to_string(), &TEST_NETWORK); - let conn_info = - ConnectionInfo::new("incorrect address", 11625, secret, 0, false, true, false); - - let stellar_overlay_connection = - StellarOverlayConnection::connect(node_info, conn_info).await; - - assert!(stellar_overlay_connection.is_err()); - match stellar_overlay_connection.err().unwrap() { - Error::ConnectionFailed(_) => {}, - _ => { - panic!("Incorrect error") - }, - } - } - - #[tokio::test] - async fn stellar_overlay_connect_works() { - let (node_info, conn_info) = create_node_and_conn(); - let stellar_overlay_connection = - StellarOverlayConnection::connect(node_info, conn_info).await; - - assert!(stellar_overlay_connection.is_ok()); - } -} diff --git a/clients/stellar-relay-lib/src/connection/services.rs b/clients/stellar-relay-lib/src/connection/services.rs deleted file mode 100644 index dd503b9fa..000000000 --- a/clients/stellar-relay-lib/src/connection/services.rs +++ /dev/null @@ -1,350 +0,0 @@ -use crate::{ - connection::{ - connector::{Connector, ConnectorActions}, - helper::{time_now, to_base64_xdr_string}, - log_error, - xdr_converter::get_xdr_message_length, - }, - Error, StellarRelayMessage, -}; -use substrate_stellar_sdk::types::StellarMessage; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::{tcp, TcpStream}, - sync::mpsc, - time::{timeout, Duration}, -}; - -/// For connecting to the StellarNode -pub(crate) async fn create_stream( - address: &str, -) -> Result<(tcp::OwnedReadHalf, tcp::OwnedWriteHalf), Error> { - let stream = TcpStream::connect(address) - .await - .map_err(|e| Error::ConnectionFailed(e.to_string()))?; - - Ok(stream.into_split()) -} - -/// checks the length of the next stellar message. -async fn next_message_length(r_stream: &mut tcp::OwnedReadHalf) -> usize { - // let's check for messages. - let mut sizebuf = [0; 4]; - - if r_stream.read(&mut sizebuf).await.unwrap_or(0) == 0 { - return 0 - } - - get_xdr_message_length(&sizebuf) -} - -/// reads data from the stream and store to buffer -async fn read_stream(r_stream: &mut tcp::OwnedReadHalf, buffer: &mut [u8]) -> Result { - r_stream.read(buffer).await.map_err(|e| Error::ReadFailed(e.to_string())) -} - -/// sends the HandleMessage action to the connector -async fn handle_message( - actions_sender: &mpsc::Sender, - proc_id: u32, - xdr_msg: Vec, -) -> Result<(), Error> { - actions_sender - .send(ConnectorActions::HandleMessage((proc_id, xdr_msg))) - .await - .map_err(Error::from) -} - -/// reads a continuation of bytes that belong to the previous message -/// -/// # Arguments -/// * `r_stream` - the read stream for reading the xdr stellar message -/// * `actions_sender` - the sender for actions a Connector must do -/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message -/// * `proc_id` - the process id, used for tracing. -/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message -async fn read_unfinished_message( - r_stream: &mut tcp::OwnedReadHalf, - actions_sender: &mpsc::Sender, - lack_bytes_from_prev: &mut usize, - proc_id: &mut u32, - readbuf: &mut Vec, -) -> Result<(), Error> { - // let's read the continuation number of bytes from the previous message. - let mut cont_buf = vec![0; *lack_bytes_from_prev]; - - let actual_msg_len = read_stream(r_stream, &mut cont_buf).await?; - - // this partial message completes the previous message. - if actual_msg_len == *lack_bytes_from_prev { - log::trace!("read_unfinished_message(): proc_id: {} received continuation from the previous message.", proc_id); - readbuf.append(&mut cont_buf); - - handle_message(actions_sender, *proc_id, readbuf.clone()).await?; - - *lack_bytes_from_prev = 0; - readbuf.clear(); - *proc_id += 1; - - return Ok(()) - } - - // this partial message is not enough to complete the previous message. - if actual_msg_len > 0 { - *lack_bytes_from_prev -= actual_msg_len; - cont_buf = cont_buf[0..actual_msg_len].to_owned(); - readbuf.append(&mut cont_buf); - log::trace!( - "read_unfinished_message(): proc_id: {} not enough bytes to complete the previous message. Need {} bytes to complete.", - proc_id, - lack_bytes_from_prev - ); - } - - Ok(()) -} - -/// reads a number of bytes based on the expected message length. -/// -/// # Arguments -/// * `r_stream` - the read stream for reading the xdr stellar message -/// * `actions_sender` - the sender for actions a Connector must do -/// * `lack_bytes_from_prev` - the number of bytes remaining, to complete the previous message -/// * `proc_id` - the process id, used for tracing. -/// * `readbuf` - the buffer that holds the bytes of the previous and incomplete message -/// * `xpect_msg_len` - the expected # of bytes of the Stellar message -async fn read_message( - r_stream: &mut tcp::OwnedReadHalf, - actions_sender: &mpsc::Sender, - lack_bytes_from_prev: &mut usize, - proc_id: &mut u32, - readbuf: &mut Vec, - xpect_msg_len: usize, -) -> Result<(), Error> { - let actual_msg_len = read_stream(r_stream, readbuf).await?; - - // only when the message has the exact expected size bytes, should we send to user. - if actual_msg_len == xpect_msg_len { - handle_message(actions_sender, *proc_id, readbuf.clone()).await?; - readbuf.clear(); - *proc_id += 1; - return Ok(()) - } - - // The next bytes are remnants from the previous stellar message. - // save it and read it on the next loop. - *lack_bytes_from_prev = xpect_msg_len - actual_msg_len; - *readbuf = readbuf[0..actual_msg_len].to_owned(); - log::trace!( - "read_message(): proc_id: {} received only partial message. Need {} bytes to complete.", - proc_id, - lack_bytes_from_prev - ); - - Ok(()) -} - -/// This service is for RECEIVING a stellar message from the server. -/// # Arguments -/// * `r_stream` - the read stream for reading the xdr stellar message -/// * `tx_stream_reader` - the sender for handling the xdr stellar message -pub(crate) async fn receiving_service( - mut r_stream: tcp::OwnedReadHalf, - actions_sender: mpsc::Sender, - timeout_in_secs: u64, - retries: u8, -) { - let mut retry = 0; - let mut retry_read = 0; - let mut proc_id = 0; - - // holds the number of bytes that were missing from the previous stellar message. - let mut lack_bytes_from_prev = 0; - let mut readbuf: Vec = vec![]; - - let mut buff_for_peeking = vec![0; 4]; - loop { - // check whether or not we should read the bytes as: - // 1. the length of the next stellar message - // 2. the remaining bytes of the previous stellar message - match timeout(Duration::from_secs(timeout_in_secs), r_stream.peek(&mut buff_for_peeking)) - .await - { - Ok(Ok(0)) => { - if retry_read >= retries { - log::error!("receiving_service(): proc_id: {proc_id}. Failed to read messages from the stream. Received 0 size more than {retries} times"); - return - } - retry_read += 1; - }, - Ok(Ok(_)) if lack_bytes_from_prev == 0 => { - retry = 0; - retry_read = 0; - // if there are no more bytes lacking from the previous message, - // then check the size of next stellar message. - // If it's not enough, skip it. - let expect_msg_len = next_message_length(&mut r_stream).await; - log::trace!("receiving_service(): proc_id: {proc_id}. The next message length is {expect_msg_len}"); - - if expect_msg_len == 0 { - // there's nothing to read; wait for the next iteration - log::trace!("receiving_service(): proc_id: {proc_id}. Nothing left to read; waiting for next loop"); - continue - } - - // let's start reading the actual stellar message. - readbuf = vec![0; expect_msg_len]; - - log_error!( - read_message( - &mut r_stream, - &actions_sender, - &mut lack_bytes_from_prev, - &mut proc_id, - &mut readbuf, - expect_msg_len, - ) - .await, - format!("receiving_service(): proc_id: {proc_id}. Failed to read message") - ); - }, - - Ok(Ok(_)) => { - retry = 0; - retry_read = 0; - // let's read the continuation number of bytes from the previous message. - log_error!( - read_unfinished_message( - &mut r_stream, - &actions_sender, - &mut lack_bytes_from_prev, - &mut proc_id, - &mut readbuf, - ).await, - format!("receiving_service(): proc_id:{proc_id}. Error occurred while reading unfinished stellar message") - ); - }, - Ok(Err(e)) => { - log::error!("receiving_service(): proc_id: {proc_id}. Error occurred while reading the stream: {e:?}"); - return - }, - Err(elapsed) => { - log::error!( - "receiving_service(): proc_id: {proc_id}. Timeout of {} seconds elapsed for reading messages from Stellar Node. Retry: #{retry}", - elapsed.to_string() - ); - - if retry >= retries { - log::error!("receiving_service(): proc_id: {proc_id}. Exhausted maximum retries for reading messages from Stellar Node."); - return - } - retry += 1; - }, - } - } -} - -async fn _write_to_stream( - msg: StellarMessage, - connector: &mut Connector, - w_stream: &mut tcp::OwnedWriteHalf, -) -> Result<(), Error> { - let xdr_msg = connector.create_xdr_message(msg)?; - w_stream - .write_all(&xdr_msg) - .await - .map_err(|e| Error::WriteFailed(e.to_string())) -} - -async fn _connection_handler( - actions: ConnectorActions, - connector: &mut Connector, - w_stream: &mut tcp::OwnedWriteHalf, -) -> Result<(), Error> { - match actions { - // start the connection to Stellar node with a 'hello' - ConnectorActions::SendHello => { - let msg = connector.create_hello_message(time_now())?; - - log::info!( - "_connection_handler(): Sending Hello Message: {}", - to_base64_xdr_string(&msg) - ); - _write_to_stream(msg, connector, w_stream).await?; - }, - - // write message to the stream - ConnectorActions::SendMessage(msg) => { - _write_to_stream(*msg, connector, w_stream).await?; - }, - - // handle incoming message from the stream - ConnectorActions::HandleMessage(xdr) => { - connector.process_raw_message(xdr).await?; - }, - - ConnectorActions::Disconnect => panic!("Should disconnect"), - } - - Ok(()) -} - -/// Handles actions for the connection. -/// # Arguments -/// * `connector` - the Connector that would send/handle messages to/from Stellar Node -/// * `receiver` - The receiver for actions that the Connector should do. -/// * `w_stream` -> the write half of the TcpStream to connect to the Stellar Node -pub(crate) async fn connection_handler( - mut connector: Connector, - mut actions_receiver: mpsc::Receiver, - mut w_stream: tcp::OwnedWriteHalf, -) { - let mut retry = 0; - - loop { - match timeout(Duration::from_secs(connector.timeout_in_secs), actions_receiver.recv()).await - { - Ok(Some(ConnectorActions::Disconnect)) => { - log_error!( - w_stream.shutdown().await, - format!("connection_handler(): Failed to shutdown write half of stream:") - ); - drop(connector); - drop(actions_receiver); - return - }, - - Ok(Some(action)) => { - if let Err(e) = _connection_handler(action, &mut connector, &mut w_stream).await { - log::error!("connection_handler(): {e:?}"); - - log_error!( - connector.send_to_user(StellarRelayMessage::Error(e.to_string())).await, - format!("connection_handler(): sending error message") - ); - return - } - }, - - Ok(None) => { - log::warn!("connection_handler(): Unexpected empty response from receiver"); - }, - - Err(elapsed) => { - log::error!( - "connection_handler(): Timeout of {} seconds elapsed for reading messages from Stellar Node. Retry: #{retry}", - elapsed.to_string() - ); - - if retry >= connector.retries { - log_error!( - connector.send_to_user(StellarRelayMessage::Timeout).await, - format!("connection_handler(): Exhausted maximum retries for receiving any actions from receiver.") - ); - return - } - retry += 1; - }, - } - } -} diff --git a/clients/stellar-relay-lib/src/lib.rs b/clients/stellar-relay-lib/src/lib.rs index 952c2c4fc..4579e9b25 100644 --- a/clients/stellar-relay-lib/src/lib.rs +++ b/clients/stellar-relay-lib/src/lib.rs @@ -1,15 +1,14 @@ mod config; +// mod connection; mod connection; pub mod node; +mod overlay; #[cfg(test)] mod tests; -pub(crate) use connection::{ - handshake::{self, HandshakeState}, - ConnectionInfo, Connector, ConnectorActions, +pub use crate::connection::{ + handshake::HandshakeState, helper, xdr_converter, ConnectionInfo, Error, }; -pub use connection::{helper, xdr_converter, Error, StellarOverlayConnection, StellarRelayMessage}; - -pub use substrate_stellar_sdk as sdk; - pub use config::{connect_to_stellar_overlay_network, StellarOverlayConfig}; +pub use overlay::StellarOverlayConnection; +pub use substrate_stellar_sdk as sdk; diff --git a/clients/stellar-relay-lib/src/node/mod.rs b/clients/stellar-relay-lib/src/node/mod.rs index 4886890f9..bd6fc642a 100644 --- a/clients/stellar-relay-lib/src/node/mod.rs +++ b/clients/stellar-relay-lib/src/node/mod.rs @@ -30,25 +30,6 @@ impl Debug for NodeInfo { } } -impl NodeInfo { - #[cfg(test)] - pub(crate) fn new( - ledger_version: u32, - overlay_version: u32, - overlay_min_version: u32, - version_str: String, - network: &Network, - ) -> NodeInfo { - NodeInfo { - ledger_version, - overlay_version, - overlay_min_version, - version_str: version_str.into_bytes(), - network_id: *network.get_id(), - } - } -} - impl From for NodeInfo { fn from(value: NodeInfoCfg) -> Self { let network: &Network = if value.is_pub_net { &PUBLIC_NETWORK } else { &TEST_NETWORK }; diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs new file mode 100644 index 000000000..82b3f52ee --- /dev/null +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -0,0 +1,107 @@ +use substrate_stellar_sdk::types::{ErrorCode, StellarMessage}; +use tokio::sync::{ + mpsc, + mpsc::{ + error::{SendError, TryRecvError}, + Sender, + }, +}; + +use crate::{ + connection::{poll_messages_from_stellar, ConnectionInfo, Connector}, + helper::{create_stream, error_to_string}, + node::NodeInfo, + Error, +}; + +/// Used to send/receive messages to/from Stellar Node +pub struct StellarOverlayConnection { + sender: mpsc::Sender, + receiver: mpsc::Receiver, +} + +impl StellarOverlayConnection { + pub fn sender(&self) -> Sender { + self.sender.clone() + } + + pub async fn send_to_node(&self, msg: StellarMessage) -> Result<(), SendError> { + self.sender.send(msg).await + } + + /// Returns an `StellarOverlayConnection` when a connection to Stellar Node is successful. + pub async fn connect( + local_node_info: NodeInfo, + conn_info: ConnectionInfo, + ) -> Result { + log::info!("connect(): connecting to {conn_info:?}"); + + // this is a channel to communicate with the user/caller. + let (send_to_user_sender, send_to_user_receiver) = mpsc::channel::(1024); + + let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::(1024); + + // split the stream for easy handling of read and write + let (read_stream_overlay, write_stream_overlay) = + create_stream(&conn_info.address()).await?; + + let mut connector = Connector::new(local_node_info, conn_info, write_stream_overlay); + connector.send_hello_message().await?; + + tokio::spawn(poll_messages_from_stellar( + connector, + read_stream_overlay, + send_to_user_sender, + send_to_node_receiver, + )); + + Ok(StellarOverlayConnection { + sender: send_to_node_sender, + receiver: send_to_user_receiver, + }) + } + + pub async fn listen(&mut self) -> Result, Error> { + loop { + if !self.is_alive() { + self.disconnect(); + return Err(Error::Disconnected) + } + + match self.receiver.try_recv() { + Ok(StellarMessage::ErrorMsg(e)) => { + log::error!("listen(): received error message: {e:?}"); + if e.code == ErrorCode::ErrConf || e.code == ErrorCode::ErrAuth { + return Err(Error::ConnectionFailed(error_to_string(e))) + } + + return Ok(None) + }, + Ok(msg) => return Ok(Some(msg)), + Err(TryRecvError::Disconnected) => return Err(Error::Disconnected), + Err(TryRecvError::Empty) => continue, + } + } + } + + pub fn is_alive(&mut self) -> bool { + let is_closed = self.sender.is_closed(); + + if is_closed { + self.disconnect(); + } + + !is_closed + } + + pub fn disconnect(&mut self) { + log::info!("disconnect(): closing connection to overlay network"); + self.receiver.close(); + } +} + +impl Drop for StellarOverlayConnection { + fn drop(&mut self) { + self.disconnect(); + } +} diff --git a/clients/stellar-relay-lib/src/tests/mod.rs b/clients/stellar-relay-lib/src/tests/mod.rs index cf96ac17b..ad138ec2f 100644 --- a/clients/stellar-relay-lib/src/tests/mod.rs +++ b/clients/stellar-relay-lib/src/tests/mod.rs @@ -1,14 +1,12 @@ +use crate::{ + connection::ConnectionInfo, node::NodeInfo, StellarOverlayConfig, StellarOverlayConnection, +}; +use serial_test::serial; use std::{sync::Arc, time::Duration}; use substrate_stellar_sdk::{ types::{ScpStatementExternalize, ScpStatementPledges, StellarMessage}, Hash, IntoHash, }; - -use crate::{ - node::NodeInfo, ConnectionInfo, StellarOverlayConfig, StellarOverlayConnection, - StellarRelayMessage, -}; -use serial_test::serial; use tokio::{sync::Mutex, time::timeout}; fn secret_key(is_mainnet: bool) -> String { @@ -43,25 +41,7 @@ fn overlay_infos(is_mainnet: bool) -> (NodeInfo, ConnectionInfo) { ) } -#[tokio::test] -#[serial] -async fn stellar_overlay_connect_and_listen_connect_message() { - let (node_info, conn_info) = overlay_infos(false); - - let mut overlay_connection = - StellarOverlayConnection::connect(node_info.clone(), conn_info).await.unwrap(); - - let message = overlay_connection.listen().await.unwrap(); - if let StellarRelayMessage::Connect { pub_key: _x, node_info: y } = message { - assert_eq!(y.ledger_version, node_info.ledger_version); - } else { - panic!("Incorrect stellar relay message received"); - } - - overlay_connection.disconnect().await.expect("Should be able to disconnect"); -} - -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_should_receive_scp_messages() { let (node_info, conn_info) = overlay_infos(false); @@ -76,18 +56,10 @@ async fn stellar_overlay_should_receive_scp_messages() { timeout(Duration::from_secs(300), async move { let mut ov_conn_locked = ov_conn.lock().await; - while let Some(relay_message) = ov_conn_locked.listen().await { - match relay_message { - StellarRelayMessage::Data { p_id: _, msg_type: _, msg } => match *msg { - StellarMessage::ScpMessage(msg) => { - scps_vec_clone.lock().await.push(msg); - ov_conn_locked.disconnect().await.expect("failed to disconnect"); - break - }, - _ => {}, - }, - _ => {}, - } + if let Ok(Some(msg)) = ov_conn_locked.listen().await { + scps_vec_clone.lock().await.push(msg); + + ov_conn_locked.disconnect(); } }) .await @@ -98,7 +70,7 @@ async fn stellar_overlay_should_receive_scp_messages() { assert!(!scps_vec.lock().await.is_empty()); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_should_receive_tx_set() { //arrange @@ -121,34 +93,30 @@ async fn stellar_overlay_should_receive_tx_set() { timeout(Duration::from_secs(500), async move { let mut ov_conn_locked = ov_conn.lock().await; - while let Some(relay_message) = ov_conn_locked.listen().await { - match relay_message { - StellarRelayMessage::Data { p_id: _, msg_type: _, msg } => match *msg { - StellarMessage::ScpMessage(msg) => - if let ScpStatementPledges::ScpStExternalize(stmt) = &msg.statement.pledges - { - let tx_set_hash = get_tx_set_hash(stmt); - tx_set_hashes_clone.lock().await.push(tx_set_hash.clone()); - ov_conn_locked - .send(StellarMessage::GetTxSet(tx_set_hash)) - .await - .unwrap(); - }, - StellarMessage::TxSet(set) => { - let tx_set_hash = set.into_hash().expect("should return a hash"); - actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - - ov_conn_locked.disconnect().await.expect("failed to disconnect"); - break + while let Ok(Some(msg)) = ov_conn_locked.listen().await { + match msg { + StellarMessage::ScpMessage(msg) => + if let ScpStatementPledges::ScpStExternalize(stmt) = &msg.statement.pledges { + let tx_set_hash = get_tx_set_hash(stmt); + tx_set_hashes_clone.lock().await.push(tx_set_hash.clone()); + ov_conn_locked + .send_to_node(StellarMessage::GetTxSet(tx_set_hash)) + .await + .unwrap(); }, - StellarMessage::GeneralizedTxSet(set) => { - let tx_set_hash = set.into_hash().expect("should return a hash"); - actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); + StellarMessage::TxSet(set) => { + let tx_set_hash = set.into_hash().expect("should return a hash"); + actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); - ov_conn_locked.disconnect().await.expect("failed to disconnect"); - break - }, - _ => {}, + ov_conn_locked.disconnect(); + break + }, + StellarMessage::GeneralizedTxSet(set) => { + let tx_set_hash = set.into_hash().expect("should return a hash"); + actual_tx_set_hashes_clone.lock().await.push(tx_set_hash); + + ov_conn_locked.disconnect(); + break }, _ => {}, } @@ -167,7 +135,7 @@ async fn stellar_overlay_should_receive_tx_set() { assert!(expected_hashes.contains(&actual_hashes[0])) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] #[serial] async fn stellar_overlay_disconnect_works() { let (node_info, conn_info) = overlay_infos(false); @@ -175,11 +143,7 @@ async fn stellar_overlay_disconnect_works() { let mut overlay_connection = StellarOverlayConnection::connect(node_info.clone(), conn_info).await.unwrap(); - let message = overlay_connection.listen().await.unwrap(); - if let StellarRelayMessage::Connect { pub_key: _x, node_info: y } = message { - assert_eq!(y.ledger_version, node_info.ledger_version); - } else { - panic!("Incorrect stellar relay message received"); - } - overlay_connection.disconnect().await.unwrap(); + let _ = overlay_connection.listen().await.unwrap(); + + overlay_connection.disconnect(); } diff --git a/clients/vault/src/oracle/agent.rs b/clients/vault/src/oracle/agent.rs index b49da4948..58f58ca2c 100644 --- a/clients/vault/src/oracle/agent.rs +++ b/clients/vault/src/oracle/agent.rs @@ -2,14 +2,14 @@ use std::{sync::Arc, time::Duration}; use service::on_shutdown; use tokio::{ - sync::{mpsc, RwLock}, + sync::{mpsc, mpsc::error::TryRecvError, RwLock}, time::{sleep, timeout}, }; use runtime::ShutdownSender; use stellar_relay_lib::{ - connect_to_stellar_overlay_network, sdk::types::StellarMessage, StellarOverlayConfig, - StellarRelayMessage, + connect_to_stellar_overlay_network, helper::to_base64_xdr_string, sdk::types::StellarMessage, + StellarOverlayConfig, }; use crate::oracle::{ @@ -33,35 +33,22 @@ pub struct OracleAgent { /// * `collector` - used to collect envelopes and transaction sets /// * `message_sender` - used to send messages to Stellar Node async fn handle_message( - message: StellarRelayMessage, + message: StellarMessage, collector: Arc>, message_sender: &StellarMessageSender, ) -> Result<(), Error> { match message { - StellarRelayMessage::Data { p_id: _, msg_type: _, msg } => match *msg { - StellarMessage::ScpMessage(env) => { - collector.write().await.handle_envelope(env, message_sender).await?; - }, - StellarMessage::TxSet(set) => - if let Err(e) = collector.read().await.add_txset(set) { - tracing::error!(e); - }, - StellarMessage::GeneralizedTxSet(set) => { - if let Err(e) = collector.read().await.add_txset(set) { - tracing::error!(e); - } - }, - _ => {}, + StellarMessage::ScpMessage(env) => { + collector.write().await.handle_envelope(env, message_sender).await?; }, - StellarRelayMessage::Connect { pub_key, node_info } => { - let pub_key = pub_key.to_encoding(); - let pub_key = std::str::from_utf8(&pub_key).unwrap_or("****"); - - tracing::info!("handle_message(): Connected: via public key: {pub_key}"); - tracing::info!("handle_message(): Connected: with {:#?}", node_info) - }, - StellarRelayMessage::Timeout => { - tracing::error!("handle_message(): The Stellar Relay timed out. Failed to process message: {message:?}"); + StellarMessage::TxSet(set) => + if let Err(e) = collector.read().await.add_txset(set) { + tracing::error!(e); + }, + StellarMessage::GeneralizedTxSet(set) => { + if let Err(e) = collector.read().await.add_txset(set) { + tracing::error!(e); + } }, _ => {}, } @@ -75,52 +62,75 @@ async fn handle_message( pub async fn start_oracle_agent( config: StellarOverlayConfig, secret_key: &str, + shutdown_sender: ShutdownSender, ) -> Result { tracing::info!("start_oracle_agent(): Starting connection to Stellar overlay network..."); let mut overlay_conn = connect_to_stellar_overlay_network(config.clone(), secret_key).await?; + // use StellarOverlayConnection's sender to send message to Stellar + let sender = overlay_conn.sender(); - // Get action sender and disconnect action before moving `overlay_conn` into the closure - let actions_sender = overlay_conn.get_actions_sender(); - let disconnect_action = overlay_conn.get_disconnect_action(); - - let (sender, mut receiver) = mpsc::channel(34); let collector = Arc::new(RwLock::new(ScpMessageCollector::new( config.is_public_network(), config.stellar_history_archive_urls(), ))); - let shutdown_sender = ShutdownSender::default(); + let collector_clone = collector.clone(); - let shutdown_clone = shutdown_sender.clone(); - // handle a message from the overlay network - let sender_clone = sender.clone(); + let shutdown_sender_clone = shutdown_sender.clone(); + // a clone used to forcefully call a shutdown, when StellarOverlay disconnects. + let shutdown_sender_clone2 = shutdown_sender.clone(); - let collector_clone = collector.clone(); - service::spawn_cancelable(shutdown_clone.subscribe(), async move { - let sender = sender_clone.clone(); + // disconnect signal sender tells the StellarOverlayConnection to close its TcpStream to Stellar + // Node + let (disconnect_signal_sender, mut disconnect_signal_receiver) = mpsc::channel::<()>(2); + + service::spawn_cancelable(shutdown_sender_clone.subscribe(), async move { + let sender_clone = overlay_conn.sender(); loop { - tokio::select! { - // runs the stellar-relay and listens to data to collect the scp messages and txsets. - Some(msg) = overlay_conn.listen() => { - handle_message(msg, collector_clone.clone(), &sender).await?; + match disconnect_signal_receiver.try_recv() { + // if a disconnect signal was sent, disconnect from Stellar. + Ok(_) | Err(TryRecvError::Disconnected) => { + tracing::info!("start_oracle_agent(): disconnect overlay..."); + overlay_conn.disconnect(); + break }, + Err(TryRecvError::Empty) => {}, + } - Some(msg) = receiver.recv() => { - // We received the instruction to send a message to the overlay network by the receiver - overlay_conn.send(msg).await?; - } + // listen for messages from Stellar + match overlay_conn.listen().await { + Ok(Some(msg)) => { + let msg_as_str = to_base64_xdr_string(&msg); + if let Err(e) = + handle_message(msg, collector_clone.clone(), &sender_clone).await + { + tracing::error!( + "start_oracle_agent(): failed to handle message: {msg_as_str}: {e:?}" + ); + } + }, + Ok(None) => {}, + // connection got lost + Err(e) => { + overlay_conn.disconnect(); + tracing::error!("start_oracle_agent(): encounter error in overlay: {e:?}"); + + if let Err(e) = shutdown_sender_clone2.send(()) { + tracing::error!( + "start_oracle_agent(): Failed to send shutdown signal in thread: {e:?}" + ); + } + break + }, } } - #[allow(unreachable_code)] - Ok::<(), Error>(()) }); tokio::spawn(on_shutdown(shutdown_sender.clone(), async move { - let result_sending_disconnect = - actions_sender.send(disconnect_action).await.map_err(Error::from); - if let Err(e) = result_sending_disconnect { - tracing::error!("start_oracle_agent(): Failed to send disconnect message: {:#?}", e); - }; + tracing::debug!("start_oracle_agent(): sending signal to shutdown overlay connection..."); + if let Err(e) = disconnect_signal_sender.send(()).await { + tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}"); + } })); Ok(OracleAgent { @@ -154,7 +164,6 @@ impl OracleAgent { let collector = collector.read().await; match collector.build_proof(slot, &stellar_sender).await { None => { - tracing::warn!("get_proof(): Failed to build proof for slot {slot}."); drop(collector); // give 10 seconds interval for every retry sleep(Duration::from_secs(10)).await; @@ -203,14 +212,18 @@ mod tests { use super::*; use serial_test::serial; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[ntest::timeout(1_800_000)] // timeout at 30 minutes #[serial] async fn test_get_proof_for_current_slot() { - let agent = - start_oracle_agent(get_test_stellar_relay_config(true), &get_test_secret_key(true)) - .await - .expect("Failed to start agent"); + let shutdown_sender = ShutdownSender::new(); + let agent = start_oracle_agent( + get_test_stellar_relay_config(true), + &get_test_secret_key(true), + shutdown_sender, + ) + .await + .expect("Failed to start agent"); sleep(Duration::from_secs(10)).await; // Wait until agent is caught up with the network. @@ -227,17 +240,22 @@ mod tests { assert!(proof_result.is_ok(), "Failed to get proof for slot: {}", latest_slot); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot() { let scp_archive_storage = ScpArchiveStorage::default(); let tx_archive_storage = TransactionsArchiveStorage::default(); - let agent = - start_oracle_agent(get_test_stellar_relay_config(true), &get_test_secret_key(true)) - .await - .expect("Failed to start agent"); + let shutdown_sender = ShutdownSender::new(); + let agent = start_oracle_agent( + get_test_stellar_relay_config(true), + &get_test_secret_key(true), + shutdown_sender, + ) + .await + .expect("Failed to start agent"); + sleep(Duration::from_secs(5)).await; // This slot should be archived on the public network let target_slot = 44041116; let proof = agent.get_proof(target_slot).await.expect("should return a proof"); @@ -251,7 +269,7 @@ mod tests { agent.stop().expect("Failed to stop the agent"); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_with_fallback() { let scp_archive_storage = ScpArchiveStorage::default(); @@ -267,10 +285,13 @@ mod tests { let modified_config = StellarOverlayConfig { stellar_history_archive_urls: archive_urls, ..base_config }; - let agent = start_oracle_agent(modified_config, &get_test_secret_key(true)) - .await - .expect("Failed to start agent"); + let shutdown_sender = ShutdownSender::new(); + let agent = + start_oracle_agent(modified_config, &get_test_secret_key(true), shutdown_sender) + .await + .expect("Failed to start agent"); + sleep(Duration::from_secs(5)).await; // This slot should be archived on the public network let target_slot = 44041116; let proof = agent.get_proof(target_slot).await.expect("should return a proof"); @@ -284,7 +305,7 @@ mod tests { agent.stop().expect("Failed to stop the agent"); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] #[serial] async fn test_get_proof_for_archived_slot_fails_without_archives() { let scp_archive_storage = ScpArchiveStorage::default(); @@ -294,9 +315,11 @@ mod tests { let modified_config: StellarOverlayConfig = StellarOverlayConfig { stellar_history_archive_urls: vec![], ..base_config }; - let agent = start_oracle_agent(modified_config, &get_test_secret_key(true)) + let shutdown = ShutdownSender::new(); + let agent = start_oracle_agent(modified_config, &get_test_secret_key(true), shutdown) .await .expect("Failed to start agent"); + sleep(Duration::from_secs(5)).await; // This slot should be archived on the public network let target_slot = 44041116; diff --git a/clients/vault/src/system.rs b/clients/vault/src/system.rs index 59849155d..50951483f 100644 --- a/clients/vault/src/system.rs +++ b/clients/vault/src/system.rs @@ -400,6 +400,7 @@ impl VaultService { async fn create_oracle_agent( &self, is_public_network: bool, + shutdown_sender: ShutdownSender, ) -> Result, ServiceError> { let cfg_path = &self.config.stellar_overlay_config_filepath; let stellar_overlay_cfg = @@ -410,9 +411,13 @@ impl VaultService { return Err(ServiceError::IncompatibleNetwork) } - let oracle_agent = crate::oracle::start_oracle_agent(stellar_overlay_cfg, &self.secret_key) - .await - .expect("Failed to start oracle agent"); + let oracle_agent = crate::oracle::start_oracle_agent( + stellar_overlay_cfg, + &self.secret_key, + shutdown_sender, + ) + .await + .expect("Failed to start oracle agent"); Ok(Arc::new(oracle_agent)) } @@ -774,7 +779,8 @@ impl VaultService { .await; drop(wallet); - let oracle_agent = self.create_oracle_agent(is_public_network).await?; + let oracle_agent = + self.create_oracle_agent(is_public_network, self.shutdown.clone()).await?; self.execute_open_requests(oracle_agent.clone()); diff --git a/clients/vault/tests/helper/mod.rs b/clients/vault/tests/helper/mod.rs index 1cd84a33b..7747f0b10 100644 --- a/clients/vault/tests/helper/mod.rs +++ b/clients/vault/tests/helper/mod.rs @@ -12,7 +12,7 @@ use runtime::{ default_provider_client, set_exchange_rate_and_wait, setup_provider, SubxtClient, }, types::FixedU128, - SpacewalkParachain, VaultId, + ShutdownSender, SpacewalkParachain, VaultId, }; use sp_arithmetic::FixedPointNumber; use sp_keyring::AccountKeyring; @@ -175,7 +175,8 @@ where .unwrap(), )); - let oracle_agent = start_oracle_agent(CFG.clone(), &SECRET_KEY) + let shutdown_tx = ShutdownSender::new(); + let oracle_agent = start_oracle_agent(CFG.clone(), &SECRET_KEY, shutdown_tx) .await .expect("failed to start agent"); let oracle_agent = Arc::new(oracle_agent);