Skip to content

Commit

Permalink
Merge pull request #7 from jxs/quic
Browse files Browse the repository at this point in the history
cleanups
  • Loading branch information
AgeManning authored Aug 11, 2023
2 parents 8dfeb3c + 5b5b262 commit 2fbd384
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 81 deletions.
48 changes: 10 additions & 38 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,20 +383,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.discv5.table_entries_enr()
}

/// Returns the ENR of a known peer if it exists.
pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option<Enr> {
// first search the local cache
if let Some(enr) = self.cached_enrs.get(peer_id) {
return Some(enr.clone());
}
// not in the local cache, look in the routing table
if let Ok(node_id) = enr_ext::peer_id_to_node_id(peer_id) {
self.discv5.find_enr(&node_id)
} else {
None
}
}

/// Updates the local ENR TCP port.
/// There currently isn't a case to update the address here. We opt for discovery to
/// automatically update the external address.
Expand Down Expand Up @@ -733,23 +719,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
target_peers: usize,
additional_predicate: impl Fn(&Enr) -> bool + Send + 'static,
) {
// Make sure there are subnet queries included
let contains_queries = match &query {
QueryType::Subnet(queries) => !queries.is_empty(),
QueryType::FindPeers => true,
};

if !contains_queries {
debug!(
self.log,
"No subnets included in this request. Skipping discovery request."
);
return;
}

// Generate a random target node id.
let random_node = NodeId::random();

let enr_fork_id = match self.local_enr().eth2() {
Ok(v) => v,
Err(e) => {
Expand All @@ -773,7 +742,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Build the future
let query_future = self
.discv5
.find_node_predicate(random_node, predicate, target_peers)
// Generate a random target node id.
.find_node_predicate(NodeId::random(), predicate, target_peers)
.map(|v| QueryResult {
query_type: query,
result: v,
Expand All @@ -797,12 +767,14 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
let mut results: HashMap<_, Option<Instant>> = HashMap::new();
r.into_iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
results.insert(enr, None);
});
let results = r
.into_iter()
.map(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
(enr, None)
})
.collect();
return Some(results);
}
Err(e) => {
Expand Down
34 changes: 16 additions & 18 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// This function decides whether or not to dial these peers.
#[allow(clippy::mutable_key_type)]
pub fn peers_discovered(&mut self, results: HashMap<Enr, Option<Instant>>) {
let mut to_dial_peers = Vec::with_capacity(4);

let mut to_dial_peers = 0;
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for (enr, min_ttl) in results {
// There are two conditions in deciding whether to dial this peer.
Expand All @@ -327,14 +326,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// considered a priority. We have pre-allocated some extra priority slots for these
// peers as specified by PRIORITY_PEER_EXCESS. Therefore we dial these peers, even
// if we are already at our max_peer limit.
if (min_ttl.is_some()
&& connected_or_dialing + to_dial_peers.len() < self.max_priority_peers()
|| connected_or_dialing + to_dial_peers.len() < self.max_peers())
&& self
.network_globals
.peers
.read()
.should_dial(&enr.peer_id())
if min_ttl.is_some() && connected_or_dialing + to_dial_peers < self.max_priority_peers()
|| connected_or_dialing + to_dial_peers < self.max_peers()
{
// This should be updated with the peer dialing. In fact created once the peer is
// dialed
Expand All @@ -345,15 +338,13 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.update_min_ttl(&enr.peer_id(), min_ttl);
}
debug!(self.log, "Dialing discovered peer"; "peer_id" => %enr.peer_id());
to_dial_peers.push(enr);
self.dial_peer(enr);
to_dial_peers += 1;
}
}

// Queue another discovery if we need to
self.maintain_peer_count(to_dial_peers.len());

// Dial the required peers
self.dial_peers(to_dial_peers);
self.maintain_peer_count(to_dial_peers);
}

/// A STATUS message has been received from a peer. This resets the status timer.
Expand Down Expand Up @@ -409,9 +400,16 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {

/* Notifications from the Swarm */

// A peer is being dialed.
pub fn dial_peers(&mut self, mut peers: Vec<Enr>) {
self.peers_to_dial.append(&mut peers);
/// A peer is being dialed.
pub fn dial_peer(&mut self, peer: Enr) {
if self
.network_globals
.peers
.read()
.should_dial(&peer.peer_id())
{
self.peers_to_dial.push(peer);
}
}

/// Reports if a peer is banned or not.
Expand Down
35 changes: 12 additions & 23 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,9 +1066,8 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
let peers_to_dial: Vec<Enr> = self
.discovery()
.cached_enrs()
.filter_map(|(peer_id, enr)| {
let peers = self.network_globals.peers.read();
if predicate(enr) && peers.should_dial(peer_id) {
.filter_map(|(_peer_id, enr)| {
if predicate(enr) {
Some(enr.clone())
} else {
None
Expand All @@ -1078,10 +1077,9 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {

// Remove the ENR from the cache to prevent continual re-dialing on disconnects
peers_to_dial.iter().for_each(|enr| {
self.peer_manager_mut().dial_peer(enr.clone());
self.discovery_mut().remove_cached_enr(&enr.peer_id());
});

self.peer_manager_mut().dial_peers(peers_to_dial);
}

/* Sub-behaviour event handling functions */
Expand Down Expand Up @@ -1366,19 +1364,6 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
}
}

/// Handle a discovery event.
fn inject_discovery_event(
&mut self,
event: DiscoveredPeers,
) -> Option<NetworkEvent<AppReqId, TSpec>> {
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
self.peer_manager_mut().peers_discovered(event.peers);
None
}

/// Handle an identify event.
fn inject_identify_event(
&mut self,
Expand Down Expand Up @@ -1479,7 +1464,14 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
BehaviourEvent::BannedPeers(void) => void::unreachable(void),
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
BehaviourEvent::Discovery(de) => self.inject_discovery_event(de),
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
BehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
self.peer_manager_mut().peers_discovered(peers);
None
}
BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
BehaviourEvent::ConnectionLimits(le) => void::unreachable(le),
Expand Down Expand Up @@ -1561,10 +1553,7 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
None
}
}
SwarmEvent::Dialing {
peer_id: _,
connection_id: _,
} => None,
SwarmEvent::Dialing { .. } => None,
};

if let Some(ev) = maybe_event {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub fn build_config(port: u16, mut boot_nodes: Vec<Enr>) -> NetworkConfig {
.unwrap();

config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, port, port, port + 1);
config.enr_disc4_port = Some(port);
config.enr_udp4_port = Some(port);
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path();
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod tests {
);

let mut config = NetworkConfig::default();
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212);
config.set_ipv4_listening_address(std::net::Ipv4Addr::UNSPECIFIED, 21212, 21212, 21212);
config.discv5_config.table_filter = |_| true; // Do not ignore local IPs
config.upnp_enabled = false;
config.boot_nodes_enr = enrs.clone();
Expand Down

0 comments on commit 2fbd384

Please sign in to comment.