Skip to content

Commit

Permalink
fix(auditor): use DashMap and stream for better threading
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Oct 8, 2024
1 parent 0dd2246 commit 42404fe
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 88 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion sn_auditor/src/dag_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,14 @@ impl SpendDagDb {
}));
} else if let Some(sender) = spend_processing.clone() {
let (reattempt_addrs, fetched_addrs, addrs_for_further_track) = client
.crawl_to_next_utxos(&mut addrs_to_get, sender.clone(), *UTXO_REATTEMPT_SECONDS)
.crawl_to_next_utxos(
addrs_to_get.clone(),
sender.clone(),
*UTXO_REATTEMPT_SECONDS,
)
.await;

addrs_to_get.clear();
let mut utxo_addresses = self.utxo_addresses.write().await;
for addr in fetched_addrs {
let _ = utxo_addresses.remove(&addr);
Expand Down
1 change: 1 addition & 0 deletions sn_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ bls = { package = "blsttc", version = "8.0.1" }
bytes = { version = "1.0.1", features = ["serde"] }
crdts = "7.3.2"
custom_debug = "~0.6.1"
dashmap = "~6.1.0"
futures = "~0.3.13"
hex = "~0.4.3"
itertools = "~0.12.1"
Expand Down
195 changes: 108 additions & 87 deletions sn_client/src/audit/dag_crawling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@

use crate::{Client, Error, SpendDag};

use futures::{future::join_all, StreamExt};
use dashmap::DashMap;
use futures::{
future::join_all,
stream::{self, StreamExt},
};
use sn_networking::{GetRecordError, NetworkError};
use sn_transfers::{
NanoTokens, SignedSpend, SpendAddress, SpendReason, UniquePubkey, WalletError, WalletResult,
DEFAULT_NETWORK_ROYALTIES_PK, GENESIS_SPEND_UNIQUE_KEY, NETWORK_ROYALTIES_PK,
};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::mpsc::Sender, task::JoinSet};
use tokio::sync::mpsc::Sender;

const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096;

Expand Down Expand Up @@ -145,108 +150,124 @@ impl Client {

/// Get spends from a set of given SpendAddresses
/// Drain the addresses at the same layer first, then:
/// 1, return with UTXOs for re-attempt (with insertion time stamp)
/// 2, addrs_to_get to hold the addresses for further track
/// 1, return failed_utxos for re-attempt (with insertion time stamp)
/// 2, return fetched_address to avoid un-necessary re-attempts
/// 3, return addrs_for_further_track for further track
pub async fn crawl_to_next_utxos(
&self,
addrs_to_get: &mut BTreeMap<SpendAddress, (u64, NanoTokens)>,
addrs_to_get: BTreeMap<SpendAddress, (u64, NanoTokens)>,
sender: Sender<(SignedSpend, u64, bool)>,
reattempt_seconds: u64,
) -> (
BTreeMap<SpendAddress, (u64, Instant, NanoTokens)>,
Vec<SpendAddress>,
BTreeSet<(SpendAddress, NanoTokens)>,
) {
let mut failed_utxos = BTreeMap::new();
let mut tasks = JoinSet::new();
let mut addrs_for_further_track = BTreeSet::new();
let mut fetched_addrs = Vec::new();

while !addrs_to_get.is_empty() || !tasks.is_empty() {
while tasks.len() < 32 && !addrs_to_get.is_empty() {
if let Some((addr, (failed_times, amount))) = addrs_to_get.pop_first() {
let client_clone = self.clone();
let _ = tasks.spawn(async move {
(
client_clone.crawl_spend(addr).await,
failed_times,
addr,
amount,
)
});
}
}

if let Some(Ok((result, failed_times, address, amount))) = tasks.join_next().await {
match result {
InternalGetNetworkSpend::Spend(spend) => {
let for_further_track = beta_track_analyze_spend(&spend);
let _ = sender
.send((*spend, for_further_track.len() as u64, false))
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()));
addrs_for_further_track.extend(for_further_track);
fetched_addrs.push(address);
}
InternalGetNetworkSpend::DoubleSpend(spends) => {
warn!(
"Detected burnt spend regarding {address:?} - {:?}",
spends.len()
);
for (i, spend) in spends.iter().enumerate() {
let reason = spend.reason();
let amount = spend.spend.amount();
let ancestors_len = spend.spend.ancestors.len();
let descendants_len = spend.spend.descendants.len();
let roy_len = spend.spend.network_royalties().len();
// max concurrency for the tasks of fetching records from network.
const MAX_CONCURRENT: usize = 64;

let failed_utxos_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());
let addrs_for_further_track_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());
let fetched_addrs_arc: Arc<DashMap<_, _>> = Arc::new(DashMap::new());

stream::iter(addrs_to_get.into_iter())
.map(|(addr, (failed_times, amount))| {
let client_clone = self.clone();
let sender_clone = sender.clone();

let failed_utxos = Arc::clone(&failed_utxos_arc);
let addrs_for_further_track = Arc::clone(&addrs_for_further_track_arc);
let fetched_addrs = Arc::clone(&fetched_addrs_arc);
async move {
let result = client_clone.crawl_spend(addr).await;

match result {
InternalGetNetworkSpend::Spend(spend) => {
let for_further_track = beta_track_analyze_spend(&spend);
let _ = sender_clone
.send((*spend, for_further_track.len() as u64, false))
.await;
for entry in for_further_track {
let _ = addrs_for_further_track.insert(entry, ());
}
fetched_addrs.insert(addr, ());
}
InternalGetNetworkSpend::DoubleSpend(spends) => {
warn!(
"burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
spend.spend.ancestors, spend.spend.descendants
"Detected burnt spend regarding {addr:?} - {:?}",
spends.len()
);

let for_further_track = beta_track_analyze_spend(spend);
addrs_for_further_track.extend(for_further_track);

let _ = sender
.send((spend.clone(), 0, true))
.await
.map_err(|e| WalletError::SpendProcessing(e.to_string()));
for (i, spend) in spends.into_iter().enumerate() {
let reason = spend.reason();
let amount = spend.spend.amount();
let ancestors_len = spend.spend.ancestors.len();
let descendants_len = spend.spend.descendants.len();
let roy_len = spend.spend.network_royalties().len();
warn!("burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}",
spend.spend.ancestors, spend.spend.descendants);
}
fetched_addrs.insert(addr, ());
}
InternalGetNetworkSpend::NotFound => {
let reattempt_interval = if amount.as_nano() > 100000 {
info!("Not find spend of big-UTXO {addr:?} with {amount}");
reattempt_seconds
} else {
reattempt_seconds * (failed_times * 8 + 1)
};
failed_utxos.insert(
addr,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_interval),
amount,
),
);
}
InternalGetNetworkSpend::Error(e) => {
warn!("Fetching spend {addr:?} with {amount:?} result in error {e:?}");
// Error of `NotEnoughCopies` could be re-attempted and succeed eventually.
failed_utxos.insert(
addr,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_seconds),
amount,
),
);
}
fetched_addrs.push(address);
}
InternalGetNetworkSpend::NotFound => {
let reattempt_interval = if amount.as_nano() > 100000 {
info!("Not find spend of big-UTXO {address:?} with {amount}");
reattempt_seconds
} else {
reattempt_seconds * (failed_times * 8 + 1)
};
let _ = failed_utxos.insert(
address,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_interval),
amount,
),
);
}
InternalGetNetworkSpend::Error(e) => {
warn!("Fetching spend {address:?} with {amount:?} result in error {e:?}");
// Error of `NotEnoughCopies` could be re-attempted and succeed eventually.
let _ = failed_utxos.insert(
address,
(
failed_times + 1,
Instant::now() + Duration::from_secs(reattempt_seconds),
amount,
),
);
}

(addr, amount)
}
}
})
.buffer_unordered(MAX_CONCURRENT)
.for_each(|(address, amount)| async move {
info!("Completed fetching attempt of {address:?} with amount {amount:?}");
})
.await;

let mut failed_utxos_result = BTreeMap::new();
for entry in failed_utxos_arc.iter() {
let key = entry.key();
let val = entry.value();
let _ = failed_utxos_result.insert(*key, *val);
}

let mut fetched_addrs = Vec::new();
for entry in fetched_addrs_arc.iter() {
let key = entry.key();
fetched_addrs.push(*key);
}

let mut addrs_for_further_track = BTreeSet::new();
for entry in addrs_for_further_track_arc.iter() {
let key = entry.key();
let _ = addrs_for_further_track.insert(*key);
}

(failed_utxos, fetched_addrs, addrs_for_further_track)
(failed_utxos_result, fetched_addrs, addrs_for_further_track)
}

/// Crawls the Spend Dag from a given SpendAddress recursively
Expand Down

0 comments on commit 42404fe

Please sign in to comment.