Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(node): loose bad node detection criteria #1508

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::target_arch::Instant;

const MAX_CONTINUOUS_HDD_WRITE_ERROR: usize = 5;

#[derive(Debug)]
#[derive(Debug, Eq, PartialEq)]
pub enum NodeIssue {
/// Connection issues observed
ConnectionIssue,
Expand Down Expand Up @@ -692,28 +692,49 @@ impl SwarmDriver {
cmd_string = "RecordNodeIssues";
let _ = self.bad_nodes_ongoing_verifications.remove(&peer_id);

// only trigger the bad_node verification once have enough nodes in RT
// currently set the trigger bar at 100
let total_peers: usize = self
.swarm
.behaviour_mut()
.kademlia
.kbuckets()
.map(|kbucket| kbucket.num_entries())
.sum();
info!("Peer {peer_id:?} is reported as having issue {issue:?}");
let (issue_vec, is_bad) = self.bad_nodes.entry(peer_id).or_default();

if total_peers > 100 {
info!("Peer {peer_id:?} is considered as bad");
let issue_vec = self.bad_nodes.entry(peer_id).or_default();
// If being considered as bad already, skip certain operations
if !(*is_bad) {
// Remove outdated entries
issue_vec.retain(|(_, timestamp)| timestamp.elapsed().as_secs() < 300);

// check if vec is already 10 long, if so, remove the oldest issue
// we only track 10 issues to avoid mem leaks
if issue_vec.len() == 10 {
issue_vec.remove(0);
}

issue_vec.push(issue);
// To avoid being too sensitive, only consider as a new issue
// when after certain while since the last one
let is_new_issue = if let Some((_issue, timestamp)) = issue_vec.last() {
timestamp.elapsed().as_secs() > 10
} else {
true
};

if is_new_issue {
issue_vec.push((issue, Instant::now()));
}

// Only consider candidate as a bad node when:
// accumulated THREE same kind issues within certain period
for (issue, _timestamp) in issue_vec.iter() {
let issue_counts = issue_vec
.iter()
.filter(|(i, _timestamp)| *issue == *i)
.count();
if issue_counts >= 3 {
*is_bad = true;
info!("Peer {peer_id:?} accumulated {issue_counts} times of issue {issue:?}. Consider it as a bad node now.");
// Once a bad behaviour detected, no point to continue
break;
}
}
}

if *is_bad {
warn!("Cleaning out bad_peer {peer_id:?}");
if let Some(dead_peer) =
self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id)
Expand All @@ -730,16 +751,16 @@ impl SwarmDriver {
}
SwarmCmd::IsPeerShunned { target, sender } => {
cmd_string = "IsPeerInTrouble";
if let Some(peer_id) = target.as_peer_id() {
if let Some(_issues) = self.bad_nodes.get(&peer_id) {
// TODO: expand on the criteria here...
let _ = sender.send(true);
let is_bad = if let Some(peer_id) = target.as_peer_id() {
if let Some((_issues, is_bad)) = self.bad_nodes.get(&peer_id) {
*is_bad
} else {
let _ = sender.send(false);
false
}
} else {
let _ = sender.send(false);
}
false
};
let _ = sender.send(is_bad);
}
}

Expand Down
4 changes: 3 additions & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,9 @@ pub struct SwarmDriver {
handling_statistics: BTreeMap<String, Vec<Duration>>,
handled_times: usize,
pub(crate) hard_disk_write_error: usize,
pub(crate) bad_nodes: BTreeMap<PeerId, Vec<NodeIssue>>, // 10 is the max number of issues we track to avoid mem leaks
// 10 is the max number of issues per node we track to avoid mem leaks
// the boolean flag to indicate whether the node is considered as bad or not
pub(crate) bad_nodes: BTreeMap<PeerId, (Vec<(NodeIssue, Instant)>, bool)>,
pub(crate) bad_nodes_ongoing_verifications: BTreeSet<PeerId>,
}

Expand Down
12 changes: 1 addition & 11 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,11 @@ impl SwarmDriver {

// If we are not local, we care only for peers that we dialed and thus are reachable.
if self.local || has_dialed && peer_is_agent {
// only trigger the bad_node verification once have enough nodes in RT
// currently set the trigger bar at 100
let total_peers: usize = self
.swarm
.behaviour_mut()
.kademlia
.kbuckets()
.map(|kbucket| kbucket.num_entries())
.sum();

// To reduce the bad_node check resource usage,
// during the connection establish process, only check cached black_list
// The periodical check, which involves network queries shall filter
// out bad_nodes eventually.
if total_peers > 100 && self.bad_nodes.get(&peer_id).is_some() {
if let Some((_issues, true)) = self.bad_nodes.get(&peer_id) {
info!("Peer {peer_id:?} is considered as bad, blocking it.");
} else {
self.remove_bootstrap_from_full(peer_id);
Expand Down
6 changes: 4 additions & 2 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ const MAX_PARALLEL_FETCH: usize = K_VALUE.get();

// The duration after which a peer will be considered failed to fetch data from,
// if no response got from that peer.
const FETCH_TIMEOUT: Duration = Duration::from_secs(10);
// Note this will also cover the period that node self write the fetched copy to disk.
// Hence shall give a longer time as allowance.
const FETCH_TIMEOUT: Duration = Duration::from_secs(20);

// The duration after which a pending entry shall be cleared from the `to_be_fetch` list.
// This is to avoid holding too many outdated entries when the fetching speed is slow.
Expand Down Expand Up @@ -83,7 +85,7 @@ impl ReplicationFetcher {
if let Entry::Vacant(entry) = self.on_going_fetches.entry(new_data_key.clone()) {
let (record_key, _record_type) = new_data_key;
keys_to_fetch.push((holder, record_key));
let _ = entry.insert((holder, Instant::now()));
let _ = entry.insert((holder, Instant::now() + FETCH_TIMEOUT));
}

// To avoid later on un-necessary actions.
Expand Down
Loading