From 2f0b25d71e444dfc0a78df2879fcd4ea863db94f Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Thu, 21 Dec 2023 14:03:06 +0100 Subject: [PATCH] Remove fake-rpc --- Cargo.lock | 38 --------- Cargo.toml | 1 - crates/fake-rpc/Cargo.toml | 52 ------------ crates/fake-rpc/src/lib.rs | 153 ------------------------------------ crates/fake-rpc/src/main.rs | 30 ------- src/broadcast_utils.rs | 40 +++------- src/tasks/broadcast.rs | 2 +- src/tasks/escalate.rs | 9 ++- tests/common/mod.rs | 96 ++++++++++------------ tests/create_relayer.rs | 4 +- tests/escalation.rs | 50 ++++++++++++ tests/reorg.rs | 0 tests/rpc_access.rs | 4 +- tests/send_many_txs.rs | 7 +- tests/send_tx.rs | 6 +- 15 files changed, 119 insertions(+), 373 deletions(-) delete mode 100644 crates/fake-rpc/Cargo.toml delete mode 100644 crates/fake-rpc/src/lib.rs delete mode 100644 crates/fake-rpc/src/main.rs create mode 100644 tests/escalation.rs create mode 100644 tests/reorg.rs diff --git a/Cargo.lock b/Cargo.lock index 68fd11b..698e24e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1705,43 +1705,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "fake-rpc" -version = "0.1.0" -dependencies = [ - "async-trait", - "axum", - "chrono", - "clap", - "config", - "dotenv", - "ethers", - "ethers-signers", - "eyre", - "futures", - "headers", - "hex", - "hex-literal", - "humantime", - "humantime-serde", - "hyper", - "rand", - "reqwest", - "serde", - "serde_json", - "sha3", - "spki", - "sqlx", - "strum", - "thiserror", - "tokio", - "toml 0.8.8", - "tower-http", - "tracing", - "tracing-subscriber", - "uuid 0.8.2", -] - [[package]] name = "fastrand" version = "2.0.1" @@ -5040,7 +5003,6 @@ dependencies = [ "dotenv", "ethers", "eyre", - "fake-rpc", "futures", "headers", "hex", diff --git a/Cargo.toml b/Cargo.toml index 192f45c..f9f6b47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,6 @@ url = "2.4.1" uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] -fake-rpc = { path = "crates/fake-rpc" } indoc = "2.0.3" test-case = "3.1.0" diff --git a/crates/fake-rpc/Cargo.toml b/crates/fake-rpc/Cargo.toml deleted file mode 100644 index da94d7a..0000000 --- a/crates/fake-rpc/Cargo.toml +++ /dev/null @@ -1,52 +0,0 @@ -[package] -name = "fake-rpc" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -serde = "1.0.136" -axum = { version = "0.6.20", features = ["headers"] } -thiserror = "1.0.50" -headers = "0.3.9" -humantime = "2.1.0" -humantime-serde = "1.1.1" -hyper = "0.14.27" -dotenv = "0.15.0" -clap = { version = "4.3.0", features = ["env", "derive"] } -ethers = { version = "2.0.11" } -ethers-signers = { version = "2.0.11" } -eyre = "0.6.5" -hex = "0.4.3" -hex-literal = "0.4.1" -reqwest = { version = "0.11.13", default-features = false, features = [ - "rustls-tls", -] } -serde_json = "1.0.91" -strum = { version = "0.25.0", features = ["derive"] } -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", default-features = false, features = [ - "env-filter", - "std", - "fmt", - "json", - "ansi", -] } -tower-http = { version = "0.4.4", features = ["trace"] } -uuid = { version = "0.8", features = ["v4"] } -futures = "0.3" -chrono = "0.4" -rand = "0.8.5" -sha3 = "0.10.8" -config = "0.13.3" -toml = "0.8.8" -sqlx = { version = "0.7.2", features = [ - "runtime-tokio", - "tls-rustls", - "postgres", - "migrate", -] } -spki = "0.7.2" -async-trait = "0.1.74" diff --git a/crates/fake-rpc/src/lib.rs b/crates/fake-rpc/src/lib.rs deleted file mode 100644 index 81c334e..0000000 --- a/crates/fake-rpc/src/lib.rs +++ /dev/null @@ -1,153 +0,0 @@ -use std::net::{Ipv4Addr, SocketAddr}; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; -use std::time::Duration; - -use axum::extract::State; -use axum::routing::{post, IntoMakeService}; -use axum::{Json, Router}; -use ethers::utils::{Anvil, AnvilInstance}; -use hyper::server::conn::AddrIncoming; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use tokio::sync::Mutex; - -pub const BLOCK_TIME_SECONDS: u64 = 2; - -pub struct DoubleAnvil { - main_anvil: Mutex, - reference_anvil: Mutex, - held_back_txs: Mutex>, - - auto_advance: AtomicBool, -} - -impl DoubleAnvil { - pub async fn drop_txs(&self) -> eyre::Result<()> { - let mut held_back_txs = self.held_back_txs.lock().await; - held_back_txs.clear(); - Ok(()) - } - - pub async fn advance(&self) -> eyre::Result<()> { - let mut held_back_txs = self.held_back_txs.lock().await; - - for req in held_back_txs.drain(..) { - tracing::info!(?req, "eth_sendRawTransaction"); - - let response = reqwest::Client::new() - .post(&self.main_anvil.lock().await.endpoint()) - .json(&req) - .send() - .await - .unwrap(); - - let resp = response.json::().await.unwrap(); - - tracing::info!(?resp, "eth_sendRawTransaction.response"); - } - - Ok(()) - } - - pub fn set_auto_advance(&self, auto_advance: bool) { - self.auto_advance - .store(auto_advance, std::sync::atomic::Ordering::SeqCst); - } - - pub async fn ws_endpoint(&self) -> String { - self.main_anvil.lock().await.ws_endpoint() - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -struct JsonRpcReq { - pub id: u64, - pub jsonrpc: String, - pub method: String, - #[serde(default)] - pub params: Value, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct JsonRpcResponse { - pub id: u64, - pub jsonrpc: String, - pub result: Value, -} - -async fn advance(State(anvil): State>) { - anvil.advance().await.unwrap(); -} - -async fn rpc( - State(anvil): State>, - Json(req): Json, -) -> Json { - let method = req.method.as_str(); - let anvil_instance = match method { - "eth_sendRawTransaction" => { - anvil.held_back_txs.lock().await.push(req.clone()); - - if anvil.auto_advance.load(std::sync::atomic::Ordering::SeqCst) { - anvil.advance().await.unwrap(); - } - - anvil.reference_anvil.lock().await - } - "eth_getTransactionReceipt" => anvil.main_anvil.lock().await, - "eth_getTransactionByHash" => anvil.reference_anvil.lock().await, - _ => anvil.main_anvil.lock().await, - }; - - tracing::info!(?req, "{}", method); - - let response = reqwest::Client::new() - .post(&anvil_instance.endpoint()) - .json(&req) - .send() - .await - .unwrap(); - - let resp = response.json::().await.unwrap(); - - tracing::info!(?resp, "{}.response", method); - - Json(resp) -} - -pub async fn serve( - port: u16, -) -> ( - Arc, - axum::Server>, -) { - let main_anvil = Anvil::new().block_time(BLOCK_TIME_SECONDS).spawn(); - let reference_anvil = Anvil::new().block_time(BLOCK_TIME_SECONDS).spawn(); - - tokio::time::sleep(Duration::from_secs(BLOCK_TIME_SECONDS)).await; - - tracing::info!("Main anvil instance: {}", main_anvil.endpoint()); - tracing::info!("Reference anvil instance: {}", reference_anvil.endpoint()); - - let state = Arc::new(DoubleAnvil { - main_anvil: Mutex::new(main_anvil), - reference_anvil: Mutex::new(reference_anvil), - held_back_txs: Mutex::new(Vec::new()), - auto_advance: AtomicBool::new(true), - }); - - let router = Router::new() - .route("/", post(rpc)) - .route("/advance", post(advance)) - .with_state(state.clone()) - .layer(tower_http::trace::TraceLayer::new_for_http()); - - let host = Ipv4Addr::new(127, 0, 0, 1); - let socket_addr = SocketAddr::new(host.into(), port); - - let server = - axum::Server::bind(&socket_addr).serve(router.into_make_service()); - - (state, server) -} diff --git a/crates/fake-rpc/src/main.rs b/crates/fake-rpc/src/main.rs deleted file mode 100644 index a20b425..0000000 --- a/crates/fake-rpc/src/main.rs +++ /dev/null @@ -1,30 +0,0 @@ -use clap::Parser; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::EnvFilter; - -#[derive(Debug, Clone, Parser)] -struct Args { - #[clap(short, long, default_value = "8545")] - port: u16, -} - -#[tokio::main] -async fn main() -> eyre::Result<()> { - dotenv::dotenv().ok(); - - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().pretty().compact()) - .with(EnvFilter::from_default_env()) - .init(); - - let args = Args::parse(); - - let (_app, server) = fake_rpc::serve(args.port).await; - - tracing::info!("Serving fake RPC at {}", server.local_addr()); - - server.await?; - - Ok(()) -} diff --git a/src/broadcast_utils.rs b/src/broadcast_utils.rs index a0182d4..0b97892 100644 --- a/src/broadcast_utils.rs +++ b/src/broadcast_utils.rs @@ -1,4 +1,4 @@ -use ethers::types::{Eip1559TransactionRequest, U256}; +use ethers::types::U256; use eyre::ContextCompat; use self::gas_estimation::FeesEstimate; @@ -12,40 +12,20 @@ pub fn calculate_gas_fees_from_estimates( tx_priority_index: usize, max_base_fee_per_gas: U256, ) -> (U256, U256) { + println!("estimates = {estimates:#?}"); + let max_priority_fee_per_gas = estimates.percentile_fees[tx_priority_index]; let max_fee_per_gas = max_base_fee_per_gas + max_priority_fee_per_gas; - (max_fee_per_gas, max_priority_fee_per_gas) -} - -pub fn escalate_priority_fee( - max_base_fee_per_gas: U256, - max_network_fee_per_gas: U256, - current_max_priority_fee_per_gas: U256, - escalation_count: usize, - tx: &mut Eip1559TransactionRequest, -) { - // Min increase of 20% on the priority fee required for a replacement tx - let increased_gas_price_percentage = - U256::from(100 + (10 * (1 + escalation_count))); - - let factor = U256::from(100); - - let new_max_priority_fee_per_gas = current_max_priority_fee_per_gas - * increased_gas_price_percentage - / factor; - - let new_max_priority_fee_per_gas = - std::cmp::min(new_max_priority_fee_per_gas, max_network_fee_per_gas); + println!( + "max_base_fee_per_gas = {max_base_fee_per_gas}, max_priority_fee_per_gas = {max_priority_fee_per_gas}, max_fee_per_gas = {max_fee_per_gas}", + max_base_fee_per_gas = max_base_fee_per_gas, + max_priority_fee_per_gas = max_priority_fee_per_gas, + max_fee_per_gas = max_fee_per_gas, + ); - let new_max_fee_per_gas = - max_base_fee_per_gas + new_max_priority_fee_per_gas; - let new_max_fee_per_gas = - std::cmp::min(new_max_fee_per_gas, max_network_fee_per_gas); - - tx.max_fee_per_gas = Some(new_max_fee_per_gas); - tx.max_priority_fee_per_gas = Some(new_max_priority_fee_per_gas); + (max_fee_per_gas, max_priority_fee_per_gas) } pub async fn should_send_transaction( diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 7f33f21..8e46612 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -5,7 +5,7 @@ use std::time::Duration; use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::transaction::eip2930::AccessList; -use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, H256}; +use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, H256, U256}; use eyre::ContextCompat; use futures::stream::FuturesUnordered; use futures::StreamExt; diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 76d5d3c..3a8c51f 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -39,7 +39,7 @@ pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { // Min increase of 20% on the priority fee required for a replacement tx let factor = U256::from(100); let increased_gas_price_percentage = - factor + U256::from(10 * (1 + escalation)); + factor + U256::from(20 * (1 + escalation)); let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0 * increased_gas_price_percentage @@ -51,6 +51,13 @@ pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { let max_priority_fee_per_gas = max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; + tracing::warn!( + "Initial tx fees are max = {}, priority = {}", + tx.initial_max_fee_per_gas.0, + tx.initial_max_priority_fee_per_gas.0 + ); + tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}"); + let eip1559_tx = Eip1559TransactionRequest { from: None, to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 033ea32..10490dd 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -9,9 +9,10 @@ use ethers::middleware::SignerMiddleware; use ethers::providers::{Http, Middleware, Provider}; use ethers::signers::{LocalWallet, Signer}; use ethers::types::{Address, Eip1559TransactionRequest, H160, U256}; -use fake_rpc::DoubleAnvil; +use ethers::utils::{Anvil, AnvilInstance}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use postgres_docker_utils::DockerContainerGuard; -use tokio::task::JoinHandle; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -57,24 +58,12 @@ pub const ARBITRARY_ADDRESS: Address = H160(hex_literal::hex!( )); pub const DEFAULT_ANVIL_CHAIN_ID: u64 = 31337; +pub const DEFAULT_ANVIL_BLOCK_TIME: u64 = 2; pub const DEFAULT_RELAYER_ID: &str = "1b908a34-5dc1-4d2d-a146-5eb46e975830"; -pub struct DoubleAnvilHandle { - pub double_anvil: Arc, - ws_addr: String, - local_addr: SocketAddr, - server_handle: JoinHandle>, -} - -impl DoubleAnvilHandle { - pub fn local_addr(&self) -> String { - self.local_addr.to_string() - } - - pub fn ws_addr(&self) -> String { - self.ws_addr.clone() - } +pub struct AnvilWrapper { + pub anvil: Anvil, } pub fn setup_tracing() { @@ -97,52 +86,47 @@ pub async fn setup_db() -> eyre::Result<(String, DockerContainerGuard)> { Ok((url, db_container)) } -pub async fn setup_double_anvil() -> eyre::Result { - let (double_anvil, server) = fake_rpc::serve(0).await; - - let local_addr = server.local_addr(); - - let server_handle = tokio::spawn(async move { - server.await?; - Ok(()) - }); +pub async fn setup_anvil(block_time: u64) -> eyre::Result { + let anvil = Anvil::new().block_time(block_time).spawn(); - let middleware = setup_middleware( - format!("http://{local_addr}"), - SECONDARY_ANVIL_PRIVATE_KEY, - ) - .await?; + let middleware = + setup_middleware(anvil.endpoint(), SECONDARY_ANVIL_PRIVATE_KEY).await?; // We need to seed some transactions so we can get fee estimates on the first block - middleware - .send_transaction( - Eip1559TransactionRequest { - to: Some(DEFAULT_ANVIL_ACCOUNT.into()), - value: Some(U256::from(100u64)), - ..Default::default() - }, - None, - ) - .await? - .await?; - - let ws_addr = double_anvil.ws_endpoint().await; - - Ok(DoubleAnvilHandle { - double_anvil, - ws_addr, - local_addr, - server_handle, - }) + // let mut nonce = 0; + // let mut futures = FuturesUnordered::new(); + // for i in 0..100 { + // let tx = middleware + // .send_transaction( + // Eip1559TransactionRequest { + // nonce: Some(U256::from(nonce)), + // to: Some(DEFAULT_ANVIL_ACCOUNT.into()), + // value: Some(U256::from(100u64)), + // max_fee_per_gas: Some(U256::from(40_000_000_000u64)), + // max_priority_fee_per_gas: Some(U256::from(10_000u64)), + // ..Default::default() + // }, + // None, + // ) + // .await?; + + // futures.push(tx); + + // nonce += 1; + // } + + // while let Some(tx) = futures.next().await { + // tx?; + // } + + Ok(anvil) } pub async fn setup_service( - anvil_handle: &DoubleAnvilHandle, + anvil: &AnvilInstance, db_connection_url: &str, escalation_interval: Duration, ) -> eyre::Result<(Service, TxSitterClient)> { - let rpc_url = anvil_handle.local_addr(); - let anvil_private_key = hex::encode(DEFAULT_ANVIL_PRIVATE_KEY); let config = Config { @@ -154,8 +138,8 @@ pub async fn setup_service( network: PredefinedNetwork { chain_id: DEFAULT_ANVIL_CHAIN_ID, name: "Anvil".to_string(), - http_rpc: format!("http://{}", rpc_url), - ws_rpc: anvil_handle.ws_addr(), + http_rpc: anvil.endpoint(), + ws_rpc: anvil.ws_endpoint(), }, relayer: PredefinedRelayer { name: "Anvil".to_string(), diff --git a/tests/create_relayer.rs b/tests/create_relayer.rs index e71732a..3acba00 100644 --- a/tests/create_relayer.rs +++ b/tests/create_relayer.rs @@ -9,10 +9,10 @@ async fn create_relayer() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = setup_anvil(DEFAULT_ANVIL_BLOCK_TIME).await?; let (_service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; + setup_service(&anvil, &db_url, ESCALATION_INTERVAL).await?; let CreateRelayerResponse { .. } = client .create_relayer(&CreateRelayerRequest { diff --git a/tests/escalation.rs b/tests/escalation.rs new file mode 100644 index 0000000..97e34b0 --- /dev/null +++ b/tests/escalation.rs @@ -0,0 +1,50 @@ +mod common; + +use tx_sitter::server::routes::relayer::CreateApiKeyResponse; + +use crate::common::prelude::*; + +const ESCALATION_INTERVAL: Duration = Duration::from_secs(2); +const ANVIL_BLOCK_TIME: u64 = 12; + +#[tokio::test] +async fn escalation() -> eyre::Result<()> { + setup_tracing(); + + let (db_url, _db_container) = setup_db().await?; + let anvil = setup_anvil(ANVIL_BLOCK_TIME).await?; + + let (_service, client) = + setup_service(&anvil, &db_url, ESCALATION_INTERVAL).await?; + + let CreateApiKeyResponse { api_key } = + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; + + let provider = setup_provider(anvil.endpoint()).await?; + + // Send a transaction + let value: U256 = parse_units("1", "ether")?.into(); + client + .send_tx( + &api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS, + value, + gas_limit: U256::from(21_000), + ..Default::default() + }, + ) + .await?; + + for _ in 0..10 { + let balance = provider.get_balance(ARBITRARY_ADDRESS, None).await?; + + if balance == value { + return Ok(()); + } else { + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + + panic!("Transaction was not sent") +} diff --git a/tests/reorg.rs b/tests/reorg.rs new file mode 100644 index 0000000..e69de29 diff --git a/tests/rpc_access.rs b/tests/rpc_access.rs index c4ab6fe..07f01ee 100644 --- a/tests/rpc_access.rs +++ b/tests/rpc_access.rs @@ -13,10 +13,10 @@ async fn rpc_access() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = setup_anvil(DEFAULT_ANVIL_BLOCK_TIME).await?; let (service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; + setup_service(&anvil, &db_url, ESCALATION_INTERVAL).await?; let CreateApiKeyResponse { api_key } = client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; diff --git a/tests/send_many_txs.rs b/tests/send_many_txs.rs index 1e0226c..f0f33e2 100644 --- a/tests/send_many_txs.rs +++ b/tests/send_many_txs.rs @@ -13,16 +13,15 @@ async fn send_many_txs() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = setup_anvil(DEFAULT_ANVIL_BLOCK_TIME).await?; let (_service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; + setup_service(&anvil, &db_url, ESCALATION_INTERVAL).await?; let CreateApiKeyResponse { api_key } = client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; - let provider = - setup_provider(format!("http://{}", double_anvil.local_addr())).await?; + let provider = setup_provider(anvil.endpoint()).await?; // Send a transaction let value: U256 = parse_units("10", "ether")?.into(); diff --git a/tests/send_tx.rs b/tests/send_tx.rs index b4236ca..5d8d192 100644 --- a/tests/send_tx.rs +++ b/tests/send_tx.rs @@ -11,16 +11,16 @@ async fn send_tx() -> eyre::Result<()> { setup_tracing(); let (db_url, _db_container) = setup_db().await?; - let double_anvil = setup_double_anvil().await?; + let anvil = setup_anvil(DEFAULT_ANVIL_BLOCK_TIME).await?; let (_service, client) = - setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; + setup_service(&anvil, &db_url, ESCALATION_INTERVAL).await?; let CreateApiKeyResponse { api_key } = client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; let provider = - setup_provider(format!("http://{}", double_anvil.local_addr())).await?; + setup_provider(anvil.endpoint()).await?; // Send a transaction let value: U256 = parse_units("1", "ether")?.into();