Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(node): optimization to reduce logging #1500

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 50 additions & 41 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,64 @@ impl ReplicationFetcher {
pub(crate) fn add_keys(
&mut self,
holder: PeerId,
incoming_keys: Vec<(NetworkAddress, RecordType)>,
mut incoming_keys: Vec<(NetworkAddress, RecordType)>,
locally_stored_keys: &HashMap<RecordKey, (NetworkAddress, RecordType)>,
) -> Vec<(PeerId, RecordKey)> {
self.remove_stored_keys(locally_stored_keys);

let is_new_data = if incoming_keys.len() == 1 {
// The incoming list is for a new data to be replicated out.
let mut keys_to_fetch = vec![];
// For new data, it will be replicated out in a special replication_list of length 1.
// And we shall `fetch` that copy immediately, if it's not being fetched.
if incoming_keys.len() == 1 {
let (record_address, record_type) = incoming_keys[0].clone();
Some((record_address.to_record_key(), record_type))
} else {
None
};

self.to_be_fetched
.retain(|_, time_out| *time_out > Instant::now());

// add non existing keys to the fetcher
incoming_keys
.into_iter()
.for_each(|(key, record_type)| self.add_key(holder, key.to_record_key(), record_type));
let new_data_key = (record_address.to_record_key(), record_type);

let mut keys_to_fetch = self.next_keys_to_fetch();

// For new data, it will be replicated out in a special replication_list of length 1.
// And we shall `fetch` that copy immediately, if it's not being fetched.
if let Some(new_data_key) = is_new_data {
if let Entry::Vacant(entry) = self.on_going_fetches.entry(new_data_key.clone()) {
let (record_key, _record_type) = new_data_key;
keys_to_fetch.push((holder, record_key));
let _ = entry.insert((holder, Instant::now()));
}

// To avoid later on un-necessary actions.
incoming_keys.clear();
}

self.to_be_fetched
.retain(|_, time_out| *time_out > Instant::now());

let mut out_of_range_keys = vec![];
let total_incoming_keys = incoming_keys.len();
// Filter out those out_of_range ones among the imcoming _keys.
if let Some(ref distance_range) = self.distance_range {
let self_address = NetworkAddress::from_peer(self.self_peer_id);

incoming_keys.retain(|(addr, _record_type)| {
let is_in_range = self_address.distance(addr) >= *distance_range;
if !is_in_range {
out_of_range_keys.push(addr.clone());
}
is_in_range
});
}

if !out_of_range_keys.is_empty() {
info!("Among {total_incoming_keys} incoming replications from {holder:?}, found {} out of range", out_of_range_keys.len());
for addr in out_of_range_keys.iter() {
trace!("The incoming record_key {addr:?} is out of range, do not fetch it from {holder:?}");
}
}

// add in-range AND non existing keys to the fetcher
incoming_keys.into_iter().for_each(|(addr, record_type)| {
let _ = self
.to_be_fetched
.entry((addr.to_record_key(), record_type, holder))
.or_insert(Instant::now() + PENDING_TIMEOUT);
});

keys_to_fetch.extend(self.next_keys_to_fetch());

keys_to_fetch
}

Expand Down Expand Up @@ -213,12 +239,14 @@ impl ReplicationFetcher {
}
});

// now we ensure we clear our any/all failed nodes from our lists.
// now to clear any failed nodes from our lists.
self.to_be_fetched
.retain(|(_, _, holder), _| !failed_holders.contains(holder));

// Such failed_hodlers shall be reported back and be excluded from RT.
self.send_event(NetworkEvent::FailedToFetchHolders(failed_holders));
// Such failed_hodlers (if any) shall be reported back and be excluded from RT.
if !failed_holders.is_empty() {
self.send_event(NetworkEvent::FailedToFetchHolders(failed_holders));
}
}

/// Remove keys that we hold already and no longer need to be replicated.
Expand All @@ -244,25 +272,6 @@ impl ReplicationFetcher {
});
}

/// Add the key if not present yet.
fn add_key(&mut self, holder: PeerId, key: RecordKey, record_type: RecordType) {
// Do nothing if the incoming key is out_of_range
if let Some(ref distance_range) = self.distance_range {
let self_address = NetworkAddress::from_peer(self.self_peer_id);
let in_address = NetworkAddress::from_record_key(&key);

if self_address.distance(&in_address) >= *distance_range {
info!("The incoming record_key {in_address:?} is out of range, do not fetch it from {holder:?}");
return;
}
}

let _ = self
.to_be_fetched
.entry((key, record_type, holder))
.or_insert(Instant::now() + PENDING_TIMEOUT);
}

/// Sends an event after pushing it off thread so as to be non-blocking
/// this is a wrapper around the `mpsc::Sender::send` call
fn send_event(&self, event: NetworkEvent) {
Expand Down
Loading