diff --git a/Cargo.lock b/Cargo.lock index 68fd11b..698e24e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1705,43 +1705,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "fake-rpc" -version = "0.1.0" -dependencies = [ - "async-trait", - "axum", - "chrono", - "clap", - "config", - "dotenv", - "ethers", - "ethers-signers", - "eyre", - "futures", - "headers", - "hex", - "hex-literal", - "humantime", - "humantime-serde", - "hyper", - "rand", - "reqwest", - "serde", - "serde_json", - "sha3", - "spki", - "sqlx", - "strum", - "thiserror", - "tokio", - "toml 0.8.8", - "tower-http", - "tracing", - "tracing-subscriber", - "uuid 0.8.2", -] - [[package]] name = "fastrand" version = "2.0.1" @@ -5040,7 +5003,6 @@ dependencies = [ "dotenv", "ethers", "eyre", - "fake-rpc", "futures", "headers", "hex", diff --git a/Cargo.toml b/Cargo.toml index 192f45c..f9f6b47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,6 @@ url = "2.4.1" uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] -fake-rpc = { path = "crates/fake-rpc" } indoc = "2.0.3" test-case = "3.1.0" diff --git a/crates/fake-rpc/Cargo.toml b/crates/fake-rpc/Cargo.toml deleted file mode 100644 index da94d7a..0000000 --- a/crates/fake-rpc/Cargo.toml +++ /dev/null @@ -1,52 +0,0 @@ -[package] -name = "fake-rpc" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -serde = "1.0.136" -axum = { version = "0.6.20", features = ["headers"] } -thiserror = "1.0.50" -headers = "0.3.9" -humantime = "2.1.0" -humantime-serde = "1.1.1" -hyper = "0.14.27" -dotenv = "0.15.0" -clap = { version = "4.3.0", features = ["env", "derive"] } -ethers = { version = "2.0.11" } -ethers-signers = { version = "2.0.11" } -eyre = "0.6.5" -hex = "0.4.3" -hex-literal = "0.4.1" -reqwest = { version = "0.11.13", default-features = false, features = [ - "rustls-tls", -] } -serde_json = "1.0.91" -strum = { version = "0.25.0", features = ["derive"] } -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", default-features = false, features = [ - "env-filter", - "std", - "fmt", - "json", - "ansi", -] } -tower-http = { version = "0.4.4", features = ["trace"] } -uuid = { version = "0.8", features = ["v4"] } -futures = "0.3" -chrono = "0.4" -rand = "0.8.5" -sha3 = "0.10.8" -config = "0.13.3" -toml = "0.8.8" -sqlx = { version = "0.7.2", features = [ - "runtime-tokio", - "tls-rustls", - "postgres", - "migrate", -] } -spki = "0.7.2" -async-trait = "0.1.74" diff --git a/crates/fake-rpc/src/lib.rs b/crates/fake-rpc/src/lib.rs deleted file mode 100644 index 81c334e..0000000 --- a/crates/fake-rpc/src/lib.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::net::{Ipv4Addr, SocketAddr}; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; -use std::time::Duration; - -use axum::extract::State; -use axum::routing::{post, IntoMakeService}; -use axum::{Json, Router}; -use ethers::utils::{Anvil, AnvilInstance}; -use hyper::server::conn::AddrIncoming; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use tokio::sync::Mutex; - -pub const BLOCK_TIME_SECONDS: u64 = 2; - -pub struct DoubleAnvil { - main_anvil: Mutex, - reference_anvil: Mutex, - held_back_txs: Mutex>, - - auto_advance: AtomicBool, -} - -impl DoubleAnvil { - pub async fn drop_txs(&self) -> eyre::Result<()> { - let mut held_back_txs = self.held_back_txs.lock().await; - held_back_txs.clear(); - Ok(()) - } - - pub async fn advance(&self) -> eyre::Result<()> { - let mut held_back_txs = self.held_back_txs.lock().await; - - for req in held_back_txs.drain(..) { - tracing::info!(?req, "eth_sendRawTransaction"); - - let response = reqwest::Client::new() - .post(&self.main_anvil.lock().await.endpoint()) - .json(&req) - .send() - .await - .unwrap(); - - let resp = response.json::().await.unwrap(); - - tracing::info!(?resp, "eth_sendRawTransaction.response"); - } - - Ok(()) - } - - pub fn set_auto_advance(&self, auto_advance: bool) { - self.auto_advance - .store(auto_advance, std::sync::atomic::Ordering::SeqCst); - } - - pub async fn ws_endpoint(&self) -> String { - self.main_anvil.lock().await.ws_endpoint() - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -struct JsonRpcReq { - pub id: u64, - pub jsonrpc: String, - pub method: String, - #[serde(default)] - pub params: Value, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct JsonRpcResponse { - pub id: u64, - pub jsonrpc: String, - pub result: Value, -} - -async fn advance(State(anvil): State>) { - anvil.advance().await.unwrap(); -} - -async fn rpc( - State(anvil): State>, - Json(req): Json, -) -> Json { - let method = req.method.as_str(); - let anvil_instance = match method { - "eth_sendRawTransaction" => { - anvil.held_back_txs.lock().await.push(req.clone()); - - if anvil.auto_advance.load(std::sync::atomic::Ordering::SeqCst) { - anvil.advance().await.unwrap(); - } - - anvil.reference_anvil.lock().await - } - "eth_getTransactionReceipt" => anvil.main_anvil.lock().await, - "eth_getTransactionByHash" => anvil.reference_anvil.lock().await, - _ => anvil.main_anvil.lock().await, - }; - - tracing::info!(?req, "{}", method); - - let response = reqwest::Client::new() - .post(&anvil_instance.endpoint()) - .json(&req) - .send() - .await - .unwrap(); - - let resp = response.json::().await.unwrap(); - - tracing::info!(?resp, "{}.response", method); - - Json(resp) -} - -pub async fn serve( - port: u16, -) -> ( - Arc, - axum::Server>, -) { - let main_anvil = Anvil::new().block_time(BLOCK_TIME_SECONDS).spawn(); - let reference_anvil = Anvil::new().block_time(BLOCK_TIME_SECONDS).spawn(); - - tokio::time::sleep(Duration::from_secs(BLOCK_TIME_SECONDS)).await; - - tracing::info!("Main anvil instance: {}", main_anvil.endpoint()); - tracing::info!("Reference anvil instance: {}", reference_anvil.endpoint()); - - let state = Arc::new(DoubleAnvil { - main_anvil: Mutex::new(main_anvil), - reference_anvil: Mutex::new(reference_anvil), - held_back_txs: Mutex::new(Vec::new()), - auto_advance: AtomicBool::new(true), - }); - - let router = Router::new() - .route("/", post(rpc)) - .route("/advance", post(advance)) - .with_state(state.clone()) - .layer(tower_http::trace::TraceLayer::new_for_http()); - - let host = Ipv4Addr::new(127, 0, 0, 1); - let socket_addr = SocketAddr::new(host.into(), port); - - let server = - axum::Server::bind(&socket_addr).serve(router.into_make_service()); - - (state, server) -} diff --git a/crates/fake-rpc/src/main.rs b/crates/fake-rpc/src/main.rs deleted file mode 100644 index a20b425..0000000 --- a/crates/fake-rpc/src/main.rs +++ /dev/null @@ -1,30 +0,0 @@ -use clap::Parser; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::EnvFilter; - -#[derive(Debug, Clone, Parser)] -struct Args { - #[clap(short, long, default_value = "8545")] - port: u16, -} - -#[tokio::main] -async fn main() -> eyre::Result<()> { - dotenv::dotenv().ok(); - - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().pretty().compact()) - .with(EnvFilter::from_default_env()) - .init(); - - let args = Args::parse(); - - let (_app, server) = fake_rpc::serve(args.port).await; - - tracing::info!("Serving fake RPC at {}", server.local_addr()); - - server.await?; - - Ok(()) -} diff --git a/src/broadcast_utils.rs b/src/broadcast_utils.rs index a0182d4..35c9318 100644 --- a/src/broadcast_utils.rs +++ b/src/broadcast_utils.rs @@ -1,4 +1,4 @@ -use ethers::types::{Eip1559TransactionRequest, U256}; +use ethers::types::U256; use eyre::ContextCompat; use self::gas_estimation::FeesEstimate; @@ -19,35 +19,6 @@ pub fn calculate_gas_fees_from_estimates( (max_fee_per_gas, max_priority_fee_per_gas) } -pub fn escalate_priority_fee( - max_base_fee_per_gas: U256, - max_network_fee_per_gas: U256, - current_max_priority_fee_per_gas: U256, - escalation_count: usize, - tx: &mut Eip1559TransactionRequest, -) { - // Min increase of 20% on the priority fee required for a replacement tx - let increased_gas_price_percentage = - U256::from(100 + (10 * (1 + escalation_count))); - - let factor = U256::from(100); - - let new_max_priority_fee_per_gas = current_max_priority_fee_per_gas - * increased_gas_price_percentage - / factor; - - let new_max_priority_fee_per_gas = - std::cmp::min(new_max_priority_fee_per_gas, max_network_fee_per_gas); - - let new_max_fee_per_gas = - max_base_fee_per_gas + new_max_priority_fee_per_gas; - let new_max_fee_per_gas = - std::cmp::min(new_max_fee_per_gas, max_network_fee_per_gas); - - tx.max_fee_per_gas = Some(new_max_fee_per_gas); - tx.max_priority_fee_per_gas = Some(new_max_priority_fee_per_gas); -} - pub async fn should_send_transaction( app: &App, relayer_id: &str, diff --git a/src/client.rs b/src/client.rs index b479e5d..49aa7d1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,7 +5,9 @@ use crate::server::routes::network::NewNetworkInfo; use crate::server::routes::relayer::{ CreateApiKeyResponse, CreateRelayerRequest, CreateRelayerResponse, }; -use crate::server::routes::transaction::{SendTxRequest, SendTxResponse}; +use crate::server::routes::transaction::{ + GetTxResponse, SendTxRequest, SendTxResponse, +}; pub struct TxSitterClient { client: reqwest::Client, @@ -43,6 +45,17 @@ impl TxSitterClient { Ok(response.json().await?) } + async fn json_get(&self, url: &str) -> eyre::Result + where + R: serde::de::DeserializeOwned, + { + let response = self.client.get(url).send().await?; + + let response = Self::validate_response(response).await?; + + Ok(response.json().await?) + } + async fn validate_response(response: Response) -> eyre::Result { if !response.status().is_success() { let body = response.text().await?; @@ -77,6 +90,20 @@ impl TxSitterClient { .await } + pub async fn get_tx( + &self, + api_key: &ApiKey, + tx_id: &str, + ) -> eyre::Result { + self.json_get(&format!( + "{}/1/api/{api_key}/tx/{tx_id}", + self.url, + api_key = api_key, + tx_id = tx_id + )) + .await + } + pub async fn create_network( &self, chain_id: u64, diff --git a/src/config.rs b/src/config.rs index 7e1e534..2869c1e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,6 +18,12 @@ pub struct TxSitterConfig { #[serde(with = "humantime_serde")] pub escalation_interval: Duration, + #[serde(with = "humantime_serde", default = "default_soft_reorg_interval")] + pub soft_reorg_interval: Duration, + + #[serde(with = "humantime_serde", default = "default_hard_reorg_interval")] + pub hard_reorg_interval: Duration, + #[serde(default)] pub datadog_enabled: bool, @@ -28,6 +34,14 @@ pub struct TxSitterConfig { pub predefined: Option, } +const fn default_soft_reorg_interval() -> Duration { + Duration::from_secs(60) +} + +const fn default_hard_reorg_interval() -> Duration { + Duration::from_secs(60 * 60) +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct Predefined { @@ -148,6 +162,8 @@ mod tests { const WITH_DB_CONNECTION_STRING: &str = indoc! {r#" [service] escalation_interval = "1h" + soft_reorg_interval = "1m" + hard_reorg_interval = "1h" datadog_enabled = false statsd_enabled = false @@ -165,6 +181,8 @@ mod tests { const WITH_DB_PARTS: &str = indoc! {r#" [service] escalation_interval = "1h" + soft_reorg_interval = "1m" + hard_reorg_interval = "1h" datadog_enabled = false statsd_enabled = false @@ -188,6 +206,8 @@ mod tests { let config = Config { service: TxSitterConfig { escalation_interval: Duration::from_secs(60 * 60), + soft_reorg_interval: default_soft_reorg_interval(), + hard_reorg_interval: default_hard_reorg_interval(), datadog_enabled: false, statsd_enabled: false, predefined: None, @@ -214,6 +234,8 @@ mod tests { let config = Config { service: TxSitterConfig { escalation_interval: Duration::from_secs(60 * 60), + soft_reorg_interval: default_soft_reorg_interval(), + hard_reorg_interval: default_hard_reorg_interval(), datadog_enabled: false, statsd_enabled: false, predefined: None, diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 76d5d3c..3a8c51f 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -39,7 +39,7 @@ pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { // Min increase of 20% on the priority fee required for a replacement tx let factor = U256::from(100); let increased_gas_price_percentage = - factor + U256::from(10 * (1 + escalation)); + factor + U256::from(20 * (1 + escalation)); let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0 * increased_gas_price_percentage @@ -51,6 +51,13 @@ pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { let max_priority_fee_per_gas = max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; + tracing::warn!( + "Initial tx fees are max = {}, priority = {}", + tx.initial_max_fee_per_gas.0, + tx.initial_max_priority_fee_per_gas.0 + ); + tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}"); + let eip1559_tx = Eip1559TransactionRequest { from: None, to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), diff --git a/src/tasks/handle_reorgs.rs b/src/tasks/handle_reorgs.rs index 43c5f56..7b9a12d 100644 --- a/src/tasks/handle_reorgs.rs +++ b/src/tasks/handle_reorgs.rs @@ -1,12 +1,7 @@ use std::sync::Arc; -use std::time::Duration; use crate::app::App; -// TODO: Make this configurable -const TIME_BETWEEN_HARD_REORGS_SECONDS: i64 = 60 * 60; // Once every hour -const TIME_BETWEEN_SOFT_REORGS_SECONDS: i64 = 60; // Once every minute - pub async fn handle_hard_reorgs(app: Arc) -> eyre::Result<()> { loop { tracing::info!("Handling hard reorgs"); @@ -17,10 +12,7 @@ pub async fn handle_hard_reorgs(app: Arc) -> eyre::Result<()> { tracing::info!(id = tx, "Tx hard reorged"); } - tokio::time::sleep(Duration::from_secs( - TIME_BETWEEN_HARD_REORGS_SECONDS as u64, - )) - .await; + tokio::time::sleep(app.config.service.hard_reorg_interval).await; } } @@ -34,9 +26,6 @@ pub async fn handle_soft_reorgs(app: Arc) -> eyre::Result<()> { tracing::info!(id = tx, "Tx soft reorged"); } - tokio::time::sleep(Duration::from_secs( - TIME_BETWEEN_SOFT_REORGS_SECONDS as u64, - )) - .await; + tokio::time::sleep(app.config.service.soft_reorg_interval).await; } } diff --git a/tests/common/anvil_builder.rs b/tests/common/anvil_builder.rs new file mode 100644 index 0000000..ac1fd55 --- /dev/null +++ b/tests/common/anvil_builder.rs @@ -0,0 +1,67 @@ +use std::time::Duration; + +use ethers::providers::Middleware; +use ethers::types::{Eip1559TransactionRequest, U256}; +use ethers::utils::{Anvil, AnvilInstance}; + +use super::prelude::{ + setup_middleware, DEFAULT_ANVIL_ACCOUNT, DEFAULT_ANVIL_BLOCK_TIME, + SECONDARY_ANVIL_PRIVATE_KEY, +}; + +#[derive(Debug, Clone, Default)] +pub struct AnvilBuilder { + pub block_time: Option, + pub port: Option, +} + +impl AnvilBuilder { + pub fn block_time(mut self, block_time: u64) -> Self { + self.block_time = Some(block_time); + self + } + + pub fn port(mut self, port: u16) -> Self { + self.port = Some(port); + self + } + + pub async fn spawn(self) -> eyre::Result { + let mut anvil = Anvil::new(); + + let block_time = if let Some(block_time) = self.block_time { + block_time + } else { + DEFAULT_ANVIL_BLOCK_TIME + }; + anvil = anvil.block_time(block_time); + + if let Some(port) = self.port { + anvil = anvil.port(port); + } + + let anvil = anvil.spawn(); + + let middleware = + setup_middleware(anvil.endpoint(), SECONDARY_ANVIL_PRIVATE_KEY) + .await?; + + // Wait for the chain to start and produce at least one block + tokio::time::sleep(Duration::from_secs(block_time)).await; + + // We need to seed some transactions so we can get fee estimates on the first block + middleware + .send_transaction( + Eip1559TransactionRequest { + to: Some(DEFAULT_ANVIL_ACCOUNT.into()), + value: Some(U256::from(100u64)), + ..Default::default() + }, + None, + ) + .await? + .await?; + + Ok(anvil) + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 033ea32..3d5f121 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,41 +1,43 @@ #![allow(dead_code)] // Needed because this module is imported as module by many test crates -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; -use std::time::Duration; use ethers::core::k256::ecdsa::SigningKey; use ethers::middleware::SignerMiddleware; use ethers::providers::{Http, Middleware, Provider}; use ethers::signers::{LocalWallet, Signer}; -use ethers::types::{Address, Eip1559TransactionRequest, H160, U256}; -use fake_rpc::DoubleAnvil; +use ethers::types::{Address, H160}; use postgres_docker_utils::DockerContainerGuard; -use tokio::task::JoinHandle; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -use tx_sitter::client::TxSitterClient; -use tx_sitter::config::{ - Config, DatabaseConfig, KeysConfig, LocalKeysConfig, Predefined, - PredefinedNetwork, PredefinedRelayer, ServerConfig, TxSitterConfig, -}; -use tx_sitter::service::Service; pub type AppMiddleware = SignerMiddleware>, LocalWallet>; +mod anvil_builder; +mod service_builder; + +pub use self::anvil_builder::AnvilBuilder; +pub use self::service_builder::ServiceBuilder; + #[allow(unused_imports)] pub mod prelude { pub use std::time::Duration; + pub use ethers::prelude::{Http, Provider}; pub use ethers::providers::Middleware; - pub use ethers::types::{Eip1559TransactionRequest, U256}; + pub use ethers::types::{Eip1559TransactionRequest, H256, U256}; pub use ethers::utils::parse_units; + pub use futures::stream::FuturesUnordered; + pub use futures::StreamExt; + pub use tx_sitter::api_key::ApiKey; + pub use tx_sitter::client::TxSitterClient; pub use tx_sitter::server::routes::relayer::{ - CreateRelayerRequest, CreateRelayerResponse, + CreateApiKeyResponse, CreateRelayerRequest, CreateRelayerResponse, }; pub use tx_sitter::server::routes::transaction::SendTxRequest; + pub use url::Url; pub use super::*; } @@ -57,26 +59,10 @@ pub const ARBITRARY_ADDRESS: Address = H160(hex_literal::hex!( )); pub const DEFAULT_ANVIL_CHAIN_ID: u64 = 31337; +pub const DEFAULT_ANVIL_BLOCK_TIME: u64 = 2; pub const DEFAULT_RELAYER_ID: &str = "1b908a34-5dc1-4d2d-a146-5eb46e975830"; -pub struct DoubleAnvilHandle { - pub double_anvil: Arc, - ws_addr: String, - local_addr: SocketAddr, - server_handle: JoinHandle>, -} - -impl DoubleAnvilHandle { - pub fn local_addr(&self) -> String { - self.local_addr.to_string() - } - - pub fn ws_addr(&self) -> String { - self.ws_addr.clone() - } -} - pub fn setup_tracing() { tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().pretty().compact()) @@ -84,7 +70,7 @@ pub fn setup_tracing() { EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) // Logging from fake_rpc can get very messy so we set it to warn only - .parse_lossy("info,fake_rpc=warn"), + .parse_lossy("info,tx_sitter=debug,fake_rpc=warn"), ) .init(); } @@ -97,94 +83,6 @@ pub async fn setup_db() -> eyre::Result<(String, DockerContainerGuard)> { Ok((url, db_container)) } -pub async fn setup_double_anvil() -> eyre::Result { - let (double_anvil, server) = fake_rpc::serve(0).await; - - let local_addr = server.local_addr(); - - let server_handle = tokio::spawn(async move { - server.await?; - Ok(()) - }); - - let middleware = setup_middleware( - format!("http://{local_addr}"), - SECONDARY_ANVIL_PRIVATE_KEY, - ) - .await?; - - // We need to seed some transactions so we can get fee estimates on the first block - middleware - .send_transaction( - Eip1559TransactionRequest { - to: Some(DEFAULT_ANVIL_ACCOUNT.into()), - value: Some(U256::from(100u64)), - ..Default::default() - }, - None, - ) - .await? - .await?; - - let ws_addr = double_anvil.ws_endpoint().await; - - Ok(DoubleAnvilHandle { - double_anvil, - ws_addr, - local_addr, - server_handle, - }) -} - -pub async fn setup_service( - anvil_handle: &DoubleAnvilHandle, - db_connection_url: &str, - escalation_interval: Duration, -) -> eyre::Result<(Service, TxSitterClient)> { - let rpc_url = anvil_handle.local_addr(); - - let anvil_private_key = hex::encode(DEFAULT_ANVIL_PRIVATE_KEY); - - let config = Config { - service: TxSitterConfig { - escalation_interval, - datadog_enabled: false, - statsd_enabled: false, - predefined: Some(Predefined { - network: PredefinedNetwork { - chain_id: DEFAULT_ANVIL_CHAIN_ID, - name: "Anvil".to_string(), - http_rpc: format!("http://{}", rpc_url), - ws_rpc: anvil_handle.ws_addr(), - }, - relayer: PredefinedRelayer { - name: "Anvil".to_string(), - id: DEFAULT_RELAYER_ID.to_string(), - key_id: anvil_private_key, - chain_id: DEFAULT_ANVIL_CHAIN_ID, - }, - }), - }, - server: ServerConfig { - host: SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(127, 0, 0, 1), - 0, - )), - username: None, - password: None, - }, - database: DatabaseConfig::connection_string(db_connection_url), - keys: KeysConfig::Local(LocalKeysConfig::default()), - }; - - let service = Service::new(config).await?; - - let client = - TxSitterClient::new(format!("http://{}", service.local_addr())); - - Ok((service, client)) -} - pub async fn setup_middleware( rpc_url: impl AsRef, private_key: &[u8], diff --git a/tests/common/service_builder.rs b/tests/common/service_builder.rs new file mode 100644 index 0000000..492e538 --- /dev/null +++ b/tests/common/service_builder.rs @@ -0,0 +1,96 @@ +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::time::Duration; + +use ethers::utils::AnvilInstance; +use tx_sitter::client::TxSitterClient; +use tx_sitter::config::{ + Config, DatabaseConfig, KeysConfig, LocalKeysConfig, Predefined, + PredefinedNetwork, PredefinedRelayer, ServerConfig, TxSitterConfig, +}; +use tx_sitter::service::Service; + +use super::prelude::{ + DEFAULT_ANVIL_CHAIN_ID, DEFAULT_ANVIL_PRIVATE_KEY, DEFAULT_RELAYER_ID, +}; + +pub struct ServiceBuilder { + escalation_interval: Duration, + soft_reorg_interval: Duration, + hard_reorg_interval: Duration, +} + +impl Default for ServiceBuilder { + fn default() -> Self { + Self { + escalation_interval: Duration::from_secs(30), + soft_reorg_interval: Duration::from_secs(45), + hard_reorg_interval: Duration::from_secs(60), + } + } +} + +impl ServiceBuilder { + pub fn escalation_interval(mut self, interval: Duration) -> Self { + self.escalation_interval = interval; + self + } + + pub fn soft_reorg_interval(mut self, interval: Duration) -> Self { + self.soft_reorg_interval = interval; + self + } + + pub fn hard_reorg_interval(mut self, interval: Duration) -> Self { + self.hard_reorg_interval = interval; + self + } + + pub async fn build( + self, + anvil: &AnvilInstance, + db_url: &str, + ) -> eyre::Result<(Service, TxSitterClient)> { + let anvil_private_key = hex::encode(DEFAULT_ANVIL_PRIVATE_KEY); + + let config = Config { + service: TxSitterConfig { + escalation_interval: self.escalation_interval, + soft_reorg_interval: self.soft_reorg_interval, + hard_reorg_interval: self.hard_reorg_interval, + datadog_enabled: false, + statsd_enabled: false, + predefined: Some(Predefined { + network: PredefinedNetwork { + chain_id: DEFAULT_ANVIL_CHAIN_ID, + name: "Anvil".to_string(), + http_rpc: anvil.endpoint(), + ws_rpc: anvil.ws_endpoint(), + }, + relayer: PredefinedRelayer { + name: "Anvil".to_string(), + id: DEFAULT_RELAYER_ID.to_string(), + key_id: anvil_private_key, + chain_id: DEFAULT_ANVIL_CHAIN_ID, + }, + }), + }, + server: ServerConfig { + host: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 0, + )), + username: None, + password: None, + }, + database: DatabaseConfig::connection_string(db_url), + keys: KeysConfig::Local(LocalKeysConfig::default()), + }; + + let service = Service::new(config).await?; + + let client = + TxSitterClient::new(format!("http://{}", service.local_addr())); + + Ok((service, client)) + } +} diff --git a/tests/create_relayer.rs b/tests/create_relayer.rs index e71732a..17a8233 100644 --- a/tests/create_relayer.rs +++ b/tests/create_relayer.rs @@ -2,17 +2,15 @@ mod common; use crate::common::prelude::*; -const ESCALATION_INTERVAL: Duration = Duration::from_secs(30); - #[tokio::test] async fn create_relayer() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = AnvilBuilder::default().spawn().await?; let (_service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; + ServiceBuilder::default().build(&anvil, &db_url).await?; let CreateRelayerResponse { .. } = client .create_relayer(&CreateRelayerRequest { diff --git a/tests/escalation.rs b/tests/escalation.rs new file mode 100644 index 0000000..cab10c6 --- /dev/null +++ b/tests/escalation.rs @@ -0,0 +1,86 @@ +mod common; + +use crate::common::prelude::*; + +const ESCALATION_INTERVAL: Duration = Duration::from_secs(2); +const ANVIL_BLOCK_TIME: u64 = 6; + +#[tokio::test] +async fn escalation() -> eyre::Result<()> { + setup_tracing(); + + let (db_url, _db_container) = setup_db().await?; + let anvil = AnvilBuilder::default() + .block_time(ANVIL_BLOCK_TIME) + .spawn() + .await?; + + let (_service, client) = ServiceBuilder::default() + .escalation_interval(ESCALATION_INTERVAL) + .build(&anvil, &db_url) + .await?; + + let CreateApiKeyResponse { api_key } = + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; + + let provider = setup_provider(anvil.endpoint()).await?; + + // Send a transaction + let value: U256 = parse_units("1", "ether")?.into(); + let tx = client + .send_tx( + &api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS, + value, + gas_limit: U256::from(21_000), + ..Default::default() + }, + ) + .await?; + + let initial_tx_hash = get_tx_hash(&client, &api_key, &tx.tx_id).await?; + + await_balance(&provider, value).await?; + let final_tx_hash = get_tx_hash(&client, &api_key, &tx.tx_id).await?; + + assert_ne!( + initial_tx_hash, final_tx_hash, + "Escalation should have occurred" + ); + + Ok(()) +} + +async fn await_balance( + provider: &Provider, + value: U256, +) -> eyre::Result<()> { + for _ in 0..24 { + let balance = provider.get_balance(ARBITRARY_ADDRESS, None).await?; + + if balance == value { + return Ok(()); + } else { + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + eyre::bail!("Balance not updated in time"); +} + +async fn get_tx_hash( + client: &TxSitterClient, + api_key: &ApiKey, + tx_id: &str, +) -> eyre::Result { + loop { + let tx = client.get_tx(api_key, tx_id).await?; + + if let Some(tx_hash) = tx.tx_hash { + return Ok(tx_hash); + } else { + tokio::time::sleep(Duration::from_secs(3)).await; + } + } +} diff --git a/tests/reorg.rs b/tests/reorg.rs new file mode 100644 index 0000000..a25ae9a --- /dev/null +++ b/tests/reorg.rs @@ -0,0 +1,74 @@ +mod common; + +use crate::common::prelude::*; + +#[tokio::test] +async fn reorg() -> eyre::Result<()> { + setup_tracing(); + + let (db_url, _db_container) = setup_db().await?; + let anvil = AnvilBuilder::default().spawn().await?; + let anvil_port = anvil.port(); + + let (_service, client) = ServiceBuilder::default() + .hard_reorg_interval(Duration::from_secs(2)) + .build(&anvil, &db_url) + .await?; + + let CreateApiKeyResponse { api_key } = + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; + + let provider = setup_provider(anvil.endpoint()).await?; + + // Send a transaction + let value: U256 = parse_units("1", "ether")?.into(); + client + .send_tx( + &api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS, + value, + gas_limit: U256::from(21_000), + ..Default::default() + }, + ) + .await?; + + await_balance(&provider, value).await?; + + // Drop anvil to simulate a reorg + tracing::warn!("Dropping anvil & restarting at port {anvil_port}"); + drop(anvil); + + let anvil = AnvilBuilder::default().port(anvil_port).spawn().await?; + let provider = setup_provider(anvil.endpoint()).await?; + + await_balance(&provider, value).await?; + + Ok(()) +} + +async fn await_balance( + provider: &Provider, + value: U256, +) -> eyre::Result<()> { + for _ in 0..24 { + let balance = match provider.get_balance(ARBITRARY_ADDRESS, None).await + { + Ok(balance) => balance, + Err(err) => { + tracing::warn!("Error getting balance: {:?}", err); + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + } + }; + + if balance == value { + return Ok(()); + } else { + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + eyre::bail!("Balance not updated in time"); +} diff --git a/tests/rpc_access.rs b/tests/rpc_access.rs index c4ab6fe..f646210 100644 --- a/tests/rpc_access.rs +++ b/tests/rpc_access.rs @@ -1,22 +1,16 @@ mod common; -use ethers::prelude::*; -use tx_sitter::server::routes::relayer::CreateApiKeyResponse; -use url::Url; - use crate::common::prelude::*; -const ESCALATION_INTERVAL: Duration = Duration::from_secs(30); - #[tokio::test] async fn rpc_access() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = AnvilBuilder::default().spawn().await?; let (service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; + ServiceBuilder::default().build(&anvil, &db_url).await?; let CreateApiKeyResponse { api_key } = client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; diff --git a/tests/send_many_txs.rs b/tests/send_many_txs.rs index 1e0226c..4325423 100644 --- a/tests/send_many_txs.rs +++ b/tests/send_many_txs.rs @@ -1,28 +1,21 @@ mod common; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use tx_sitter::server::routes::relayer::CreateApiKeyResponse; - use crate::common::prelude::*; -const ESCALATION_INTERVAL: Duration = Duration::from_secs(30); - #[tokio::test] async fn send_many_txs() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = AnvilBuilder::default().spawn().await?; let (_service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; + ServiceBuilder::default().build(&anvil, &db_url).await?; let CreateApiKeyResponse { api_key } = client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; - let provider = - setup_provider(format!("http://{}", double_anvil.local_addr())).await?; + let provider = setup_provider(anvil.endpoint()).await?; // Send a transaction let value: U256 = parse_units("10", "ether")?.into(); diff --git a/tests/send_tx.rs b/tests/send_tx.rs index b4236ca..1a8fe62 100644 --- a/tests/send_tx.rs +++ b/tests/send_tx.rs @@ -1,26 +1,20 @@ mod common; -use tx_sitter::server::routes::relayer::CreateApiKeyResponse; - use crate::common::prelude::*; -const ESCALATION_INTERVAL: Duration = Duration::from_secs(30); - #[tokio::test] async fn send_tx() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = AnvilBuilder::default().spawn().await?; let (_service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; - + ServiceBuilder::default().build(&anvil, &db_url).await?; let CreateApiKeyResponse { api_key } = client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; - let provider = - setup_provider(format!("http://{}", double_anvil.local_addr())).await?; + let provider = setup_provider(anvil.endpoint()).await?; // Send a transaction let value: U256 = parse_units("1", "ether")?.into();