Skip to content

Commit

Permalink
chore(firehose-client): improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
suchapalaver committed Oct 21, 2024
1 parent f19fc34 commit c32d7f0
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tokio-stream = "0.1.16"
tokio-test = "0.4.3"
tonic = "0.12.0"
tonic-build = "0.12.0"
tracing = "0.1.40"
tree_hash = "0.6.0"
trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0", tag = "v0.1.0-alpha.35" }
types = { git = "https://github.com/semiotic-ai/lighthouse.git", branch = "stable" }
Expand Down
1 change: 1 addition & 0 deletions crates/firehose-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-stream.workspace = true
tonic = { workspace = true, features = ["tls-roots"] }
tracing.workspace = true

[dev-dependencies]
hex.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/firehose-client/examples/stream_beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ async fn main() {
let mut client = FirehoseClient::new(Chain::Beacon);
let mut stream = client
.stream_beacon_with_retry(START_SLOT, TOTAL_SLOTS)
.await;
.await
.unwrap();

let mut blocks: Vec<FirehoseBeaconBlock> = Vec::with_capacity(TOTAL_SLOTS as usize);

Expand Down
3 changes: 2 additions & 1 deletion crates/firehose-client/examples/stream_ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ async fn main() {
let mut client = FirehoseClient::new(Chain::Ethereum);
let mut stream = client
.stream_ethereum_with_retry(START_BLOCK, TOTAL_BLOCKS)
.await;
.await
.unwrap();

let mut blocks: Vec<FirehoseEthBlock> = Vec::with_capacity(TOTAL_BLOCKS as usize);

Expand Down
99 changes: 48 additions & 51 deletions crates/firehose-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tonic::{
transport::{Channel, Uri},
Response, Status,
};
use tracing::{error, info, trace};

pub struct FirehoseClient {
chain: Chain,
Expand All @@ -43,9 +44,10 @@ impl FirehoseClient {
self.fetch_client = Some(fetch_client(self.chain).await?);
}
let mut request = create_single_block_fetch_request(number);

request.insert_api_key_if_provided(self.chain);
// TODO: proper tracing
println!("Requesting block number:\n\t{}", number);

info!("Requesting block number:\n\t{}", number);
Ok(self.fetch_client.as_mut().unwrap().block(request).await)
}

Expand All @@ -66,16 +68,10 @@ impl FirehoseClient {
&mut self,
start: u64,
total: u64,
) -> impl futures::Stream<Item = FirehoseBeaconBlock> {
) -> Result<impl futures::Stream<Item = FirehoseBeaconBlock>, ClientError> {
let (tx, rx) = tokio::sync::mpsc::channel::<FirehoseBeaconBlock>(8192);

let client = self
.get_streaming_client()
.await
.map_err(|e| {
panic!("Failed to get streaming client: {:?}", e);
})
.unwrap();
let client = self.get_streaming_client().await?;

tokio::spawn(async move {
let mut blocks = 0;
Expand All @@ -90,59 +86,60 @@ impl FirehoseClient {
start + total - 1,
BlocksRequested::All,
);
let response = client.blocks(request).await.unwrap();
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 {
println!("Blocks fetched: {}", blocks);
}
match FirehoseBeaconBlock::try_from(block_msg) {
Ok(block) => {
if let Some(last_slot) = last_valid_slot {
let missed_slots = block.slot.saturating_sub(last_slot + 1);
if missed_slots > 0 {
// TODO: proper tracing
println!("Missed block at slot: {}", start + blocks);

let last_block = last_valid_block.take().unwrap();
let tx = tx.clone();
for _ in 0..missed_slots {
blocks += 1;
tx.send(last_block.clone()).await.unwrap();
match client.blocks(request).await {
Ok(response) => {
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 {
trace!("Blocks fetched: {}", blocks);
}
match FirehoseBeaconBlock::try_from(block_msg) {
Ok(block) => {
if let Some(last_slot) = last_valid_slot {
let missed_slots = block.slot.saturating_sub(last_slot + 1);
if missed_slots > 0 {
trace!("Missed block at slot: {}", start + blocks);

let last_block = last_valid_block.take().unwrap();
let tx = tx.clone();
for _ in 0..missed_slots {
blocks += 1;
tx.send(last_block.clone()).await.unwrap();
}
}
}
last_valid_slot = Some(block.slot);
last_valid_block = Some(block.clone());

blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(e) => {
error!("Failed to convert block message to block: {e}");
break;
}
}
last_valid_slot = Some(block.slot);
last_valid_block = Some(block.clone());

blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(_) => {
panic!("Failed to convert block message to block");
}
}
}
Err(e) => {
error!("Failed to get blocks stream: {:?}", e.code());
break;
}
};
}
});

ReceiverStream::new(rx)
Ok(ReceiverStream::new(rx))
}

pub async fn stream_ethereum_with_retry(
&mut self,
start: u64,
total: u64,
) -> impl futures::Stream<Item = FirehoseEthBlock> {
) -> Result<impl futures::Stream<Item = FirehoseEthBlock>, ClientError> {
let (tx, rx) = tokio::sync::mpsc::channel::<FirehoseEthBlock>(8192);

let client = self
.get_streaming_client()
.await
.map_err(|e| {
panic!("Failed to get streaming client: {:?}", e);
})
.unwrap();
let client = self.get_streaming_client().await?;

tokio::spawn(async move {
let mut blocks = 0;
Expand All @@ -159,22 +156,22 @@ impl FirehoseClient {
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 && blocks != 0 {
println!("Blocks fetched: {}", blocks);
trace!("Blocks fetched: {}", blocks);
}
match FirehoseEthBlock::try_from(block_msg) {
Ok(block) => {
blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(_) => {
panic!("Failed to convert block message to block");
Err(e) => {
panic!("Failed to convert block message to block: {e}");
}
}
}
}
});

ReceiverStream::new(rx)
Ok(ReceiverStream::new(rx))
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/forrestrie/examples/single_execution_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ async fn main() {
let num_blocks = SLOTS_PER_HISTORICAL_ROOT as u64;
let mut stream = beacon_client
.stream_beacon_with_retry(era * SLOTS_PER_HISTORICAL_ROOT as u64, num_blocks)
.await;
.await
.unwrap();
let mut block_roots: Vec<Hash256> = Vec::with_capacity(8192);
while let Some(block) = stream.next().await {
let root = BlockRoot::try_from(block).unwrap();
Expand Down

0 comments on commit c32d7f0

Please sign in to comment.