Skip to content

Commit

Permalink
feat: svg caching, fault tolerance during DAG collection
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach authored and RolandSherwin committed Mar 27, 2024
1 parent 279e8d7 commit 700f647
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sn_auditor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ sn_client = { path = "../sn_client", version = "0.104.31" }
sn_logging = { path = "../sn_logging", version = "0.2.24" }
sn_peers_acquisition= { path="../sn_peers_acquisition", version = "0.2.8" }
tiny_http = { version="0.12", features = ["ssl-rustls"] }
tracing = { version = "~0.1.26" }
tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time", "fs"] }
35 changes: 31 additions & 4 deletions sn_auditor/src/dag_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
use color_eyre::eyre::{eyre, Result};
use graphviz_rust::{cmd::Format, exec, parse, printer::PrinterContext};
use serde::{Deserialize, Serialize};
use sn_client::networking::NetworkError;
use sn_client::transfers::{SignedSpend, SpendAddress, GENESIS_CASHNOTE};
use sn_client::Error as ClientError;
use sn_client::{Client, SpendDag, SpendDagGet};
use std::fmt::Write;
use std::{
Expand All @@ -18,6 +20,7 @@ use std::{
};

pub const SPEND_DAG_FILENAME: &str = "spend_dag";
pub const SPEND_DAG_SVG_FILENAME: &str = "spend_dag.svg";

/// Abstraction for the Spend DAG database
/// Currently in memory, with disk backup, but should probably be a real DB at scale
Expand Down Expand Up @@ -107,13 +110,26 @@ impl SpendDagDb {
Ok(())
}

/// Get the current DAG as SVG
pub fn svg(&self) -> Result<Vec<u8>> {
/// Load current DAG svg from disk
pub fn load_svg(&self) -> Result<Vec<u8>> {
let svg_path = self.path.join(SPEND_DAG_SVG_FILENAME);
let svg = std::fs::read(svg_path)?;
Ok(svg)
}

/// Dump current DAG as svg to disk
pub fn dump_dag_svg(&self) -> Result<()> {
info!("Dumping DAG to svg...");
std::fs::create_dir_all(&self.path)?;
let svg_path = self.path.join(SPEND_DAG_SVG_FILENAME);
let dag_ref = self.dag.clone();
let r_handle = dag_ref
.read()
.map_err(|e| eyre!("Failed to get read lock: {e}"))?;
dag_to_svg(&r_handle)
let svg = dag_to_svg(&r_handle)?;
std::fs::write(svg_path.clone(), svg)?;
info!("Successfully dumped DAG to {svg_path:?}...");
Ok(())
}

/// Update DAG from Network
Expand All @@ -136,6 +152,15 @@ impl SpendDagDb {
.write()
.map_err(|e| eyre!("Failed to get write lock: {e}"))?;
*w_handle = dag;
std::mem::drop(w_handle);

// update and save svg to file in a background thread so we don't block
let self_clone = self.clone();
tokio::spawn(async move {
if let Err(e) = self_clone.dump_dag_svg() {
error!("Failed to dump DAG svg: {e}");
}
});

Ok(())
}
Expand All @@ -158,7 +183,9 @@ pub async fn new_dag_with_genesis_only(client: &Client) -> Result<SpendDag> {
let mut dag = SpendDag::new(genesis_addr);
let genesis_spend = match client.get_spend_from_network(genesis_addr).await {
Ok(s) => s,
Err(sn_client::Error::DoubleSpend(addr, spend1, spend2)) => {
Err(ClientError::Network(NetworkError::DoubleSpendAttempt(spend1, spend2)))
| Err(ClientError::DoubleSpend(_, spend1, spend2)) => {
let addr = spend1.address();
println!("Double spend detected at Genesis: {addr:?}");
dag.insert(genesis_addr, *spend2);
dag.record_faults(&dag.source())?;
Expand Down
25 changes: 17 additions & 8 deletions sn_auditor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

#[macro_use]
extern crate tracing;

mod dag_db;
mod routes;

Expand Down Expand Up @@ -44,14 +47,14 @@ struct Opt {
/// - Windows: C:\Users\<username>\AppData\Roaming\safe\client\logs
#[allow(rustdoc::invalid_html_tags)]
#[clap(long, value_parser = LogOutputDest::parse_from_str, verbatim_doc_comment, default_value = "data-dir")]
pub log_output_dest: LogOutputDest,
log_output_dest: LogOutputDest,
/// Specify the logging format.
///
/// Valid values are "default" or "json".
///
/// If the argument is not used, the default format will be applied.
#[clap(long, value_parser = LogFormat::parse_from_str, verbatim_doc_comment)]
pub log_format: Option<LogFormat>,
log_format: Option<LogFormat>,
}

#[tokio::main]
Expand All @@ -75,12 +78,13 @@ fn logging_init(
) -> Result<LogBuilder> {
color_eyre::install()?;
let logging_targets = vec![
("sn_auditor".to_string(), Level::TRACE),
("sn_client".to_string(), Level::DEBUG),
("sn_transfers".to_string(), Level::TRACE),
("sn_networking".to_string(), Level::DEBUG),
("sn_client".to_string(), Level::TRACE),
("sn_logging".to_string(), Level::TRACE),
("sn_peers_acquisition".to_string(), Level::TRACE),
("sn_protocol".to_string(), Level::TRACE),
("sn_logging".to_string(), Level::INFO),
("sn_peers_acquisition".to_string(), Level::INFO),
("sn_protocol".to_string(), Level::INFO),
("sn_networking".to_string(), Level::WARN),
];
let mut log_builder = LogBuilder::new(logging_targets);
log_builder.output_dest(log_output_dest);
Expand All @@ -104,6 +108,7 @@ async fn connect_to_network(peers: PeersArgs) -> Result<Client> {
.await
.map_err(|err| eyre!("Failed to connect to the network: {err}"))?;

println!("Connected to the network");
Ok(client)
}

Expand All @@ -115,7 +120,7 @@ async fn initialize_background_spend_dag_collection(
force_from_genesis: bool,
clean: bool,
) -> Result<SpendDagDb> {
println!("Gather Spend DAG...");
println!("Initialize spend dag...");
let path = dirs_next::data_dir()
.ok_or(eyre!("Could not obtain data directory path"))?
.join("safe")
Expand Down Expand Up @@ -151,6 +156,10 @@ async fn initialize_background_spend_dag_collection(
});
}

// initialize svg
println!("Initialize visualization...");
dag.dump_dag_svg()?;

// background thread to update DAG
println!("Starting background DAG collection thread...");
let mut d = dag.clone();
Expand Down
4 changes: 3 additions & 1 deletion sn_auditor/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tiny_http::{Request, Response};
use crate::dag_db::SpendDagDb;

pub(crate) fn spend_dag_svg(dag: &SpendDagDb) -> Result<Response<Cursor<Vec<u8>>>> {
let svg = dag.svg().map_err(|e| eyre!("Failed to get SVG: {e}"))?;
let svg = dag
.load_svg()
.map_err(|e| eyre!("Failed to get SVG: {e}"))?;
let response = Response::from_data(svg);
Ok(response)
}
Expand Down
4 changes: 0 additions & 4 deletions sn_client/src/audit/spend_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,6 @@ impl SpendDag {
let ancestor_spends = match self.get_ancestor_spends(spend) {
Ok(a) => a,
Err(e) => {
println!(
"Error is_genesis({:?}): {e} {spend:#?}",
is_genesis_spend(spend)
);
debug!("Failed to get ancestor spends of: {addr:?} {e}");
recorded_faults.insert(e);
return Ok(recorded_faults);
Expand Down
16 changes: 11 additions & 5 deletions sn_client/src/audit/spend_dag_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ impl Client {
dag.insert(spend_addr, *s2);
*s1
}
Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())),
Err(e) => {
warn!("Failed to get spend at {spend_addr:?}: {e}");
return Err(WalletError::FailedToGetSpend(e.to_string()));
}
};
dag.insert(spend_addr, first_spend.clone());

Expand Down Expand Up @@ -245,14 +248,17 @@ impl Client {
let utxos = dag.get_utxos();

let mut stream = futures::stream::iter(utxos.into_iter())
.map(|utxo| {
.map(|utxo| async move {
debug!("Queuing task to gather DAG from utxo: {:?}", utxo);
self.spend_dag_build_from(utxo)
(self.spend_dag_build_from(utxo).await, utxo)
})
.buffer_unordered(crate::MAX_CONCURRENT_TASKS);

while let Some(res) = stream.next().await {
dag.merge(res?);
while let Some((res, addr)) = stream.next().await {
match res {
Ok(d) => dag.merge(d),
Err(e) => warn!("Failed to gather sub dag from {addr:?}: {e}"),
}
}

dag.record_faults(&dag.source())
Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub use sn_protocol as protocol;
pub use sn_registers as registers;
pub use sn_transfers as transfers;

const MAX_CONCURRENT_TASKS: usize = 1024;
const MAX_CONCURRENT_TASKS: usize = 32;

pub use self::{
audit::{DagError, SpendDag, SpendDagGet, SpendFault},
Expand Down

0 comments on commit 700f647

Please sign in to comment.