diff --git a/Cargo.lock b/Cargo.lock index 1d36aefe..608fec06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5341,6 +5341,7 @@ dependencies = [ "tracing-futures", "tracing-subscriber 0.3.17", "tracing-test", + "tx-sitter-client", "url", "zeroize", ] @@ -6312,6 +6313,19 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tx-sitter-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "ethers", + "reqwest", + "serde", + "serde_json", + "strum", + "tracing", +] + [[package]] name = "typenum" version = "1.15.0" diff --git a/Cargo.toml b/Cargo.toml index 70f0ac7a..556c8080 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ tokio = { version = "1.17", features = [ ] } tracing = "0.1" tracing-futures = "0.2" +tx-sitter-client = { path = "crates/tx-sitter-client" } url = { version = "2.2", features = ["serde"] } # `ethers-rs` requires an older version of primitive-types. # But `ruint` supports the latest version. So we need to override it. diff --git a/compose.yml b/compose.yml new file mode 100644 index 00000000..5b32ded6 --- /dev/null +++ b/compose.yml @@ -0,0 +1,36 @@ +version: '3' +services: + chain: + image: ghcr.io/foundry-rs/foundry + ports: + - 8545:8545 + command: ["anvil --host 0.0.0.0 --chain-id 31337 --block-time 2 --fork-url https://eth-sepolia.g.alchemy.com/v2/rbe0ZJX0TXJ7udcCB07HHggMt5x5Rcy9@4910128"] + tx-sitter-db: + image: postgres:latest + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=tx-sitter + sequencer-db: + image: postgres:latest + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_DB=sequencer + tx-sitter: + image: ghcr.io/worldcoin/tx-sitter-monolith:dev + ports: + - 3000:3000 + environment: + - TX_SITTER__SERVICE__ESCALATION_INTERVAL=1m + - TX_SITTER__DATABASE__KIND=connection_string + - TX_SITTER__DATABASE__CONNECTION_STRING=postgres://postgres:postgres@tx-sitter-db:5432/tx-sitter?sslmode=disable + - TX_SITTER__KEYS__KIND=local + - TX_SITTER__PREDEFINED__NETWORK__CHAIN_ID=31337 + - TX_SITTER__PREDEFINED__NETWORK__HTTP_URL=http://chain:8545 + - TX_SITTER__PREDEFINED__NETWORK__WS_URL=ws://chain:8545 + - TX_SITTER__PREDEFINED__RELAYER__ID=1b908a34-5dc1-4d2d-a146-5eb46e975830 + - TX_SITTER__PREDEFINED__RELAYER__CHAIN_ID=31337 + - TX_SITTER__PREDEFINED__RELAYER__KEY_ID=d10607662a85424f02a33fb1e6d095bd0ac7154396ff09762e41f82ff2233aaa + - TX_SITTER__SERVER__HOST=0.0.0.0:3000 + - TX_SITTER__SERVER__DISABLE_AUTH=true diff --git a/crates/tx-sitter-client/Cargo.toml b/crates/tx-sitter-client/Cargo.toml new file mode 100644 index 00000000..d3e6a026 --- /dev/null +++ b/crates/tx-sitter-client/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "tx-sitter-client" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +anyhow = "1.0" +ethers = { version = "2.0.10", features = [ ] } +reqwest = "0.11.14" +serde = { version = "1.0.154", features = ["derive"] } +serde_json = "1.0.94" +strum = { version = "0.25", features = ["derive"] } +tracing = "0.1" diff --git a/crates/tx-sitter-client/src/data.rs b/crates/tx-sitter-client/src/data.rs new file mode 100644 index 00000000..0a00e3f3 --- /dev/null +++ b/crates/tx-sitter-client/src/data.rs @@ -0,0 +1,72 @@ +use ethers::types::{Address, Bytes, H256, U256}; +use serde::{Deserialize, Serialize}; +use strum::Display; + +mod decimal_u256; + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SendTxRequest { + pub to: Address, + #[serde(with = "decimal_u256")] + pub value: U256, + #[serde(default)] + pub data: Option, + #[serde(with = "decimal_u256")] + pub gas_limit: U256, + #[serde(default)] + pub priority: TransactionPriority, + #[serde(default)] + pub tx_id: Option, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Copy, Default)] +#[serde(rename_all = "camelCase")] +pub enum TransactionPriority { + // 5th percentile + Slowest = 0, + // 25th percentile + Slow = 1, + // 50th percentile + #[default] + Regular = 2, + // 75th percentile + Fast = 3, + // 95th percentile + Fastest = 4, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SendTxResponse { + pub tx_id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetTxResponse { + pub tx_id: String, + pub to: Address, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub data: Option, + #[serde(with = "decimal_u256")] + pub value: U256, + #[serde(with = "decimal_u256")] + pub gas_limit: U256, + pub nonce: u64, + + // Sent tx data + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tx_hash: Option, + pub status: TxStatus, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Display, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +#[strum(serialize_all = "camelCase")] +pub enum TxStatus { + Unsent, + Pending, + Mined, + Finalized, +} diff --git a/crates/tx-sitter-client/src/data/decimal_u256.rs b/crates/tx-sitter-client/src/data/decimal_u256.rs new file mode 100644 index 00000000..f37e802b --- /dev/null +++ b/crates/tx-sitter-client/src/data/decimal_u256.rs @@ -0,0 +1,42 @@ +use ethers::types::U256; + +pub fn serialize(u256: &U256, serializer: S) -> Result +where + S: serde::Serializer, +{ + let s = u256.to_string(); + serializer.serialize_str(&s) +} + +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let s: &str = serde::Deserialize::deserialize(deserializer)?; + let u256 = U256::from_dec_str(s).map_err(serde::de::Error::custom)?; + Ok(u256) +} + +#[cfg(test)] +mod tests { + use serde::{Deserialize, Serialize}; + + use super::*; + + #[derive(Debug, Clone, Serialize, Deserialize)] + struct Test { + #[serde(with = "super")] + v: U256, + } + + #[test] + fn test_u256_serde() { + let test = Test { v: U256::from(123) }; + + let s = serde_json::to_string(&test).unwrap(); + assert_eq!(s, r#"{"v":"123"}"#); + + let test: Test = serde_json::from_str(&s).unwrap(); + assert_eq!(test.v, U256::from(123)); + } +} diff --git a/crates/tx-sitter-client/src/lib.rs b/crates/tx-sitter-client/src/lib.rs new file mode 100644 index 00000000..37423437 --- /dev/null +++ b/crates/tx-sitter-client/src/lib.rs @@ -0,0 +1,81 @@ +use data::{GetTxResponse, SendTxRequest, SendTxResponse, TxStatus}; +use reqwest::Response; +use tracing::instrument; + +pub mod data; + +pub struct TxSitterClient { + client: reqwest::Client, + url: String, +} + +impl TxSitterClient { + pub fn new(url: impl ToString) -> Self { + Self { + client: reqwest::Client::new(), + url: url.to_string(), + } + } + + async fn json_post(&self, url: &str, body: T) -> anyhow::Result + where + T: serde::Serialize, + R: serde::de::DeserializeOwned, + { + let response = self.client.post(url).json(&body).send().await?; + + let response = Self::validate_response(response).await?; + + Ok(response.json().await?) + } + + async fn json_get(&self, url: &str) -> anyhow::Result + where + R: serde::de::DeserializeOwned, + { + let response = self.client.get(url).send().await?; + + let response = Self::validate_response(response).await?; + + Ok(response.json().await?) + } + + async fn validate_response(response: Response) -> anyhow::Result { + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await?; + + tracing::error!("Response failed with status {} - {}", status, body); + return Err(anyhow::anyhow!( + "Response failed with status {status} - {body}" + )); + } + + Ok(response) + } + + #[instrument(skip(self))] + pub async fn send_tx(&self, req: &SendTxRequest) -> anyhow::Result { + self.json_post(&format!("{}/tx", self.url), req).await + } + + #[instrument(skip(self))] + pub async fn get_tx(&self, tx_id: &str) -> anyhow::Result { + self.json_get(&format!("{}/tx/{}", self.url, tx_id)).await + } + + #[instrument(skip(self))] + pub async fn get_txs(&self, tx_status: Option) -> anyhow::Result> { + let mut url = format!("{}/txs", self.url); + + if let Some(tx_status) = tx_status { + url.push_str(&format!("?status={}", tx_status)); + } + + self.json_get(&url).await + } + + pub fn rpc_url(&self) -> String { + format!("{}/rpc", self.url.clone()) + } +} diff --git a/src/ethereum/mod.rs b/src/ethereum/mod.rs index b2085e49..81214feb 100644 --- a/src/ethereum/mod.rs +++ b/src/ethereum/mod.rs @@ -10,13 +10,14 @@ use tracing::instrument; use url::Url; pub use write::TxError; -use self::write::{TransactionId, WriteProvider}; +use self::write::TransactionId; +use self::write_provider::WriteProvider; use crate::serde_utils::JsonStrWrapper; pub mod read; pub mod write; -mod write_oz; +mod write_provider; // TODO: Log and metrics for signer / nonces. #[derive(Clone, Debug, PartialEq, Parser)] @@ -31,7 +32,7 @@ pub struct Options { pub secondary_providers: JsonStrWrapper>, #[clap(flatten)] - pub write_options: write_oz::Options, + pub write_options: write_provider::Options, } #[derive(Clone, Debug)] @@ -39,7 +40,7 @@ pub struct Ethereum { read_provider: Arc, // Mapping of chain id to provider secondary_read_providers: HashMap>, - write_provider: Arc, + write_provider: Arc, } impl Ethereum { @@ -57,8 +58,10 @@ impl Ethereum { ); } - let write_provider: Arc = - Arc::new(write_oz::Provider::new(read_provider.clone(), &options.write_options).await?); + let write_provider: Arc = Arc::new( + write_provider::WriteProvider::new(read_provider.clone(), &options.write_options) + .await?, + ); Ok(Self { read_provider: Arc::new(read_provider), diff --git a/src/ethereum/write/mod.rs b/src/ethereum/write/mod.rs index 99a0184b..7aeec2a7 100644 --- a/src/ethereum/write/mod.rs +++ b/src/ethereum/write/mod.rs @@ -1,10 +1,8 @@ use std::error::Error; use std::fmt; -use async_trait::async_trait; use ethers::providers::ProviderError; -use ethers::types::transaction::eip2718::TypedTransaction; -use ethers::types::{Address, TransactionReceipt, H256}; +use ethers::types::{TransactionReceipt, H256}; use thiserror::Error; #[derive(Clone, Debug)] @@ -35,7 +33,7 @@ pub enum TxError { SendTimeout, #[error("Error sending transaction: {0}")] - Send(Box), + Send(anyhow::Error), #[error("Timeout while waiting for confirmations")] ConfirmationTimeout, @@ -51,19 +49,7 @@ pub enum TxError { #[error("Error parsing transaction id: {0}")] Parse(Box), -} - -#[async_trait] -pub trait WriteProvider: Sync + Send + fmt::Debug { - async fn send_transaction( - &self, - tx: TypedTransaction, - only_once: bool, - ) -> Result; - - async fn fetch_pending_transactions(&self) -> Result, TxError>; - - async fn mine_transaction(&self, tx: TransactionId) -> Result; - fn address(&self) -> Address; + #[error("{0}")] + Other(anyhow::Error), } diff --git a/src/ethereum/write_oz/error.rs b/src/ethereum/write_provider/error.rs similarity index 100% rename from src/ethereum/write_oz/error.rs rename to src/ethereum/write_provider/error.rs diff --git a/src/ethereum/write_provider/inner.rs b/src/ethereum/write_provider/inner.rs new file mode 100644 index 00000000..054e5b19 --- /dev/null +++ b/src/ethereum/write_provider/inner.rs @@ -0,0 +1,23 @@ +use ethers::types::transaction::eip2718::TypedTransaction; +use ethers::types::H256; + +use crate::ethereum::write::TransactionId; +use crate::ethereum::TxError; + +#[async_trait::async_trait] +pub trait Inner: Send + Sync + 'static { + async fn send_transaction( + &self, + tx: TypedTransaction, + only_once: bool, + ) -> Result; + + async fn fetch_pending_transactions(&self) -> Result, TxError>; + + async fn mine_transaction(&self, tx: TransactionId) -> Result; +} + +pub struct TransactionResult { + pub transaction_id: String, + pub hash: Option, +} diff --git a/src/ethereum/write_oz/mod.rs b/src/ethereum/write_provider/mod.rs similarity index 50% rename from src/ethereum/write_oz/mod.rs rename to src/ethereum/write_provider/mod.rs index 4af29d52..73e29d19 100644 --- a/src/ethereum/write_oz/mod.rs +++ b/src/ethereum/write_provider/mod.rs @@ -1,82 +1,67 @@ -use std::num::ParseIntError; -use std::str::FromStr; -use std::time::Duration; +use std::fmt; +use std::sync::Arc; use anyhow::Result as AnyhowResult; -use async_trait::async_trait; -use clap::Parser; use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; -use ethers::types::{Address, H160, U64}; +use ethers::types::{Address, U64}; use tracing::{info, warn}; +use self::inner::Inner; use self::openzeppelin::OzRelay; -use super::write::{TransactionId, WriteProvider}; +use self::options::ParsedOptions; +use self::tx_sitter::TxSitter; +use super::write::TransactionId; use super::{ReadProvider, TxError}; mod error; +mod inner; mod openzeppelin; +mod options; +mod tx_sitter; -fn duration_from_str(value: &str) -> Result { - Ok(Duration::from_secs(u64::from_str(value)?)) -} - -// TODO: Log and metrics for signer / nonces. -#[derive(Clone, Debug, Eq, PartialEq, Parser)] -#[group(skip)] -pub struct Options { - #[clap(long, env, default_value = "https://api.defender.openzeppelin.com")] - pub oz_api_url: String, - - /// OpenZeppelin Defender API Key - #[clap(long, env)] - pub oz_api_key: String, - - /// OpenZeppelin Defender API Secret - #[clap(long, env)] - pub oz_api_secret: String, - - /// OpenZeppelin Defender API Secret - #[clap(long, env)] - pub oz_address: H160, - - /// For how long OpenZeppelin should track and retry the transaction (in - /// seconds) Default: 7 days (7 * 24 * 60 * 60 = 604800 seconds) - #[clap(long, env, value_parser=duration_from_str, default_value="604800")] - pub oz_transaction_validity: Duration, - - #[clap(long, env, value_parser=duration_from_str, default_value="60")] - pub oz_send_timeout: Duration, +pub use self::options::Options; - #[clap(long, env, value_parser=duration_from_str, default_value="60")] - pub oz_mine_timeout: Duration, - - #[clap(long, env)] - pub oz_gas_limit: Option, -} - -#[derive(Debug)] -pub struct Provider { +pub struct WriteProvider { read_provider: ReadProvider, - inner: OzRelay, + inner: Arc, address: Address, } -impl Provider { +impl fmt::Debug for WriteProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WriteProvider") + .field("read_provider", &self.read_provider) + .field("inner", &"") + .field("address", &self.address) + .finish() + } +} + +impl WriteProvider { pub async fn new(read_provider: ReadProvider, options: &Options) -> AnyhowResult { - let relay = OzRelay::new(options).await?; + let options = options.to_parsed()?; + let address = options.address(); + + let inner: Arc = match options { + ParsedOptions::Oz(oz_options) => { + tracing::info!("Initializing OZ Relayer"); + Arc::new(OzRelay::new(&oz_options).await?) + } + ParsedOptions::TxSitter(tx_sitter_options) => { + tracing::info!("Initializing TxSitter"); + Arc::new(TxSitter::new(&tx_sitter_options)) + } + }; Ok(Self { read_provider, - inner: relay, - address: options.oz_address, + inner, + address, }) } -} -#[async_trait] -impl WriteProvider for Provider { - async fn send_transaction( + pub async fn send_transaction( &self, tx: TypedTransaction, only_once: bool, @@ -84,11 +69,11 @@ impl WriteProvider for Provider { self.inner.send_transaction(tx, only_once).await } - async fn fetch_pending_transactions(&self) -> Result, TxError> { + pub async fn fetch_pending_transactions(&self) -> Result, TxError> { self.inner.fetch_pending_transactions().await } - async fn mine_transaction(&self, tx: TransactionId) -> Result { + pub async fn mine_transaction(&self, tx: TransactionId) -> Result { let oz_transaction_result = self.inner.mine_transaction(tx.clone()).await; if let Err(TxError::Failed(_)) = oz_transaction_result { @@ -130,7 +115,7 @@ impl WriteProvider for Provider { } } - fn address(&self) -> Address { + pub fn address(&self) -> Address { self.address } } diff --git a/src/ethereum/write_oz/openzeppelin.rs b/src/ethereum/write_provider/openzeppelin.rs similarity index 87% rename from src/ethereum/write_oz/openzeppelin.rs rename to src/ethereum/write_provider/openzeppelin.rs index 4aafb73b..8538d903 100644 --- a/src/ethereum/write_oz/openzeppelin.rs +++ b/src/ethereum/write_provider/openzeppelin.rs @@ -11,7 +11,8 @@ use tokio::time::timeout; use tracing::{error, info, info_span, Instrument}; use super::error::Error; -use super::Options; +use super::inner::{Inner, TransactionResult}; +use super::options::OzOptions; use crate::ethereum::write::TransactionId; use crate::ethereum::TxError; @@ -32,7 +33,7 @@ pub struct OzRelay { } impl OzRelay { - pub async fn new(options: &Options) -> AnyhowResult { + pub async fn new(options: &OzOptions) -> AnyhowResult { let oz_api = if options.oz_api_key.is_empty() && options.oz_api_secret.is_empty() { tracing::warn!( "OpenZeppelin Defender API Key and Secret are empty. Connection will operate \ @@ -77,7 +78,7 @@ impl OzRelay { loop { let transaction = self.query(id).await.map_err(|error| { error!(?error, "Failed to get transaction status"); - TxError::Send(Box::new(error)) + TxError::Send(error.into()) })?; let status = transaction.status; @@ -131,10 +132,9 @@ impl OzRelay { /// take multiple seconds to restart. pub async fn send_transaction( &self, - tx: TypedTransaction, + mut tx: TypedTransaction, only_once: bool, ) -> Result { - let mut tx = tx.clone(); if let Some(gas_limit) = self.gas_limit { tx.set_gas(gas_limit); } @@ -144,7 +144,7 @@ impl OzRelay { let existing_transactions = self.list_recent_transactions().await.map_err(|e| { error!(?e, "error occurred"); - TxError::Send(Box::new(e)) + TxError::Send(e.into()) })?; let existing_transaction = @@ -185,7 +185,7 @@ impl OzRelay { })? .map_err(|error| { error!(?error, "Failed to send transaction"); - TxError::Send(Box::new(error)) + TxError::Send(error.into()) })?; info!(?tx_id, "Transaction submitted to OZ Relay"); @@ -214,3 +214,27 @@ impl OzRelay { Ok(pending_txs) } } + +#[async_trait::async_trait] +impl Inner for OzRelay { + async fn send_transaction( + &self, + tx: TypedTransaction, + only_once: bool, + ) -> Result { + self.send_transaction(tx, only_once).await + } + + async fn fetch_pending_transactions(&self) -> Result, TxError> { + self.fetch_pending_transactions().await + } + + async fn mine_transaction(&self, tx: TransactionId) -> Result { + let transaction = self.mine_transaction(tx).await?; + + Ok(TransactionResult { + transaction_id: transaction.transaction_id, + hash: transaction.hash, + }) + } +} diff --git a/src/ethereum/write_provider/options.rs b/src/ethereum/write_provider/options.rs new file mode 100644 index 00000000..53f6ba04 --- /dev/null +++ b/src/ethereum/write_provider/options.rs @@ -0,0 +1,160 @@ +use std::num::ParseIntError; +use std::str::FromStr; +use std::time::Duration; + +use anyhow::anyhow; +use clap::Parser; +use ethers::types::H160; + +// TODO: Log and metrics for signer / nonces. +#[derive(Clone, Debug, Eq, PartialEq, Parser)] +#[group(skip)] +pub struct Options { + // ### OZ Params ### + #[clap(long, env, default_value = "https://api.defender.openzeppelin.com")] + pub oz_api_url: Option, + + /// OpenZeppelin Defender API Key + #[clap(long, env)] + pub oz_api_key: Option, + + /// OpenZeppelin Defender API Secret + #[clap(long, env)] + pub oz_api_secret: Option, + + /// OpenZeppelin Defender API Secret + #[clap(long, env)] + pub oz_address: Option, + + /// For how long OpenZeppelin should track and retry the transaction (in + /// seconds) Default: 7 days (7 * 24 * 60 * 60 = 604800 seconds) + #[clap(long, env, value_parser=duration_from_str, default_value="604800")] + pub oz_transaction_validity: Duration, + + #[clap(long, env, value_parser=duration_from_str, default_value="60")] + pub oz_send_timeout: Duration, + + #[clap(long, env, value_parser=duration_from_str, default_value="60")] + pub oz_mine_timeout: Duration, + + #[clap(long, env)] + pub oz_gas_limit: Option, + + // ### TxSitter Params ### + #[clap(long, env)] + pub tx_sitter_url: Option, + + #[clap(long, env)] + pub tx_sitter_address: Option, + + #[clap(long, env)] + pub tx_sitter_gas_limit: Option, +} + +fn duration_from_str(value: &str) -> Result { + Ok(Duration::from_secs(u64::from_str(value)?)) +} + +impl Options { + pub fn to_parsed(&self) -> anyhow::Result { + let oz_options = OzOptions::try_from(self); + if let Ok(oz_options) = oz_options { + return Ok(ParsedOptions::Oz(oz_options)); + } + + let tx_sitter_options = TxSitterOptions::try_from(self); + if let Ok(tx_sitter_options) = tx_sitter_options { + return Ok(ParsedOptions::TxSitter(tx_sitter_options)); + } + + Err(anyhow!("Invalid options")) + } +} + +pub enum ParsedOptions { + Oz(OzOptions), + TxSitter(TxSitterOptions), +} + +impl ParsedOptions { + pub fn address(&self) -> H160 { + match self { + Self::Oz(oz_options) => oz_options.oz_address, + Self::TxSitter(tx_sitter_options) => tx_sitter_options.tx_sitter_address, + } + } +} + +pub struct OzOptions { + pub oz_api_url: String, + + /// OpenZeppelin Defender API Key + pub oz_api_key: String, + + /// OpenZeppelin Defender API Secret + pub oz_api_secret: String, + + /// OpenZeppelin Defender API Secret + pub oz_address: H160, + + /// For how long OpenZeppelin should track and retry the transaction (in + /// seconds) Default: 7 days (7 * 24 * 60 * 60 = 604800 seconds) + pub oz_transaction_validity: Duration, + + pub oz_send_timeout: Duration, + + pub oz_mine_timeout: Duration, + + pub oz_gas_limit: Option, +} + +impl<'a> TryFrom<&'a Options> for OzOptions { + type Error = anyhow::Error; + + fn try_from(value: &'a Options) -> Result { + Ok(Self { + oz_api_url: value + .oz_api_url + .clone() + .ok_or_else(|| anyhow!("Missing oz_api_url"))?, + oz_api_key: value + .oz_api_key + .clone() + .ok_or_else(|| anyhow!("Missing oz_api_key"))?, + oz_api_secret: value + .oz_api_secret + .clone() + .ok_or_else(|| anyhow!("Missing oz_api_secret"))?, + oz_address: value + .oz_address + .ok_or_else(|| anyhow!("Missing oz_address"))?, + oz_transaction_validity: value.oz_transaction_validity, + oz_send_timeout: value.oz_send_timeout, + oz_mine_timeout: value.oz_mine_timeout, + oz_gas_limit: value.oz_gas_limit, + }) + } +} + +pub struct TxSitterOptions { + pub tx_sitter_url: String, + pub tx_sitter_address: H160, + pub tx_sitter_gas_limit: Option, +} + +impl<'a> TryFrom<&'a Options> for TxSitterOptions { + type Error = anyhow::Error; + + fn try_from(value: &'a Options) -> Result { + Ok(Self { + tx_sitter_url: value + .tx_sitter_url + .clone() + .ok_or_else(|| anyhow!("Missing tx_sitter_url"))?, + tx_sitter_address: value + .tx_sitter_address + .ok_or_else(|| anyhow!("Missing tx_sitter_address"))?, + tx_sitter_gas_limit: value.tx_sitter_gas_limit, + }) + } +} diff --git a/src/ethereum/write_provider/tx_sitter.rs b/src/ethereum/write_provider/tx_sitter.rs new file mode 100644 index 00000000..a96a0433 --- /dev/null +++ b/src/ethereum/write_provider/tx_sitter.rs @@ -0,0 +1,114 @@ +use std::time::Duration; + +use anyhow::Context; +use async_trait::async_trait; +use ethers::types::transaction::eip2718::TypedTransaction; +use ethers::types::U256; +use tx_sitter_client::data::{SendTxRequest, TransactionPriority, TxStatus}; +use tx_sitter_client::TxSitterClient; + +use super::inner::{Inner, TransactionResult}; +use super::options::TxSitterOptions; +use crate::ethereum::write::TransactionId; +use crate::ethereum::TxError; + +const MINING_TIMEOUT: Duration = Duration::from_secs(60); + +pub struct TxSitter { + client: TxSitterClient, + gas_limit: Option, +} + +impl TxSitter { + pub fn new(options: &TxSitterOptions) -> Self { + Self { + client: TxSitterClient::new(&options.tx_sitter_url), + gas_limit: options.tx_sitter_gas_limit, + } + } + + pub async fn mine_transaction_inner( + &self, + tx_id: TransactionId, + ) -> Result { + loop { + let tx = self.client.get_tx(&tx_id.0).await.map_err(TxError::Send)?; + + if tx.status == TxStatus::Mined || tx.status == TxStatus::Finalized { + return Ok(TransactionResult { + transaction_id: tx.tx_id, + hash: Some( + tx.tx_hash + .context("Missing hash on a mined tx") + .map_err(TxError::Send)?, + ), + }); + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } +} + +#[async_trait] +impl Inner for TxSitter { + async fn send_transaction( + &self, + mut tx: TypedTransaction, + _only_once: bool, + ) -> Result { + if let Some(gas_limit) = self.gas_limit { + tx.set_gas(gas_limit); + } + + // TODO: Handle only_once + let tx = self + .client + .send_tx(&SendTxRequest { + to: *tx + .to_addr() + .context("Tx receiver must be an address") + .map_err(TxError::Send)?, + value: tx.value().copied().unwrap_or(U256::zero()), + data: tx.data().cloned(), + gas_limit: *tx + .gas() + .context("Missing tx gas limit") + .map_err(TxError::Send)?, + priority: TransactionPriority::Regular, + tx_id: None, + }) + .await + .map_err(TxError::Send)?; + + Ok(TransactionId(tx.tx_id)) + } + + async fn fetch_pending_transactions(&self) -> Result, TxError> { + let unsent_txs = self + .client + .get_txs(Some(TxStatus::Unsent)) + .await + .map_err(TxError::Send)?; + + let pending_txs = self + .client + .get_txs(Some(TxStatus::Pending)) + .await + .map_err(TxError::Send)?; + + let mut txs = vec![]; + + for tx in unsent_txs.into_iter().chain(pending_txs) { + txs.push(TransactionId(tx.tx_id)); + } + + Ok(txs) + } + + async fn mine_transaction(&self, tx: TransactionId) -> Result { + tokio::time::timeout(MINING_TIMEOUT, self.mine_transaction_inner(tx)) + .await + .map_err(|_| TxError::ConfirmationTimeout)? + } +}