diff --git a/sn_networking/src/replication_fetcher.rs b/sn_networking/src/replication_fetcher.rs index 2bc078d922..66165be48d 100644 --- a/sn_networking/src/replication_fetcher.rs +++ b/sn_networking/src/replication_fetcher.rs @@ -67,38 +67,64 @@ impl ReplicationFetcher { pub(crate) fn add_keys( &mut self, holder: PeerId, - incoming_keys: Vec<(NetworkAddress, RecordType)>, + mut incoming_keys: Vec<(NetworkAddress, RecordType)>, locally_stored_keys: &HashMap, ) -> Vec<(PeerId, RecordKey)> { self.remove_stored_keys(locally_stored_keys); - let is_new_data = if incoming_keys.len() == 1 { - // The incoming list is for a new data to be replicated out. + let mut keys_to_fetch = vec![]; + // For new data, it will be replicated out in a special replication_list of length 1. + // And we shall `fetch` that copy immediately, if it's not being fetched. + if incoming_keys.len() == 1 { let (record_address, record_type) = incoming_keys[0].clone(); - Some((record_address.to_record_key(), record_type)) - } else { - None - }; - - self.to_be_fetched - .retain(|_, time_out| *time_out > Instant::now()); - // add non existing keys to the fetcher - incoming_keys - .into_iter() - .for_each(|(key, record_type)| self.add_key(holder, key.to_record_key(), record_type)); + let new_data_key = (record_address.to_record_key(), record_type); - let mut keys_to_fetch = self.next_keys_to_fetch(); - - // For new data, it will be replicated out in a special replication_list of length 1. - // And we shall `fetch` that copy immediately, if it's not being fetched. - if let Some(new_data_key) = is_new_data { if let Entry::Vacant(entry) = self.on_going_fetches.entry(new_data_key.clone()) { let (record_key, _record_type) = new_data_key; keys_to_fetch.push((holder, record_key)); let _ = entry.insert((holder, Instant::now())); } + + // To avoid later on un-necessary actions. + incoming_keys.clear(); + } + + self.to_be_fetched + .retain(|_, time_out| *time_out > Instant::now()); + + let mut out_of_range_keys = vec![]; + let total_incoming_keys = incoming_keys.len(); + // Filter out those out_of_range ones among the imcoming _keys. + if let Some(ref distance_range) = self.distance_range { + let self_address = NetworkAddress::from_peer(self.self_peer_id); + + incoming_keys.retain(|(addr, _record_type)| { + let is_in_range = self_address.distance(addr) >= *distance_range; + if !is_in_range { + out_of_range_keys.push(addr.clone()); + } + is_in_range + }); } + + if !out_of_range_keys.is_empty() { + info!("Among {total_incoming_keys} incoming replications from {holder:?}, found {} out of range", out_of_range_keys.len()); + for addr in out_of_range_keys.iter() { + trace!("The incoming record_key {addr:?} is out of range, do not fetch it from {holder:?}"); + } + } + + // add in-range AND non existing keys to the fetcher + incoming_keys.into_iter().for_each(|(addr, record_type)| { + let _ = self + .to_be_fetched + .entry((addr.to_record_key(), record_type, holder)) + .or_insert(Instant::now() + PENDING_TIMEOUT); + }); + + keys_to_fetch.extend(self.next_keys_to_fetch()); + keys_to_fetch } @@ -213,12 +239,14 @@ impl ReplicationFetcher { } }); - // now we ensure we clear our any/all failed nodes from our lists. + // now to clear any failed nodes from our lists. self.to_be_fetched .retain(|(_, _, holder), _| !failed_holders.contains(holder)); - // Such failed_hodlers shall be reported back and be excluded from RT. - self.send_event(NetworkEvent::FailedToFetchHolders(failed_holders)); + // Such failed_hodlers (if any) shall be reported back and be excluded from RT. + if !failed_holders.is_empty() { + self.send_event(NetworkEvent::FailedToFetchHolders(failed_holders)); + } } /// Remove keys that we hold already and no longer need to be replicated. @@ -244,25 +272,6 @@ impl ReplicationFetcher { }); } - /// Add the key if not present yet. - fn add_key(&mut self, holder: PeerId, key: RecordKey, record_type: RecordType) { - // Do nothing if the incoming key is out_of_range - if let Some(ref distance_range) = self.distance_range { - let self_address = NetworkAddress::from_peer(self.self_peer_id); - let in_address = NetworkAddress::from_record_key(&key); - - if self_address.distance(&in_address) >= *distance_range { - info!("The incoming record_key {in_address:?} is out of range, do not fetch it from {holder:?}"); - return; - } - } - - let _ = self - .to_be_fetched - .entry((key, record_type, holder)) - .or_insert(Instant::now() + PENDING_TIMEOUT); - } - /// Sends an event after pushing it off thread so as to be non-blocking /// this is a wrapper around the `mpsc::Sender::send` call fn send_event(&self, event: NetworkEvent) {