From ed91b17bc1ab0c3cb875494c434127fccb9c8593 Mon Sep 17 00:00:00 2001 From: Marko Petrlic Date: Fri, 29 Nov 2024 12:39:56 +0100 Subject: [PATCH] changes --- avail-rust/src/error.rs | 2 ++ avail-rust/src/utils.rs | 42 +++++++++++++++++++++++++++++++---------- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/avail-rust/src/error.rs b/avail-rust/src/error.rs index 9ace4340d..5fb9362e0 100644 --- a/avail-rust/src/error.rs +++ b/avail-rust/src/error.rs @@ -7,6 +7,7 @@ use crate::transactions::TransactionFailed; #[derive(Debug)] pub enum ClientError { Custom(String), + TransactionDropped, BlockStream(String), Subxt(subxt::Error), SubxtSigner(SecretUriError), @@ -17,6 +18,7 @@ impl ClientError { pub fn to_string(&self) -> String { match self { ClientError::Custom(e) => e.clone(), + ClientError::TransactionDropped => String::from("Transaction Dropped"), ClientError::BlockStream(e) => e.clone(), ClientError::Subxt(e) => e.to_string(), ClientError::SubxtSigner(e) => e.to_string(), diff --git a/avail-rust/src/utils.rs b/avail-rust/src/utils.rs index 3f0b7fa14..ba8f3d085 100644 --- a/avail-rust/src/utils.rs +++ b/avail-rust/src/utils.rs @@ -90,22 +90,40 @@ pub async fn watch_transaction( ) -> Result { let mut block_hash; let mut block_number; - let mut tx_details = None; + let tx_details; - let mut stream = if wait_for == WaitFor::BlockInclusion { - online_client.blocks().subscribe_all().await? - } else { - online_client.blocks().subscribe_finalized().await? + let create_stream = || async { + if wait_for == WaitFor::BlockInclusion { + online_client.blocks().subscribe_all().await + } else { + online_client.blocks().subscribe_finalized().await + } }; + let mut stream = create_stream().await?; + let mut stream_failed_count = 0; + let mut current_block_number: Option = None; let mut timeout_block_number: Option = None; loop { let Some(block) = stream.next().await else { - let error = - String::from("Failed to get next block from the stream. Client might be offline"); - return Err(ClientError::BlockStream(error)); + if stream_failed_count > 3 { + let error = String::from( + "Critical Error: Failed to get next block from the stream. Aborting.", + ); + return Err(ClientError::BlockStream(error)); + } else { + let warning = String::from( + "Warning: Failed to get next block from the stream. Creating new stream.", + ); + println!("{}", warning); + + stream_failed_count += 1; + stream = create_stream().await?; + + continue; + } }; let block = block?; @@ -129,12 +147,16 @@ pub async fn watch_transaction( timeout_block_number = Some(block_number + block_timeout); } if timeout_block_number.is_some_and(|timeout| block_number >= timeout) { - break; + let error = + "Transaction not found. It might have been dropped and thus it was never executed"; + return Err(ClientError::from(error)); } } let Some(tx_details) = tx_details else { - return Err(ClientError::Custom("a".into()).into()); + // TODO + let error = "If we are here then God/Gods/NoGod/MaybeGod help us"; + return Err(ClientError::from(error)); }; let events = tx_details.events().await?;