Skip to content

Commit

Permalink
feat(node): carry out chunk_proof chunk against peer
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Mar 29, 2024
1 parent bacb878 commit 636bb67
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 19 deletions.
2 changes: 2 additions & 0 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub enum NodeIssue {
CloseNodesShunning,
/// Provided a bad quote
BadQuoting,
/// Peer failed to pass the chunk proof verification
FailedChunkProofCheck,
}

/// Commands to send to the Swarm
Expand Down
133 changes: 117 additions & 16 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use libp2p::{

use crate::target_arch::Instant;

use rand::{rngs::OsRng, Rng};
use sn_protocol::{
messages::{CmdResponse, Query, Request, Response},
storage::RecordType,
Expand Down Expand Up @@ -128,6 +129,11 @@ pub enum NetworkEvent {
QuoteVerification {
quotes: Vec<(PeerId, PaymentQuote)>,
},
/// Carry out chunk proof check against the specified record and peer
ChunkProofVerification {
peer_id: PeerId,
keys_to_verify: Vec<NetworkAddress>,
},
}

// Manually implement Debug as `#[debug(with = "unverified_record_fmt")]` not working as expected.
Expand Down Expand Up @@ -176,6 +182,15 @@ impl Debug for NetworkEvent {
quotes.len()
)
}
NetworkEvent::ChunkProofVerification {
peer_id,
keys_to_verify,
} => {
write!(
f,
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
}
}
}
Expand Down Expand Up @@ -1124,28 +1139,48 @@ impl SwarmDriver {
return;
}

// Only handle those non-exist and in close range keys
// On receive a replication_list from a close_group peer, we undertake two tasks:
// 1, For those keys that we don't have:
// fetch them if close enough to us
// 2, For those keys that we have and supposed to be held by the sender as well:
// start chunk_proof check against a randomly selected chunk type record to the sender

// For fetching, only handle those non-exist and in close range keys
let keys_to_store =
self.select_non_existent_records_for_replications(&incoming_keys, &closest_k_peers);

if keys_to_store.is_empty() {
debug!("Empty keys to store after adding to");
return;
} else {
#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let keys_to_fetch = self
.replication_fetcher
.add_keys(holder, keys_to_store, all_keys);
if keys_to_fetch.is_empty() {
trace!("no waiting keys to fetch from the network");
} else {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
}
}

#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();
let keys_to_fetch = self
.replication_fetcher
.add_keys(holder, keys_to_store, all_keys);
if !keys_to_fetch.is_empty() {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
} else {
trace!("no waiting keys to fetch from the network");
// Only trigger chunk_proof check when received a periodical replication request.
if incoming_keys.len() > 1 {
let keys_to_verify = self.select_verify_candiates(sender);

if keys_to_verify.is_empty() {
debug!("No valid candidate to be checked against peer {holder:?}");
} else {
self.send_event(NetworkEvent::ChunkProofVerification {
peer_id: holder,
keys_to_verify,
});
}
}
}

Expand Down Expand Up @@ -1217,6 +1252,72 @@ impl SwarmDriver {
}
}
}

/// Check among all chunk type records that we have, select those close to the peer,
/// and randomly pick one as the verification candidate.
#[allow(clippy::mutable_key_type)]
fn select_verify_candiates(&mut self, peer: NetworkAddress) -> Vec<NetworkAddress> {
let mut closest_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&self.self_peer_id.into())
.map(|peer| peer.into_preimage())
.take(20)
.collect_vec();
closest_peers.push(self.self_peer_id);

let target_peer = if let Some(peer_id) = peer.as_peer_id() {
peer_id
} else {
error!("Target {peer:?} is not a valid PeerId");
return vec![];
};

#[allow(clippy::mutable_key_type)]
let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();

// Targeted chunk type record shall be expected within the close range from our perspective.
let mut verify_candidates: Vec<NetworkAddress> = all_keys
.values()
.filter_map(|(addr, record_type)| {
if RecordType::Chunk == *record_type {
match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) {
Ok(close_group) => {
if close_group.contains(&&target_peer) {
Some(addr.clone())
} else {
None
}
}
Err(err) => {
warn!("Could not get sorted peers for {addr:?} with error {err:?}");
None
}
}
} else {
None
}
})
.collect();

verify_candidates.sort_by_key(|a| peer.distance(a));

// To ensure the candidate mush have to be held by the peer,
// we only carry out check when there are already certain amount of chunks uploaded
// AND choose candidate from certain reduced range.
if verify_candidates.len() > 50 {
let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2));
vec![verify_candidates[index].clone()]
} else {
vec![]
}
}
}

/// Helper function to print formatted connection role info.
Expand Down
104 changes: 101 additions & 3 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use bytes::Bytes;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng};
use sn_networking::{
close_group_majority, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver,
CLOSE_GROUP_SIZE,
close_group_majority, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue,
SwarmDriver, CLOSE_GROUP_SIZE,
};
use sn_protocol::{
error::Error as ProtocolError,
Expand Down Expand Up @@ -410,6 +410,39 @@ impl Node {
quotes_verification(&network, quotes).await;
});
}
NetworkEvent::ChunkProofVerification {
peer_id,
keys_to_verify,
} => {
event_header = "ChunkProofVerification";
let network = self.network.clone();

trace!("Going to verify chunk {keys_to_verify:?} against peer {peer_id:?}");

let _handle = spawn(async move {
// To avoid the peer is in the process of getting the copy via replication,
// repeat the verification for couple of times (in case of error).
// Only report the node as bad when ALL the verification attempts failed.
let mut attempts = 0;
while attempts < 3 {
if chunk_proof_verify_peer(&network, peer_id, &keys_to_verify).await {
return;
}
// Replication interval is 22s - 45s.
// Three times 15 gives 45s, which ensures at least one replication
// shall be carried out during the period.
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
attempts += 1;
}
// Now ALL attempts failed, hence report the issue.
// Note this won't immediately trigger the node to be considered as BAD.
// Only the same peer accumulated three same issue
// within 5 mins will be considered as BAD.
// As the chunk_proof_check will be triggered every periodical replication,
// a low performed or cheaty peer will raise multiple issue alerts during it.
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
});
}
}

trace!(
Expand Down Expand Up @@ -642,3 +675,68 @@ impl Node {
}
}
}

async fn chunk_proof_verify_peer(
network: &Network,
peer_id: PeerId,
keys: &[NetworkAddress],
) -> bool {
for key in keys.iter() {
let check_passed = if let Ok(Some(record)) =
network.get_local_record(&key.to_record_key()).await
{
let nonce = thread_rng().gen::<u64>();
let expected_proof = ChunkProof::new(&record.value, nonce);
trace!("To verify peer {peer_id:?}, chunk_proof for {key:?} is {expected_proof:?}");

let request = Request::Query(Query::GetChunkExistenceProof {
key: key.clone(),
nonce,
});
let responses = network
.send_and_get_responses(&[peer_id], &request, true)
.await;
let n_verified = responses
.into_iter()
.filter_map(|(peer, resp)| {
received_valid_chunk_proof(key, &expected_proof, peer, resp)
})
.count();

n_verified >= 1
} else {
error!(
"To verify peer {peer_id:?} Could not get ChunkProof for {key:?} as we don't have the record locally."
);
true
};

if !check_passed {
return false;
}
}

true
}

fn received_valid_chunk_proof(
key: &NetworkAddress,
expected_proof: &ChunkProof,
peer: PeerId,
resp: Result<Response, NetworkError>,
) -> Option<()> {
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = resp {
if expected_proof.verify(&proof) {
debug!(
"Got a valid ChunkProof of {key:?} from {peer:?}, during peer chunk proof check."
);
Some(())
} else {
warn!("When verify {peer:?} with ChunkProof of {key:?}, the chunk might have been tampered?");
None
}
} else {
debug!("Did not get a valid response for the ChunkProof from {peer:?}");
None
}
}

0 comments on commit 636bb67

Please sign in to comment.