diff --git a/.gitignore b/.gitignore index 14ee500..5f32e70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ target/ -.env +.env \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index f1f02e5..192f45c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,54 +8,50 @@ default-run = "tx-sitter" members = ["crates/*"] [dependencies] -# Third Party +async-trait = "0.1.74" + ## AWS aws-config = { version = "1.0.1" } -aws-sdk-kms = "1.3.0" -aws-smithy-types = "1.0.2" -aws-smithy-runtime-api = "1.0.2" -aws-types = "1.0.1" aws-credential-types = { version = "1.0.1", features = [ "hardcoded-credentials", ] } - -## Other -serde = "1.0.136" +aws-sdk-kms = "1.3.0" +aws-smithy-runtime-api = "1.0.2" +aws-smithy-types = "1.0.2" +aws-types = "1.0.1" axum = { version = "0.6.20", features = ["headers"] } -thiserror = "1.0.50" -headers = "0.3.9" -humantime = "2.1.0" -humantime-serde = "1.1.1" -hyper = "0.14.27" -dotenv = "0.15.0" +base64 = "0.21.5" +bigdecimal = "0.4.2" +chrono = "0.4" clap = { version = "4.3.0", features = ["env", "derive"] } +config = "0.13.3" +dotenv = "0.15.0" ethers = { version = "2.0.11", features = ["ws"] } eyre = "0.6.5" +futures = "0.3" +headers = "0.3.9" hex = "0.4.3" hex-literal = "0.4.1" +humantime = "2.1.0" +humantime-serde = "1.1.1" +hyper = "0.14.27" +itertools = "0.12.0" +metrics = "0.21.1" +num-bigint = "0.4.4" +# telemetry-batteries = { path = "../telemetry-batteries" } + +# Internal +postgres-docker-utils = { path = "crates/postgres-docker-utils" } +rand = "0.8.5" reqwest = { version = "0.11.13", default-features = false, features = [ "rustls-tls", ] } + +## Other +serde = "1.0.136" serde_json = "1.0.91" -strum = { version = "0.25.0", features = ["derive"] } -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", default-features = false, features = [ - "env-filter", - "std", - "fmt", - "json", - "ansi", -] } -tower-http = { version = "0.4.4", features = [ "trace", "auth" ] } -uuid = { version = "0.8", features = ["v4"] } -futures = "0.3" -chrono = "0.4" -rand = "0.8.5" sha3 = "0.10.8" -config = "0.13.3" -toml = "0.8.8" -url = "2.4.1" +spki = "0.7.2" sqlx = { version = "0.7.2", features = [ "time", "chrono", @@ -65,26 +61,30 @@ sqlx = { version = "0.7.2", features = [ "migrate", "bigdecimal", ] } -metrics = "0.21.1" -num-bigint = "0.4.4" -bigdecimal = "0.4.2" -spki = "0.7.2" -async-trait = "0.1.74" -itertools = "0.12.0" -base64 = "0.21.5" +strum = { version = "0.25.0", features = ["derive"] } # Company telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", branch = "dzejkop/unnest-fields" } -# telemetry-batteries = { path = "../telemetry-batteries" } - -# Internal -postgres-docker-utils = { path = "crates/postgres-docker-utils" } +thiserror = "1.0.50" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +toml = "0.8.8" +tower-http = { version = "0.4.4", features = ["trace", "auth"] } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", default-features = false, features = [ + "env-filter", + "std", + "fmt", + "json", + "ansi", +] } +url = "2.4.1" +uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] -test-case = "3.1.0" -indoc = "2.0.3" fake-rpc = { path = "crates/fake-rpc" } +indoc = "2.0.3" +test-case = "3.1.0" [features] -default = [ "default-config" ] +default = ["default-config"] default-config = [] diff --git a/src/keys/universal_signer.rs b/src/keys/universal_signer.rs index 6bfd718..2a3db9d 100644 --- a/src/keys/universal_signer.rs +++ b/src/keys/universal_signer.rs @@ -3,6 +3,7 @@ use ethers::core::types::transaction::eip2718::TypedTransaction; use ethers::core::types::transaction::eip712::Eip712; use ethers::core::types::{Address, Signature as EthSig}; use ethers::signers::{Signer, Wallet, WalletError}; +use ethers::types::Bytes; use thiserror::Error; use crate::aws::ethers_signer::AwsSigner; @@ -13,6 +14,20 @@ pub enum UniversalSigner { Local(Wallet), } +impl UniversalSigner { + pub async fn raw_signed_tx( + &self, + tx: &TypedTransaction, + ) -> eyre::Result { + let signature = match self { + Self::Aws(signer) => signer.sign_transaction(tx).await?, + Self::Local(signer) => signer.sign_transaction(tx).await?, + }; + + Ok(tx.rlp_signed(&signature)) + } +} + #[derive(Debug, Error)] pub enum UniversalError { #[error("AWS Signer Error: {0}")] diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index d541928..7f33f21 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -5,7 +5,7 @@ use std::time::Duration; use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::transaction::eip2930::AccessList; -use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress}; +use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, H256}; use eyre::ContextCompat; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -19,23 +19,9 @@ use crate::db::UnsentTx; pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { loop { - let mut txs = app.db.get_unsent_txs().await?; - - txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); - - let txs_by_relayer = - txs.into_iter().group_by(|tx| tx.relayer_id.clone()); - - let txs_by_relayer: HashMap<_, _> = txs_by_relayer - .into_iter() - .map(|(relayer_id, txs)| { - let mut txs = txs.collect_vec(); - - txs.sort_unstable_by_key(|tx| tx.nonce); - - (relayer_id, txs) - }) - .collect(); + // Get all unsent txs and broadcast + let txs = app.db.get_unsent_txs().await?; + let txs_by_relayer = sort_txs_by_relayer(txs); let mut futures = FuturesUnordered::new(); @@ -96,41 +82,45 @@ async fn broadcast_relayer_txs( max_base_fee_per_gas, ); - let eip1559_tx = Eip1559TransactionRequest { - from: None, - to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), - gas: Some(tx.gas_limit.0), - value: Some(tx.value.0), - data: Some(tx.data.into()), - nonce: Some(tx.nonce.into()), - access_list: AccessList::default(), - max_priority_fee_per_gas: Some(max_priority_fee_per_gas), - max_fee_per_gas: Some(max_fee_per_gas), - chain_id: Some(tx.chain_id.into()), - }; + let mut typed_transaction = + TypedTransaction::Eip1559(Eip1559TransactionRequest { + from: None, + to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), + gas: Some(tx.gas_limit.0), + value: Some(tx.value.0), + data: Some(tx.data.into()), + nonce: Some(tx.nonce.into()), + access_list: AccessList::default(), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + chain_id: Some(tx.chain_id.into()), + }); + + // Fill and simulate the transaction + middleware + .fill_transaction(&mut typed_transaction, None) + .await?; - tracing::debug!(?eip1559_tx, "Sending tx"); + tracing::debug!(?tx.id, "Simulating tx"); - // TODO: Is it possible that we send a tx but don't store it in the DB? - // TODO: Be smarter about error handling - a tx can fail to be sent - // e.g. because the relayer is out of funds - // but we don't want to retry it forever - let pending_tx = middleware - .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) - .await; - - let pending_tx = match pending_tx { - Ok(pending_tx) => { - tracing::info!(?pending_tx, "Tx sent successfully"); - pending_tx + // Simulate the transaction + match middleware.call(&typed_transaction, None).await { + Ok(_) => { + tracing::info!(?tx.id, "Tx simulated successfully"); } Err(err) => { - tracing::error!(error = ?err, "Failed to send tx"); + tracing::error!(?tx.id, error = ?err, "Failed to simulate tx"); continue; } }; - let tx_hash = pending_tx.tx_hash(); + // Get the raw signed tx and derive the tx hash + let raw_signed_tx = middleware + .signer() + .raw_signed_tx(&typed_transaction) + .await?; + + let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx)); app.db .insert_tx_broadcast( @@ -141,8 +131,43 @@ async fn broadcast_relayer_txs( ) .await?; + tracing::debug!(?tx.id, "Sending tx"); + + // TODO: Be smarter about error handling - a tx can fail to be sent + // e.g. because the relayer is out of funds + // but we don't want to retry it forever + let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await; + + match pending_tx { + Ok(pending_tx) => { + tracing::info!(?pending_tx, "Tx sent successfully"); + } + Err(err) => { + tracing::error!(?tx.id, error = ?err, "Failed to send tx"); + continue; + } + }; + tracing::info!(id = tx.id, hash = ?tx_hash, "Tx broadcast"); } Ok(()) } + +fn sort_txs_by_relayer( + mut txs: Vec, +) -> HashMap> { + txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); + let txs_by_relayer = txs.into_iter().group_by(|tx| tx.relayer_id.clone()); + + txs_by_relayer + .into_iter() + .map(|(relayer_id, txs)| { + let mut txs = txs.collect_vec(); + + txs.sort_unstable_by_key(|tx| tx.nonce); + + (relayer_id, txs) + }) + .collect() +} diff --git a/src/tasks/finalize.rs b/src/tasks/finalize.rs index 4695153..9ee3d87 100644 --- a/src/tasks/finalize.rs +++ b/src/tasks/finalize.rs @@ -7,15 +7,15 @@ const TIME_BETWEEN_FINALIZATIONS_SECONDS: i64 = 60; pub async fn finalize_txs(app: Arc) -> eyre::Result<()> { loop { - let finalization_timestmap = + let finalization_timestamp = chrono::Utc::now() - chrono::Duration::seconds(60 * 60); tracing::info!( "Finalizing txs mined before {}", - finalization_timestmap + finalization_timestamp ); - app.db.finalize_txs(finalization_timestmap).await?; + app.db.finalize_txs(finalization_timestamp).await?; tokio::time::sleep(Duration::from_secs( TIME_BETWEEN_FINALIZATIONS_SECONDS as u64,