Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(firehose-client): fix client chain specification #7

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tokio-stream = "0.1.16"
tokio-test = "0.4.3"
tonic = "0.12.0"
tonic-build = "0.12.0"
tracing = "0.1.40"
tree_hash = "0.6.0"
trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0", tag = "v0.1.0-alpha.35" }
types = { git = "https://github.com/semiotic-ai/lighthouse.git", branch = "stable" }
Expand Down
1 change: 1 addition & 0 deletions crates/firehose-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-stream.workspace = true
tonic = { workspace = true, features = ["tls-roots"] }
tracing.workspace = true

[dev-dependencies]
hex.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/firehose-client/examples/stream_beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ async fn main() {
let mut client = FirehoseClient::new(Chain::Beacon);
let mut stream = client
.stream_beacon_with_retry(START_SLOT, TOTAL_SLOTS)
.await;
.await
.unwrap();

let mut blocks: Vec<FirehoseBeaconBlock> = Vec::with_capacity(TOTAL_SLOTS as usize);

Expand Down
3 changes: 2 additions & 1 deletion crates/firehose-client/examples/stream_ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ async fn main() {
let mut client = FirehoseClient::new(Chain::Ethereum);
let mut stream = client
.stream_ethereum_with_retry(START_BLOCK, TOTAL_BLOCKS)
.await;
.await
.unwrap();

let mut blocks: Vec<FirehoseEthBlock> = Vec::with_capacity(TOTAL_BLOCKS as usize);

Expand Down
113 changes: 56 additions & 57 deletions crates/firehose-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tonic::{
transport::{Channel, Uri},
Response, Status,
};
use tracing::{error, info, trace};

pub struct FirehoseClient {
chain: Chain,
Expand All @@ -43,9 +44,10 @@ impl FirehoseClient {
self.fetch_client = Some(fetch_client(self.chain).await?);
}
let mut request = create_single_block_fetch_request(number);

request.insert_api_key_if_provided(self.chain);
// TODO: proper tracing
println!("Requesting block number:\n\t{}", number);

info!("Requesting block number:\n\t{}", number);
Ok(self.fetch_client.as_mut().unwrap().block(request).await)
}

Expand All @@ -66,16 +68,11 @@ impl FirehoseClient {
&mut self,
start: u64,
total: u64,
) -> impl futures::Stream<Item = FirehoseBeaconBlock> {
) -> Result<impl futures::Stream<Item = FirehoseBeaconBlock>, ClientError> {
let (tx, rx) = tokio::sync::mpsc::channel::<FirehoseBeaconBlock>(8192);

let client = self
.get_streaming_client()
.await
.map_err(|e| {
panic!("Failed to get streaming client: {:?}", e);
})
.unwrap();
let chain = self.chain;
let client = self.get_streaming_client().await?;

tokio::spawn(async move {
let mut blocks = 0;
Expand All @@ -85,72 +82,74 @@ impl FirehoseClient {
while blocks < total {
let mut client = client.clone();
let request = create_blocks_streaming_request(
Chain::Ethereum,
chain,
Comment on lines -88 to +85
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main fix.

start + blocks,
start + total - 1,
BlocksRequested::All,
);
let response = client.blocks(request).await.unwrap();
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 {
println!("Blocks fetched: {}", blocks);
}
match FirehoseBeaconBlock::try_from(block_msg) {
Ok(block) => {
if let Some(last_slot) = last_valid_slot {
let missed_slots = block.slot.saturating_sub(last_slot + 1);
if missed_slots > 0 {
// TODO: proper tracing
println!("Missed block at slot: {}", start + blocks);

let last_block = last_valid_block.take().unwrap();
let tx = tx.clone();
for _ in 0..missed_slots {
blocks += 1;
tx.send(last_block.clone()).await.unwrap();
match client.blocks(request).await {
Ok(response) => {
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 {
trace!("Blocks fetched: {}", blocks);
}
match FirehoseBeaconBlock::try_from(block_msg) {
Ok(block) => {
if let Some(last_slot) = last_valid_slot {
let missed_slots = block.slot.saturating_sub(last_slot + 1);
if missed_slots > 0 {
trace!("Missed block at slot: {}", start + blocks);

let last_block = last_valid_block.take().unwrap();
let tx = tx.clone();
for _ in 0..missed_slots {
blocks += 1;
tx.send(last_block.clone()).await.unwrap();
}
}
}
last_valid_slot = Some(block.slot);
last_valid_block = Some(block.clone());

blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(e) => {
error!("Failed to convert block message to block: {e}");
break;
}
}
last_valid_slot = Some(block.slot);
last_valid_block = Some(block.clone());

blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(_) => {
panic!("Failed to convert block message to block");
}
}
}
Err(e) => {
error!("Failed to get blocks stream: {:?}", e.code());
break;
}
};
}
});

ReceiverStream::new(rx)
Ok(ReceiverStream::new(rx))
}

pub async fn stream_ethereum_with_retry(
&mut self,
start: u64,
total: u64,
) -> impl futures::Stream<Item = FirehoseEthBlock> {
) -> Result<impl futures::Stream<Item = FirehoseEthBlock>, ClientError> {
let (tx, rx) = tokio::sync::mpsc::channel::<FirehoseEthBlock>(8192);

let client = self
.get_streaming_client()
.await
.map_err(|e| {
panic!("Failed to get streaming client: {:?}", e);
})
.unwrap();
let chain = self.chain;
let client = self.get_streaming_client().await?;

tokio::spawn(async move {
let mut blocks = 0;

while blocks < total {
let mut client = client.clone();
let request = create_blocks_streaming_request(
Chain::Ethereum,
chain,
start + blocks,
start + total - 1,
BlocksRequested::All,
Expand All @@ -159,22 +158,22 @@ impl FirehoseClient {
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 && blocks != 0 {
println!("Blocks fetched: {}", blocks);
trace!("Blocks fetched: {}", blocks);
}
match FirehoseEthBlock::try_from(block_msg) {
Ok(block) => {
blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(_) => {
panic!("Failed to convert block message to block");
Err(e) => {
panic!("Failed to convert block message to block: {e}");
}
}
}
}
});

ReceiverStream::new(rx)
Ok(ReceiverStream::new(rx))
}
}

Expand Down Expand Up @@ -209,15 +208,15 @@ fn create_blocks_streaming_request(

async fn fetch_client(firehose: Chain) -> Result<FetchClient<Channel>, ClientError> {
Ok(FetchClient::new({
let execution_firehose_uri = firehose.uri_from_env()?;
build_and_connect_channel(execution_firehose_uri).await?
let uri = firehose.uri_from_env()?;
build_and_connect_channel(uri).await?
}))
}

async fn stream_client(firehose: Chain) -> Result<StreamClient<Channel>, ClientError> {
Ok(StreamClient::new({
let execution_firehose_uri = firehose.uri_from_env()?;
build_and_connect_channel(execution_firehose_uri).await?
let uri = firehose.uri_from_env()?;
build_and_connect_channel(uri).await?
}))
}

Expand Down
3 changes: 2 additions & 1 deletion crates/forrestrie/examples/single_execution_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ async fn main() {
let num_blocks = SLOTS_PER_HISTORICAL_ROOT as u64;
let mut stream = beacon_client
.stream_beacon_with_retry(era * SLOTS_PER_HISTORICAL_ROOT as u64, num_blocks)
.await;
.await
.unwrap();
let mut block_roots: Vec<Hash256> = Vec::with_capacity(8192);
while let Some(block) = stream.next().await {
let root = BlockRoot::try_from(block).unwrap();
Expand Down
Loading