From c32d7f0cd32755a53d11717da0cd5cede412ff9e Mon Sep 17 00:00:00 2001 From: Joseph Livesey Date: Fri, 18 Oct 2024 17:09:18 -0400 Subject: [PATCH] chore(firehose-client): improve error handling --- Cargo.lock | 1 + Cargo.toml | 1 + crates/firehose-client/Cargo.toml | 1 + .../firehose-client/examples/stream_beacon.rs | 3 +- .../examples/stream_ethereum.rs | 3 +- crates/firehose-client/src/client.rs | 99 +++++++++---------- .../examples/single_execution_block.rs | 3 +- 7 files changed, 57 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1d24944..395129b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2150,6 +2150,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2a9ca790..33dd0148 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ tokio-stream = "0.1.16" tokio-test = "0.4.3" tonic = "0.12.0" tonic-build = "0.12.0" +tracing = "0.1.40" tree_hash = "0.6.0" trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0", tag = "v0.1.0-alpha.35" } types = { git = "https://github.com/semiotic-ai/lighthouse.git", branch = "stable" } diff --git a/crates/firehose-client/Cargo.toml b/crates/firehose-client/Cargo.toml index 214e800b..98d692fd 100644 --- a/crates/firehose-client/Cargo.toml +++ b/crates/firehose-client/Cargo.toml @@ -22,6 +22,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream.workspace = true tonic = { workspace = true, features = ["tls-roots"] } +tracing.workspace = true [dev-dependencies] hex.workspace = true diff --git a/crates/firehose-client/examples/stream_beacon.rs b/crates/firehose-client/examples/stream_beacon.rs index 900ca780..19089372 100644 --- a/crates/firehose-client/examples/stream_beacon.rs +++ b/crates/firehose-client/examples/stream_beacon.rs @@ -16,7 +16,8 @@ async fn main() { let mut client = FirehoseClient::new(Chain::Beacon); let mut stream = client .stream_beacon_with_retry(START_SLOT, TOTAL_SLOTS) - .await; + .await + .unwrap(); let mut blocks: Vec = Vec::with_capacity(TOTAL_SLOTS as usize); diff --git a/crates/firehose-client/examples/stream_ethereum.rs b/crates/firehose-client/examples/stream_ethereum.rs index 2e585915..d1b90b24 100644 --- a/crates/firehose-client/examples/stream_ethereum.rs +++ b/crates/firehose-client/examples/stream_ethereum.rs @@ -15,7 +15,8 @@ async fn main() { let mut client = FirehoseClient::new(Chain::Ethereum); let mut stream = client .stream_ethereum_with_retry(START_BLOCK, TOTAL_BLOCKS) - .await; + .await + .unwrap(); let mut blocks: Vec = Vec::with_capacity(TOTAL_BLOCKS as usize); diff --git a/crates/firehose-client/src/client.rs b/crates/firehose-client/src/client.rs index 8d96d0f9..a192fcba 100644 --- a/crates/firehose-client/src/client.rs +++ b/crates/firehose-client/src/client.rs @@ -17,6 +17,7 @@ use tonic::{ transport::{Channel, Uri}, Response, Status, }; +use tracing::{error, info, trace}; pub struct FirehoseClient { chain: Chain, @@ -43,9 +44,10 @@ impl FirehoseClient { self.fetch_client = Some(fetch_client(self.chain).await?); } let mut request = create_single_block_fetch_request(number); + request.insert_api_key_if_provided(self.chain); - // TODO: proper tracing - println!("Requesting block number:\n\t{}", number); + + info!("Requesting block number:\n\t{}", number); Ok(self.fetch_client.as_mut().unwrap().block(request).await) } @@ -66,16 +68,10 @@ impl FirehoseClient { &mut self, start: u64, total: u64, - ) -> impl futures::Stream { + ) -> Result, ClientError> { let (tx, rx) = tokio::sync::mpsc::channel::(8192); - let client = self - .get_streaming_client() - .await - .map_err(|e| { - panic!("Failed to get streaming client: {:?}", e); - }) - .unwrap(); + let client = self.get_streaming_client().await?; tokio::spawn(async move { let mut blocks = 0; @@ -90,59 +86,60 @@ impl FirehoseClient { start + total - 1, BlocksRequested::All, ); - let response = client.blocks(request).await.unwrap(); - let mut stream_inner = response.into_inner(); - while let Ok(Some(block_msg)) = stream_inner.message().await { - if blocks % 100 == 0 { - println!("Blocks fetched: {}", blocks); - } - match FirehoseBeaconBlock::try_from(block_msg) { - Ok(block) => { - if let Some(last_slot) = last_valid_slot { - let missed_slots = block.slot.saturating_sub(last_slot + 1); - if missed_slots > 0 { - // TODO: proper tracing - println!("Missed block at slot: {}", start + blocks); - - let last_block = last_valid_block.take().unwrap(); - let tx = tx.clone(); - for _ in 0..missed_slots { - blocks += 1; - tx.send(last_block.clone()).await.unwrap(); + match client.blocks(request).await { + Ok(response) => { + let mut stream_inner = response.into_inner(); + while let Ok(Some(block_msg)) = stream_inner.message().await { + if blocks % 100 == 0 { + trace!("Blocks fetched: {}", blocks); + } + match FirehoseBeaconBlock::try_from(block_msg) { + Ok(block) => { + if let Some(last_slot) = last_valid_slot { + let missed_slots = block.slot.saturating_sub(last_slot + 1); + if missed_slots > 0 { + trace!("Missed block at slot: {}", start + blocks); + + let last_block = last_valid_block.take().unwrap(); + let tx = tx.clone(); + for _ in 0..missed_slots { + blocks += 1; + tx.send(last_block.clone()).await.unwrap(); + } + } } + last_valid_slot = Some(block.slot); + last_valid_block = Some(block.clone()); + + blocks += 1; + tx.clone().send(block).await.unwrap(); + } + Err(e) => { + error!("Failed to convert block message to block: {e}"); + break; } } - last_valid_slot = Some(block.slot); - last_valid_block = Some(block.clone()); - - blocks += 1; - tx.clone().send(block).await.unwrap(); - } - Err(_) => { - panic!("Failed to convert block message to block"); } } - } + Err(e) => { + error!("Failed to get blocks stream: {:?}", e.code()); + break; + } + }; } }); - ReceiverStream::new(rx) + Ok(ReceiverStream::new(rx)) } pub async fn stream_ethereum_with_retry( &mut self, start: u64, total: u64, - ) -> impl futures::Stream { + ) -> Result, ClientError> { let (tx, rx) = tokio::sync::mpsc::channel::(8192); - let client = self - .get_streaming_client() - .await - .map_err(|e| { - panic!("Failed to get streaming client: {:?}", e); - }) - .unwrap(); + let client = self.get_streaming_client().await?; tokio::spawn(async move { let mut blocks = 0; @@ -159,22 +156,22 @@ impl FirehoseClient { let mut stream_inner = response.into_inner(); while let Ok(Some(block_msg)) = stream_inner.message().await { if blocks % 100 == 0 && blocks != 0 { - println!("Blocks fetched: {}", blocks); + trace!("Blocks fetched: {}", blocks); } match FirehoseEthBlock::try_from(block_msg) { Ok(block) => { blocks += 1; tx.clone().send(block).await.unwrap(); } - Err(_) => { - panic!("Failed to convert block message to block"); + Err(e) => { + panic!("Failed to convert block message to block: {e}"); } } } } }); - ReceiverStream::new(rx) + Ok(ReceiverStream::new(rx)) } } diff --git a/crates/forrestrie/examples/single_execution_block.rs b/crates/forrestrie/examples/single_execution_block.rs index d559a18c..e24c0d76 100644 --- a/crates/forrestrie/examples/single_execution_block.rs +++ b/crates/forrestrie/examples/single_execution_block.rs @@ -154,7 +154,8 @@ async fn main() { let num_blocks = SLOTS_PER_HISTORICAL_ROOT as u64; let mut stream = beacon_client .stream_beacon_with_retry(era * SLOTS_PER_HISTORICAL_ROOT as u64, num_blocks) - .await; + .await + .unwrap(); let mut block_roots: Vec = Vec::with_capacity(8192); while let Some(block) = stream.next().await { let root = BlockRoot::try_from(block).unwrap();