diff --git a/packages/storage/.sqlx/query-2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6.json b/.sqlx/query-2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6.json similarity index 100% rename from packages/storage/.sqlx/query-2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6.json rename to .sqlx/query-2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6.json diff --git a/.sqlx/query-59468b64c24fb4c77a9b30428fd4682e9b4cfcba55c9535931edc7448ed88bfa.json b/.sqlx/query-59468b64c24fb4c77a9b30428fd4682e9b4cfcba55c9535931edc7448ed88bfa.json new file mode 100644 index 00000000..3b75ee51 --- /dev/null +++ b/.sqlx/query-59468b64c24fb4c77a9b30428fd4682e9b4cfcba55c9535931edc7448ed88bfa.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT EXISTS (SELECT 1 FROM l1_transactions WHERE state = $1) AS has_pending_transactions;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "has_pending_transactions", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Int2" + ] + }, + "nullable": [ + null + ] + }, + "hash": "59468b64c24fb4c77a9b30428fd4682e9b4cfcba55c9535931edc7448ed88bfa" +} diff --git a/packages/storage/.sqlx/query-6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c.json b/.sqlx/query-6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c.json similarity index 100% rename from packages/storage/.sqlx/query-6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c.json rename to .sqlx/query-6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c.json diff --git a/.sqlx/query-6fb27a4115a2cb07c21e7a47565a0436de31285492657bae6aaadc04115cea5d.json b/.sqlx/query-6fb27a4115a2cb07c21e7a47565a0436de31285492657bae6aaadc04115cea5d.json new file mode 100644 index 00000000..35cdfc07 --- /dev/null +++ b/.sqlx/query-6fb27a4115a2cb07c21e7a47565a0436de31285492657bae6aaadc04115cea5d.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO l1_transactions (hash, state) VALUES ($1, $2) RETURNING id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Int2" + ] + }, + "nullable": [ + false + ] + }, + "hash": "6fb27a4115a2cb07c21e7a47565a0436de31285492657bae6aaadc04115cea5d" +} diff --git a/.sqlx/query-9be45d22c0bb43deb53da5a8fe19f5554f2a901cabc730792d51baadd2460e44.json b/.sqlx/query-9be45d22c0bb43deb53da5a8fe19f5554f2a901cabc730792d51baadd2460e44.json new file mode 100644 index 00000000..777bc248 --- /dev/null +++ b/.sqlx/query-9be45d22c0bb43deb53da5a8fe19f5554f2a901cabc730792d51baadd2460e44.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT * FROM l1_transactions WHERE state = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "hash", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "state", + "type_info": "Int2" + } + ], + "parameters": { + "Left": [ + "Int2" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "9be45d22c0bb43deb53da5a8fe19f5554f2a901cabc730792d51baadd2460e44" +} diff --git a/.sqlx/query-9fad1eeaa60ea30606182ffef41d62900c0343c83e6258a2e9f287c2b4e0281e.json b/.sqlx/query-9fad1eeaa60ea30606182ffef41d62900c0343c83e6258a2e9f287c2b4e0281e.json new file mode 100644 index 00000000..4c3b1cbd --- /dev/null +++ b/.sqlx/query-9fad1eeaa60ea30606182ffef41d62900c0343c83e6258a2e9f287c2b4e0281e.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO l1_transaction_fragments (transaction_id, fragment_id) VALUES ($1, $2)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "9fad1eeaa60ea30606182ffef41d62900c0343c83e6258a2e9f287c2b4e0281e" +} diff --git a/packages/storage/.sqlx/query-a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6.json b/.sqlx/query-a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6.json similarity index 100% rename from packages/storage/.sqlx/query-a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6.json rename to .sqlx/query-a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6.json diff --git a/.sqlx/query-b3e422ba5518d62297afe5fc97440249be2af4c93243b961f68b028232185992.json b/.sqlx/query-b3e422ba5518d62297afe5fc97440249be2af4c93243b961f68b028232185992.json new file mode 100644 index 00000000..3191a907 --- /dev/null +++ b/.sqlx/query-b3e422ba5518d62297afe5fc97440249be2af4c93243b961f68b028232185992.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE l1_transactions SET state = $1 WHERE hash = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int2", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "b3e422ba5518d62297afe5fc97440249be2af4c93243b961f68b028232185992" +} diff --git a/.sqlx/query-bce910f42b45949e8ab08355c5b6d7679c267ef5946d7c360a24abfdefa1abe2.json b/.sqlx/query-bce910f42b45949e8ab08355c5b6d7679c267ef5946d7c360a24abfdefa1abe2.json new file mode 100644 index 00000000..28d12388 --- /dev/null +++ b/.sqlx/query-bce910f42b45949e8ab08355c5b6d7679c267ef5946d7c360a24abfdefa1abe2.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO l1_fragments (fragment_idx, submission_id, data, created_at) VALUES ($1, $2, $3, $4)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Int4", + "Bytea", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "bce910f42b45949e8ab08355c5b6d7679c267ef5946d7c360a24abfdefa1abe2" +} diff --git a/packages/storage/.sqlx/query-3e2f4f0f3c019c8ee65865a83d799b5c152c9358ea2af38e4c267f326366afd2.json b/.sqlx/query-c6cffaf0718065ed45442c123f7aed85456bbbb9588ab0ed2be2d685ea09364e.json similarity index 65% rename from packages/storage/.sqlx/query-3e2f4f0f3c019c8ee65865a83d799b5c152c9358ea2af38e4c267f326366afd2.json rename to .sqlx/query-c6cffaf0718065ed45442c123f7aed85456bbbb9588ab0ed2be2d685ea09364e.json index aad70506..27e7b399 100644 --- a/packages/storage/.sqlx/query-3e2f4f0f3c019c8ee65865a83d799b5c152c9358ea2af38e4c267f326366afd2.json +++ b/.sqlx/query-c6cffaf0718065ed45442c123f7aed85456bbbb9588ab0ed2be2d685ea09364e.json @@ -1,22 +1,22 @@ { "db_name": "PostgreSQL", - "query": "SELECT * FROM l1_state_submission ORDER BY fuel_block_height DESC LIMIT 1", + "query": "SELECT * FROM l1_submissions ORDER BY fuel_block_height DESC LIMIT 1", "describe": { "columns": [ { "ordinal": 0, - "name": "fuel_block_hash", - "type_info": "Bytea" + "name": "id", + "type_info": "Int4" }, { "ordinal": 1, - "name": "fuel_block_height", - "type_info": "Int8" + "name": "fuel_block_hash", + "type_info": "Bytea" }, { "ordinal": 2, - "name": "completed", - "type_info": "Bool" + "name": "fuel_block_height", + "type_info": "Int8" } ], "parameters": { @@ -28,5 +28,5 @@ false ] }, - "hash": "3e2f4f0f3c019c8ee65865a83d799b5c152c9358ea2af38e4c267f326366afd2" + "hash": "c6cffaf0718065ed45442c123f7aed85456bbbb9588ab0ed2be2d685ea09364e" } diff --git a/.sqlx/query-daa42cdb26e7b8e6d1d586367cbe42d1defc42b001b71e53a86e47f91c521c69.json b/.sqlx/query-daa42cdb26e7b8e6d1d586367cbe42d1defc42b001b71e53a86e47f91c521c69.json new file mode 100644 index 00000000..51c7304f --- /dev/null +++ b/.sqlx/query-daa42cdb26e7b8e6d1d586367cbe42d1defc42b001b71e53a86e47f91c521c69.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO l1_submissions (fuel_block_hash, fuel_block_height) VALUES ($1, $2) RETURNING id", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "daa42cdb26e7b8e6d1d586367cbe42d1defc42b001b71e53a86e47f91c521c69" +} diff --git a/.sqlx/query-f258b9822f1b060c13cd895fdbe61020fa605fdba844cb8c0071111f78342b5e.json b/.sqlx/query-f258b9822f1b060c13cd895fdbe61020fa605fdba844cb8c0071111f78342b5e.json new file mode 100644 index 00000000..ec52bb46 --- /dev/null +++ b/.sqlx/query-f258b9822f1b060c13cd895fdbe61020fa605fdba844cb8c0071111f78342b5e.json @@ -0,0 +1,48 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT l1_fragments.*\n FROM l1_fragments\n WHERE l1_fragments.id NOT IN (\n SELECT l1_fragments.id\n FROM l1_fragments\n JOIN l1_transaction_fragments ON l1_fragments.id = l1_transaction_fragments.fragment_id\n JOIN l1_transactions ON l1_transaction_fragments.transaction_id = l1_transactions.id\n WHERE l1_transactions.state IN ($1, $2)\n )\n ORDER BY l1_fragments.created_at\n LIMIT $3;", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "fragment_idx", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "submission_id", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "data", + "type_info": "Bytea" + }, + { + "ordinal": 4, + "name": "created_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Int2", + "Int2", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "f258b9822f1b060c13cd895fdbe61020fa605fdba844cb8c0071111f78342b5e" +} diff --git a/committer/src/config.rs b/committer/src/config.rs index 036c56f0..04d3488c 100644 --- a/committer/src/config.rs +++ b/committer/src/config.rs @@ -85,6 +85,8 @@ pub struct App { /// How often to check the latest fuel block #[serde(deserialize_with = "human_readable_duration")] pub block_check_interval: Duration, + /// Number of L1 blocks that need to pass to accept the tx as finalized + pub num_blocks_to_finalize_tx: u64, } fn human_readable_duration<'de, D>(deserializer: D) -> Result diff --git a/committer/src/main.rs b/committer/src/main.rs index fc3db3ce..1441021c 100644 --- a/committer/src/main.rs +++ b/committer/src/main.rs @@ -70,10 +70,11 @@ async fn main() -> Result<()> { listener_handle, ]; - // If the blob pool wallet key is set, we need to start the state committer and state importer + // If the blob pool wallet key is set, we need to start + // the state committer, state importer and state listener if config.eth.blob_pool_wallet_key.is_some() { let state_committer_handle = setup::state_committer( - ethereum_rpc, + ethereum_rpc.clone(), storage.clone(), &metrics_registry, cancel_token.clone(), @@ -88,8 +89,12 @@ async fn main() -> Result<()> { &config, ); + let state_listener_handle = + setup::state_listener(ethereum_rpc, storage.clone(), cancel_token.clone(), &config); + handles.push(state_committer_handle); handles.push(state_importer_handle); + handles.push(state_listener_handle); } launch_api_server( diff --git a/committer/src/setup.rs b/committer/src/setup.rs index aa7c7add..558a7835 100644 --- a/committer/src/setup.rs +++ b/committer/src/setup.rs @@ -105,6 +105,23 @@ pub fn state_importer( ) } +pub fn state_listener( + l1: L1, + storage: impl Storage + 'static, + cancel_token: CancellationToken, + config: &config::Config, +) -> tokio::task::JoinHandle<()> { + let state_listener = + services::StateListener::new(l1, storage, config.app.num_blocks_to_finalize_tx); + + schedule_polling( + config.app.block_check_interval, + state_listener, + "State Listener", + cancel_token, + ) +} + pub async fn l1_adapter( config: &config::Config, internal_config: &config::Internal, diff --git a/configurations/development/config.toml b/configurations/development/config.toml index eb01cddf..613e27b9 100644 --- a/configurations/development/config.toml +++ b/configurations/development/config.toml @@ -11,6 +11,7 @@ block_producer_public_key = "0x73dc6cc8cc0041e4924954b35a71a22ccb520664c522198a6 port = 8080 host = "0.0.0.0" block_check_interval = "1s" +num_blocks_to_finalize_tx = 12 [app.db] host = "localhost" diff --git a/packages/eth/src/lib.rs b/packages/eth/src/lib.rs index f472fef7..ab6ab875 100644 --- a/packages/eth/src/lib.rs +++ b/packages/eth/src/lib.rs @@ -8,7 +8,7 @@ use ethers::types::U256; use futures::{stream::TryStreamExt, Stream}; use ports::{ l1::{Api, Contract, EventStreamer, Result}, - types::{FuelBlockCommittedOnL1, L1Height, ValidatedFuelBlock}, + types::{FuelBlockCommittedOnL1, L1Height, TransactionResponse, ValidatedFuelBlock}, }; use websocket::EthEventStreamer; @@ -51,6 +51,13 @@ impl Api for WebsocketClient { Ok(height) } + + async fn get_transaction_response( + &self, + tx_hash: [u8; 32], + ) -> Result> { + Ok(self.get_transaction_response(tx_hash).await?) + } } #[async_trait::async_trait] diff --git a/packages/eth/src/websocket.rs b/packages/eth/src/websocket.rs index 7dcff38d..243c13c9 100644 --- a/packages/eth/src/websocket.rs +++ b/packages/eth/src/websocket.rs @@ -2,7 +2,7 @@ use ::metrics::{prometheus::core::Collector, HealthChecker, RegistersMetrics}; use ethers::types::{Address, Chain}; use ports::{ l1::Result, - types::{ValidatedFuelBlock, U256}, + types::{TransactionResponse, ValidatedFuelBlock, U256}, }; use std::num::NonZeroU32; use url::Url; @@ -66,6 +66,13 @@ impl WebsocketClient { Ok(self.inner.get_block_number().await?) } + pub(crate) async fn get_transaction_response( + &self, + tx_hash: [u8; 32], + ) -> Result> { + Ok(self.inner.get_transaction_response(tx_hash).await?) + } + pub(crate) async fn balance(&self) -> Result { Ok(self.inner.balance().await?) } diff --git a/packages/eth/src/websocket/connection.rs b/packages/eth/src/websocket/connection.rs index 43006185..babb0720 100644 --- a/packages/eth/src/websocket/connection.rs +++ b/packages/eth/src/websocket/connection.rs @@ -4,9 +4,9 @@ use ethers::{ prelude::{abigen, SignerMiddleware}, providers::{Middleware, Provider, Ws}, signers::{LocalWallet, Signer as _}, - types::{Address, BlockNumber, Chain, H160, H256, U256, U64}, + types::{Address, BlockNumber, Chain, TransactionReceipt, H160, H256, U256, U64}, }; -use ports::types::ValidatedFuelBlock; +use ports::types::{TransactionResponse, ValidatedFuelBlock}; use serde_json::Value; use url::Url; @@ -80,6 +80,15 @@ impl EthApi for WsConnection { EthEventStreamer::new(events) } + async fn get_transaction_response( + &self, + tx_hash: [u8; 32], + ) -> Result> { + let tx_receipt = self.provider.get_transaction_receipt(tx_hash).await?; + + Self::convert_to_tx_response(tx_receipt) + } + async fn submit_l2_state(&self, state_data: Vec) -> Result<[u8; 32]> { let blob_pool_wallet = if let Some(blob_pool_wallet) = &self.blob_pool_wallet { blob_pool_wallet @@ -214,6 +223,43 @@ impl WsConnection { Ok(max_fee_per_blob_gas) } + + fn convert_to_tx_response( + tx_receipt: Option, + ) -> Result> { + let Some(tx_receipt) = tx_receipt else { + return Ok(None); + }; + + let block_number = Self::extract_block_number_from_receipt(&tx_receipt)?; + + const SUCCESS_STATUS: u64 = 1; + // Only present after activation of [EIP-658](https://eips.ethereum.org/EIPS/eip-658) + let Some(status) = tx_receipt.status else { + return Err(Error::Other( + "`status` not present in tx receipt".to_string(), + )); + }; + + let status: u64 = status.try_into().map_err(|_| { + Error::Other("could not convert tx receipt `status` to `u64`".to_string()) + })?; + + Ok(Some(TransactionResponse::new( + block_number, + status == SUCCESS_STATUS, + ))) + } + + fn extract_block_number_from_receipt(receipt: &TransactionReceipt) -> Result { + receipt + .block_number + .ok_or_else(|| { + Error::Other("transaction receipt does not contain block number".to_string()) + })? + .try_into() + .map_err(|_| Error::Other("could not convert `block_number` to `u64`".to_string())) + } } #[cfg(test)] diff --git a/packages/eth/src/websocket/health_tracking_middleware.rs b/packages/eth/src/websocket/health_tracking_middleware.rs index 74271a37..bce27580 100644 --- a/packages/eth/src/websocket/health_tracking_middleware.rs +++ b/packages/eth/src/websocket/health_tracking_middleware.rs @@ -4,7 +4,7 @@ use ::metrics::{ use std::num::NonZeroU32; -use ports::types::{ValidatedFuelBlock, U256}; +use ports::types::{TransactionResponse, ValidatedFuelBlock, U256}; use crate::{ error::{Error, Result}, @@ -20,6 +20,10 @@ pub trait EthApi { async fn balance(&self) -> Result; fn commit_interval(&self) -> NonZeroU32; fn event_streamer(&self, eth_block_height: u64) -> EthEventStreamer; + async fn get_transaction_response( + &self, + tx_hash: [u8; 32], + ) -> Result>; async fn submit_l2_state(&self, state_data: Vec) -> Result<[u8; 32]>; #[cfg(feature = "test-helpers")] async fn finalized(&self, block: ValidatedFuelBlock) -> Result; @@ -85,6 +89,15 @@ where response } + async fn get_transaction_response( + &self, + tx_hash: [u8; 32], + ) -> Result> { + let response = self.adapter.get_transaction_response(tx_hash).await; + self.note_network_status(&response); + response + } + fn event_streamer(&self, eth_block_height: u64) -> EthEventStreamer { self.adapter.event_streamer(eth_block_height) } diff --git a/packages/ports/src/ports/l1.rs b/packages/ports/src/ports/l1.rs index 68134c21..1143b1d5 100644 --- a/packages/ports/src/ports/l1.rs +++ b/packages/ports/src/ports/l1.rs @@ -1,7 +1,8 @@ use std::pin::Pin; use crate::types::{ - FuelBlockCommittedOnL1, InvalidL1Height, L1Height, Stream, ValidatedFuelBlock, U256, + FuelBlockCommittedOnL1, InvalidL1Height, L1Height, Stream, TransactionResponse, + ValidatedFuelBlock, U256, }; #[derive(Debug, thiserror::Error)] @@ -34,6 +35,10 @@ pub trait Api { async fn submit_l2_state(&self, state_data: Vec) -> Result<[u8; 32]>; async fn get_block_number(&self) -> Result; async fn balance(&self) -> Result; + async fn get_transaction_response( + &self, + tx_hash: [u8; 32], + ) -> Result>; } #[cfg_attr(feature = "test-helpers", mockall::automock)] diff --git a/packages/ports/src/ports/storage.rs b/packages/ports/src/ports/storage.rs index b8853770..ffac048c 100644 --- a/packages/ports/src/ports/storage.rs +++ b/packages/ports/src/ports/storage.rs @@ -1,6 +1,8 @@ use std::sync::Arc; -use crate::types::{BlockSubmission, StateFragment, StateFragmentId, StateSubmission}; +use crate::types::{ + BlockSubmission, StateFragment, StateSubmission, SubmissionTx, TransactionState, +}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -20,17 +22,19 @@ pub trait Storage: Send + Sync { async fn submission_w_latest_block(&self) -> Result>; async fn set_submission_completed(&self, fuel_block_hash: [u8; 32]) -> Result; - async fn insert_state( + async fn insert_state_submission( &self, - state: StateSubmission, + submission: StateSubmission, fragments: Vec, ) -> Result<()>; async fn get_unsubmitted_fragments(&self) -> Result>; - async fn record_pending_tx( - &self, - tx_hash: [u8; 32], - fragment_ids: Vec, - ) -> Result<()>; + async fn record_pending_tx(&self, tx_hash: [u8; 32], fragment_ids: Vec) -> Result<()>; + async fn get_pending_txs(&self) -> Result>; async fn has_pending_txs(&self) -> Result; async fn state_submission_w_latest_block(&self) -> Result>; + async fn update_submission_tx_state( + &self, + hash: [u8; 32], + state: TransactionState, + ) -> Result<()>; } diff --git a/packages/ports/src/types/state_submission.rs b/packages/ports/src/types/state_submission.rs index e92b6ff1..9c527a10 100644 --- a/packages/ports/src/types/state_submission.rs +++ b/packages/ports/src/types/state_submission.rs @@ -2,27 +2,76 @@ pub use sqlx::types::chrono::{DateTime, Utc}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct StateSubmission { + pub id: Option, pub block_hash: [u8; 32], pub block_height: u32, - pub completed: bool, } -pub type StateFragmentId = ([u8; 32], u32); - #[derive(Debug, Clone, PartialEq, Eq)] pub struct StateFragment { - pub block_hash: [u8; 32], - pub transaction_hash: Option<[u8; 32]>, - pub fragment_index: u32, - pub raw_data: Vec, + pub id: Option, + pub submission_id: Option, + pub fragment_idx: u32, + pub data: Vec, pub created_at: DateTime, - pub completed: bool, } impl StateFragment { pub const MAX_FRAGMENT_SIZE: usize = 128 * 1024; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SubmissionTx { + pub id: Option, + pub hash: [u8; 32], + pub state: TransactionState, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TransactionState { + Pending, + Finalized, + Failed, +} + +// Used for DB storage +impl TransactionState { + pub fn into_i16(&self) -> i16 { + match self { + TransactionState::Pending => 0, + TransactionState::Finalized => 1, + TransactionState::Failed => 2, + } + } + + pub fn from_i16(value: i16) -> Option { + match value { + 0 => Some(Self::Pending), + 1 => Some(Self::Finalized), + 2 => Some(Self::Failed), + _ => None, + } + } +} + +pub struct TransactionResponse { + block_number: u64, + succeeded: bool, +} + +impl TransactionResponse { + pub fn new(block_number: u64, succeeded: bool) -> Self { + Self { + block_number, + succeeded, + } + } + + pub fn block_number(&self) -> u64 { + self.block_number + } - pub fn id(&self) -> StateFragmentId { - (self.block_hash, self.fragment_index) + pub fn succeeded(&self) -> bool { + self.succeeded } } diff --git a/packages/services/src/block_committer.rs b/packages/services/src/block_committer.rs index 8253d39a..b01010d4 100644 --- a/packages/services/src/block_committer.rs +++ b/packages/services/src/block_committer.rs @@ -184,7 +184,7 @@ mod tests { use ports::{ fuel::{FuelBlock, FuelBlockId, FuelConsensus, FuelHeader, FuelPoAConsensus}, l1::{Contract, EventStreamer, MockContract}, - types::{L1Height, U256}, + types::{L1Height, TransactionResponse, U256}, }; use storage::{Postgres, PostgresProcess}; @@ -228,6 +228,13 @@ mod tests { async fn balance(&self) -> ports::l1::Result { self.api.balance().await } + + async fn get_transaction_response( + &self, + _tx_hash: [u8; 32], + ) -> ports::l1::Result> { + Ok(None) + } } fn given_l1_that_expects_submission(block: ValidatedFuelBlock) -> MockL1 { diff --git a/packages/services/src/lib.rs b/packages/services/src/lib.rs index 3187f2ac..5c944b04 100644 --- a/packages/services/src/lib.rs +++ b/packages/services/src/lib.rs @@ -2,20 +2,20 @@ mod block_committer; mod commit_listener; mod health_reporter; -mod status_reporter; -mod wallet_balance_tracker; - mod state_committer; mod state_importer; +mod state_listener; +mod status_reporter; +mod wallet_balance_tracker; pub use block_committer::BlockCommitter; pub use commit_listener::CommitListener; pub use health_reporter::HealthReporter; -pub use status_reporter::StatusReporter; -pub use wallet_balance_tracker::WalletBalanceTracker; - pub use state_committer::StateCommitter; pub use state_importer::StateImporter; +pub use state_listener::StateListener; +pub use status_reporter::StatusReporter; +pub use wallet_balance_tracker::WalletBalanceTracker; #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/packages/services/src/state_committer.rs b/packages/services/src/state_committer.rs index ad2f3e18..4c4419e7 100644 --- a/packages/services/src/state_committer.rs +++ b/packages/services/src/state_committer.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use ports::{storage::Storage, types::StateFragmentId}; +use ports::storage::Storage; use crate::{Result, Runner}; @@ -22,15 +22,15 @@ where L1: ports::l1::Api, Db: Storage, { - async fn prepare_fragments(&self) -> Result<(Vec, Vec)> { + async fn prepare_fragments(&self) -> Result<(Vec, Vec)> { let fragments = self.storage.get_unsubmitted_fragments().await?; let num_fragments = fragments.len(); let mut fragment_ids = Vec::with_capacity(num_fragments); let mut data = Vec::with_capacity(num_fragments); for fragment in fragments { - fragment_ids.push(fragment.id()); - data.extend(fragment.raw_data); + fragment_ids.push(fragment.id.expect("fragments from DB must have `id`")); + data.extend(fragment.data); } Ok((fragment_ids, data)) @@ -75,7 +75,7 @@ where #[cfg(test)] mod tests { use mockall::predicate; - use ports::types::{L1Height, StateFragment, StateSubmission, U256}; + use ports::types::{L1Height, StateFragment, StateSubmission, TransactionResponse, U256}; use storage::PostgresProcess; use super::*; @@ -104,6 +104,13 @@ mod tests { async fn balance(&self) -> ports::l1::Result { Ok(U256::zero()) } + + async fn get_transaction_response( + &self, + _tx_hash: [u8; 32], + ) -> ports::l1::Result> { + Ok(None) + } } fn given_l1_that_expects_submission(fragment: StateFragment) -> MockL1 { @@ -111,7 +118,7 @@ mod tests { l1.api .expect_submit_l2_state() - .with(predicate::eq(fragment.raw_data)) + .with(predicate::eq(fragment.data)) .return_once(move |_| Ok([1u8; 32])); l1 @@ -120,17 +127,16 @@ mod tests { fn given_state() -> (StateSubmission, StateFragment) { ( StateSubmission { + id: None, block_hash: [0u8; 32], block_height: 1, - completed: false, }, StateFragment { - block_hash: [0u8; 32], - transaction_hash: None, - fragment_index: 0, - raw_data: vec![1, 2, 3], + id: None, + submission_id: None, + fragment_idx: 0, + data: vec![1, 2, 3], created_at: ports::types::Utc::now(), - completed: false, }, ) } @@ -143,7 +149,7 @@ mod tests { let process = PostgresProcess::shared().await.unwrap(); let db = process.create_random_db().await?; - db.insert_state(state, vec![fragment]).await?; + db.insert_state_submission(state, vec![fragment]).await?; let mut committer = StateCommitter::new(l1_mock, db.clone()); // when diff --git a/packages/services/src/state_importer.rs b/packages/services/src/state_importer.rs index 261b1a92..c44621a1 100644 --- a/packages/services/src/state_importer.rs +++ b/packages/services/src/state_importer.rs @@ -71,19 +71,18 @@ where .into_iter() .enumerate() .map(|(index, chunk)| StateFragment { - block_hash: *block.id, - transaction_hash: None, - fragment_index: index as u32, - raw_data: chunk.copied().collect(), + id: None, + submission_id: None, + fragment_idx: index as u32, + data: chunk.copied().collect(), created_at: ports::types::Utc::now(), - completed: false, }) .collect(); let submission = StateSubmission { + id: None, block_hash: *block.id, block_height: block.header.height, - completed: false, }; Ok((submission, fragments)) @@ -91,7 +90,9 @@ where async fn import_state(&self, block: FuelBlock) -> Result<()> { let (submission, fragments) = self.block_to_state_submission(block)?; - self.storage.insert_state(submission, fragments).await?; + self.storage + .insert_state_submission(submission, fragments) + .await?; Ok(()) } @@ -198,7 +199,6 @@ mod tests { // given let secret_key = given_secret_key(); let block = given_a_block(1, &secret_key); - let block_id = *block.id; let fuel_mock = given_fetcher(block); let block_validator = BlockValidator::new(secret_key.public_key()); @@ -211,8 +211,9 @@ mod tests { // then let fragments = db.get_unsubmitted_fragments().await?; + let latest_submission = db.state_submission_w_latest_block().await?.unwrap(); assert_eq!(fragments.len(), 1); - assert_eq!(fragments[0].block_hash, block_id); + assert_eq!(fragments[0].submission_id, latest_submission.id); Ok(()) } diff --git a/packages/services/src/state_listener.rs b/packages/services/src/state_listener.rs new file mode 100644 index 00000000..f2edb12f --- /dev/null +++ b/packages/services/src/state_listener.rs @@ -0,0 +1,266 @@ +use async_trait::async_trait; +use ports::{ + storage::Storage, + types::{SubmissionTx, TransactionState}, +}; + +use super::Runner; + +pub struct StateListener { + l1_adapter: L1, + storage: Db, + num_blocks_to_finalize: u64, +} + +impl StateListener { + pub fn new(l1_adapter: L1, storage: Db, num_blocks_to_finalize: u64) -> Self { + Self { + l1_adapter, + storage, + num_blocks_to_finalize, + } + } +} + +impl StateListener +where + L1: ports::l1::Api, + Db: Storage, +{ + async fn check_pending_txs(&mut self, pending_txs: Vec) -> crate::Result<()> { + let current_block_number: u64 = self.l1_adapter.get_block_number().await?.into(); + + for tx in pending_txs { + let Some(tx_response) = self.l1_adapter.get_transaction_response(tx.hash).await? else { + continue; // not committed + }; + + if !tx_response.succeeded() { + self.storage + .update_submission_tx_state(tx.hash, TransactionState::Failed) + .await?; + + continue; + } + + if current_block_number.saturating_sub(tx_response.block_number()) + < self.num_blocks_to_finalize + { + continue; // not finalized + } + + self.storage + .update_submission_tx_state(tx.hash, TransactionState::Finalized) + .await?; + } + + Ok(()) + } +} + +#[async_trait] +impl Runner for StateListener +where + L1: ports::l1::Api + Send + Sync, + Db: Storage, +{ + async fn run(&mut self) -> crate::Result<()> { + let pending_txs = self.storage.get_pending_txs().await?; + + if pending_txs.is_empty() { + return Ok(()); + } + + self.check_pending_txs(pending_txs).await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use mockall::predicate; + use ports::types::{L1Height, StateFragment, StateSubmission, TransactionResponse, U256}; + use storage::PostgresProcess; + + use super::*; + + struct MockL1 { + api: ports::l1::MockApi, + } + impl MockL1 { + fn new() -> Self { + Self { + api: ports::l1::MockApi::new(), + } + } + } + + #[async_trait::async_trait] + impl ports::l1::Api for MockL1 { + async fn submit_l2_state(&self, _state_data: Vec) -> ports::l1::Result<[u8; 32]> { + Ok([0; 32]) + } + + async fn get_block_number(&self) -> ports::l1::Result { + self.api.get_block_number().await + } + + async fn balance(&self) -> ports::l1::Result { + Ok(U256::zero()) + } + + async fn get_transaction_response( + &self, + tx_hash: [u8; 32], + ) -> ports::l1::Result> { + self.api.get_transaction_response(tx_hash).await + } + } + + fn given_l1_that_expects_get_transaction_receipt( + tx_hash: [u8; 32], + current_block_number: u32, + block_number: u64, + ) -> MockL1 { + let mut l1 = MockL1::new(); + + l1.api + .expect_get_block_number() + .return_once(move || Ok(current_block_number.into())); + + let transaction_response = TransactionResponse::new(block_number, true); + l1.api + .expect_get_transaction_response() + .with(predicate::eq(tx_hash)) + .return_once(move |_| Ok(Some(transaction_response))); + + l1 + } + + fn given_l1_that_returns_failed_transaction(tx_hash: [u8; 32]) -> MockL1 { + let mut l1 = MockL1::new(); + + l1.api + .expect_get_block_number() + .return_once(move || Ok(0u32.into())); + + let transaction_response = TransactionResponse::new(0, false); + + l1.api + .expect_get_transaction_response() + .with(predicate::eq(tx_hash)) + .return_once(move |_| Ok(Some(transaction_response))); + + l1 + } + + fn given_state() -> (StateSubmission, StateFragment, Vec) { + let submission = StateSubmission { + id: None, + block_hash: [0u8; 32], + block_height: 1, + }; + let fragment_id = 1; + let fragment = StateFragment { + id: Some(fragment_id), + submission_id: None, + fragment_idx: 0, + data: vec![1, 2, 3], + created_at: ports::types::Utc::now(), + }; + let fragment_ids = vec![fragment_id]; + + (submission, fragment, fragment_ids) + } + + #[tokio::test] + async fn state_listener_will_update_tx_state_if_finalized() -> crate::Result<()> { + // given + let (state, fragment, fragment_ids) = given_state(); + let tx_hash = [1; 32]; + + let process = PostgresProcess::shared().await.unwrap(); + let db = process.create_random_db().await?; + db.insert_state_submission(state, vec![fragment]).await?; + db.record_pending_tx(tx_hash, fragment_ids).await?; + + let current_block_number = 34; + let tx_block_number = 32; + let l1_mock = given_l1_that_expects_get_transaction_receipt( + tx_hash, + current_block_number, + tx_block_number, + ); + + let num_blocks_to_finalize = 1; + let mut listener = StateListener::new(l1_mock, db.clone(), num_blocks_to_finalize); + assert!(db.has_pending_txs().await?); + + // when + listener.run().await.unwrap(); + + // then + assert!(!db.has_pending_txs().await?); + + Ok(()) + } + + #[tokio::test] + async fn state_listener_will_not_update_tx_state_if_not_finalized() -> crate::Result<()> { + // given + let (state, fragment, fragment_ids) = given_state(); + let tx_hash = [1; 32]; + + let process = PostgresProcess::shared().await.unwrap(); + let db = process.create_random_db().await?; + db.insert_state_submission(state, vec![fragment]).await?; + db.record_pending_tx(tx_hash, fragment_ids).await?; + + let current_block_number = 34; + let tx_block_number = 32; + let l1_mock = given_l1_that_expects_get_transaction_receipt( + tx_hash, + current_block_number, + tx_block_number, + ); + + let num_blocks_to_finalize = 4; + let mut listener = StateListener::new(l1_mock, db.clone(), num_blocks_to_finalize); + assert!(db.has_pending_txs().await?); + + // when + listener.run().await.unwrap(); + + // then + assert!(db.has_pending_txs().await?); + + Ok(()) + } + + #[tokio::test] + async fn state_listener_will_update_tx_state_if_failed() -> crate::Result<()> { + // given + let (state, fragment, fragment_ids) = given_state(); + let tx_hash = [1; 32]; + + let process = PostgresProcess::shared().await.unwrap(); + let db = process.create_random_db().await?; + db.insert_state_submission(state, vec![fragment]).await?; + db.record_pending_tx(tx_hash, fragment_ids).await?; + + let l1_mock = given_l1_that_returns_failed_transaction(tx_hash); + + let num_blocks_to_finalize = 4; + let mut listener = StateListener::new(l1_mock, db.clone(), num_blocks_to_finalize); + assert!(db.has_pending_txs().await?); + + // when + listener.run().await.unwrap(); + + // then + assert!(!db.has_pending_txs().await?); + + Ok(()) + } +} diff --git a/packages/storage/.sqlx/query-0637d9796d86c776b1bd9545119d18f5610d481afcc5673ccf8cec0e60cf8a55.json b/packages/storage/.sqlx/query-0637d9796d86c776b1bd9545119d18f5610d481afcc5673ccf8cec0e60cf8a55.json deleted file mode 100644 index 7079182a..00000000 --- a/packages/storage/.sqlx/query-0637d9796d86c776b1bd9545119d18f5610d481afcc5673ccf8cec0e60cf8a55.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT EXISTS (SELECT 1 FROM l1_pending_transaction LIMIT 1) as exists", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "exists", - "type_info": "Bool" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - null - ] - }, - "hash": "0637d9796d86c776b1bd9545119d18f5610d481afcc5673ccf8cec0e60cf8a55" -} diff --git a/packages/storage/.sqlx/query-312026b68fc8b8cd1ffefda229ee18db5e0f1f5707cf3ca653d155043387a669.json b/packages/storage/.sqlx/query-312026b68fc8b8cd1ffefda229ee18db5e0f1f5707cf3ca653d155043387a669.json deleted file mode 100644 index 6aa7bdbc..00000000 --- a/packages/storage/.sqlx/query-312026b68fc8b8cd1ffefda229ee18db5e0f1f5707cf3ca653d155043387a669.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO l1_pending_transaction (transaction_hash) VALUES ($1)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea" - ] - }, - "nullable": [] - }, - "hash": "312026b68fc8b8cd1ffefda229ee18db5e0f1f5707cf3ca653d155043387a669" -} diff --git a/packages/storage/.sqlx/query-65238ff6a15221c4f29eecf2cbf94429f426c422c32367f3a59c7a6a6845063a.json b/packages/storage/.sqlx/query-65238ff6a15221c4f29eecf2cbf94429f426c422c32367f3a59c7a6a6845063a.json deleted file mode 100644 index 9037ab29..00000000 --- a/packages/storage/.sqlx/query-65238ff6a15221c4f29eecf2cbf94429f426c422c32367f3a59c7a6a6845063a.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO l1_state_fragment (fuel_block_hash, raw_data, fragment_index, completed) VALUES ($1, $2, $3, $4)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea", - "Bytea", - "Int8", - "Bool" - ] - }, - "nullable": [] - }, - "hash": "65238ff6a15221c4f29eecf2cbf94429f426c422c32367f3a59c7a6a6845063a" -} diff --git a/packages/storage/.sqlx/query-eb85593bdf620bee502d4360198b143920b80db4ecfe39f25965de41a3e74c26.json b/packages/storage/.sqlx/query-eb85593bdf620bee502d4360198b143920b80db4ecfe39f25965de41a3e74c26.json deleted file mode 100644 index 8fc76e52..00000000 --- a/packages/storage/.sqlx/query-eb85593bdf620bee502d4360198b143920b80db4ecfe39f25965de41a3e74c26.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT * FROM l1_state_fragment WHERE completed = false ORDER BY created_at ASC LIMIT 6", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "fuel_block_hash", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "fragment_index", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "raw_data", - "type_info": "Bytea" - }, - { - "ordinal": 3, - "name": "completed", - "type_info": "Bool" - }, - { - "ordinal": 4, - "name": "created_at", - "type_info": "Timestamptz" - }, - { - "ordinal": 5, - "name": "transaction_hash", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false, - false, - false, - false, - true - ] - }, - "hash": "eb85593bdf620bee502d4360198b143920b80db4ecfe39f25965de41a3e74c26" -} diff --git a/packages/storage/.sqlx/query-f68b2767c2f768badcc699e0a730e2721d9be64f5801da7f69925589da850266.json b/packages/storage/.sqlx/query-f68b2767c2f768badcc699e0a730e2721d9be64f5801da7f69925589da850266.json deleted file mode 100644 index 7557bea9..00000000 --- a/packages/storage/.sqlx/query-f68b2767c2f768badcc699e0a730e2721d9be64f5801da7f69925589da850266.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO l1_state_submission (fuel_block_hash, fuel_block_height, completed) VALUES ($1, $2, $3)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea", - "Int8", - "Bool" - ] - }, - "nullable": [] - }, - "hash": "f68b2767c2f768badcc699e0a730e2721d9be64f5801da7f69925589da850266" -} diff --git a/packages/storage/.sqlx/query-f9e5936696cd88780b8a24968b4bee7ecd2d6f4fb317cf53b274a0ace7941de4.json b/packages/storage/.sqlx/query-f9e5936696cd88780b8a24968b4bee7ecd2d6f4fb317cf53b274a0ace7941de4.json deleted file mode 100644 index e579289d..00000000 --- a/packages/storage/.sqlx/query-f9e5936696cd88780b8a24968b4bee7ecd2d6f4fb317cf53b274a0ace7941de4.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE l1_state_fragment SET transaction_hash = $1 WHERE fuel_block_hash = $2 AND fragment_index = $3", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea", - "Bytea", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "f9e5936696cd88780b8a24968b4bee7ecd2d6f4fb317cf53b274a0ace7941de4" -} diff --git a/packages/storage/migrations/0001_initial.down.sql b/packages/storage/migrations/0001_initial.down.sql index b908600d..844ddddf 100644 --- a/packages/storage/migrations/0001_initial.down.sql +++ b/packages/storage/migrations/0001_initial.down.sql @@ -1,4 +1,5 @@ DROP TABLE IF EXISTS l1_fuel_block_submission; -DROP TABLE IF EXISTS l1_state_fragment; -DROP TABLE IF EXISTS l1_pending_transaction; -DROP TABLE IF EXISTS l1_state_submission; +DROP TABLE IF EXISTS l1_submissions; +DROP TABLE IF EXISTS l1_fragments; +DROP TABLE IF EXISTS l1_transactions; +DROP TABLE IF EXISTS l1_transaction_fragments; diff --git a/packages/storage/migrations/0001_initial.up.sql b/packages/storage/migrations/0001_initial.up.sql index 7e4303f1..e739c432 100644 --- a/packages/storage/migrations/0001_initial.up.sql +++ b/packages/storage/migrations/0001_initial.up.sql @@ -8,28 +8,33 @@ CREATE TABLE IF NOT EXISTS l1_fuel_block_submission ( CHECK (octet_length(fuel_block_hash) = 32) ); -CREATE TABLE IF NOT EXISTS l1_pending_transaction ( - transaction_hash BYTEA PRIMARY KEY NOT NULL, - CHECK (octet_length(transaction_hash) = 32) +CREATE TABLE IF NOT EXISTS l1_submissions ( + id SERIAL PRIMARY KEY, + fuel_block_hash BYTEA NOT NULL, + fuel_block_height BIGINT NOT NULL UNIQUE CHECK (fuel_block_height >= 0), + CHECK (octet_length(fuel_block_hash) = 32) ); -CREATE TABLE IF NOT EXISTS l1_state_submission ( - fuel_block_hash BYTEA PRIMARY KEY NOT NULL, - fuel_block_height BIGINT NOT NULL UNIQUE CHECK (fuel_block_height >= 0), - completed BOOLEAN NOT NULL, - CHECK (octet_length(fuel_block_hash) = 32) +CREATE TABLE IF NOT EXISTS l1_fragments ( + id SERIAL PRIMARY KEY, + fragment_idx BIGINT NOT NULL CHECK (fragment_idx >= 0), + submission_id INTEGER NOT NULL REFERENCES l1_submissions(id), + data BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS l1_transactions ( + id SERIAL PRIMARY KEY, + hash BYTEA NOT NULL UNIQUE, + state SMALLINT NOT NULL, + CHECK (octet_length(hash) = 32), + CHECK (state IN (0, 1, 2)) ); -CREATE TABLE IF NOT EXISTS l1_state_fragment ( - fuel_block_hash BYTEA NOT NULL REFERENCES l1_state_submission(fuel_block_hash) ON DELETE CASCADE, - fragment_index BIGINT NOT NULL, - raw_data BYTEA NOT NULL, - completed BOOLEAN NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, - transaction_hash BYTEA REFERENCES l1_pending_transaction(transaction_hash) ON DELETE SET NULL, - PRIMARY KEY (fuel_block_hash, fragment_index), - CHECK (octet_length(fuel_block_hash) = 32), - CHECK (fragment_index >= 0) +CREATE TABLE IF NOT EXISTS l1_transaction_fragments ( + transaction_id INTEGER NOT NULL REFERENCES l1_transactions(id), + fragment_id INTEGER NOT NULL REFERENCES l1_fragments(id), + PRIMARY KEY (transaction_id, fragment_id) ); COMMIT; diff --git a/packages/storage/src/lib.rs b/packages/storage/src/lib.rs index 3f845d6c..05ccb4e3 100644 --- a/packages/storage/src/lib.rs +++ b/packages/storage/src/lib.rs @@ -7,64 +7,68 @@ pub use test_instance::*; mod error; mod postgres; -use ports::types::BlockSubmission; -pub use postgres::*; - -use ports::types::{StateFragment, StateFragmentId, StateSubmission}; +use ports::{ + storage::{Result, Storage}, + types::{BlockSubmission, StateFragment, StateSubmission, SubmissionTx, TransactionState}, +}; +pub use postgres::{DbConfig, Postgres}; #[async_trait::async_trait] -impl ports::storage::Storage for postgres::Postgres { - async fn insert(&self, submission: BlockSubmission) -> ports::storage::Result<()> { +impl Storage for Postgres { + async fn insert(&self, submission: BlockSubmission) -> Result<()> { Ok(self._insert(submission).await?) } - async fn submission_w_latest_block(&self) -> ports::storage::Result> { + async fn submission_w_latest_block(&self) -> Result> { Ok(self._submission_w_latest_block().await?) } - async fn set_submission_completed( - &self, - fuel_block_hash: [u8; 32], - ) -> ports::storage::Result { + async fn set_submission_completed(&self, fuel_block_hash: [u8; 32]) -> Result { Ok(self._set_submission_completed(fuel_block_hash).await?) } - async fn insert_state( + async fn insert_state_submission( &self, - state: StateSubmission, + submission: StateSubmission, fragments: Vec, - ) -> ports::storage::Result<()> { - Ok(self._insert_state(state, fragments).await?) + ) -> Result<()> { + Ok(self._insert_state_submission(submission, fragments).await?) } - async fn get_unsubmitted_fragments(&self) -> ports::storage::Result> { + async fn get_unsubmitted_fragments(&self) -> Result> { Ok(self._get_unsubmitted_fragments().await?) } - async fn record_pending_tx( - &self, - tx_hash: [u8; 32], - fragment_ids: Vec, - ) -> ports::storage::Result<()> { + async fn record_pending_tx(&self, tx_hash: [u8; 32], fragment_ids: Vec) -> Result<()> { Ok(self._record_pending_tx(tx_hash, fragment_ids).await?) } - async fn has_pending_txs(&self) -> ports::storage::Result { + async fn get_pending_txs(&self) -> Result> { + Ok(self._get_pending_txs().await?) + } + + async fn has_pending_txs(&self) -> Result { Ok(self._has_pending_txs().await?) } - async fn state_submission_w_latest_block( - &self, - ) -> ports::storage::Result> { + async fn state_submission_w_latest_block(&self) -> Result> { Ok(self._state_submission_w_latest_block().await?) } + + async fn update_submission_tx_state( + &self, + hash: [u8; 32], + state: TransactionState, + ) -> Result<()> { + Ok(self._update_submission_tx_state(hash, state).await?) + } } #[cfg(test)] mod tests { use ports::{ - storage::{Error, Storage}, - types::BlockSubmission, + storage::{Error, Result, Storage}, + types::{BlockSubmission, StateFragment, StateSubmission, TransactionState}, }; use rand::{thread_rng, Rng}; use storage as _; @@ -142,4 +146,163 @@ mod tests { submission } + + #[tokio::test] + async fn insert_state_submission() -> Result<()> { + // given + let process = PostgresProcess::shared().await?; + let db = process.create_random_db().await?; + + let (state, fragments) = given_state_and_fragments(); + + // when + db.insert_state_submission(state, fragments.clone()).await?; + + // then + let db_fragments = db.get_unsubmitted_fragments().await?; + + assert_eq!(db_fragments.len(), fragments.len()); + + Ok(()) + } + + #[tokio::test] + async fn record_pending_tx() -> Result<()> { + // given + let process = PostgresProcess::shared().await?; + let db = process.create_random_db().await?; + + let (state, fragments) = given_state_and_fragments(); + db.insert_state_submission(state, fragments.clone()).await?; + let tx_hash = [1; 32]; + let fragment_ids = vec![1]; + + // when + db.record_pending_tx(tx_hash, fragment_ids).await?; + + // then + let has_pending_tx = db.has_pending_txs().await?; + let pending_tx = db.get_pending_txs().await?; + + assert!(has_pending_tx); + + assert_eq!(pending_tx.len(), 1); + assert_eq!(pending_tx[0].hash, tx_hash); + assert_eq!(pending_tx[0].state, TransactionState::Pending); + + Ok(()) + } + + #[tokio::test] + async fn update_submission_tx_state() -> Result<()> { + // given + let process = PostgresProcess::shared().await?; + let db = process.create_random_db().await?; + + let (state, fragments) = given_state_and_fragments(); + db.insert_state_submission(state, fragments.clone()).await?; + let tx_hash = [1; 32]; + let fragment_ids = vec![1]; + db.record_pending_tx(tx_hash, fragment_ids).await?; + + // when + db.update_submission_tx_state(tx_hash, TransactionState::Finalized) + .await?; + + // then + let has_pending_tx = db.has_pending_txs().await?; + let pending_tx = db.get_pending_txs().await?; + + assert!(!has_pending_tx); + assert!(pending_tx.is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn unsbumitted_fragments_are_not_in_pending_or_finalized_tx() -> Result<()> { + // given + let process = PostgresProcess::shared().await?; + let db = process.create_random_db().await?; + + let (state, fragments) = given_state_and_fragments(); + db.insert_state_submission(state, fragments.clone()).await?; + + // when + // tx failed + let tx_hash = [1; 32]; + let fragment_ids = vec![1, 2]; + db.record_pending_tx(tx_hash, fragment_ids).await?; + db.update_submission_tx_state(tx_hash, TransactionState::Failed) + .await?; + + // tx is finalized + let tx_hash = [2; 32]; + let fragment_ids = vec![2]; + db.record_pending_tx(tx_hash, fragment_ids).await?; + db.update_submission_tx_state(tx_hash, TransactionState::Finalized) + .await?; + + // tx is pending + let tx_hash = [3; 32]; + let fragment_ids = vec![3]; + db.record_pending_tx(tx_hash, fragment_ids).await?; + + // then + let db_fragments = db.get_unsubmitted_fragments().await?; + + let db_fragment_id: Vec<_> = db_fragments.iter().map(|f| f.id.expect("has id")).collect(); + + // unsubmitted fragments are not associated to any finalized or pending tx + assert_eq!(db_fragment_id, vec![1, 4, 5]); + + Ok(()) + } + + fn given_state_and_fragments() -> (StateSubmission, Vec) { + ( + StateSubmission { + id: None, + block_hash: [0u8; 32], + block_height: 1, + }, + vec![ + StateFragment { + id: None, + submission_id: None, + fragment_idx: 0, + data: vec![1, 2], + created_at: ports::types::Utc::now(), + }, + StateFragment { + id: None, + submission_id: None, + fragment_idx: 1, + data: vec![3, 4], + created_at: ports::types::Utc::now(), + }, + StateFragment { + id: None, + submission_id: None, + fragment_idx: 2, + data: vec![5, 6], + created_at: ports::types::Utc::now(), + }, + StateFragment { + id: None, + submission_id: None, + fragment_idx: 3, + data: vec![7, 8], + created_at: ports::types::Utc::now(), + }, + StateFragment { + id: None, + submission_id: None, + fragment_idx: 4, + data: vec![9, 10], + created_at: ports::types::Utc::now(), + }, + ], + ) + } } diff --git a/packages/storage/src/postgres.rs b/packages/storage/src/postgres.rs index 32648812..e06c67f9 100644 --- a/packages/storage/src/postgres.rs +++ b/packages/storage/src/postgres.rs @@ -1,5 +1,6 @@ -use crate::tables::state_submission::{L1StateFragment, L1StateSubmission}; -use ports::types::{BlockSubmission, StateFragment, StateFragmentId, StateSubmission}; +use ports::types::{ + BlockSubmission, StateFragment, StateSubmission, SubmissionTx, TransactionState, +}; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; use super::error::{Error, Result}; @@ -130,44 +131,43 @@ impl Postgres { } } - pub(crate) async fn _insert_state( + pub(crate) async fn _insert_state_submission( &self, state: StateSubmission, fragments: Vec, ) -> Result<()> { if fragments.is_empty() { return Err(Error::Database( - "Cannot insert state with no fragments".to_string(), + "cannot insert state with no fragments".to_string(), )); } - let state_row = L1StateSubmission::from(state); + let state_row = tables::L1StateSubmission::from(state); let fragment_rows = fragments .into_iter() - .map(L1StateFragment::from) + .map(tables::L1StateFragment::from) .collect::>(); let mut transaction = self.connection_pool.begin().await?; // Insert the state submission - sqlx::query!( - "INSERT INTO l1_state_submission (fuel_block_hash, fuel_block_height, completed) VALUES ($1, $2, $3)", + let submission_id = sqlx::query!( + "INSERT INTO l1_submissions (fuel_block_hash, fuel_block_height) VALUES ($1, $2) RETURNING id", state_row.fuel_block_hash, - state_row.fuel_block_height, - state_row.completed, + state_row.fuel_block_height ) - .execute(&mut *transaction) - .await?; + .fetch_one(&mut *transaction) + .await?.id; // Insert the state fragments // TODO: optimize this for fragment_row in fragment_rows { sqlx::query!( - "INSERT INTO l1_state_fragment (fuel_block_hash, raw_data, fragment_index, completed) VALUES ($1, $2, $3, $4)", - fragment_row.fuel_block_hash, - fragment_row.raw_data, - fragment_row.fragment_index, - fragment_row.completed, + "INSERT INTO l1_fragments (fragment_idx, submission_id, data, created_at) VALUES ($1, $2, $3, $4)", + fragment_row.fragment_idx, + submission_id, + fragment_row.data, + fragment_row.created_at ) .execute(&mut *transaction) .await?; @@ -179,10 +179,24 @@ impl Postgres { } pub(crate) async fn _get_unsubmitted_fragments(&self) -> Result> { - // TODO use blob limit + const BLOB_LIMIT: i64 = 6; let rows = sqlx::query_as!( - L1StateFragment, - "SELECT * FROM l1_state_fragment WHERE completed = false ORDER BY created_at ASC LIMIT 6" + // all fragments that are not associated to any pending or finalized tx + tables::L1StateFragment, + "SELECT l1_fragments.* + FROM l1_fragments + WHERE l1_fragments.id NOT IN ( + SELECT l1_fragments.id + FROM l1_fragments + JOIN l1_transaction_fragments ON l1_fragments.id = l1_transaction_fragments.fragment_id + JOIN l1_transactions ON l1_transaction_fragments.transaction_id = l1_transactions.id + WHERE l1_transactions.state IN ($1, $2) + ) + ORDER BY l1_fragments.created_at + LIMIT $3;", + TransactionState::Finalized.into_i16(), + TransactionState::Pending.into_i16(), + BLOB_LIMIT ) .fetch_all(&self.connection_pool) .await? @@ -195,23 +209,24 @@ impl Postgres { pub(crate) async fn _record_pending_tx( &self, tx_hash: [u8; 32], - fragment_ids: Vec, + fragment_ids: Vec, ) -> Result<()> { let mut transaction = self.connection_pool.begin().await?; - sqlx::query!( - "INSERT INTO l1_pending_transaction (transaction_hash) VALUES ($1)", - tx_hash.as_slice() + let transaction_id = sqlx::query!( + "INSERT INTO l1_transactions (hash, state) VALUES ($1, $2) RETURNING id", + tx_hash.as_slice(), + TransactionState::Pending.into_i16(), ) - .execute(&mut *transaction) - .await?; + .fetch_one(&mut *transaction) + .await? + .id; - for (block_hash, fragment_idx) in fragment_ids { + for fragment_id in fragment_ids { sqlx::query!( - "UPDATE l1_state_fragment SET transaction_hash = $1 WHERE fuel_block_hash = $2 AND fragment_index = $3", - tx_hash.as_slice(), - block_hash.as_slice(), - fragment_idx as i64 + "INSERT INTO l1_transaction_fragments (transaction_id, fragment_id) VALUES ($1, $2)", + transaction_id, + fragment_id as i64 ) .execute(&mut *transaction) .await?; @@ -223,24 +238,54 @@ impl Postgres { } pub(crate) async fn _has_pending_txs(&self) -> Result { - let resp = - sqlx::query!("SELECT EXISTS (SELECT 1 FROM l1_pending_transaction LIMIT 1) as exists") - .fetch_one(&self.connection_pool) - .await?; + Ok(sqlx::query!( + "SELECT EXISTS (SELECT 1 FROM l1_transactions WHERE state = $1) AS has_pending_transactions;", + TransactionState::Pending.into_i16() + ) + .fetch_one(&self.connection_pool) + .await? + .has_pending_transactions.unwrap_or(false)) + } - Ok(resp.exists.expect("query will always return a row")) + pub(crate) async fn _get_pending_txs(&self) -> Result> { + sqlx::query_as!( + tables::L1SubmissionTx, + "SELECT * FROM l1_transactions WHERE state = $1", + TransactionState::Pending.into_i16() + ) + .fetch_all(&self.connection_pool) + .await? + .into_iter() + .map(SubmissionTx::try_from) + .collect::>>() } pub(crate) async fn _state_submission_w_latest_block( &self, ) -> crate::error::Result> { sqlx::query_as!( - L1StateSubmission, - "SELECT * FROM l1_state_submission ORDER BY fuel_block_height DESC LIMIT 1" + tables::L1StateSubmission, + "SELECT * FROM l1_submissions ORDER BY fuel_block_height DESC LIMIT 1" ) .fetch_optional(&self.connection_pool) .await? .map(StateSubmission::try_from) .transpose() } + + pub(crate) async fn _update_submission_tx_state( + &self, + hash: [u8; 32], + state: TransactionState, + ) -> Result<()> { + sqlx::query!( + "UPDATE l1_transactions SET state = $1 WHERE hash = $2", + state.into_i16(), + hash.as_slice(), + ) + .execute(&self.connection_pool) + .await?; + + Ok(()) + } } diff --git a/packages/storage/src/tables.rs b/packages/storage/src/tables.rs index 2175dc2c..9dde848e 100644 --- a/packages/storage/src/tables.rs +++ b/packages/storage/src/tables.rs @@ -1,4 +1,7 @@ -use ports::types::BlockSubmission; +use ports::types::{ + BlockSubmission, StateFragment, StateSubmission, SubmissionTx, TransactionState, +}; +use sqlx::types::chrono; macro_rules! bail { ($msg: literal, $($args: expr),*) => { @@ -55,141 +58,124 @@ impl From for L1FuelBlockSubmission { } } -pub mod state_submission { - use ports::types::{StateFragment, StateSubmission}; - use sqlx::types::chrono; - - #[derive(sqlx::FromRow)] - pub struct L1StateSubmission { - pub fuel_block_hash: Vec, - pub fuel_block_height: i64, - pub completed: bool, - } - - #[derive(sqlx::FromRow)] - pub struct L1StateFragment { - pub fuel_block_hash: Vec, - pub transaction_hash: Option>, - pub raw_data: Vec, - pub created_at: chrono::DateTime, - pub fragment_index: i64, - pub completed: bool, - } +#[derive(sqlx::FromRow)] +pub struct L1StateSubmission { + pub id: i64, + pub fuel_block_hash: Vec, + pub fuel_block_height: i64, +} - impl TryFrom for StateSubmission { - type Error = crate::error::Error; +impl TryFrom for StateSubmission { + type Error = crate::error::Error; - fn try_from(value: L1StateSubmission) -> Result { - let block_hash = value.fuel_block_hash.as_slice(); - let Ok(block_hash) = block_hash.try_into() else { - bail!("Expected 32 bytes for `fuel_block_hash`, but got: {block_hash:?} from db",); - }; + fn try_from(value: L1StateSubmission) -> Result { + let block_hash = value.fuel_block_hash.as_slice(); + let Ok(block_hash) = block_hash.try_into() else { + bail!("Expected 32 bytes for `fuel_block_hash`, but got: {block_hash:?} from db",); + }; - let Ok(block_height) = value.fuel_block_height.try_into() else { - bail!( + let Ok(block_height) = value.fuel_block_height.try_into() else { + bail!( "`fuel_block_height` as read from the db cannot fit in a `u32` as expected. Got: {:?} from db", value.fuel_block_height ); - }; + }; - Ok(Self { - block_height, - block_hash, - completed: value.completed, - }) - } + Ok(Self { + id: Some(value.id as u32), + block_height, + block_hash, + }) } +} - impl From for L1StateSubmission { - fn from(value: StateSubmission) -> Self { - Self { - fuel_block_height: i64::from(value.block_height), - completed: value.completed, - fuel_block_hash: value.block_hash.to_vec(), - } +impl From for L1StateSubmission { + fn from(value: StateSubmission) -> Self { + Self { + // if not present use placeholder as id is given by db + id: value.id.unwrap_or_default() as i64, + fuel_block_height: i64::from(value.block_height), + fuel_block_hash: value.block_hash.to_vec(), } } +} - impl TryFrom for StateFragment { - type Error = crate::error::Error; - - fn try_from(value: L1StateFragment) -> Result { - let block_hash = value.fuel_block_hash.as_slice(); - let Ok(block_hash) = block_hash.try_into() else { - bail!("Expected 32 bytes for `fuel_block_hash`, but got: {block_hash:?} from db",); - }; - - let transaction_hash = match value.transaction_hash { - Some(hash) => { - let Ok(hash) = hash.as_slice().try_into() else { - bail!( - "Expected 32 bytes for `transaction_hash`, but got: {hash:?} from db", - ); - }; - - Some(hash) - } - None => None, - }; - - let fragment_index = value.fragment_index.try_into(); - let Ok(fragment_index) = fragment_index else { - bail!( - "`fragment_index` as read from the db cannot fit in a `u32` as expected. Got: {} from db", - value.fragment_index - ); - }; - - Ok(Self { - block_hash, - transaction_hash, - raw_data: value.raw_data, - created_at: value.created_at, - completed: value.completed, - fragment_index, - }) - } +#[derive(sqlx::FromRow)] +pub struct L1StateFragment { + pub id: i64, + pub submission_id: i64, + pub fragment_idx: i64, + pub data: Vec, + pub created_at: chrono::DateTime, +} + +impl TryFrom for StateFragment { + type Error = crate::error::Error; + + fn try_from(value: L1StateFragment) -> Result { + Ok(Self { + id: Some(value.id as u32), + submission_id: Some(value.submission_id as u32), + fragment_idx: value.fragment_idx as u32, + data: value.data, + created_at: value.created_at, + }) } +} - impl From for L1StateFragment { - fn from(value: StateFragment) -> Self { - Self { - fuel_block_hash: value.block_hash.to_vec(), - transaction_hash: value.transaction_hash.map(|hash| hash.to_vec()), - raw_data: value.raw_data, - created_at: value.created_at, - completed: value.completed, - fragment_index: i64::from(value.fragment_index), - } +impl From for L1StateFragment { + fn from(value: StateFragment) -> Self { + Self { + // if not present use placeholder as id is given by db + id: value.id.unwrap_or_default() as i64, + // if not present use placeholder as id is given by db + submission_id: value.submission_id.unwrap_or_default() as i64, + fragment_idx: value.fragment_idx as i64, + data: value.data, + created_at: value.created_at, } } +} - #[derive(sqlx::FromRow)] - pub struct L1PendingTransaction { - pub transaction_hash: Vec, - } +#[derive(sqlx::FromRow)] +pub struct L1SubmissionTx { + pub id: i64, + pub hash: Vec, + pub state: i16, +} - impl TryFrom for [u8; 32] { - type Error = crate::error::Error; +impl TryFrom for SubmissionTx { + type Error = crate::error::Error; + + fn try_from(value: L1SubmissionTx) -> Result { + let hash = value.hash.as_slice(); + let Ok(hash) = hash.try_into() else { + bail!("Expected 32 bytes for transaction hash, but got: {hash:?} from db",); + }; - fn try_from(value: L1PendingTransaction) -> Result { - let transaction_hash = value.transaction_hash.as_slice(); - let Ok(transaction_hash) = transaction_hash.try_into() else { - bail!( - "Expected 32 bytes for `transaction_hash`, but got: {transaction_hash:?} from db", + let Some(state) = TransactionState::from_i16(value.state) else { + bail!( + "state: {:?} is not a valid variant of `TransactionState`", + value.state ); - }; + }; - Ok(transaction_hash) - } + Ok(SubmissionTx { + id: Some(value.id as u32), + hash, + state, + }) } +} - impl From<[u8; 32]> for L1PendingTransaction { - fn from(value: [u8; 32]) -> Self { - Self { - transaction_hash: value.to_vec(), - } +impl From for L1SubmissionTx { + fn from(value: SubmissionTx) -> Self { + Self { + // if not present use placeholder as id is given by db + id: value.id.unwrap_or_default() as i64, + hash: value.hash.to_vec(), + state: value.state.into_i16(), } } }