diff --git a/examples/demo-rollup/tests/e2e/mod.rs b/examples/demo-rollup/tests/e2e/mod.rs index 1400675c1..15e2fff1e 100644 --- a/examples/demo-rollup/tests/e2e/mod.rs +++ b/examples/demo-rollup/tests/e2e/mod.rs @@ -1,5 +1,4 @@ -use std::fs::{self, DirEntry}; -use std::net::SocketAddr; +use std::fs; use std::path::Path; use std::str::FromStr; use std::time::Duration; @@ -267,12 +266,13 @@ async fn test_close_and_reopen_full_node() -> Result<(), anyhow::Error> { // Copy the db to a new path with the same contents because // the lock is not released on the db directory even though the task is aborted - copy_dir_recursive( + + let _ = copy_dir_recursive( + Path::new("demo_data_test_close_and_reopen_full_node"), Path::new("demo_data_test_close_and_reopen_full_node_copy"), ); - println!("\ncopy done\n"); sleep(Duration::from_secs(5)).await; // spin up the full node again with the same data where it left of only with different path to not stuck on lock @@ -309,6 +309,10 @@ async fn test_close_and_reopen_full_node() -> Result<(), anyhow::Error> { fs::remove_dir_all(Path::new("demo_data_test_close_and_reopen_full_node_copy")).unwrap(); fs::remove_dir_all(Path::new("demo_data_test_close_and_reopen_full_node")).unwrap(); + + seq_task.abort(); + rollup_task.abort(); + Ok(()) } diff --git a/examples/demo-rollup/tests/evm/mod.rs b/examples/demo-rollup/tests/evm/mod.rs index 1d5730142..d4862aede 100644 --- a/examples/demo-rollup/tests/evm/mod.rs +++ b/examples/demo-rollup/tests/evm/mod.rs @@ -25,6 +25,7 @@ async fn web3_rpc_tests() -> Result<(), anyhow::Error> { GenesisPaths::from_dir("../test-data/genesis/integration-tests"), RollupProverConfig::Skip, NodeMode::SequencerNode, + None, ) .await; }); diff --git a/full-node/sequencer-client/src/lib.rs b/full-node/sequencer-client/src/lib.rs index 3201b8438..0b8fa00d7 100644 --- a/full-node/sequencer-client/src/lib.rs +++ b/full-node/sequencer-client/src/lib.rs @@ -1,6 +1,6 @@ -use anyhow::Context; use ethers::types::{Bytes, H256}; use jsonrpsee::core::client::ClientT; +use jsonrpsee::core::Error; use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use jsonrpsee::rpc_params; use serde::Deserialize; @@ -23,13 +23,19 @@ impl SequencerClient { /// Gets l2 block given l2 height pub async fn get_sov_tx(&self, num: u64) -> anyhow::Result> { - let res: GetSovTxResponse = self + let res: Result = self .client .request("ledger_getTransactionByNumber", rpc_params![num]) - .await - .context("Failed to make RPC request")?; + .await; - Ok(res.body) + match res { + Ok(res) => Ok(res.body), + Err(e) => match e { + Error::Transport(e) => anyhow::Result::Err(Error::Transport(e).into()), + Error::ParseError(e) => anyhow::Result::Err(Error::ParseError(e).into()), + _ => Err(anyhow::anyhow!(e)), + }, + } } /// Sends raw tx to sequencer @@ -43,7 +49,7 @@ impl SequencerClient { } // the response has more fields, however for now we don't need them -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct GetSovTxResponse { pub body: Vec, } diff --git a/full-node/sov-stf-runner/src/runner.rs b/full-node/sov-stf-runner/src/runner.rs index b10f0566c..8ccaacf00 100644 --- a/full-node/sov-stf-runner/src/runner.rs +++ b/full-node/sov-stf-runner/src/runner.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use borsh::BorshSerialize; +use jsonrpsee::core::Error; use jsonrpsee::RpcModule; use sequencer_client::SequencerClient; use sov_db::ledger_db::{LedgerDB, SlotCommit}; @@ -11,13 +12,17 @@ use sov_rollup_interface::stf::StateTransitionFunction; use sov_rollup_interface::storage::StorageManager; use sov_rollup_interface::zk::ZkvmHost; use tokio::sync::oneshot; -use tracing::{debug, info}; +use tokio::time::{sleep, Duration, Instant}; +use tracing::{debug, error, info}; use crate::verifier::StateTransitionVerifier; use crate::{ProverService, RunnerConfig}; type StateRoot = >::StateRoot; type InitialState = >::GenesisParams; +const CONNECTION_INTERVALS: &[u64] = &[0, 1, 2, 5, 10, 15, 30, 60]; +const PARSE_INTERVALS: &[u64] = &[0, 1, 5]; + /// Combines `DaService` with `StateTransitionFunction` and "runs" the rollup. pub struct StateTransitionRunner where @@ -199,13 +204,47 @@ where }; let mut height = self.start_height; + + let mut last_connection_error = Instant::now(); + let mut last_parse_error = Instant::now(); + + let mut connection_index = 0; + let mut parse_index = 0; + loop { let tx = client.get_sov_tx(height).await; if tx.is_err() { // TODO: Add logs here: https://github.com/chainwayxyz/secret-sovereign-sdk/issues/47 - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - continue; + + let x = tx.unwrap_err(); + match x.downcast_ref::() { + Some(Error::Transport(e)) => { + debug!("Connection error during RPC call: {:?}", e); + sleep(Duration::from_secs(2)).await; + Self::log_error( + &mut last_connection_error, + CONNECTION_INTERVALS, + &mut connection_index, + format!("Connection error during RPC call: {:?}", e).as_str(), + ); + continue; + } + Some(Error::ParseError(e)) => { + debug!("Retrying after {} seconds: {:?}", 2, e); + sleep(Duration::from_secs(2)).await; + Self::log_error( + &mut last_parse_error, + PARSE_INTERVALS, + &mut parse_index, + format!("Parse error upon RPC call: {:?}", e).as_str(), + ); + continue; + } + _ => { + anyhow::bail!("Unknown error from RPC call: {:?}", x); + } + } } let batch = Batch { @@ -308,4 +347,24 @@ where height += 1; } } + + /// A basic helper for exponential backoff for error logging. + pub fn log_error( + last_error_log: &mut Instant, + error_log_intervals: &[u64], + error_interval_index: &mut usize, + error_msg: &str, + ) { + let now = Instant::now(); + if now.duration_since(*last_error_log) + >= Duration::from_secs(error_log_intervals[*error_interval_index] * 60) + { + error!( + "{} : {} minutes", + error_msg, error_log_intervals[*error_interval_index] + ); + *last_error_log = now; // Update the value pointed by the reference + *error_interval_index = (*error_interval_index + 1).min(error_log_intervals.len() - 1); + } + } }