Skip to content

Commit

Permalink
Handling logging - error of full node (#89)
Browse files Browse the repository at this point in the history
* Implement error conversion logic

* Fmt + add exponentional wait logic

* Convert connection bail to error

* PR Fixes

* Merge nightly

* Separate intervals

* Fix web3 rpc tests

* Fmt + add first error log

---------

Co-authored-by: Esad Yusuf Atik <[email protected]>
  • Loading branch information
otaliptus and eyusufatik authored Dec 28, 2023
1 parent 5181c12 commit cb1f3dc
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 13 deletions.
12 changes: 8 additions & 4 deletions examples/demo-rollup/tests/e2e/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fs::{self, DirEntry};
use std::net::SocketAddr;
use std::fs;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
Expand Down Expand Up @@ -267,12 +266,13 @@ async fn test_close_and_reopen_full_node() -> Result<(), anyhow::Error> {

// Copy the db to a new path with the same contents because
// the lock is not released on the db directory even though the task is aborted
copy_dir_recursive(

let _ = copy_dir_recursive(

Path::new("demo_data_test_close_and_reopen_full_node"),
Path::new("demo_data_test_close_and_reopen_full_node_copy"),
);

println!("\ncopy done\n");
sleep(Duration::from_secs(5)).await;

// spin up the full node again with the same data where it left of only with different path to not stuck on lock
Expand Down Expand Up @@ -309,6 +309,10 @@ async fn test_close_and_reopen_full_node() -> Result<(), anyhow::Error> {

fs::remove_dir_all(Path::new("demo_data_test_close_and_reopen_full_node_copy")).unwrap();
fs::remove_dir_all(Path::new("demo_data_test_close_and_reopen_full_node")).unwrap();

seq_task.abort();
rollup_task.abort();

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions examples/demo-rollup/tests/evm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async fn web3_rpc_tests() -> Result<(), anyhow::Error> {
GenesisPaths::from_dir("../test-data/genesis/integration-tests"),
RollupProverConfig::Skip,
NodeMode::SequencerNode,
None,
)
.await;
});
Expand Down
18 changes: 12 additions & 6 deletions full-node/sequencer-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context;
use ethers::types::{Bytes, H256};
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::Error;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::rpc_params;
use serde::Deserialize;
Expand All @@ -23,13 +23,19 @@ impl SequencerClient {

/// Gets l2 block given l2 height
pub async fn get_sov_tx(&self, num: u64) -> anyhow::Result<Vec<u8>> {
let res: GetSovTxResponse = self
let res: Result<GetSovTxResponse, jsonrpsee::core::Error> = self
.client
.request("ledger_getTransactionByNumber", rpc_params![num])
.await
.context("Failed to make RPC request")?;
.await;

Ok(res.body)
match res {
Ok(res) => Ok(res.body),
Err(e) => match e {
Error::Transport(e) => anyhow::Result::Err(Error::Transport(e).into()),
Error::ParseError(e) => anyhow::Result::Err(Error::ParseError(e).into()),
_ => Err(anyhow::anyhow!(e)),
},
}
}

/// Sends raw tx to sequencer
Expand All @@ -43,7 +49,7 @@ impl SequencerClient {
}

// the response has more fields, however for now we don't need them
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct GetSovTxResponse {
pub body: Vec<u8>,
}
65 changes: 62 additions & 3 deletions full-node/sov-stf-runner/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;

use borsh::BorshSerialize;
use jsonrpsee::core::Error;
use jsonrpsee::RpcModule;
use sequencer_client::SequencerClient;
use sov_db::ledger_db::{LedgerDB, SlotCommit};
Expand All @@ -11,13 +12,17 @@ use sov_rollup_interface::stf::StateTransitionFunction;
use sov_rollup_interface::storage::StorageManager;
use sov_rollup_interface::zk::ZkvmHost;
use tokio::sync::oneshot;
use tracing::{debug, info};
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, error, info};

use crate::verifier::StateTransitionVerifier;
use crate::{ProverService, RunnerConfig};
type StateRoot<ST, Vm, Da> = <ST as StateTransitionFunction<Vm, Da>>::StateRoot;
type InitialState<ST, Vm, Da> = <ST as StateTransitionFunction<Vm, Da>>::GenesisParams;

const CONNECTION_INTERVALS: &[u64] = &[0, 1, 2, 5, 10, 15, 30, 60];
const PARSE_INTERVALS: &[u64] = &[0, 1, 5];

/// Combines `DaService` with `StateTransitionFunction` and "runs" the rollup.
pub struct StateTransitionRunner<Stf, Sm, Da, Vm, Ps>
where
Expand Down Expand Up @@ -199,13 +204,47 @@ where
};

let mut height = self.start_height;

let mut last_connection_error = Instant::now();
let mut last_parse_error = Instant::now();

let mut connection_index = 0;
let mut parse_index = 0;

loop {
let tx = client.get_sov_tx(height).await;

if tx.is_err() {
// TODO: Add logs here: https://github.com/chainwayxyz/secret-sovereign-sdk/issues/47
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
continue;

let x = tx.unwrap_err();
match x.downcast_ref::<jsonrpsee::core::Error>() {
Some(Error::Transport(e)) => {
debug!("Connection error during RPC call: {:?}", e);
sleep(Duration::from_secs(2)).await;
Self::log_error(
&mut last_connection_error,
CONNECTION_INTERVALS,
&mut connection_index,
format!("Connection error during RPC call: {:?}", e).as_str(),
);
continue;
}
Some(Error::ParseError(e)) => {
debug!("Retrying after {} seconds: {:?}", 2, e);
sleep(Duration::from_secs(2)).await;
Self::log_error(
&mut last_parse_error,
PARSE_INTERVALS,
&mut parse_index,
format!("Parse error upon RPC call: {:?}", e).as_str(),
);
continue;
}
_ => {
anyhow::bail!("Unknown error from RPC call: {:?}", x);
}
}
}

let batch = Batch {
Expand Down Expand Up @@ -308,4 +347,24 @@ where
height += 1;
}
}

/// A basic helper for exponential backoff for error logging.
pub fn log_error(
last_error_log: &mut Instant,
error_log_intervals: &[u64],
error_interval_index: &mut usize,
error_msg: &str,
) {
let now = Instant::now();
if now.duration_since(*last_error_log)
>= Duration::from_secs(error_log_intervals[*error_interval_index] * 60)
{
error!(
"{} : {} minutes",
error_msg, error_log_intervals[*error_interval_index]
);
*last_error_log = now; // Update the value pointed by the reference
*error_interval_index = (*error_interval_index + 1).min(error_log_intervals.len() - 1);
}
}
}

0 comments on commit cb1f3dc

Please sign in to comment.