From 700f647c28d78b2c3e9a2f42267aa8d25cd7ffa0 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 27 Mar 2024 20:04:24 +0900 Subject: [PATCH] feat: svg caching, fault tolerance during DAG collection --- Cargo.lock | 1 + sn_auditor/Cargo.toml | 1 + sn_auditor/src/dag_db.rs | 35 ++++++++++++++++++++--- sn_auditor/src/main.rs | 25 ++++++++++------ sn_auditor/src/routes.rs | 4 ++- sn_client/src/audit/spend_dag.rs | 4 --- sn_client/src/audit/spend_dag_building.rs | 16 +++++++---- sn_client/src/lib.rs | 2 +- 8 files changed, 65 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 315d846f64..bfe10d950b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5033,6 +5033,7 @@ dependencies = [ "sn_peers_acquisition", "tiny_http", "tokio", + "tracing", ] [[package]] diff --git a/sn_auditor/Cargo.toml b/sn_auditor/Cargo.toml index ea6905a340..c0b4359b2f 100644 --- a/sn_auditor/Cargo.toml +++ b/sn_auditor/Cargo.toml @@ -27,4 +27,5 @@ sn_client = { path = "../sn_client", version = "0.104.31" } sn_logging = { path = "../sn_logging", version = "0.2.24" } sn_peers_acquisition= { path="../sn_peers_acquisition", version = "0.2.8" } tiny_http = { version="0.12", features = ["ssl-rustls"] } +tracing = { version = "~0.1.26" } tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time", "fs"] } diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index 67fbf28611..e8469c99bc 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -9,7 +9,9 @@ use color_eyre::eyre::{eyre, Result}; use graphviz_rust::{cmd::Format, exec, parse, printer::PrinterContext}; use serde::{Deserialize, Serialize}; +use sn_client::networking::NetworkError; use sn_client::transfers::{SignedSpend, SpendAddress, GENESIS_CASHNOTE}; +use sn_client::Error as ClientError; use sn_client::{Client, SpendDag, SpendDagGet}; use std::fmt::Write; use std::{ @@ -18,6 +20,7 @@ use std::{ }; pub const SPEND_DAG_FILENAME: &str = "spend_dag"; +pub const SPEND_DAG_SVG_FILENAME: &str = "spend_dag.svg"; /// Abstraction for the Spend DAG database /// Currently in memory, with disk backup, but should probably be a real DB at scale @@ -107,13 +110,26 @@ impl SpendDagDb { Ok(()) } - /// Get the current DAG as SVG - pub fn svg(&self) -> Result> { + /// Load current DAG svg from disk + pub fn load_svg(&self) -> Result> { + let svg_path = self.path.join(SPEND_DAG_SVG_FILENAME); + let svg = std::fs::read(svg_path)?; + Ok(svg) + } + + /// Dump current DAG as svg to disk + pub fn dump_dag_svg(&self) -> Result<()> { + info!("Dumping DAG to svg..."); + std::fs::create_dir_all(&self.path)?; + let svg_path = self.path.join(SPEND_DAG_SVG_FILENAME); let dag_ref = self.dag.clone(); let r_handle = dag_ref .read() .map_err(|e| eyre!("Failed to get read lock: {e}"))?; - dag_to_svg(&r_handle) + let svg = dag_to_svg(&r_handle)?; + std::fs::write(svg_path.clone(), svg)?; + info!("Successfully dumped DAG to {svg_path:?}..."); + Ok(()) } /// Update DAG from Network @@ -136,6 +152,15 @@ impl SpendDagDb { .write() .map_err(|e| eyre!("Failed to get write lock: {e}"))?; *w_handle = dag; + std::mem::drop(w_handle); + + // update and save svg to file in a background thread so we don't block + let self_clone = self.clone(); + tokio::spawn(async move { + if let Err(e) = self_clone.dump_dag_svg() { + error!("Failed to dump DAG svg: {e}"); + } + }); Ok(()) } @@ -158,7 +183,9 @@ pub async fn new_dag_with_genesis_only(client: &Client) -> Result { let mut dag = SpendDag::new(genesis_addr); let genesis_spend = match client.get_spend_from_network(genesis_addr).await { Ok(s) => s, - Err(sn_client::Error::DoubleSpend(addr, spend1, spend2)) => { + Err(ClientError::Network(NetworkError::DoubleSpendAttempt(spend1, spend2))) + | Err(ClientError::DoubleSpend(_, spend1, spend2)) => { + let addr = spend1.address(); println!("Double spend detected at Genesis: {addr:?}"); dag.insert(genesis_addr, *spend2); dag.record_faults(&dag.source())?; diff --git a/sn_auditor/src/main.rs b/sn_auditor/src/main.rs index 5a02c153df..10cb9a7a6d 100644 --- a/sn_auditor/src/main.rs +++ b/sn_auditor/src/main.rs @@ -6,6 +6,9 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +#[macro_use] +extern crate tracing; + mod dag_db; mod routes; @@ -44,14 +47,14 @@ struct Opt { /// - Windows: C:\Users\\AppData\Roaming\safe\client\logs #[allow(rustdoc::invalid_html_tags)] #[clap(long, value_parser = LogOutputDest::parse_from_str, verbatim_doc_comment, default_value = "data-dir")] - pub log_output_dest: LogOutputDest, + log_output_dest: LogOutputDest, /// Specify the logging format. /// /// Valid values are "default" or "json". /// /// If the argument is not used, the default format will be applied. #[clap(long, value_parser = LogFormat::parse_from_str, verbatim_doc_comment)] - pub log_format: Option, + log_format: Option, } #[tokio::main] @@ -75,12 +78,13 @@ fn logging_init( ) -> Result { color_eyre::install()?; let logging_targets = vec![ + ("sn_auditor".to_string(), Level::TRACE), + ("sn_client".to_string(), Level::DEBUG), ("sn_transfers".to_string(), Level::TRACE), - ("sn_networking".to_string(), Level::DEBUG), - ("sn_client".to_string(), Level::TRACE), - ("sn_logging".to_string(), Level::TRACE), - ("sn_peers_acquisition".to_string(), Level::TRACE), - ("sn_protocol".to_string(), Level::TRACE), + ("sn_logging".to_string(), Level::INFO), + ("sn_peers_acquisition".to_string(), Level::INFO), + ("sn_protocol".to_string(), Level::INFO), + ("sn_networking".to_string(), Level::WARN), ]; let mut log_builder = LogBuilder::new(logging_targets); log_builder.output_dest(log_output_dest); @@ -104,6 +108,7 @@ async fn connect_to_network(peers: PeersArgs) -> Result { .await .map_err(|err| eyre!("Failed to connect to the network: {err}"))?; + println!("Connected to the network"); Ok(client) } @@ -115,7 +120,7 @@ async fn initialize_background_spend_dag_collection( force_from_genesis: bool, clean: bool, ) -> Result { - println!("Gather Spend DAG..."); + println!("Initialize spend dag..."); let path = dirs_next::data_dir() .ok_or(eyre!("Could not obtain data directory path"))? .join("safe") @@ -151,6 +156,10 @@ async fn initialize_background_spend_dag_collection( }); } + // initialize svg + println!("Initialize visualization..."); + dag.dump_dag_svg()?; + // background thread to update DAG println!("Starting background DAG collection thread..."); let mut d = dag.clone(); diff --git a/sn_auditor/src/routes.rs b/sn_auditor/src/routes.rs index bb529cf9e3..69abe9adde 100644 --- a/sn_auditor/src/routes.rs +++ b/sn_auditor/src/routes.rs @@ -14,7 +14,9 @@ use tiny_http::{Request, Response}; use crate::dag_db::SpendDagDb; pub(crate) fn spend_dag_svg(dag: &SpendDagDb) -> Result>>> { - let svg = dag.svg().map_err(|e| eyre!("Failed to get SVG: {e}"))?; + let svg = dag + .load_svg() + .map_err(|e| eyre!("Failed to get SVG: {e}"))?; let response = Response::from_data(svg); Ok(response) } diff --git a/sn_client/src/audit/spend_dag.rs b/sn_client/src/audit/spend_dag.rs index e0580f7ea0..da76f88a10 100644 --- a/sn_client/src/audit/spend_dag.rs +++ b/sn_client/src/audit/spend_dag.rs @@ -516,10 +516,6 @@ impl SpendDag { let ancestor_spends = match self.get_ancestor_spends(spend) { Ok(a) => a, Err(e) => { - println!( - "Error is_genesis({:?}): {e} {spend:#?}", - is_genesis_spend(spend) - ); debug!("Failed to get ancestor spends of: {addr:?} {e}"); recorded_faults.insert(e); return Ok(recorded_faults); diff --git a/sn_client/src/audit/spend_dag_building.rs b/sn_client/src/audit/spend_dag_building.rs index 70dff80260..402b799bed 100644 --- a/sn_client/src/audit/spend_dag_building.rs +++ b/sn_client/src/audit/spend_dag_building.rs @@ -36,7 +36,10 @@ impl Client { dag.insert(spend_addr, *s2); *s1 } - Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())), + Err(e) => { + warn!("Failed to get spend at {spend_addr:?}: {e}"); + return Err(WalletError::FailedToGetSpend(e.to_string())); + } }; dag.insert(spend_addr, first_spend.clone()); @@ -245,14 +248,17 @@ impl Client { let utxos = dag.get_utxos(); let mut stream = futures::stream::iter(utxos.into_iter()) - .map(|utxo| { + .map(|utxo| async move { debug!("Queuing task to gather DAG from utxo: {:?}", utxo); - self.spend_dag_build_from(utxo) + (self.spend_dag_build_from(utxo).await, utxo) }) .buffer_unordered(crate::MAX_CONCURRENT_TASKS); - while let Some(res) = stream.next().await { - dag.merge(res?); + while let Some((res, addr)) = stream.next().await { + match res { + Ok(d) => dag.merge(d), + Err(e) => warn!("Failed to gather sub dag from {addr:?}: {e}"), + } } dag.record_faults(&dag.source()) diff --git a/sn_client/src/lib.rs b/sn_client/src/lib.rs index 750eef343a..2a378d65bc 100644 --- a/sn_client/src/lib.rs +++ b/sn_client/src/lib.rs @@ -33,7 +33,7 @@ pub use sn_protocol as protocol; pub use sn_registers as registers; pub use sn_transfers as transfers; -const MAX_CONCURRENT_TASKS: usize = 1024; +const MAX_CONCURRENT_TASKS: usize = 32; pub use self::{ audit::{DagError, SpendDag, SpendDagGet, SpendFault},