Skip to content

Commit

Permalink
chore(firehose-client): rework streaming experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
suchapalaver committed Oct 18, 2024
1 parent cab7820 commit 4f3b648
Showing 1 changed file with 41 additions and 42 deletions.
83 changes: 41 additions & 42 deletions crates/firehose-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tonic::{
Response, Status,
};

#[derive(Clone)]
pub struct FirehoseClient {
chain: Chain,
fetch_client: Option<FetchClient<Channel>>,
Expand Down Expand Up @@ -49,15 +50,17 @@ impl FirehoseClient {
Ok(self.fetch_client.as_mut().unwrap().block(request).await)
}

/// The tonic docs encourage cloning the client.
pub async fn get_streaming_client(&mut self) -> Result<StreamClient<Channel>, ClientError> {
let client = if let Some(client) = self.stream_client.clone() {
client
} else {
pub async fn get_streaming_response(
&mut self,
request: tonic::Request<Request>,
) -> Result<Response<tonic::Streaming<sf_protos::firehose::v2::Response>>, ClientError> {
if self.stream_client.is_none() {
self.stream_client = Some(stream_client(self.chain).await?);
self.stream_client.clone().unwrap()
};
Ok(client)
}
let mut request = request;
request.insert_api_key_if_provided(self.chain);
// Unwrap here because we know `self.stream_client.is_some()`
Ok(self.stream_client.as_mut().unwrap().blocks(request).await?)
}

/// Stream a block range of Beacon blocks, with a retry mechanism if the stream cuts off
Expand All @@ -69,32 +72,27 @@ impl FirehoseClient {
) -> impl futures::Stream<Item = FirehoseBeaconBlock> {
let (tx, rx) = tokio::sync::mpsc::channel::<FirehoseBeaconBlock>(8192);

let client = self
.get_streaming_client()
.await
.map_err(|e| {
panic!("Failed to get streaming client: {:?}", e);
})
.unwrap();
let mut client = self.clone();

tokio::spawn(async move {
let mut blocks = 0;
let mut last_valid_slot: Option<u64> = None;
let mut last_valid_block: Option<FirehoseBeaconBlock> = None;

while blocks < total {
let mut client = client.clone();
let request = create_blocks_streaming_request(
Chain::Ethereum,
Chain::Beacon,
start + blocks,
start + total - 1,
BlocksRequested::All,
);
match client.blocks(request).await {

match client.get_streaming_response(request).await {
Ok(response) => {
let mut stream_inner = response.into_inner();

while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 {
if blocks % 100 == 0 && blocks > 0 {
println!("Blocks fetched: {}", blocks);
}
match FirehoseBeaconBlock::try_from(block_msg) {
Expand Down Expand Up @@ -127,10 +125,10 @@ impl FirehoseClient {
}
}
Err(e) => {
eprint!("Failed to get blocks stream: {:?}", e.code());
eprint!("Failed to get blocks stream: {e}");
break;
}
};
}
}
});

Expand All @@ -144,40 +142,41 @@ impl FirehoseClient {
) -> impl futures::Stream<Item = FirehoseEthBlock> {
let (tx, rx) = tokio::sync::mpsc::channel::<FirehoseEthBlock>(8192);

let client = self
.get_streaming_client()
.await
.map_err(|e| {
panic!("Failed to get streaming client: {:?}", e);
})
.unwrap();
let mut client = self.clone();

tokio::spawn(async move {
let mut blocks = 0;

while blocks < total {
let mut client = client.clone();
let request = create_blocks_streaming_request(
Chain::Ethereum,
start + blocks,
start + total - 1,
BlocksRequested::All,
);
let response = client.blocks(request).await.unwrap();
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 && blocks != 0 {
println!("Blocks fetched: {}", blocks);
}
match FirehoseEthBlock::try_from(block_msg) {
Ok(block) => {
blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(_) => {
panic!("Failed to convert block message to block");

match client.get_streaming_response(request).await {
Ok(response) => {
let mut stream_inner = response.into_inner();
while let Ok(Some(block_msg)) = stream_inner.message().await {
if blocks % 100 == 0 && blocks > 0 {
println!("Blocks fetched: {}", blocks);
}
match FirehoseEthBlock::try_from(block_msg) {
Ok(block) => {
blocks += 1;
tx.clone().send(block).await.unwrap();
}
Err(_) => {
panic!("Failed to convert block message to block");
}
}
}
}
Err(e) => {
eprint!("Failed to get blocks stream: {e}");
break;
}
}
}
});
Expand Down

0 comments on commit 4f3b648

Please sign in to comment.