Skip to content

Commit

Permalink
Merge pull request #43 from EspressoSystems/fix/unwraps
Browse files Browse the repository at this point in the history
Remove some unwraps, including one which panicked in production
  • Loading branch information
jbearer authored Oct 9, 2023
2 parents 1632675 + 927545e commit 8e29223
Showing 1 changed file with 41 additions and 29 deletions.
70 changes: 41 additions & 29 deletions src/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
// along with the Discord Faucet library. If not, see <https://mit-license.org/>.

use anyhow::{Error, Result};
use async_std::{channel::Receiver, sync::RwLock, task::JoinHandle};
use async_std::{
channel::Receiver,
sync::{RwLock, RwLockUpgradableReadGuard},
task::{sleep, JoinHandle},
};
use clap::Parser;
use ethers::{
prelude::SignerMiddleware,
Expand Down Expand Up @@ -467,17 +471,12 @@ impl Faucet {
async fn handle_non_faucet_transfer(&self, receipt: &TransactionReceipt) -> Result<()> {
tracing::debug!("Handling external incoming transfer to {:?}", receipt.to);
if let Some(receiver) = receipt.to {
if self
.state
.read()
.await
.clients_being_funded
.contains_key(&receiver)
{
let state = self.state.upgradable_read().await;
if state.clients_being_funded.contains_key(&receiver) {
let balance = self.balance(receiver).await?;
if balance >= self.config.min_funding_balance() {
tracing::info!("Funded client {:?} with external transfer", receiver);
let mut state = self.state.write().await;
let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
if let Some(transfer_index) =
state.transfer_queue.iter().position(|r| r.to() == receiver)
{
Expand Down Expand Up @@ -511,7 +510,10 @@ impl Faucet {

// Only continue if there's an inflight transfer or the recipient is a client being funded.
let is_relevant = inflight.is_some()
|| (tx.to.is_some() && state.clients_being_funded.contains_key(&tx.to.unwrap()));
|| tx
.to
.as_ref()
.is_some_and(|to| state.clients_being_funded.contains_key(to));

drop(state);

Expand All @@ -530,13 +532,12 @@ impl Faucet {

tracing::debug!("Got receipt {:?}", receipt);

if inflight.is_none() {
return self.handle_non_faucet_transfer(&receipt).await;
}

let Transfer {
let Some(Transfer {
sender, request, ..
} = inflight.unwrap();
}) = inflight
else {
return self.handle_non_faucet_transfer(&receipt).await;
};

tracing::info!("Received receipt for {request:?}");
// Do all external calls before state modifications
Expand Down Expand Up @@ -599,18 +600,29 @@ impl Faucet {
async fn monitor_transactions(&self) -> Result<()> {
loop {
let mut stream = match &self.ws_provider {
Some(provider) => provider
.subscribe_blocks()
.await
.unwrap()
.filter_map(|block| async move {
if block.hash.is_none() {
tracing::warn!("Received block without hash, ignoring: {block:?}");
}
block.hash
})
.boxed(),
None => self.provider.watch_blocks().await.unwrap().boxed(),
Some(provider) => match provider.subscribe_blocks().await {
Ok(stream) => stream
.filter_map(|block| async move {
if block.hash.is_none() {
tracing::warn!("Received block without hash, ignoring: {block:?}");
}
block.hash
})
.boxed(),
Err(err) => {
tracing::error!("Error reconnecting to block stream: {err}");
sleep(Duration::from_secs(1)).await;
continue;
}
},
None => match self.provider.watch_blocks().await {
Ok(stream) => stream.boxed(),
Err(err) => {
tracing::error!("Error reconnecting to block stream: {err}");
sleep(Duration::from_secs(1)).await;
continue;
}
},
};

self.state.write().await.monitoring_started = true;
Expand Down Expand Up @@ -641,7 +653,7 @@ impl Faucet {
// If we get here, the subscription was closed. This happens for example
// if the RPC server is restarted.
tracing::warn!("Block subscription closed, will restart ...");
async_std::task::sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
}
}

Expand Down

0 comments on commit 8e29223

Please sign in to comment.