Skip to content

Commit

Permalink
Check if relayer funds are enough to process submitted transaction (e…
Browse files Browse the repository at this point in the history
…stimates). (#53)

* Check if realyer funds are enough to process submitted transaction
(estimates).

* Code review changes.

* Code review changes.
  • Loading branch information
piohei authored Sep 5, 2024
1 parent 49dd452 commit e67fa99
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/
target/
.env
.env
25 changes: 25 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::instrument;
use crate::broadcast_utils::gas_estimation::FeesEstimate;
use crate::config::DatabaseConfig;
use crate::types::wrappers::h256::H256Wrapper;
use crate::types::wrappers::hex_u256::HexU256;
use crate::types::{
NetworkInfo, RelayerInfo, RelayerUpdate, TransactionPriority, TxStatus,
};
Expand Down Expand Up @@ -271,6 +272,30 @@ impl Database {
Ok(tx_count as usize)
}

#[instrument(skip(self), level = "debug")]
pub async fn get_relayer_pending_txs_gas_limit_sum(
&self,
relayer_id: &str,
) -> eyre::Result<U256> {
let gas_limits: Vec<(HexU256,)> = sqlx::query_as(
r#"
SELECT t.gas_limit
FROM transactions t
LEFT JOIN sent_transactions s ON (t.id = s.tx_id)
WHERE t.relayer_id = $1
AND (s.tx_id IS NULL OR s.status = $2)
"#,
)
.bind(relayer_id)
.bind(TxStatus::Pending)
.fetch_all(&self.pool)
.await?;

Ok(gas_limits
.into_iter()
.fold(U256::zero(), |acc, (v,)| acc + v.0))
}

#[instrument(skip(self), level = "debug")]
pub async fn create_transaction(
&self,
Expand Down
34 changes: 34 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::SocketAddr;
use std::sync::Arc;

use ethers::middleware::Middleware;
use ethers::providers::{Http, Provider};
use ethers::signers::Signer;
use eyre::ContextCompat;
use poem::http::StatusCode;
Expand Down Expand Up @@ -251,6 +253,38 @@ impl RelayerApi {
));
}

let relayer_queued_tx_gas_limit_sum = app
.db
.get_relayer_pending_txs_gas_limit_sum(api_token.relayer_id())
.await?;

let block_fees = app
.db
.get_latest_block_fees_by_chain_id(relayer.chain_id)
.await?;
if let Some(block_fees) = block_fees {
let gas_limit = relayer_queued_tx_gas_limit_sum + req.gas_limit.0;
let estimated_transactions_cost = block_fees.gas_price * gas_limit;

// TODO: Cache?
let http_provider: Provider<Http> =
app.http_provider(relayer.chain_id).await?;

let balance = http_provider
.get_balance(relayer.address.0, None)
.await
.map_err(|err| {
eyre::eyre!("Error checking balance: {}", err)
})?;

if balance < estimated_transactions_cost {
return Err(poem::error::Error::from_string(
"Relayer funds are insufficient for transaction to be mined.".to_string(),
StatusCode::UNPROCESSABLE_ENTITY,
));
}
}

let res = app
.db
.create_transaction(
Expand Down
16 changes: 14 additions & 2 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::task_runner::TaskRunner;
use crate::tasks;

pub struct Service {
_app: Arc<App>,
app: Arc<App>,
local_addr: SocketAddr,
server_handle: JoinHandle<eyre::Result<()>>,
}
Expand Down Expand Up @@ -50,7 +50,7 @@ impl Service {
initialize_predefined_values(&app).await?;

Ok(Self {
_app: app,
app,
local_addr,
server_handle,
})
Expand Down Expand Up @@ -82,6 +82,18 @@ impl Service {

Ok(())
}

pub async fn are_estimates_ready_for_chain(&self, chain_id: u64) -> bool {
let res = self
.app
.db
.get_latest_block_fees_by_chain_id(chain_id)
.await;
match res {
Ok(res) => res.is_some(),
Err(_) => false,
}
}
}

async fn initialize_predefined_values(
Expand Down
11 changes: 5 additions & 6 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,15 @@ pub async fn backfill_to_block(
rpc: &Provider<Http>,
latest_block: Block<H256>,
) -> eyre::Result<()> {
// Get the latest block from the db
let Some(latest_db_block_number) =
let next_block_number: u64 = if let Some(latest_db_block_number) =
app.db.get_latest_block_number(chain_id).await?
else {
{
latest_db_block_number + 1
} else {
tracing::info!(chain_id, "No latest block");
return Ok(());
0
};

let next_block_number: u64 = latest_db_block_number + 1;

// Get the first block from the stream and backfill any missing blocks
let latest_block_number = latest_block
.number
Expand Down
16 changes: 16 additions & 0 deletions tests/common/service_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,22 @@ impl ServiceBuilder {
let client =
TxSitterClient::new(format!("http://{}", service.local_addr()));

// Awaits for estimates to be ready
let mut are_estimates_ready = false;
for _ in 0..30 {
if service
.are_estimates_ready_for_chain(DEFAULT_ANVIL_CHAIN_ID)
.await
{
are_estimates_ready = true;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
if !are_estimates_ready {
eyre::bail!("Estimates were not ready!");
}

Ok((service, client))
}
}
2 changes: 1 addition & 1 deletion tests/escalation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
use crate::common::prelude::*;

const ESCALATION_INTERVAL: Duration = Duration::from_secs(2);
const ANVIL_BLOCK_TIME: u64 = 6;
const ANVIL_BLOCK_TIME: u64 = 10;

#[tokio::test]
async fn escalation() -> eyre::Result<()> {
Expand Down
24 changes: 22 additions & 2 deletions tests/send_too_many_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,28 @@ async fn send_too_many_txs() -> eyre::Result<()> {
})
.await?;

let provider = setup_provider(anvil.endpoint()).await?;
let init_value: U256 = parse_units("1", "ether")?.into();

// Send some funds to created relayer
client
.send_tx(
&api_key,
&SendTxRequest {
to: secondary_relayer_address.clone(),
value: init_value.into(),
data: None,
gas_limit: U256::from(21_000).into(),
priority: TransactionPriority::Regular,
tx_id: None,
blobs: None,
},
)
.await?;

tracing::info!("Waiting for secondary relayer initial balance");
await_balance(&provider, init_value, secondary_relayer_address.0).await?;

let CreateApiKeyResponse {
api_key: secondary_api_key,
} = client.create_relayer_api_key(&secondary_relayer_id).await?;
Expand All @@ -42,8 +64,6 @@ async fn send_too_many_txs() -> eyre::Result<()> {
)
.await?;

let provider = setup_provider(anvil.endpoint()).await?;

// Send a transaction
let value: U256 = parse_units("0.01", "ether")?.into();

Expand Down
54 changes: 54 additions & 0 deletions tests/send_when_insufficient_funds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
mod common;

use tx_sitter::client::ClientError;

use crate::common::prelude::*;

const ESCALATION_INTERVAL: Duration = Duration::from_secs(2);
const ANVIL_BLOCK_TIME: u64 = 6;

#[tokio::test]
async fn send_when_insufficient_funds() -> eyre::Result<()> {
setup_tracing();

let (db_url, _db_container) = setup_db().await?;
let anvil = AnvilBuilder::default()
.block_time(ANVIL_BLOCK_TIME)
.spawn()
.await?;

let (_service, client) = ServiceBuilder::default()
.escalation_interval(ESCALATION_INTERVAL)
.build(&anvil, &db_url)
.await?;

let CreateApiKeyResponse { api_key } =
client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?;

// Send a transaction
let value: U256 = parse_units("1", "ether")?.into();
for _ in 0..10 {
let tx = client
.send_tx(
&api_key,
&SendTxRequest {
to: ARBITRARY_ADDRESS.into(),
value: value.into(),
gas_limit: U256::from_dec_str("1000000000000")?.into(),
..Default::default()
},
)
.await;

if let Err(ClientError::TxSitter(status_code, message)) = tx {
assert_eq!(status_code, reqwest::StatusCode::UNPROCESSABLE_ENTITY);
assert_eq!(
message,
"Relayer funds are insufficient for transaction to be mined."
);
return Ok(());
}
}

eyre::bail!("Should return error response with information about insufficient funds.")
}

0 comments on commit e67fa99

Please sign in to comment.