From 9139fafce18362fbbd96745ea8bf0ca5453b1cd9 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Tue, 12 Dec 2023 20:24:37 +0100 Subject: [PATCH 01/11] Make transfers in tests more parallel --- tests/send_many_txs.rs | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/tests/send_many_txs.rs b/tests/send_many_txs.rs index ed6cfb3..5476a3e 100644 --- a/tests/send_many_txs.rs +++ b/tests/send_many_txs.rs @@ -1,5 +1,7 @@ mod common; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use tx_sitter::server::routes::relayer::CreateApiKeyResponse; use crate::common::prelude::*; @@ -59,18 +61,29 @@ async fn send_many_txs() -> eyre::Result<()> { let value: U256 = parse_units("10", "ether")?.into(); let num_transfers = 10; + let mut tasks = FuturesUnordered::new(); for _ in 0..num_transfers { - client - .send_tx( - &api_key, - &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, - gas_limit: U256::from(21_000), - ..Default::default() - }, - ) - .await?; + let client = &client; + tasks.push(async { + client + .send_tx( + &api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS, + value, + gas_limit: U256::from(21_000), + ..Default::default() + }, + ) + .await?; + + Ok(()) + }); + } + + while let Some(result) = tasks.next().await { + let result: eyre::Result<()> = result; + result?; } let expected_balance = value * num_transfers; From 3d369f329191e912629abb037785dfe166e180af Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 20:56:22 -0500 Subject: [PATCH 02/11] updated logic to back fill blocks that have been missed --- .gitignore | 1 + src/db.rs | 20 +++++++++ src/tasks/index.rs | 106 +++++++++++++++++++++++++++++---------------- 3 files changed, 89 insertions(+), 38 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 9676582..b724e24 100644 --- a/src/db.rs +++ b/src/db.rs @@ -303,6 +303,26 @@ impl Database { Ok(block_number.map(|(n,)| n as u64)) } + pub async fn get_latest_block_number( + &self, + chain_id: u64, + ) -> eyre::Result { + let (block_number,): (i64,) = sqlx::query_as( + r#" + SELECT block_number + FROM blocks + WHERE chain_id = $1 + ORDER BY block_number DESC + LIMIT 1 + "#, + ) + .bind(chain_id as i64) + .fetch_one(&self.pool) + .await?; + + Ok(block_number as u64) + } + pub async fn get_latest_block_fees_by_chain_id( &self, chain_id: u64, diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 0483938..554bdba 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -3,7 +3,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use ethers::providers::{Http, Middleware, Provider}; -use ethers::types::BlockNumber; +use ethers::types::{Block, BlockNumber, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -26,53 +26,83 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { let mut blocks_stream = ws_rpc.subscribe_blocks().await?; + let next_block_number = + app.db.get_latest_block_number(chain_id).await? + 1; + + if let Some(latest_block) = blocks_stream.next().await { + let latest_block_number = latest_block + .number + .context("Missing block number")? + .as_u64(); + + if latest_block_number > next_block_number { + for block_number in next_block_number..=latest_block_number { + let block = rpc + .get_block::(block_number.into()) + .await? + .context(format!( + "Could not get block at height {}", + block_number + ))?; + + index_block(app.clone(), chain_id, &rpc, block).await?; + } + } + } + while let Some(block) = blocks_stream.next().await { - let block_number = - block.number.context("Missing block number")?.as_u64(); + index_block(app.clone(), chain_id, &rpc, block).await?; + } + } +} - tracing::info!(block_number, "Indexing block"); +pub async fn index_block( + app: Arc, + chain_id: u64, + rpc: &Provider, + block: Block, +) -> eyre::Result<()> { + let block_number = block.number.context("Missing block number")?.as_u64(); - let block_timestamp_seconds = block.timestamp.as_u64(); - let block_timestamp = DateTime::::from_timestamp( - block_timestamp_seconds as i64, - 0, - ) - .context("Invalid timestamp")?; + tracing::info!(block_number, "Indexing block"); - let block = rpc - .get_block(block_number) - .await? - .context("Missing block")?; + let block_timestamp_seconds = block.timestamp.as_u64(); + let block_timestamp = + DateTime::::from_timestamp(block_timestamp_seconds as i64, 0) + .context("Invalid timestamp")?; - app.db - .save_block( - block.number.unwrap().as_u64(), - chain_id, - block_timestamp, - &block.transactions, - ) - .await?; + let block = rpc + .get_block(block_number) + .await? + .context("Missing block")?; + + app.db + .save_block( + block.number.unwrap().as_u64(), + chain_id, + block_timestamp, + &block.transactions, + ) + .await?; - let mined_txs = app.db.mine_txs(chain_id).await?; + let mined_txs = app.db.mine_txs(chain_id).await?; - let metric_labels = [("chain_id", chain_id.to_string())]; - for tx in mined_txs { - tracing::info!( - id = tx.0, - hash = ?tx.1, - "Tx mined" - ); + let metric_labels: [(&str, String); 1] = + [("chain_id", chain_id.to_string())]; + for tx in mined_txs { + tracing::info!( + id = tx.0, + hash = ?tx.1, + "Tx mined" + ); - metrics::increment_counter!("tx_mined", &metric_labels); - } + metrics::increment_counter!("tx_mined", &metric_labels); + } - let relayer_addresses = - app.db.get_relayer_addresses(chain_id).await?; + let relayer_addresses = app.db.get_relayer_addresses(chain_id).await?; - update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id) - .await?; - } - } + update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id).await?; + Ok(()) } pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> { From d8eec4908a41f8331799537aba752a582bf04a23 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 21:18:32 -0500 Subject: [PATCH 03/11] adding comments --- src/tasks/index.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 554bdba..9f88bb0 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -24,11 +24,14 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { let ws_rpc = app.ws_provider(chain_id).await?; let rpc = app.http_provider(chain_id).await?; + // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream let mut blocks_stream = ws_rpc.subscribe_blocks().await?; + // Get the latest block from the db let next_block_number = app.db.get_latest_block_number(chain_id).await? + 1; + // Get the first block from the stream and backfill any missing blocks if let Some(latest_block) = blocks_stream.next().await { let latest_block_number = latest_block .number @@ -36,7 +39,8 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { .as_u64(); if latest_block_number > next_block_number { - for block_number in next_block_number..=latest_block_number { + // Backfill blocks between the last synced block and the chain head + for block_number in next_block_number..latest_block_number { let block = rpc .get_block::(block_number.into()) .await? @@ -47,9 +51,13 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { index_block(app.clone(), chain_id, &rpc, block).await?; } + + // Index the latest block after backfilling + index_block(app.clone(), chain_id, &rpc, latest_block).await?; } } + // Index incoming blocks from the stream while let Some(block) = blocks_stream.next().await { index_block(app.clone(), chain_id, &rpc, block).await?; } From 80656c1242ab763eda3a32abb70712f49eef6099 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 21:28:53 -0500 Subject: [PATCH 04/11] updated tests --- src/db.rs | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/db.rs b/src/db.rs index b724e24..662ffa2 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1089,9 +1089,17 @@ mod tests { let url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); - let db = Database::new(&DatabaseConfig::connection_string(url)).await?; + for _ in 0..5 { + match Database::new(&DatabaseConfig::connection_string(&url)).await + { + Ok(db) => return Ok((db, db_container)), + Err(_) => { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } - Ok((db, db_container)) + Err(eyre::eyre!("Failed to connect to the database")) } async fn full_update( @@ -1406,7 +1414,7 @@ mod tests { async fn blocks() -> eyre::Result<()> { let (db, _db_container) = setup_db().await?; - let block_number = 1; + let block_numbers = vec![0, 1]; let chain_id = 1; let timestamp = ymd_hms(2023, 11, 23, 12, 32, 2); let txs = &[ @@ -1415,7 +1423,10 @@ mod tests { H256::from_low_u64_be(3), ]; - db.save_block(block_number, chain_id, timestamp, txs) + db.save_block(block_numbers[0], chain_id, timestamp, txs) + .await?; + + db.save_block(block_numbers[1], chain_id, timestamp, txs) .await?; let fee_estimates = FeesEstimate { @@ -1425,13 +1436,19 @@ mod tests { let gas_price = U256::from(1_000_000_007); - db.save_block_fees(block_number, chain_id, &fee_estimates, gas_price) - .await?; + db.save_block_fees( + block_numbers[1], + chain_id, + &fee_estimates, + gas_price, + ) + .await?; + let latest_block_number = db.get_latest_block_number(chain_id).await?; let block_fees = db.get_latest_block_fees_by_chain_id(chain_id).await?; - let block_fees = block_fees.context("Missing fees")?; + assert_eq!(latest_block_number, block_numbers[1]); assert_eq!( block_fees.fee_estimates.base_fee_per_gas, fee_estimates.base_fee_per_gas From b73098c0b679f763015c4731765bf1734f46ca21 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 22:07:04 -0500 Subject: [PATCH 05/11] updated get_latest_block_number --- src/db.rs | 13 +++++---- src/tasks/index.rs | 71 ++++++++++++++++++++++++++++------------------ 2 files changed, 52 insertions(+), 32 deletions(-) diff --git a/src/db.rs b/src/db.rs index 662ffa2..1f1e07e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -306,8 +306,8 @@ impl Database { pub async fn get_latest_block_number( &self, chain_id: u64, - ) -> eyre::Result { - let (block_number,): (i64,) = sqlx::query_as( + ) -> eyre::Result> { + let block_number: Option<(i64,)> = sqlx::query_as( r#" SELECT block_number FROM blocks @@ -317,10 +317,10 @@ impl Database { "#, ) .bind(chain_id as i64) - .fetch_one(&self.pool) + .fetch_optional(&self.pool) .await?; - Ok(block_number as u64) + Ok(block_number.map(|(n,)| n as u64)) } pub async fn get_latest_block_fees_by_chain_id( @@ -1444,7 +1444,10 @@ mod tests { ) .await?; - let latest_block_number = db.get_latest_block_number(chain_id).await?; + let latest_block_number = + db.get_latest_block_number(chain_id) + .await? + .context("Could not get latest block number")?; let block_fees = db.get_latest_block_fees_by_chain_id(chain_id).await?; let block_fees = block_fees.context("Missing fees")?; diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 9f88bb0..3882d06 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, Utc}; -use ethers::providers::{Http, Middleware, Provider}; +use ethers::providers::{Http, Middleware, Provider, SubscriptionStream, Ws}; use ethers::types::{Block, BlockNumber, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; @@ -27,34 +27,12 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream let mut blocks_stream = ws_rpc.subscribe_blocks().await?; - // Get the latest block from the db - let next_block_number = - app.db.get_latest_block_number(chain_id).await? + 1; + // Get the first block from the stream, backfilling any missing blocks between the latest block in the db - // Get the first block from the stream and backfill any missing blocks + //TODO: note in the comments that this fills the block if let Some(latest_block) = blocks_stream.next().await { - let latest_block_number = latest_block - .number - .context("Missing block number")? - .as_u64(); - - if latest_block_number > next_block_number { - // Backfill blocks between the last synced block and the chain head - for block_number in next_block_number..latest_block_number { - let block = rpc - .get_block::(block_number.into()) - .await? - .context(format!( - "Could not get block at height {}", - block_number - ))?; - - index_block(app.clone(), chain_id, &rpc, block).await?; - } - - // Index the latest block after backfilling - index_block(app.clone(), chain_id, &rpc, latest_block).await?; - } + backfill_to_block(app.clone(), chain_id, &rpc, latest_block) + .await?; } // Index incoming blocks from the stream @@ -113,6 +91,45 @@ pub async fn index_block( Ok(()) } +pub async fn backfill_to_block( + app: Arc, + chain_id: u64, + rpc: &Provider, + latest_block: Block, +) -> eyre::Result<()> { + // Get the latest block from the db + if let Some(latest_db_block_number) = + app.db.get_latest_block_number(chain_id).await? + { + let next_block_number: u64 = latest_db_block_number + 1; + + // Get the first block from the stream and backfill any missing blocks + let latest_block_number = latest_block + .number + .context("Missing block number")? + .as_u64(); + + if latest_block_number > next_block_number { + // Backfill blocks between the last synced block and the chain head, non inclusive + for block_number in next_block_number..latest_block_number { + let block = rpc + .get_block::(block_number.into()) + .await? + .context(format!( + "Could not get block at height {}", + block_number + ))?; + + index_block(app.clone(), chain_id, &rpc, block).await?; + } + } + + // Index the latest block after backfilling + index_block(app.clone(), chain_id, &rpc, latest_block).await?; + }; + Ok(()) +} + pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> { let rpc = app.http_provider(chain_id).await?; From 06116d81aa6c73e68b714df207b8d4d43a665e63 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 22:11:26 -0500 Subject: [PATCH 06/11] updated comments --- src/tasks/index.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 3882d06..b54676f 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -27,9 +27,7 @@ pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream let mut blocks_stream = ws_rpc.subscribe_blocks().await?; - // Get the first block from the stream, backfilling any missing blocks between the latest block in the db - - //TODO: note in the comments that this fills the block + // Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head if let Some(latest_block) = blocks_stream.next().await { backfill_to_block(app.clone(), chain_id, &rpc, latest_block) .await?; From 299a8568623bb08b2939cdb5dfb389a3293a1c72 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 22:18:53 -0500 Subject: [PATCH 07/11] cargo clippy --- src/db.rs | 2 +- src/tasks/index.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/db.rs b/src/db.rs index 1f1e07e..965117a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1414,7 +1414,7 @@ mod tests { async fn blocks() -> eyre::Result<()> { let (db, _db_container) = setup_db().await?; - let block_numbers = vec![0, 1]; + let block_numbers = [0, 1]; let chain_id = 1; let timestamp = ymd_hms(2023, 11, 23, 12, 32, 2); let txs = &[ diff --git a/src/tasks/index.rs b/src/tasks/index.rs index b54676f..eca129f 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, Utc}; -use ethers::providers::{Http, Middleware, Provider, SubscriptionStream, Ws}; +use ethers::providers::{Http, Middleware, Provider}; use ethers::types::{Block, BlockNumber, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; @@ -85,7 +85,7 @@ pub async fn index_block( let relayer_addresses = app.db.get_relayer_addresses(chain_id).await?; - update_relayer_nonces(relayer_addresses, &app, &rpc, chain_id).await?; + update_relayer_nonces(relayer_addresses, &app, rpc, chain_id).await?; Ok(()) } @@ -118,12 +118,12 @@ pub async fn backfill_to_block( block_number ))?; - index_block(app.clone(), chain_id, &rpc, block).await?; + index_block(app.clone(), chain_id, rpc, block).await?; } } // Index the latest block after backfilling - index_block(app.clone(), chain_id, &rpc, latest_block).await?; + index_block(app.clone(), chain_id, rpc, latest_block).await?; }; Ok(()) } From 41fd8af3bfaeba250026452ba28519b8d93dd728 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Tr=C4=85d?= Date: Wed, 13 Dec 2023 16:17:23 +0100 Subject: [PATCH 08/11] Preconfigured start - for dockerization (#6) * Allow preconfigured networks & relayers * Only allow single network and relayer --- config.toml | 10 ++++ src/config.rs | 42 +++++++++++++- src/keys.rs | 125 ++--------------------------------------- src/keys/kms_keys.rs | 59 +++++++++++++++++++ src/keys/local_keys.rs | 69 +++++++++++++++++++++++ src/service.rs | 43 ++++++++++++++ tests/common/mod.rs | 52 +++++++++++------ tests/rpc_access.rs | 9 +-- tests/send_many_txs.rs | 41 ++------------ tests/send_tx.rs | 39 +------------ 10 files changed, 269 insertions(+), 220 deletions(-) create mode 100644 src/keys/kms_keys.rs create mode 100644 src/keys/local_keys.rs diff --git a/config.toml b/config.toml index f2f5ec2..cc2e14e 100644 --- a/config.toml +++ b/config.toml @@ -3,6 +3,16 @@ escalation_interval = "1m" datadog_enabled = false statsd_enabled = false +[predefined.network] +chain_id = 31337 +http_url = "http://127.0.0.1:8545" +ws_url = "ws://127.0.0.1:8545" + +[predefined.relayer] +id = "1b908a34-5dc1-4d2d-a146-5eb46e975830" +chain_id = 31337 +key_id = "d10607662a85424f02a33fb1e6d095bd0ac7154396ff09762e41f82ff2233aaa" + [server] host = "127.0.0.1:3000" disable_auth = false diff --git a/src/config.rs b/src/config.rs index 53ad624..7e1e534 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,34 @@ pub struct TxSitterConfig { #[serde(default)] pub statsd_enabled: bool, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub predefined: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct Predefined { + pub network: PredefinedNetwork, + pub relayer: PredefinedRelayer, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct PredefinedNetwork { + pub chain_id: u64, + pub name: String, + pub http_rpc: String, + pub ws_rpc: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct PredefinedRelayer { + pub id: String, + pub name: String, + pub key_id: String, + pub chain_id: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -101,10 +129,16 @@ pub enum KeysConfig { #[serde(rename_all = "snake_case")] pub struct KmsKeysConfig {} -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct LocalKeysConfig {} +impl KeysConfig { + pub fn is_local(&self) -> bool { + matches!(self, Self::Local(_)) + } +} + #[cfg(test)] mod tests { use indoc::indoc; @@ -156,6 +190,7 @@ mod tests { escalation_interval: Duration::from_secs(60 * 60), datadog_enabled: false, statsd_enabled: false, + predefined: None, }, server: ServerConfig { host: SocketAddr::from(([127, 0, 0, 1], 3000)), @@ -166,7 +201,7 @@ mod tests { "postgres://postgres:postgres@127.0.0.1:52804/database" .to_string(), ), - keys: KeysConfig::Local(LocalKeysConfig {}), + keys: KeysConfig::Local(LocalKeysConfig::default()), }; let toml = toml::to_string_pretty(&config).unwrap(); @@ -181,6 +216,7 @@ mod tests { escalation_interval: Duration::from_secs(60 * 60), datadog_enabled: false, statsd_enabled: false, + predefined: None, }, server: ServerConfig { host: SocketAddr::from(([127, 0, 0, 1], 3000)), @@ -194,7 +230,7 @@ mod tests { password: "pass".to_string(), database: "db".to_string(), }), - keys: KeysConfig::Local(LocalKeysConfig {}), + keys: KeysConfig::Local(LocalKeysConfig::default()), }; let toml = toml::to_string_pretty(&config).unwrap(); diff --git a/src/keys.rs b/src/keys.rs index 01c4647..3dc171c 100644 --- a/src/keys.rs +++ b/src/keys.rs @@ -1,14 +1,10 @@ -use aws_config::BehaviorVersion; -use aws_sdk_kms::types::{KeySpec, KeyUsageType}; -use ethers::core::k256::ecdsa::SigningKey; -use ethers::signers::Wallet; -use eyre::{Context, ContextCompat}; -pub use universal_signer::UniversalSigner; - -use crate::aws::ethers_signer::AwsSigner; -use crate::config::{KmsKeysConfig, LocalKeysConfig}; +pub mod kms_keys; +pub mod local_keys; +pub mod universal_signer; -mod universal_signer; +pub use kms_keys::KmsKeys; +pub use local_keys::LocalKeys; +pub use universal_signer::UniversalSigner; #[async_trait::async_trait] pub trait KeysSource: Send + Sync + 'static { @@ -18,112 +14,3 @@ pub trait KeysSource: Send + Sync + 'static { /// Loads the key using the provided id async fn load_signer(&self, id: String) -> eyre::Result; } - -pub struct KmsKeys { - kms_client: aws_sdk_kms::Client, -} - -impl KmsKeys { - pub async fn new(_config: &KmsKeysConfig) -> eyre::Result { - let aws_config = - aws_config::load_defaults(BehaviorVersion::latest()).await; - - let kms_client = aws_sdk_kms::Client::new(&aws_config); - - Ok(Self { kms_client }) - } -} - -#[async_trait::async_trait] -impl KeysSource for KmsKeys { - async fn new_signer(&self) -> eyre::Result<(String, UniversalSigner)> { - let kms_key = self - .kms_client - .create_key() - .key_spec(KeySpec::EccSecgP256K1) - .key_usage(KeyUsageType::SignVerify) - .send() - .await - .context("AWS Error")?; - - let key_id = - kms_key.key_metadata.context("Missing key metadata")?.key_id; - - let signer = AwsSigner::new( - self.kms_client.clone(), - key_id.clone(), - 1, // TODO: get chain id from provider - ) - .await?; - - Ok((key_id, UniversalSigner::Aws(signer))) - } - - async fn load_signer(&self, id: String) -> eyre::Result { - let signer = AwsSigner::new( - self.kms_client.clone(), - id.clone(), - 1, // TODO: get chain id from provider - ) - .await?; - - Ok(UniversalSigner::Aws(signer)) - } -} - -pub struct LocalKeys { - rng: rand::rngs::OsRng, -} - -impl LocalKeys { - pub fn new(_config: &LocalKeysConfig) -> Self { - Self { - rng: rand::rngs::OsRng, - } - } -} - -#[async_trait::async_trait] -impl KeysSource for LocalKeys { - async fn new_signer(&self) -> eyre::Result<(String, UniversalSigner)> { - let signing_key = SigningKey::random(&mut self.rng.clone()); - - let key_id = signing_key.to_bytes().to_vec(); - let key_id = hex::encode(key_id); - - let signer = Wallet::from(signing_key); - - Ok((key_id, UniversalSigner::Local(signer))) - } - - async fn load_signer(&self, id: String) -> eyre::Result { - let key_id = hex::decode(id)?; - let signing_key = SigningKey::from_slice(key_id.as_slice())?; - - let signer = Wallet::from(signing_key); - - Ok(UniversalSigner::Local(signer)) - } -} - -#[cfg(test)] -mod tests { - use ethers::signers::Signer; - - use super::*; - - #[tokio::test] - async fn local_roundtrip() -> eyre::Result<()> { - let keys_source = LocalKeys::new(&LocalKeysConfig {}); - - let (id, signer) = keys_source.new_signer().await?; - - let address = signer.address(); - - let signer = keys_source.load_signer(id).await?; - - assert_eq!(address, signer.address()); - - Ok(()) - } -} diff --git a/src/keys/kms_keys.rs b/src/keys/kms_keys.rs new file mode 100644 index 0000000..baabb29 --- /dev/null +++ b/src/keys/kms_keys.rs @@ -0,0 +1,59 @@ +use aws_config::BehaviorVersion; +use aws_sdk_kms::types::{KeySpec, KeyUsageType}; +use eyre::{Context, ContextCompat}; + +use super::{KeysSource, UniversalSigner}; +use crate::aws::ethers_signer::AwsSigner; +use crate::config::KmsKeysConfig; + +pub struct KmsKeys { + kms_client: aws_sdk_kms::Client, +} + +impl KmsKeys { + pub async fn new(_config: &KmsKeysConfig) -> eyre::Result { + let aws_config = + aws_config::load_defaults(BehaviorVersion::latest()).await; + + let kms_client = aws_sdk_kms::Client::new(&aws_config); + + Ok(Self { kms_client }) + } +} + +#[async_trait::async_trait] +impl KeysSource for KmsKeys { + async fn new_signer(&self) -> eyre::Result<(String, UniversalSigner)> { + let kms_key = self + .kms_client + .create_key() + .key_spec(KeySpec::EccSecgP256K1) + .key_usage(KeyUsageType::SignVerify) + .send() + .await + .context("AWS Error")?; + + let key_id = + kms_key.key_metadata.context("Missing key metadata")?.key_id; + + let signer = AwsSigner::new( + self.kms_client.clone(), + key_id.clone(), + 1, // TODO: get chain id from provider + ) + .await?; + + Ok((key_id, UniversalSigner::Aws(signer))) + } + + async fn load_signer(&self, id: String) -> eyre::Result { + let signer = AwsSigner::new( + self.kms_client.clone(), + id.clone(), + 1, // TODO: get chain id from provider + ) + .await?; + + Ok(UniversalSigner::Aws(signer)) + } +} diff --git a/src/keys/local_keys.rs b/src/keys/local_keys.rs new file mode 100644 index 0000000..8b6e334 --- /dev/null +++ b/src/keys/local_keys.rs @@ -0,0 +1,69 @@ +use ethers::core::k256::ecdsa::SigningKey; +use ethers::signers::Wallet; + +use super::universal_signer::UniversalSigner; +use super::KeysSource; +use crate::config::LocalKeysConfig; + +pub struct LocalKeys { + rng: rand::rngs::OsRng, +} + +impl LocalKeys { + pub fn new(_config: &LocalKeysConfig) -> Self { + Self { + rng: rand::rngs::OsRng, + } + } +} + +#[async_trait::async_trait] +impl KeysSource for LocalKeys { + async fn new_signer(&self) -> eyre::Result<(String, UniversalSigner)> { + let signing_key = SigningKey::random(&mut self.rng.clone()); + + let key_id = signing_key.to_bytes().to_vec(); + let key_id = hex::encode(key_id); + + let signer = Wallet::from(signing_key); + + Ok((key_id, UniversalSigner::Local(signer))) + } + + async fn load_signer(&self, id: String) -> eyre::Result { + let signing_key = signing_key_from_hex(&id)?; + + let signer = Wallet::from(signing_key); + + Ok(UniversalSigner::Local(signer)) + } +} + +pub fn signing_key_from_hex(s: &str) -> eyre::Result { + let key_id = hex::decode(s)?; + let signing_key = SigningKey::from_slice(key_id.as_slice())?; + + Ok(signing_key) +} + +#[cfg(test)] +mod tests { + use ethers::signers::Signer; + + use super::*; + + #[tokio::test] + async fn local_roundtrip() -> eyre::Result<()> { + let keys_source = LocalKeys::new(&LocalKeysConfig::default()); + + let (id, signer) = keys_source.new_signer().await?; + + let address = signer.address(); + + let signer = keys_source.load_signer(id).await?; + + assert_eq!(address, signer.address()); + + Ok(()) + } +} diff --git a/src/service.rs b/src/service.rs index 599b0ac..a14598d 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,10 +1,12 @@ use std::net::SocketAddr; use std::sync::Arc; +use ethers::signers::{Signer, Wallet}; use tokio::task::JoinHandle; use crate::app::App; use crate::config::Config; +use crate::keys::local_keys::signing_key_from_hex; use crate::task_runner::TaskRunner; use crate::tasks; @@ -44,6 +46,8 @@ impl Service { Ok(()) }); + initialize_predefined_values(&app).await?; + Ok(Self { _app: app, local_addr, @@ -78,3 +82,42 @@ impl Service { Ok(()) } } + +async fn initialize_predefined_values( + app: &Arc, +) -> Result<(), eyre::Error> { + if app.config.service.predefined.is_some() && !app.config.keys.is_local() { + eyre::bail!("Predefined relayers are only supported with local keys"); + } + + let predefined = app.config.service.predefined.as_ref().unwrap(); + + app.db + .create_network( + predefined.network.chain_id, + &predefined.network.name, + &predefined.network.http_rpc, + &predefined.network.ws_rpc, + ) + .await?; + + let task_runner = TaskRunner::new(app.clone()); + Service::spawn_chain_tasks(&task_runner, predefined.network.chain_id)?; + + let secret_key = signing_key_from_hex(&predefined.relayer.key_id)?; + + let signer = Wallet::from(secret_key); + let address = signer.address(); + + app.db + .create_relayer( + &predefined.relayer.id, + &predefined.relayer.name, + predefined.relayer.chain_id, + &predefined.relayer.key_id, + address, + ) + .await?; + + Ok(()) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index c24a844..033ea32 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -18,10 +18,9 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; use tx_sitter::client::TxSitterClient; use tx_sitter::config::{ - Config, DatabaseConfig, KeysConfig, LocalKeysConfig, ServerConfig, - TxSitterConfig, + Config, DatabaseConfig, KeysConfig, LocalKeysConfig, Predefined, + PredefinedNetwork, PredefinedRelayer, ServerConfig, TxSitterConfig, }; -use tx_sitter::server::routes::network::NewNetworkInfo; use tx_sitter::service::Service; pub type AppMiddleware = SignerMiddleware>, LocalWallet>; @@ -49,12 +48,18 @@ pub const DEFAULT_ANVIL_PRIVATE_KEY: &[u8] = &hex_literal::hex!( "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" ); +pub const SECONDARY_ANVIL_PRIVATE_KEY: &[u8] = &hex_literal::hex!( + "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" +); + pub const ARBITRARY_ADDRESS: Address = H160(hex_literal::hex!( "1Ed53d680B8890DAe2a63f673a85fFDE1FD5C7a2" )); pub const DEFAULT_ANVIL_CHAIN_ID: u64 = 31337; +pub const DEFAULT_RELAYER_ID: &str = "1b908a34-5dc1-4d2d-a146-5eb46e975830"; + pub struct DoubleAnvilHandle { pub double_anvil: Arc, ws_addr: String, @@ -104,7 +109,7 @@ pub async fn setup_double_anvil() -> eyre::Result { let middleware = setup_middleware( format!("http://{local_addr}"), - DEFAULT_ANVIL_PRIVATE_KEY, + SECONDARY_ANVIL_PRIVATE_KEY, ) .await?; @@ -138,11 +143,27 @@ pub async fn setup_service( ) -> eyre::Result<(Service, TxSitterClient)> { let rpc_url = anvil_handle.local_addr(); + let anvil_private_key = hex::encode(DEFAULT_ANVIL_PRIVATE_KEY); + let config = Config { service: TxSitterConfig { escalation_interval, datadog_enabled: false, statsd_enabled: false, + predefined: Some(Predefined { + network: PredefinedNetwork { + chain_id: DEFAULT_ANVIL_CHAIN_ID, + name: "Anvil".to_string(), + http_rpc: format!("http://{}", rpc_url), + ws_rpc: anvil_handle.ws_addr(), + }, + relayer: PredefinedRelayer { + name: "Anvil".to_string(), + id: DEFAULT_RELAYER_ID.to_string(), + key_id: anvil_private_key, + chain_id: DEFAULT_ANVIL_CHAIN_ID, + }, + }), }, server: ServerConfig { host: SocketAddr::V4(SocketAddrV4::new( @@ -153,7 +174,7 @@ pub async fn setup_service( password: None, }, database: DatabaseConfig::connection_string(db_connection_url), - keys: KeysConfig::Local(LocalKeysConfig {}), + keys: KeysConfig::Local(LocalKeysConfig::default()), }; let service = Service::new(config).await?; @@ -161,17 +182,6 @@ pub async fn setup_service( let client = TxSitterClient::new(format!("http://{}", service.local_addr())); - client - .create_network( - DEFAULT_ANVIL_CHAIN_ID, - &NewNetworkInfo { - name: "Anvil".to_string(), - http_rpc: format!("http://{}", rpc_url), - ws_rpc: anvil_handle.ws_addr(), - }, - ) - .await?; - Ok((service, client)) } @@ -179,7 +189,7 @@ pub async fn setup_middleware( rpc_url: impl AsRef, private_key: &[u8], ) -> eyre::Result { - let provider = Provider::::new(rpc_url.as_ref().parse()?); + let provider = setup_provider(rpc_url).await?; let wallet = LocalWallet::from(SigningKey::from_slice(private_key)?) .with_chain_id(provider.get_chainid().await?.as_u64()); @@ -188,3 +198,11 @@ pub async fn setup_middleware( Ok(middleware) } + +pub async fn setup_provider( + rpc_url: impl AsRef, +) -> eyre::Result> { + let provider = Provider::::new(rpc_url.as_ref().parse()?); + + Ok(provider) +} diff --git a/tests/rpc_access.rs b/tests/rpc_access.rs index 81509cc..c4ab6fe 100644 --- a/tests/rpc_access.rs +++ b/tests/rpc_access.rs @@ -18,15 +18,8 @@ async fn rpc_access() -> eyre::Result<()> { let (service, client) = setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; - let CreateRelayerResponse { relayer_id, .. } = client - .create_relayer(&CreateRelayerRequest { - name: "Test relayer".to_string(), - chain_id: DEFAULT_ANVIL_CHAIN_ID, - }) - .await?; - let CreateApiKeyResponse { api_key } = - client.create_relayer_api_key(&relayer_id).await?; + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; let rpc_url = format!("http://{}/1/api/{api_key}/rpc", service.local_addr()); diff --git a/tests/send_many_txs.rs b/tests/send_many_txs.rs index 5476a3e..1e0226c 100644 --- a/tests/send_many_txs.rs +++ b/tests/send_many_txs.rs @@ -18,44 +18,11 @@ async fn send_many_txs() -> eyre::Result<()> { let (_service, client) = setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; - let CreateRelayerResponse { - address: relayer_address, - relayer_id, - } = client - .create_relayer(&CreateRelayerRequest { - name: "Test relayer".to_string(), - chain_id: DEFAULT_ANVIL_CHAIN_ID, - }) - .await?; - let CreateApiKeyResponse { api_key } = - client.create_relayer_api_key(&relayer_id).await?; - - // Fund the relayer - let middleware = setup_middleware( - format!("http://{}", double_anvil.local_addr()), - DEFAULT_ANVIL_PRIVATE_KEY, - ) - .await?; - - let amount: U256 = parse_units("1000", "ether")?.into(); - - middleware - .send_transaction( - Eip1559TransactionRequest { - to: Some(relayer_address.into()), - value: Some(amount), - ..Default::default() - }, - None, - ) - .await? - .await?; - - let provider = middleware.provider(); - - let current_balance = provider.get_balance(relayer_address, None).await?; - assert_eq!(current_balance, amount); + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; + + let provider = + setup_provider(format!("http://{}", double_anvil.local_addr())).await?; // Send a transaction let value: U256 = parse_units("10", "ether")?.into(); diff --git a/tests/send_tx.rs b/tests/send_tx.rs index cff3991..b4236ca 100644 --- a/tests/send_tx.rs +++ b/tests/send_tx.rs @@ -16,44 +16,11 @@ async fn send_tx() -> eyre::Result<()> { let (_service, client) = setup_service(&double_anvil, &db_url, ESCALATION_INTERVAL).await?; - let CreateRelayerResponse { - address: relayer_address, - relayer_id, - } = client - .create_relayer(&CreateRelayerRequest { - name: "Test relayer".to_string(), - chain_id: DEFAULT_ANVIL_CHAIN_ID, - }) - .await?; - let CreateApiKeyResponse { api_key } = - client.create_relayer_api_key(&relayer_id).await?; - - // Fund the relayer - let middleware = setup_middleware( - format!("http://{}", double_anvil.local_addr()), - DEFAULT_ANVIL_PRIVATE_KEY, - ) - .await?; - - let amount: U256 = parse_units("100", "ether")?.into(); - - middleware - .send_transaction( - Eip1559TransactionRequest { - to: Some(relayer_address.into()), - value: Some(amount), - ..Default::default() - }, - None, - ) - .await? - .await?; - - let provider = middleware.provider(); + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; - let current_balance = provider.get_balance(relayer_address, None).await?; - assert_eq!(current_balance, amount); + let provider = + setup_provider(format!("http://{}", double_anvil.local_addr())).await?; // Send a transaction let value: U256 = parse_units("1", "ether")?.into(); From 74301e65d9fea173aedf540d95a1799decded434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Tr=C4=85d?= Date: Wed, 13 Dec 2023 18:28:29 +0100 Subject: [PATCH 09/11] Allow filtering txs by status (#7) * Allow filtering txs by status * Minor refactor --- src/db.rs | 24 ++++++++++++++++++++++++ src/db/data.rs | 2 +- src/server/routes/transaction.rs | 22 ++++++++++++++++++++-- 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/src/db.rs b/src/db.rs index 965117a..40457f1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -754,7 +754,13 @@ impl Database { pub async fn read_txs( &self, relayer_id: &str, + tx_status_filter: Option>, ) -> eyre::Result> { + let (should_filter, status_filter) = match tx_status_filter { + Some(status) => (true, status), + None => (false, None), + }; + Ok(sqlx::query_as( r#" SELECT t.id as tx_id, t.tx_to as to, t.data, t.value, t.gas_limit, t.nonce, @@ -763,9 +769,12 @@ impl Database { LEFT JOIN sent_transactions s ON t.id = s.tx_id LEFT JOIN tx_hashes h ON s.valid_tx_hash = h.tx_hash WHERE t.relayer_id = $1 + AND ($2 = true AND s.status = $3) OR $2 = false "#, ) .bind(relayer_id) + .bind(should_filter) + .bind(status_filter) .fetch_all(&self.pool) .await?) } @@ -1310,6 +1319,9 @@ mod tests { assert_eq!(tx.nonce, 0); assert_eq!(tx.tx_hash, None); + let unsent_txs = db.read_txs(relayer_id, None).await?; + assert_eq!(unsent_txs.len(), 1, "1 unsent tx"); + let tx_hash_1 = H256::from_low_u64_be(1); let tx_hash_2 = H256::from_low_u64_be(2); let initial_max_fee_per_gas = U256::from(1); @@ -1328,6 +1340,18 @@ mod tests { assert_eq!(tx.tx_hash.unwrap().0, tx_hash_1); assert_eq!(tx.status, Some(TxStatus::Pending)); + let unsent_txs = db.read_txs(relayer_id, Some(None)).await?; + assert_eq!(unsent_txs.len(), 0, "0 unsent tx"); + + let pending_txs = db + .read_txs(relayer_id, Some(Some(TxStatus::Pending))) + .await?; + assert_eq!(pending_txs.len(), 1, "1 pending tx"); + + let all_txs = db.read_txs(relayer_id, None).await?; + + assert_eq!(all_txs, pending_txs); + db.escalate_tx( tx_id, tx_hash_2, diff --git a/src/db/data.rs b/src/db/data.rs index 7e8bd80..058feb0 100644 --- a/src/db/data.rs +++ b/src/db/data.rs @@ -43,7 +43,7 @@ pub struct TxForEscalation { pub escalation_count: usize, } -#[derive(Debug, Clone, FromRow)] +#[derive(Debug, Clone, FromRow, PartialEq, Eq)] pub struct ReadTxData { pub tx_id: String, pub to: AddressWrapper, diff --git a/src/server/routes/transaction.rs b/src/server/routes/transaction.rs index a1dbca7..33f7d9e 100644 --- a/src/server/routes/transaction.rs +++ b/src/server/routes/transaction.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use axum::extract::{Json, Path, State}; +use axum::extract::{Json, Path, Query, State}; use ethers::types::{Address, Bytes, H256, U256}; use eyre::Result; use serde::{Deserialize, Serialize}; @@ -33,6 +33,13 @@ pub struct SendTxResponse { pub tx_id: String, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetTxQuery { + #[serde(default)] + pub status: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct GetTxResponse { @@ -104,12 +111,23 @@ pub async fn send_tx( pub async fn get_txs( State(app): State>, Path(api_token): Path, + Query(query): Query, ) -> Result>, ApiError> { if !app.is_authorized(&api_token).await? { return Err(ApiError::Unauthorized); } - let txs = app.db.read_txs(&api_token.relayer_id).await?; + let txs = match query.status { + Some(GetTxResponseStatus::TxStatus(status)) => { + app.db + .read_txs(&api_token.relayer_id, Some(Some(status))) + .await? + } + Some(GetTxResponseStatus::Unsent(_)) => { + app.db.read_txs(&api_token.relayer_id, Some(None)).await? + } + None => app.db.read_txs(&api_token.relayer_id, None).await?, + }; let txs = txs.into_iter() From 38da96958f4585fa84d2c5cbf5d23fa3d0d5eef3 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Wed, 13 Dec 2023 23:53:06 +0100 Subject: [PATCH 10/11] Example .env --- .env.example | 1 + .gitignore | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 .env.example diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..943896d --- /dev/null +++ b/.env.example @@ -0,0 +1 @@ +RUST_LOG=info,tx_sitter=debug,fake_rpc=debug,tower_http=debug diff --git a/.gitignore b/.gitignore index 9f97022..14ee500 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -target/ \ No newline at end of file +target/ +.env From dc962f01de207d10e127a05730f19dcaf06d67b2 Mon Sep 17 00:00:00 2001 From: Dzejkop Date: Wed, 13 Dec 2023 23:53:11 +0100 Subject: [PATCH 11/11] Fix --- src/service.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/service.rs b/src/service.rs index a14598d..37a71c1 100644 --- a/src/service.rs +++ b/src/service.rs @@ -90,7 +90,9 @@ async fn initialize_predefined_values( eyre::bail!("Predefined relayers are only supported with local keys"); } - let predefined = app.config.service.predefined.as_ref().unwrap(); + let Some(predefined) = app.config.service.predefined.as_ref() else { + return Ok(()); + }; app.db .create_network(