diff --git a/src/client.rs b/src/client.rs index 75e8581..6806aa7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -24,7 +24,7 @@ pub enum ClientError { Serde(#[from] serde_json::Error), #[error("API error: {0}")] - TxSitter(String), + TxSitter(reqwest::StatusCode, String), #[error("Invalid API key: {0}")] InvalidApiKey(eyre::Error), @@ -118,9 +118,10 @@ impl TxSitterClient { async fn validate_response( response: Response, ) -> Result { - if !response.status().is_success() { + let status = response.status(); + if !status.is_success() { let body: String = response.text().await?; - return Err(ClientError::TxSitter(body)); + return Err(ClientError::TxSitter(status, body)); } Ok(response) @@ -213,9 +214,9 @@ impl TxSitterClient { } impl ClientError { - pub fn tx_sitter(&self) -> Option<&str> { + pub fn tx_sitter_message(&self) -> Option<&str> { match self { - Self::TxSitter(s) => Some(s), + Self::TxSitter(_, s) => Some(s), _ => None, } } diff --git a/src/db.rs b/src/db.rs index de8c2c3..44fcf39 100644 --- a/src/db.rs +++ b/src/db.rs @@ -28,6 +28,11 @@ pub struct Database { pub pool: Pool, } +pub enum CreateResult { + SUCCESS, + CONFLICT, +} + impl Database { pub async fn new(config: &DatabaseConfig) -> eyre::Result { let connection_string = config.to_connection_string(); @@ -277,7 +282,7 @@ impl Database { priority: TransactionPriority, blobs: Option>>, relayer_id: &str, - ) -> eyre::Result<()> { + ) -> eyre::Result { let mut tx = self.pool.begin().await?; let mut value_bytes = [0u8; 32]; @@ -299,7 +304,7 @@ impl Database { .fetch_one(tx.as_mut()) .await?; - sqlx::query( + let res = sqlx::query( r#" INSERT INTO transactions (id, tx_to, data, value, gas_limit, priority, relayer_id, nonce, blobs) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) @@ -315,11 +320,19 @@ impl Database { .bind(nonce) .bind(blobs) .execute(tx.as_mut()) - .await?; + .await; + + if let Err(sqlx::Error::Database(ref err)) = res { + if err.constraint() == Some("transactions_pkey") { + return Ok(CreateResult::CONFLICT); + } + } + + res?; tx.commit().await?; - Ok(()) + Ok(CreateResult::SUCCESS) } #[instrument(skip(self), level = "debug")] diff --git a/src/server.rs b/src/server.rs index aff9efc..de72445 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,6 +17,7 @@ use url::Url; use crate::api_key::ApiKey; use crate::app::App; +use crate::db::CreateResult; use crate::service::Service; use crate::task_runner::TaskRunner; use crate::types::{ @@ -250,7 +251,8 @@ impl RelayerApi { )); } - app.db + let res = app + .db .create_transaction( &tx_id, req.to.0, @@ -263,6 +265,13 @@ impl RelayerApi { ) .await?; + if let CreateResult::CONFLICT = res { + return Err(poem::error::Error::from_string( + "Transaction with same id already exists.".to_string(), + StatusCode::CONFLICT, + )); + } + tracing::info!(tx_id, "Transaction created"); Ok(Json(SendTxResponse { tx_id })) diff --git a/tests/send_too_many_txs.rs b/tests/send_too_many_txs.rs index f9f322a..1f2a35d 100644 --- a/tests/send_too_many_txs.rs +++ b/tests/send_too_many_txs.rs @@ -83,7 +83,7 @@ async fn send_too_many_txs() -> eyre::Result<()> { // TODO: Fix checking errors by string assert_eq!( - result.as_ref().err().and_then(|e| e.tx_sitter()), + result.as_ref().err().and_then(|e| e.tx_sitter_message()), Some("Relayer queue is full"), "Result {:?} should be too many transactions", result diff --git a/tests/send_tx_with_same_id.rs b/tests/send_tx_with_same_id.rs new file mode 100644 index 0000000..c27c0a8 --- /dev/null +++ b/tests/send_tx_with_same_id.rs @@ -0,0 +1,58 @@ +mod common; + +use reqwest::StatusCode; +use tx_sitter::client::ClientError; + +use crate::common::prelude::*; + +#[tokio::test] +async fn send_tx_with_same_id() -> eyre::Result<()> { + setup_tracing(); + + let (db_url, _db_container) = setup_db().await?; + let anvil = AnvilBuilder::default().spawn().await?; + + let (_service, client) = + ServiceBuilder::default().build(&anvil, &db_url).await?; + let CreateApiKeyResponse { api_key } = + client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?; + + let tx_id = Some("tx-1".to_string()); + + // Send a transaction + let value: U256 = parse_units("1", "ether")?.into(); + client + .send_tx( + &api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS.into(), + value: value.into(), + gas_limit: U256::from(21_000).into(), + tx_id: tx_id.clone(), + ..Default::default() + }, + ) + .await?; + + let res = client + .send_tx( + &api_key, + &SendTxRequest { + to: ARBITRARY_ADDRESS.into(), + value: value.into(), + gas_limit: U256::from(21_000).into(), + tx_id: tx_id.clone(), + ..Default::default() + }, + ) + .await; + + if let ClientError::TxSitter(status_code, message) = res.unwrap_err() { + assert_eq!(status_code, StatusCode::CONFLICT); + assert_eq!(message, "Transaction with same id already exists."); + + return Ok(()); + } + + panic!("Should return error on second insert with same id.") +}