diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index f1f02e5..191421c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,54 +8,50 @@ default-run = "tx-sitter" members = ["crates/*"] [dependencies] +async-trait = "0.1.74" # Third Party ## AWS aws-config = { version = "1.0.1" } -aws-sdk-kms = "1.3.0" -aws-smithy-types = "1.0.2" -aws-smithy-runtime-api = "1.0.2" -aws-types = "1.0.1" aws-credential-types = { version = "1.0.1", features = [ "hardcoded-credentials", ] } - -## Other -serde = "1.0.136" +aws-sdk-kms = "1.3.0" +aws-smithy-runtime-api = "1.0.2" +aws-smithy-types = "1.0.2" +aws-types = "1.0.1" axum = { version = "0.6.20", features = ["headers"] } -thiserror = "1.0.50" -headers = "0.3.9" -humantime = "2.1.0" -humantime-serde = "1.1.1" -hyper = "0.14.27" -dotenv = "0.15.0" +base64 = "0.21.5" +bigdecimal = "0.4.2" +chrono = "0.4" clap = { version = "4.3.0", features = ["env", "derive"] } +config = "0.13.3" +dotenv = "0.15.0" ethers = { version = "2.0.11", features = ["ws"] } eyre = "0.6.5" +futures = "0.3" +headers = "0.3.9" hex = "0.4.3" hex-literal = "0.4.1" +humantime = "2.1.0" +humantime-serde = "1.1.1" +hyper = "0.14.27" +itertools = "0.12.0" +metrics = "0.21.1" +num-bigint = "0.4.4" +# telemetry-batteries = { path = "../telemetry-batteries" } + +# Internal +postgres-docker-utils = { path = "crates/postgres-docker-utils" } +rand = "0.8.5" reqwest = { version = "0.11.13", default-features = false, features = [ "rustls-tls", ] } + +## Other +serde = "1.0.136" serde_json = "1.0.91" -strum = { version = "0.25.0", features = ["derive"] } -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", default-features = false, features = [ - "env-filter", - "std", - "fmt", - "json", - "ansi", -] } -tower-http = { version = "0.4.4", features = [ "trace", "auth" ] } -uuid = { version = "0.8", features = ["v4"] } -futures = "0.3" -chrono = "0.4" -rand = "0.8.5" sha3 = "0.10.8" -config = "0.13.3" -toml = "0.8.8" -url = "2.4.1" +spki = "0.7.2" sqlx = { version = "0.7.2", features = [ "time", "chrono", @@ -65,26 +61,30 @@ sqlx = { version = "0.7.2", features = [ "migrate", "bigdecimal", ] } -metrics = "0.21.1" -num-bigint = "0.4.4" -bigdecimal = "0.4.2" -spki = "0.7.2" -async-trait = "0.1.74" -itertools = "0.12.0" -base64 = "0.21.5" +strum = { version = "0.25.0", features = ["derive"] } # Company telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", branch = "dzejkop/unnest-fields" } -# telemetry-batteries = { path = "../telemetry-batteries" } - -# Internal -postgres-docker-utils = { path = "crates/postgres-docker-utils" } +thiserror = "1.0.50" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +toml = "0.8.8" +tower-http = { version = "0.4.4", features = [ "trace", "auth" ] } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", default-features = false, features = [ + "env-filter", + "std", + "fmt", + "json", + "ansi", +] } +url = "2.4.1" +uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] -test-case = "3.1.0" -indoc = "2.0.3" fake-rpc = { path = "crates/fake-rpc" } +indoc = "2.0.3" +test-case = "3.1.0" [features] -default = [ "default-config" ] +default = ["default-config"] default-config = [] diff --git a/db/migrations/001_init.sql b/db/migrations/001_init.sql index 62a7209..4490b20 100644 --- a/db/migrations/001_init.sql +++ b/db/migrations/001_init.sql @@ -60,6 +60,9 @@ CREATE TABLE tx_hashes ( escalated BOOL NOT NULL DEFAULT FALSE ); +ALTER TABLE tx_hashes +ADD UNIQUE (tx_id); + -- Dynamic tx data & data used for escalations CREATE TABLE sent_transactions ( tx_id VARCHAR(255) PRIMARY KEY REFERENCES transactions(id) ON DELETE CASCADE, diff --git a/src/db.rs b/src/db.rs index 9676582..914661d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -231,7 +231,7 @@ impl Database { .await?) } - pub async fn insert_tx_broadcast( + pub async fn insert_into_tx_hashes( &self, tx_id: &str, tx_hash: H256, @@ -246,21 +246,38 @@ impl Database { initial_max_priority_fee_per_gas .to_big_endian(&mut initial_max_priority_fee_per_gas_bytes); - let mut tx = self.pool.begin().await?; - sqlx::query( r#" INSERT INTO tx_hashes (tx_id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas) VALUES ($1, $2, $3, $4) + ON CONFLICT (tx_id) DO NOTHING "#, ) .bind(tx_id) .bind(tx_hash.as_bytes()) .bind(initial_max_fee_per_gas_bytes) .bind(initial_max_priority_fee_per_gas_bytes) - .execute(tx.as_mut()) + .execute(&self.pool) .await?; + Ok(()) + } + + pub async fn insert_into_sent_transactions( + &self, + tx_id: &str, + tx_hash: H256, + initial_max_fee_per_gas: U256, + initial_max_priority_fee_per_gas: U256, + ) -> eyre::Result<()> { + let mut initial_max_fee_per_gas_bytes = [0u8; 32]; + initial_max_fee_per_gas + .to_big_endian(&mut initial_max_fee_per_gas_bytes); + + let mut initial_max_priority_fee_per_gas_bytes = [0u8; 32]; + initial_max_priority_fee_per_gas + .to_big_endian(&mut initial_max_priority_fee_per_gas_bytes); + sqlx::query( r#" INSERT INTO sent_transactions (tx_id, initial_max_fee_per_gas, initial_max_priority_fee_per_gas, valid_tx_hash) @@ -271,9 +288,7 @@ impl Database { .bind(initial_max_fee_per_gas_bytes) .bind(initial_max_priority_fee_per_gas_bytes) .bind(tx_hash.as_bytes()) - .execute(tx.as_mut()).await?; - - tx.commit().await?; + .execute(&self.pool).await?; Ok(()) } @@ -1069,9 +1084,17 @@ mod tests { let url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); - let db = Database::new(&DatabaseConfig::connection_string(url)).await?; + for _ in 0..5 { + match Database::new(&DatabaseConfig::connection_string(&url)).await + { + Ok(db) => return Ok((db, db_container)), + Err(_) => { + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } - Ok((db, db_container)) + Err(eyre::eyre!("Failed to connect to the database")) } async fn full_update( @@ -1287,7 +1310,15 @@ mod tests { let initial_max_fee_per_gas = U256::from(1); let initial_max_priority_fee_per_gas = U256::from(1); - db.insert_tx_broadcast( + db.insert_into_tx_hashes( + tx_id, + tx_hash_1, + initial_max_fee_per_gas, + initial_max_priority_fee_per_gas, + ) + .await?; + + db.insert_into_sent_transactions( tx_id, tx_hash_1, initial_max_fee_per_gas, diff --git a/src/keys/universal_signer.rs b/src/keys/universal_signer.rs index 6bfd718..2a3db9d 100644 --- a/src/keys/universal_signer.rs +++ b/src/keys/universal_signer.rs @@ -3,6 +3,7 @@ use ethers::core::types::transaction::eip2718::TypedTransaction; use ethers::core::types::transaction::eip712::Eip712; use ethers::core::types::{Address, Signature as EthSig}; use ethers::signers::{Signer, Wallet, WalletError}; +use ethers::types::Bytes; use thiserror::Error; use crate::aws::ethers_signer::AwsSigner; @@ -13,6 +14,20 @@ pub enum UniversalSigner { Local(Wallet), } +impl UniversalSigner { + pub async fn raw_signed_tx( + &self, + tx: &TypedTransaction, + ) -> eyre::Result { + let signature = match self { + Self::Aws(signer) => signer.sign_transaction(tx).await?, + Self::Local(signer) => signer.sign_transaction(tx).await?, + }; + + Ok(tx.rlp_signed(&signature)) + } +} + #[derive(Debug, Error)] pub enum UniversalError { #[error("AWS Signer Error: {0}")] diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index d541928..eb3ab9a 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -5,7 +5,7 @@ use std::time::Duration; use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::transaction::eip2930::AccessList; -use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress}; +use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, H256}; use eyre::ContextCompat; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -19,38 +19,32 @@ use crate::db::UnsentTx; pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { loop { - let mut txs = app.db.get_unsent_txs().await?; - - txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); - - let txs_by_relayer = - txs.into_iter().group_by(|tx| tx.relayer_id.clone()); - - let txs_by_relayer: HashMap<_, _> = txs_by_relayer - .into_iter() - .map(|(relayer_id, txs)| { - let mut txs = txs.collect_vec(); - - txs.sort_unstable_by_key(|tx| tx.nonce); + // Get all unsent txs and broadcast + let txs = app.db.get_unsent_txs().await?; + broadcast_unsent_txs(&app, txs).await?; + tokio::time::sleep(Duration::from_secs(1)).await; + } +} - (relayer_id, txs) - }) - .collect(); +async fn broadcast_unsent_txs( + app: &App, + txs: Vec, +) -> eyre::Result<()> { + let txs_by_relayer = sort_txs_by_relayer(txs); - let mut futures = FuturesUnordered::new(); + let mut futures = FuturesUnordered::new(); - for (relayer_id, txs) in txs_by_relayer { - futures.push(broadcast_relayer_txs(&app, relayer_id, txs)); - } + for (relayer_id, txs) in txs_by_relayer { + futures.push(broadcast_relayer_txs(app, relayer_id, txs)); + } - while let Some(result) = futures.next().await { - if let Err(err) = result { - tracing::error!(error = ?err, "Failed broadcasting txs"); - } + while let Some(result) = futures.next().await { + if let Err(err) = result { + tracing::error!(error = ?err, "Failed broadcasting txs"); } - - tokio::time::sleep(Duration::from_secs(1)).await; } + + Ok(()) } #[tracing::instrument(skip(app, txs))] @@ -96,44 +90,74 @@ async fn broadcast_relayer_txs( max_base_fee_per_gas, ); - let eip1559_tx = Eip1559TransactionRequest { - from: None, - to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), - gas: Some(tx.gas_limit.0), - value: Some(tx.value.0), - data: Some(tx.data.into()), - nonce: Some(tx.nonce.into()), - access_list: AccessList::default(), - max_priority_fee_per_gas: Some(max_priority_fee_per_gas), - max_fee_per_gas: Some(max_fee_per_gas), - chain_id: Some(tx.chain_id.into()), + let mut typed_transaction = + TypedTransaction::Eip1559(Eip1559TransactionRequest { + from: None, + to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), + gas: Some(tx.gas_limit.0), + value: Some(tx.value.0), + data: Some(tx.data.into()), + nonce: Some(tx.nonce.into()), + access_list: AccessList::default(), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + chain_id: Some(tx.chain_id.into()), + }); + + // Fill and simulate the transaction + middleware + .fill_transaction(&mut typed_transaction, None) + .await?; + + // Simulate the transaction + match middleware.call(&typed_transaction, None).await { + Ok(_) => { + tracing::info!(?tx.id, "Tx simulated successfully"); + } + Err(err) => { + tracing::error!(?tx.id, error = ?err, "Failed to simulate tx"); + continue; + } }; - tracing::debug!(?eip1559_tx, "Sending tx"); + // Get the raw signed tx and derive the tx hash + let raw_signed_tx = middleware + .signer() + .raw_signed_tx(&typed_transaction) + .await?; + + let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx)); + + app.db + .insert_into_tx_hashes( + &tx.id, + tx_hash, + max_fee_per_gas, + max_priority_fee_per_gas, + ) + .await?; + + tracing::debug!(?tx.id, "Sending tx"); // TODO: Is it possible that we send a tx but don't store it in the DB? // TODO: Be smarter about error handling - a tx can fail to be sent // e.g. because the relayer is out of funds // but we don't want to retry it forever - let pending_tx = middleware - .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) - .await; + let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await; - let pending_tx = match pending_tx { + match pending_tx { Ok(pending_tx) => { tracing::info!(?pending_tx, "Tx sent successfully"); - pending_tx } Err(err) => { - tracing::error!(error = ?err, "Failed to send tx"); + tracing::error!(?tx.id, error = ?err, "Failed to send tx"); continue; } }; - let tx_hash = pending_tx.tx_hash(); - + // Insert the tx into app.db - .insert_tx_broadcast( + .insert_into_sent_transactions( &tx.id, tx_hash, max_fee_per_gas, @@ -146,3 +170,21 @@ async fn broadcast_relayer_txs( Ok(()) } + +fn sort_txs_by_relayer( + mut txs: Vec, +) -> HashMap> { + txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); + let txs_by_relayer = txs.into_iter().group_by(|tx| tx.relayer_id.clone()); + + txs_by_relayer + .into_iter() + .map(|(relayer_id, txs)| { + let mut txs = txs.collect_vec(); + + txs.sort_unstable_by_key(|tx| tx.nonce); + + (relayer_id, txs) + }) + .collect() +} diff --git a/src/tasks/finalize.rs b/src/tasks/finalize.rs index 4695153..9ee3d87 100644 --- a/src/tasks/finalize.rs +++ b/src/tasks/finalize.rs @@ -7,15 +7,15 @@ const TIME_BETWEEN_FINALIZATIONS_SECONDS: i64 = 60; pub async fn finalize_txs(app: Arc) -> eyre::Result<()> { loop { - let finalization_timestmap = + let finalization_timestamp = chrono::Utc::now() - chrono::Duration::seconds(60 * 60); tracing::info!( "Finalizing txs mined before {}", - finalization_timestmap + finalization_timestamp ); - app.db.finalize_txs(finalization_timestmap).await?; + app.db.finalize_txs(finalization_timestamp).await?; tokio::time::sleep(Duration::from_secs( TIME_BETWEEN_FINALIZATIONS_SECONDS as u64,