Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check failed find nodes requests before sending new ones #11997

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/net/discv4/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct Discv4Config {
pub udp_egress_message_buffer: usize,
/// Size of the channel buffer for incoming messages.
pub udp_ingress_message_buffer: usize,
/// The number of allowed failures for `FindNode` requests. Default: 5.
/// The number of allowed consecutive failures for `FindNode` requests. Default: 5.
pub max_find_node_failures: u8,
/// The interval to use when checking for expired nodes that need to be re-pinged. Default:
/// 10min.
Expand Down
135 changes: 117 additions & 18 deletions crates/net/discv4/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,8 @@ impl Discv4Service {
trace!(target: "discv4", ?target, "Starting lookup");
let target_key = kad_key(target);

// Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes
// Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
// a valid endpoint proof
let ctx = LookupContext::new(
target_key.clone(),
self.kbuckets
Expand Down Expand Up @@ -772,7 +773,10 @@ impl Discv4Service {
trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");

for node in closest {
self.find_node(&node, ctx.clone());
// here we still want to check against previous request failures and if necessary
// re-establish a new endpoint proof because it can be the case that the other node lost
// our entry and no longer has an endpoint proof on their end
self.find_node_checked(&node, ctx.clone());
}
}

Expand All @@ -788,6 +792,22 @@ impl Discv4Service {
self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
}

/// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
/// whether we should should send a new ping first to renew the endpoint proof by checking the
/// previously failed findNode requests. It could be that the node is no longer reachable or
/// lost our entry.
fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
let max_failures = self.config.max_find_node_failures;
let needs_ping = self
.on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
.unwrap_or(true);
if needs_ping {
self.try_ping(*node, PingReason::Lookup(*node, ctx))
} else {
self.find_node(node, ctx)
}
}

/// Notifies all listeners.
///
/// Removes all listeners that are closed.
Expand Down Expand Up @@ -860,7 +880,7 @@ impl Discv4Service {
self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
}

/// Check if the peer has a bond
/// Check if the peer has an active bond.
fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
if timestamp.elapsed() < self.config.bond_expiration {
Expand All @@ -870,6 +890,19 @@ impl Discv4Service {
false
}

/// Applies a closure on the pending or present [`NodeEntry`].
fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
where
F: FnOnce(&NodeEntry) -> R,
{
let key = kad_key(peer_id);
match self.kbuckets.entry(&key) {
BucketEntry::Present(entry, _) => Some(f(entry.value())),
BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
_ => None,
}
}

/// Update the entry on RE-ping.
///
/// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
Expand Down Expand Up @@ -929,7 +962,7 @@ impl Discv4Service {
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, old_status) => {
// endpoint is now proven
entry.value_mut().has_endpoint_proof = true;
entry.value_mut().establish_proof();
entry.value_mut().update_with_enr(last_enr_seq);

if !old_status.is_connected() {
Expand All @@ -945,7 +978,7 @@ impl Discv4Service {
}
kbucket::Entry::Pending(mut entry, mut status) => {
// endpoint is now proven
entry.value().has_endpoint_proof = true;
entry.value().establish_proof();
entry.value().update_with_enr(last_enr_seq);

if !status.is_connected() {
Expand Down Expand Up @@ -1129,6 +1162,8 @@ impl Discv4Service {
// try to send it
ctx.unmark_queried(record.id);
} else {
// we just received a ping from that peer so we can send a find node request
// directly
self.find_node(&record, ctx);
}
}
Expand Down Expand Up @@ -1419,14 +1454,28 @@ impl Discv4Service {
BucketEntry::SelfEntry => {
// we received our own node entry
}
BucketEntry::Present(mut entry, _) => {
if entry.value_mut().has_endpoint_proof {
self.find_node(&closest, ctx.clone());
BucketEntry::Present(entry, _) => {
if entry.value().has_endpoint_proof {
if entry
.value()
.exceeds_find_node_failures(self.config.max_find_node_failures)
{
self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
} else {
self.find_node(&closest, ctx.clone());
}
}
}
BucketEntry::Pending(mut entry, _) => {
if entry.value().has_endpoint_proof {
self.find_node(&closest, ctx.clone());
if entry
.value()
.exceeds_find_node_failures(self.config.max_find_node_failures)
{
self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
} else {
self.find_node(&closest, ctx.clone());
}
}
}
}
Expand Down Expand Up @@ -1486,27 +1535,27 @@ impl Discv4Service {
self.remove_node(node_id);
}

self.evict_failed_neighbours(now);
self.evict_failed_find_nodes(now);
}

/// Handles failed responses to `FindNode`
fn evict_failed_neighbours(&mut self, now: Instant) {
let mut failed_neighbours = Vec::new();
fn evict_failed_find_nodes(&mut self, now: Instant) {
let mut failed_find_nodes = Vec::new();
self.pending_find_nodes.retain(|node_id, find_node_request| {
if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
if !find_node_request.answered {
// node actually responded but with fewer entries than expected, but we don't
// treat this as an hard error since it responded.
failed_neighbours.push(*node_id);
failed_find_nodes.push(*node_id);
}
return false
}
true
});

trace!(target: "discv4", num=%failed_neighbours.len(), "processing failed neighbours");
trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");

for node_id in failed_neighbours {
for node_id in failed_find_nodes {
let key = kad_key(node_id);
let failures = match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
Expand All @@ -1523,7 +1572,7 @@ impl Discv4Service {
// if the node failed to respond anything useful multiple times, remove the node from
// the table, but only if there are enough other nodes in the bucket (bucket must be at
// least half full)
if failures > (self.config.max_find_node_failures as usize) {
if failures > self.config.max_find_node_failures {
self.soft_remove_node(node_id);
}
}
Expand Down Expand Up @@ -2216,8 +2265,8 @@ struct NodeEntry {
last_enr_seq: Option<u64>,
/// `ForkId` if retrieved via ENR requests.
fork_id: Option<ForkId>,
/// Counter for failed findNode requests.
find_node_failures: usize,
/// Counter for failed _consecutive_ findNode requests.
find_node_failures: u8,
/// Whether the endpoint of the peer is proven.
has_endpoint_proof: bool,
}
Expand All @@ -2244,6 +2293,17 @@ impl NodeEntry {
node
}

/// Marks the entry with an established proof and resets the consecutive failure counter.
fn establish_proof(&mut self) {
self.has_endpoint_proof = true;
self.find_node_failures = 0;
}

/// Returns true if the tracked find node failures exceed the max amount
const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
self.find_node_failures >= max_failures
}

/// Updates the last timestamp and sets the enr seq
fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
Expand Down Expand Up @@ -2660,6 +2720,45 @@ mod tests {
assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
}

#[tokio::test]
async fn test_reping_on_find_node_failures() {
reth_tracing::init_test_tracing();

let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config).await;

let target = PeerId::random();

let id = PeerId::random();
let key = kad_key(id);
let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);

let mut entry = NodeEntry::new_proven(record);
entry.find_node_failures = u8::MAX;
let _ = service.kbuckets.insert_or_update(
&key,
entry,
NodeStatus {
direction: ConnectionDirection::Incoming,
state: ConnectionState::Connected,
},
);

service.lookup(target);
assert_eq!(service.pending_find_nodes.len(), 0);
assert_eq!(service.pending_pings.len(), 1);

service.update_on_pong(record, None);

service
.on_entry(record.id, |entry| {
// reset on pong
assert_eq!(entry.find_node_failures, 0);
assert!(entry.has_endpoint_proof);
})
.unwrap();
}

#[tokio::test]
async fn test_service_commands() {
reth_tracing::init_test_tracing();
Expand Down
Loading