From 192df8e4717becdc5de4f9f2701647cf96805c6e Mon Sep 17 00:00:00 2001 From: Alexander Tesfamichael Date: Tue, 27 Aug 2024 17:37:17 +0200 Subject: [PATCH] fix: use all nodes to fetch blocks --- src/beacon_api.rs | 96 ++++++++++++++++++---------- src/phoenix/consensus_node.rs | 5 +- src/phoenix/inclusion_monitor/mod.rs | 4 +- 3 files changed, 68 insertions(+), 37 deletions(-) diff --git a/src/beacon_api.rs b/src/beacon_api.rs index 6b2bfa8..84cf107 100644 --- a/src/beacon_api.rs +++ b/src/beacon_api.rs @@ -2,7 +2,6 @@ use anyhow::anyhow; use rand::seq::SliceRandom; use reqwest::{StatusCode, Url}; use serde::Deserialize; -use tracing::warn; #[derive(Deserialize)] struct BeaconResponse { @@ -19,7 +18,7 @@ pub struct SyncStatus { pub is_syncing: bool, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct ExecutionPayload { pub block_hash: String, #[serde(deserialize_with = "parse_i64_from_string")] @@ -52,33 +51,36 @@ where #[derive(Clone)] pub struct BeaconApi { - node_urls: Vec, + node_hosts: Vec, client: reqwest::Client, } impl BeaconApi { - pub fn new(nodes: &[Url]) -> Self { - if !nodes.is_empty() { - Self { - node_urls: nodes.to_vec(), - client: reqwest::Client::new(), - } - } else { - panic!("tried to instantiate BeaconAPI without at least one url"); + pub fn new(node_hosts: &[Url]) -> Self { + assert!( + !node_hosts.is_empty(), + "tried to instantiate BeaconAPI without at least one url" + ); + + Self { + node_hosts: node_hosts.to_vec(), + client: reqwest::Client::new(), } } - // poor mans load balancer, get random node from list - fn get_node(&self) -> &Url { - self.node_urls.choose(&mut rand::thread_rng()).unwrap() + /// Pick a random beacon node from the list we've been initialized with. + fn random_host(&self) -> &Url { + self.node_hosts.choose(&mut rand::thread_rng()).unwrap() } + /// Fetch a validator index from a pubkey. pub async fn validator_index(&self, pubkey: &String) -> reqwest::Result { let url = format!( "{}eth/v1/beacon/states/head/validators/{}", - self.get_node(), + self.random_host(), pubkey ); + self.client .get(url) .send() @@ -88,8 +90,8 @@ impl BeaconApi { .map(|body| body.data.index) } - // Method to fetch the payload from a node and a slot - async fn fetch_payload( + /// Method to fetch the payload from a node and a slot. + async fn block_by_slot( &self, node: &Url, slot: i64, @@ -97,6 +99,10 @@ impl BeaconApi { let url = format!("{}eth/v2/beacon/blocks/{}", node, slot); let res = self.client.get(&url).send().await?; match res.status() { + // Could mean: + // 1. Slot doesn't have a block. + // 2. Slot is in the future and doesn't have a block yet. + // 3. Slot is before the beacon node backfill limit. StatusCode::NOT_FOUND => Ok(None), StatusCode::OK => { let block = res @@ -114,8 +120,45 @@ impl BeaconApi { } } + /// Fetch a beacon block by slot. + /// + /// This function is intended to be highly reliable, it does so by calling as many nodes as it + /// can and returning the first Ok(Some) if any, then Ok(None) if any, and finally the first + /// error. + pub async fn block_by_slot_any(&self, slot: i64) -> anyhow::Result> { + let futures = self + .node_hosts + .iter() + .map(|node| self.block_by_slot(node, slot)); + let results = futures::future::join_all(futures).await; + + // Attempt to return the first Ok(Some) if any. + for result in &results { + match result { + Ok(Some(payload)) => return Ok(Some(payload.clone())), + Ok(None) => continue, + Err(_) => continue, + } + } + + // Attempt to return the first Ok(None) if any. + for result in &results { + match result { + Ok(None) => return Ok(None), + Ok(Some(_)) => continue, + Err(_) => continue, + } + } + + // Return the first error if all Ok(None) and Ok(Some) are exhausted. + results + .into_iter() + .next() + .expect("expect results to be all errors") + } + // Method to fetch the sync status from a node - pub async fn fetch_sync_status(&self, node_url: &Url) -> reqwest::Result { + async fn sync_status(&self, node_url: &Url) -> reqwest::Result { let url = format!("{}eth/v1/node/syncing", node_url); self.client .get(&url) @@ -128,19 +171,8 @@ impl BeaconApi { .map_err(Into::into) } - pub async fn fetch_payload_all(&self, slot: i64) -> anyhow::Result> { - for (i, node) in self.node_urls.iter().enumerate() { - match self.fetch_payload(node, slot).await { - Ok(res) => return Ok(res), - Err(err) => { - warn!("failed to fetch payload from {}: {:?}", node, err); - if i == self.node_urls.len() - 1 { - return Err(err); - } - } - } - } - - unreachable!("last iteration should always return") + pub async fn sync_status_all(&self) -> Vec> { + let futures = self.node_hosts.iter().map(|node| self.sync_status(node)); + futures::future::join_all(futures).await } } diff --git a/src/phoenix/consensus_node.rs b/src/phoenix/consensus_node.rs index 703bfdd..613968e 100644 --- a/src/phoenix/consensus_node.rs +++ b/src/phoenix/consensus_node.rs @@ -19,9 +19,8 @@ impl ConsensusNodeMonitor { async fn num_unsynced_nodes(&self) -> usize { let mut results = Vec::new(); - for url in &APP_CONFIG.consensus_nodes { - let status = self.beacon_api.fetch_sync_status(url).await; - + let statuses = self.beacon_api.sync_status_all().await; + for status in statuses { match status { Ok(s) => results.push(!s.is_syncing), Err(err) => { diff --git a/src/phoenix/inclusion_monitor/mod.rs b/src/phoenix/inclusion_monitor/mod.rs index 7ef80ff..2b7382c 100644 --- a/src/phoenix/inclusion_monitor/mod.rs +++ b/src/phoenix/inclusion_monitor/mod.rs @@ -321,7 +321,7 @@ async fn was_attempted_reorg( delivered: &DeliveredPayload, ) -> anyhow::Result { let prev_slot = delivered.slot - 1; - let prev_payload = beacon_api.fetch_payload_all(prev_slot).await?; + let prev_payload = beacon_api.block_by_slot_any(prev_slot).await?; Ok(prev_payload .map(|p| p.block_number == delivered.block_number) .unwrap_or(false)) @@ -334,7 +334,7 @@ async fn check_missing_payload( payload: &DeliveredPayload, relay_pool: &PgPool, ) -> anyhow::Result<()> { - let block = beacon_api.fetch_payload_all(payload.slot).await?; + let block = beacon_api.block_by_slot_any(payload.slot).await?; match block { Some(ExecutionPayload { block_hash, .. }) => {