Skip to content

Commit

Permalink
451 reduce the "already-connected peer" error logs from appearing (#454)
Browse files Browse the repository at this point in the history
* reduce the "already-connected peer" error logs from appearing

* drop stream and connector before returning

* remove timeout;
introduce "disconnect" message

* new connection change

* new connection change

* new changes

* cleanup

* added more meaningful errors

* new update

* Fix compilation

* Small refactoring

* fix test cases and run cargo fmt

* Fix shutdown not propagated
by passing the same shutdown sender to the oracle agent

* add retry counter and remove extra channel

* cargo fmt and fix warnings

* fix failing test cases due to time elapsed with building proofs

* remove never-ending test and additional cleanup

* support multi-thread in tests

* #454 (comment),
#454 (comment),
#454 (comment),
#454 (comment),
#454 (comment),
#454 (comment),
#454 (comment),
#454 (comment),
#454 (comment),
#454 (review)

* remove env logger

* reverting back to 2. Maybe 3 is too forward-looking

* https://github.com/pendulum-chain/spacewalk/actions/runs/7169885052/job/19521307640#step:13:66

---------

Co-authored-by: Marcel Ebert <[email protected]>
  • Loading branch information
b-yap and ebma authored Dec 12, 2023
1 parent 2ee2f62 commit f03cae0
Show file tree
Hide file tree
Showing 24 changed files with 772 additions and 1,121 deletions.
37 changes: 35 additions & 2 deletions clients/service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::{fmt, sync::Arc, time::Duration};
use std::{
fmt,
sync::Arc,
time::{Duration, SystemTime},
};

use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt};
use governor::{Quota, RateLimiter};
use nonzero_ext::*;
use tokio::sync::RwLock;
use tokio::{sync::RwLock, time::sleep};
pub use warp;

pub use cli::{LoggingFormat, MonitoringConfig, RestartPolicy, ServiceConfig};
Expand All @@ -19,6 +23,10 @@ mod cli;
mod error;
mod trace;

const RESET_RESTART_TIME_IN_SECS: u64 = 1800; // reset the restart time in 30 minutes
const DEFAULT_RESTART_TIME_IN_SECS: u64 = 20; // default sleep time before restarting everything
const RESTART_BACKOFF_DELAY: u64 = 10;

#[async_trait]
pub trait Service<Config, InnerError> {
const NAME: &'static str;
Expand Down Expand Up @@ -67,7 +75,28 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
pub async fn start<S: Service<Config, InnerError>, InnerError: fmt::Display>(
&self,
) -> Result<(), Error<InnerError>> {
let mut restart_in_secs = DEFAULT_RESTART_TIME_IN_SECS; // set default to 20 seconds for restart
let mut last_start_timestamp = SystemTime::now();

loop {
let time_now = SystemTime::now();
let _ = time_now.duration_since(last_start_timestamp).map(|duration| {
// Revert the counter if the restart happened more than 30 minutes (or 1800 seconds)
// ago
if duration.as_secs() > RESET_RESTART_TIME_IN_SECS {
restart_in_secs = DEFAULT_RESTART_TIME_IN_SECS;
}
// Increase time by 10 seconds if a restart is triggered too frequently.
// This waits for delayed packets to be removed in the network,
// even though the connection on the client side is closed.
// Else, these straggler packets will interfere with the new connection.
// https://www.rfc-editor.org/rfc/rfc793#page-22
else {
restart_in_secs += RESTART_BACKOFF_DELAY;
last_start_timestamp = time_now;
}
});

tracing::info!("Version: {}", S::VERSION);
tracing::info!(
"Vault uses Substrate account with ID: {}",
Expand Down Expand Up @@ -129,6 +158,10 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
RestartPolicy::Never => return Err(Error::ClientShutdown),
RestartPolicy::Always => {
(self.increment_restart_counter)();

tracing::info!("Restarting in {restart_in_secs} seconds");
sleep(Duration::from_secs(restart_in_secs)).await;

continue
},
};
Expand Down
41 changes: 5 additions & 36 deletions clients/stellar-relay-lib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ pub struct ConnectionInfoCfg {
pub recv_scp_msgs: bool,
pub remote_called_us: bool,
/// how long to wait for the Stellar Node's messages.
timeout_in_secs: u64,
/// number of retries to wait for the Stellar Node's messages and/or to connect back to it.
retries:u8
timeout_in_secs: u64
}
```

Expand All @@ -66,17 +64,9 @@ Create a connection using the `connect_to_stellar_overlay_network` function:
```rust
let mut overlay_connection = stellar_relay_lib::connect_to_stellar_overlay_network(cfg, secret_key).await?;
```
The `StellarOverlayConnection` has 2 async methods to interact with the Stellar Node:
* _`send(&self, message: StellarMessage)`_ -> for sending `StellarMessage`s to Stellar Node
* _`listen(&mut self)`_ -> for receiving `StellarRelayMessage`s from the Stellar Relay.

### Interpreting the `StellarRelayMessage`
The `StellarRelayMessage` is an enum with the following variants:
* _`Connect`_ -> interprets a successful connection to Stellar Node. It contains the `PublicKey` and the `NodeInfo`
* _`Data`_ -> a wrapper of a `StellarMessage` and additional fields: the _message type_ and the unique `p_id`(process id)
* _`Timeout`_ -> Depends on the `timeout_in_secs` and `retries` defined in the `ConnectionInfo` (**10** and **3** by default). This message is returned after multiple retries have been done.
For example, Stellar Relay will wait for 10 seconds to read from the existing tcp stream before retrying again. After the 3rd retry, StellarRelay will create a new stream in 3 attempts, with an interval of 3 seconds.
* _`Error`_ -> a todo
The `StellarOverlayConnection` has 2 methods to interact with the Stellar Node:
* _`sender(&self)`_ -> used to send `StellarMessage`s to Stellar Node
* _`listen(&mut self)`_ -> async method for receiving `StellarMessage`s from the Stellar Node.

## Example
In the `stellar-relay-lib` directory, run this command:
Expand All @@ -102,25 +92,4 @@ and you should be able to see in the terminal:
...
[2022-10-14T13:16:02Z INFO connect] R0E1U1RCTVY2UURYRkRHRDYyTUVITExIWlRQREk3N1UzUEZPRDJTRUxVNVJKREhRV0JSNU5OSzc= sent StellarMessage of type ScpStNominate for ledger 43109751
[2022-10-14T13:16:02Z INFO connect] R0NHQjJTMktHWUFSUFZJQTM3SFlaWFZSTTJZWlVFWEE2UzMzWlU1QlVEQzZUSFNCNjJMWlNUWUg= sent StellarMessage of type ScpStPrepare for ledger 43109751
```

Here is an example in the terminal when disconnection/reconnection happens:
```
[2022-10-17T05:56:47Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 0
[2022-10-17T05:56:47Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 0
[2022-10-17T05:56:57Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 1
[2022-10-17T05:56:57Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 1
[2022-10-17T05:57:07Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 2
[2022-10-17T05:57:07Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 2
[2022-10-17T05:57:17Z ERROR stellar_relay::connection::services] deadline has elapsed for reading messages from Stellar Node. Retry: 3
[2022-10-17T05:57:17Z ERROR stellar_relay::connection::services] deadline has elapsed for receiving messages. Retry: 3
[2022-10-17T05:57:17Z INFO stellar_relay::connection::user_controls] reconnecting to "135.181.16.110".
[2022-10-17T05:57:17Z ERROR stellar_relay::connection::user_controls] failed to reconnect! # of retries left: 2. Retrying in 3 seconds...
[2022-10-17T05:57:20Z INFO stellar_relay::connection::user_controls] reconnecting to "135.181.16.110".
[2022-10-17T05:57:20Z INFO stellar_relay::connection::services] Starting Handshake with Hello.
[2022-10-17T05:57:21Z INFO stellar_relay::connection::connector::message_handler] Hello message processed successfully
[2022-10-17T05:57:21Z INFO stellar_relay::connection::connector::message_handler] Handshake completed
```


todo: add multiple tests
```
56 changes: 21 additions & 35 deletions clients/stellar-relay-lib/examples/connect.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use stellar_relay_lib::{
connect_to_stellar_overlay_network,
sdk::types::{ScpStatementPledges, StellarMessage},
StellarOverlayConfig, StellarRelayMessage,
StellarOverlayConfig,
};

#[tokio::main]
Expand All @@ -27,42 +27,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut overlay_connection = connect_to_stellar_overlay_network(cfg, &secret_key).await?;

while let Some(relay_message) = overlay_connection.listen().await {
match relay_message {
StellarRelayMessage::Connect { pub_key, node_info } => {
let pub_key = pub_key.to_encoding();
let pub_key = std::str::from_utf8(&pub_key).expect("should work?");
log::info!("Connected to Stellar Node: {pub_key}");
log::info!("{:?}", node_info);
},
StellarRelayMessage::Data { p_id: _, msg_type, msg } => match *msg {
StellarMessage::ScpMessage(msg) => {
let node_id = msg.statement.node_id.to_encoding();
let node_id = base64::encode(&node_id);
let slot = msg.statement.slot_index;
while let Ok(Some(msg)) = overlay_connection.listen().await {
match msg {
StellarMessage::ScpMessage(msg) => {
let node_id = msg.statement.node_id.to_encoding();
let node_id = base64::encode(&node_id);
let slot = msg.statement.slot_index;

let stmt_type = match msg.statement.pledges {
ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare",
ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm",
ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize",
ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ",
};
log::info!(
"{} sent StellarMessage of type {} for ledger {}",
node_id,
stmt_type,
slot
);
},
_ => {
log::info!("rcv StellarMessage of type: {:?}", msg_type);
},
},
StellarRelayMessage::Error(e) => {
log::error!("Error: {:?}", e);
let stmt_type = match msg.statement.pledges {
ScpStatementPledges::ScpStPrepare(_) => "ScpStPrepare",
ScpStatementPledges::ScpStConfirm(_) => "ScpStConfirm",
ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize",
ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ",
};
log::info!(
"{} sent StellarMessage of type {} for ledger {}",
node_id,
stmt_type,
slot
);
},
StellarRelayMessage::Timeout => {
log::error!("timed out");
_ => {
let _ = overlay_connection.send_to_node(StellarMessage::GetPeers).await;
},
}
}
Expand Down
19 changes: 7 additions & 12 deletions clients/stellar-relay-lib/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{connection::Error, node::NodeInfo, ConnectionInfo, StellarOverlayConnection};
use crate::{
connection::{ConnectionInfo, Error},
node::NodeInfo,
StellarOverlayConnection,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, BytesOrString};
use std::fmt::Debug;
Expand Down Expand Up @@ -42,13 +46,13 @@ impl StellarOverlayConfig {
let public_key = secret_key.get_public().to_encoding();
let public_key = std::str::from_utf8(&public_key).unwrap();
log::info!(
"connection_info(): Connected to Stellar overlay network with public key: {public_key}"
"connection_info(): Connecting to Stellar overlay network using public key: {public_key}"
);

let address = std::str::from_utf8(&cfg.address)
.map_err(|e| Error::ConfigError(format!("Address: {:?}", e)))?;

Ok(ConnectionInfo::new_with_timeout_and_retries(
Ok(ConnectionInfo::new_with_timeout(
address,
cfg.port,
secret_key,
Expand All @@ -57,7 +61,6 @@ impl StellarOverlayConfig {
cfg.recv_scp_msgs,
cfg.remote_called_us,
cfg.timeout_in_secs,
cfg.retries,
))
}
}
Expand Down Expand Up @@ -97,10 +100,6 @@ pub struct ConnectionInfoCfg {
/// how long to wait for the Stellar Node's messages.
#[serde(default = "ConnectionInfoCfg::default_timeout")]
pub timeout_in_secs: u64,

/// number of retries to wait for the Stellar Node's messages and/or to connect back to it.
#[serde(default = "ConnectionInfoCfg::default_retries")]
pub retries: u8,
}

impl ConnectionInfoCfg {
Expand All @@ -123,10 +122,6 @@ impl ConnectionInfoCfg {
fn default_timeout() -> u64 {
10
}

fn default_retries() -> u8 {
3
}
}

/// Triggers connection to the Stellar Node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ pub fn create_auth_cert(

let raw_sig_data = hash.finalize().to_vec();

let signature: Signature = Signature::new(keypair.create_signature(raw_sig_data).to_vec())?;
let signature: Signature = Signature::new(keypair.create_signature(raw_sig_data).to_vec())
.map_err(|e| {
log::error!("create_auth_cert(): {e:?}");
Error::AuthSignatureFailed
})?;

Ok(AuthCert { pubkey: pub_key_ecdh, expiration, sig: signature })
}
Expand Down
Loading

0 comments on commit f03cae0

Please sign in to comment.