Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
markopoloparadox committed Nov 29, 2024
1 parent acad145 commit ed91b17
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
2 changes: 2 additions & 0 deletions avail-rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::transactions::TransactionFailed;
#[derive(Debug)]
pub enum ClientError {
Custom(String),
TransactionDropped,
BlockStream(String),
Subxt(subxt::Error),
SubxtSigner(SecretUriError),
Expand All @@ -17,6 +18,7 @@ impl ClientError {
pub fn to_string(&self) -> String {
match self {
ClientError::Custom(e) => e.clone(),
ClientError::TransactionDropped => String::from("Transaction Dropped"),
ClientError::BlockStream(e) => e.clone(),
ClientError::Subxt(e) => e.to_string(),
ClientError::SubxtSigner(e) => e.to_string(),
Expand Down
42 changes: 32 additions & 10 deletions avail-rust/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,40 @@ pub async fn watch_transaction(
) -> Result<TransactionDetails, ClientError> {
let mut block_hash;
let mut block_number;
let mut tx_details = None;
let tx_details;

let mut stream = if wait_for == WaitFor::BlockInclusion {
online_client.blocks().subscribe_all().await?
} else {
online_client.blocks().subscribe_finalized().await?
let create_stream = || async {
if wait_for == WaitFor::BlockInclusion {
online_client.blocks().subscribe_all().await
} else {
online_client.blocks().subscribe_finalized().await
}
};

let mut stream = create_stream().await?;
let mut stream_failed_count = 0;

let mut current_block_number: Option<u32> = None;
let mut timeout_block_number: Option<u32> = None;

loop {
let Some(block) = stream.next().await else {
let error =
String::from("Failed to get next block from the stream. Client might be offline");
return Err(ClientError::BlockStream(error));
if stream_failed_count > 3 {
let error = String::from(
"Critical Error: Failed to get next block from the stream. Aborting.",
);
return Err(ClientError::BlockStream(error));
} else {
let warning = String::from(
"Warning: Failed to get next block from the stream. Creating new stream.",
);
println!("{}", warning);

stream_failed_count += 1;
stream = create_stream().await?;

continue;
}
};

let block = block?;
Expand All @@ -129,12 +147,16 @@ pub async fn watch_transaction(
timeout_block_number = Some(block_number + block_timeout);
}
if timeout_block_number.is_some_and(|timeout| block_number >= timeout) {
break;
let error =
"Transaction not found. It might have been dropped and thus it was never executed";
return Err(ClientError::from(error));
}
}

let Some(tx_details) = tx_details else {
return Err(ClientError::Custom("a".into()).into());
// TODO
let error = "If we are here then God/Gods/NoGod/MaybeGod help us";
return Err(ClientError::from(error));
};

let events = tx_details.events().await?;
Expand Down

0 comments on commit ed91b17

Please sign in to comment.