From 39a901d3e7b2ccd7452c7ff8b59ea3329cb6d59c Mon Sep 17 00:00:00 2001 From: Rakan Al-Huneiti Date: Mon, 12 Aug 2024 11:43:03 +0300 Subject: [PATCH] Remove postgres (#962) * Remove postgres * More cleanups * Remove db_config * Store mempool transactions * Wait for commitment alternative impl * Resolve mempool restoration test * Resolve all tests but one * Set last commitment l2 height in fullnode * Fix clippy check * Remove check_commitment_in_offchain_db test * Remove unused imports * Add comments --- .github/workflows/checks.yml | 28 -- Cargo.lock | 173 +------ Cargo.toml | 3 - Makefile | 3 +- bin/citrea/Cargo.toml | 1 - bin/citrea/src/lib.rs | 2 - bin/citrea/tests/e2e/mod.rs | 32 -- bin/citrea/tests/e2e/proving.rs | 135 +----- bin/citrea/tests/e2e/sequencer_behaviour.rs | 23 +- bin/citrea/tests/e2e/sequencer_replacement.rs | 91 ++-- bin/citrea/tests/e2e/syncing.rs | 2 - bin/citrea/tests/sequencer_commitments/mod.rs | 82 +--- bin/citrea/tests/test_helpers/mod.rs | 112 +++-- crates/fullnode/Cargo.toml | 1 - crates/fullnode/src/runner.rs | 2 + crates/prover/Cargo.toml | 1 - crates/prover/src/runner.rs | 36 +- crates/sequencer/Cargo.toml | 2 - crates/sequencer/src/commitment_controller.rs | 2 +- crates/sequencer/src/config.rs | 10 - crates/sequencer/src/rpc.rs | 34 +- crates/sequencer/src/sequencer.rs | 139 ++---- crates/shared-backup-db/Cargo.toml | 27 -- crates/shared-backup-db/README.md | 5 - crates/shared-backup-db/src/config.rs | 101 ---- crates/shared-backup-db/src/lib.rs | 7 - .../src/postgres_connector.rs | 440 ------------------ crates/shared-backup-db/src/tables.rs | 157 ------- .../full-node/db/sov-db/src/ledger_db/mod.rs | 78 +++- .../db/sov-db/src/ledger_db/traits.rs | 23 +- .../full-node/db/sov-db/src/schema/tables.rs | 6 + .../full-node/sov-stf-runner/Cargo.toml | 2 - .../full-node/sov-stf-runner/src/config.rs | 12 - docker-compose.postgres.yml | 11 - docs/run.md | 13 - .../bitcoin-regtest/sequencer_config.toml | 7 - .../mock-dockerized/sequencer_config.toml | 7 - resources/configs/mock/prover_config.toml | 7 - resources/configs/mock/sequencer_config.toml | 7 - resources/hive/.dockerignore | 1 - 40 files changed, 253 insertions(+), 1572 deletions(-) delete mode 100644 crates/shared-backup-db/Cargo.toml delete mode 100644 crates/shared-backup-db/README.md delete mode 100644 crates/shared-backup-db/src/config.rs delete mode 100644 crates/shared-backup-db/src/lib.rs delete mode 100644 crates/shared-backup-db/src/postgres_connector.rs delete mode 100644 crates/shared-backup-db/src/tables.rs delete mode 100644 docker-compose.postgres.yml diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index e465201f2..5bd02f02e 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -148,20 +148,6 @@ jobs: coverage: runs-on: ubicloud-standard-16 if: github.event.pull_request.draft == false - services: - postgres: - image: postgres:latest - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - POSTGRES_DB: postgres - ports: - - 5432:5432 - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@1.78.0 @@ -342,20 +328,6 @@ jobs: runs-on: ubicloud-standard-16 timeout-minutes: 60 if: github.event.pull_request.draft == false - services: - postgres: - image: postgres:latest - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - POSTGRES_DB: postgres - ports: - - 5432:5432 - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 steps: - uses: actions/checkout@v4 - uses: rui314/setup-mold@v1 diff --git a/Cargo.lock b/Cargo.lock index 838de227b..80f8d7532 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,7 +20,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" dependencies = [ "cpp_demangle", - "fallible-iterator 0.3.0", + "fallible-iterator", "gimli", "memmap2 0.9.4", "object 0.35.0", @@ -1820,7 +1820,6 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", - "shared-backup-db", "soft-confirmation-rule-enforcer", "sov-db", "sov-ledger-rpc", @@ -1897,7 +1896,6 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", - "shared-backup-db", "sov-db", "sov-mock-da", "sov-mock-zkvm", @@ -1946,7 +1944,6 @@ dependencies = [ "sequencer-client", "serde", "sha2 0.10.8", - "shared-backup-db", "sov-db", "sov-mock-da", "sov-mock-zkvm", @@ -1995,7 +1992,6 @@ dependencies = [ "citrea-evm", "citrea-primitives", "citrea-stf", - "deadpool-postgres", "digest 0.10.7", "futures", "hex", @@ -2016,7 +2012,6 @@ dependencies = [ "schnellru", "serde", "serde_json", - "shared-backup-db", "soft-confirmation-rule-enforcer", "sov-accounts", "sov-db", @@ -2553,39 +2548,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "deadpool" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" -dependencies = [ - "deadpool-runtime", - "num_cpus", - "tokio", -] - -[[package]] -name = "deadpool-postgres" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19be9da496d60d03ec3ab45d960d80a3afb285b787394b83614a79942f467e7f" -dependencies = [ - "deadpool", - "getrandom 0.2.15", - "tokio", - "tokio-postgres", - "tracing", -] - -[[package]] -name = "deadpool-runtime" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" -dependencies = [ - "tokio", -] - [[package]] name = "delay_map" version = "0.3.0" @@ -3145,12 +3107,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "fallible-iterator" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" - [[package]] name = "fallible-iterator" version = "0.3.0" @@ -3439,7 +3395,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" dependencies = [ - "fallible-iterator 0.3.0", + "fallible-iterator", "stable_deref_trait", ] @@ -5062,16 +5018,6 @@ dependencies = [ "rawpointer", ] -[[package]] -name = "md-5" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" -dependencies = [ - "cfg-if", - "digest 0.10.7", -] - [[package]] name = "memchr" version = "2.7.4" @@ -5824,37 +5770,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" -[[package]] -name = "postgres-protocol" -version = "0.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" -dependencies = [ - "base64 0.21.7", - "byteorder", - "bytes", - "fallible-iterator 0.2.0", - "hmac 0.12.1", - "md-5", - "memchr", - "rand 0.8.5", - "sha2 0.10.8", - "stringprep", -] - -[[package]] -name = "postgres-types" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" -dependencies = [ - "bytes", - "fallible-iterator 0.2.0", - "postgres-protocol", - "serde", - "serde_json", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -6195,15 +6110,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.5.2" @@ -7759,7 +7665,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" dependencies = [ "bitflags 2.5.0", - "fallible-iterator 0.3.0", + "fallible-iterator", "fallible-streaming-iterator", "hashlink 0.9.1", "libsqlite3-sys", @@ -8317,18 +8223,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "shared-backup-db" -version = "0.4.0-rc.3" -dependencies = [ - "deadpool-postgres", - "postgres-types", - "serde", - "sov-rollup-interface", - "tokio", - "tracing", -] - [[package]] name = "shell-escape" version = "0.1.5" @@ -8976,7 +8870,6 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", - "shared-backup-db", "sov-db", "sov-mock-da", "sov-modules-api", @@ -9089,17 +8982,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "stringprep" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" -dependencies = [ - "unicode-bidi", - "unicode-normalization", - "unicode-properties", -] - [[package]] name = "strsim" version = "0.9.3" @@ -9438,32 +9320,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "tokio-postgres" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "fallible-iterator 0.2.0", - "futures-channel", - "futures-util", - "log", - "parking_lot 0.12.3", - "percent-encoding", - "phf", - "pin-project-lite", - "postgres-protocol", - "postgres-types", - "rand 0.8.5", - "socket2 0.5.7", - "tokio", - "tokio-util", - "whoami", -] - [[package]] name = "tokio-rustls" version = "0.24.1" @@ -9969,12 +9825,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-properties" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291" - [[package]] name = "unicode-segmentation" version = "1.11.0" @@ -10123,12 +9973,6 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "wasite" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" - [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -10232,17 +10076,6 @@ dependencies = [ "winsafe", ] -[[package]] -name = "whoami" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" -dependencies = [ - "redox_syscall 0.4.1", - "wasite", - "web-sys", -] - [[package]] name = "widestring" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index ddd892f3f..e6a304762 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ members = [ "crates/sequencer", "crates/sequencer-client", "crates/soft-confirmation-rule-enforcer", - "crates/shared-backup-db", # Sovereign sdk "crates/sovereign-sdk/rollup-interface", "crates/sovereign-sdk/adapters/risc0", @@ -84,10 +83,8 @@ byteorder = { version = "1.5.0", default-features = false } bytes = { version = "1.2.1", default-features = false } chrono = { version = "0.4.37", default-features = false } digest = { version = "0.10.6", default-features = false, features = ["alloc"] } -deadpool-postgres = "0.13.1" itertools = { version = "0.13.0", default-features = false } lru = "0.12.3" -postgres-types = "0.2.6" rs_merkle = "1.4.2" futures = "0.3" pin-project = { version = "1.1.3" } diff --git a/Makefile b/Makefile index ac6cd13aa..50d7d0395 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,6 @@ clean: ## Cleans compiled @cargo clean clean-node: ## Cleans local dbs needed for sequencer and nodes - sudo rm -rf resources/dbs/postgres rm -rf resources/dbs/sequencer-db rm -rf resources/dbs/prover-db rm -rf resources/dbs/full-node-db @@ -115,4 +114,4 @@ genesis: # Set production genesis from system contract source files genesis-prod: - $(MAKE) -C crates/evm/src/evm/system_contracts genesis-prod \ No newline at end of file + $(MAKE) -C crates/evm/src/evm/system_contracts genesis-prod diff --git a/bin/citrea/Cargo.toml b/bin/citrea/Cargo.toml index c8c8167d4..4412f4d72 100644 --- a/bin/citrea/Cargo.toml +++ b/bin/citrea/Cargo.toml @@ -59,7 +59,6 @@ tracing-subscriber = { workspace = true } [dev-dependencies] citrea-evm = { path = "../../crates/evm", features = ["native"] } -shared-backup-db = { path = "../../crates/shared-backup-db", features = ["test-utils"] } sov-mock-da = { path = "../../crates/sovereign-sdk/adapters/mock-da", default-features = false } sov-prover-storage-manager = { path = "../../crates/sovereign-sdk/full-node/sov-prover-storage-manager", features = ["test-utils"] } sov-rollup-interface = { path = "../../crates/sovereign-sdk/rollup-interface", features = ["fuzzing"] } diff --git a/bin/citrea/src/lib.rs b/bin/citrea/src/lib.rs index b194a0796..95ed90893 100644 --- a/bin/citrea/src/lib.rs +++ b/bin/citrea/src/lib.rs @@ -29,8 +29,6 @@ pub fn initialize_logging(level: Level) { "jsonrpsee-server=info".to_owned(), "sov_schema_db=info".to_owned(), "sov_prover_storage_manager=info".to_owned(), - // Limit output as much as possible, use WARN. - "tokio_postgres=warn".to_owned(), ]; debug_components.join(",") })) diff --git a/bin/citrea/tests/e2e/mod.rs b/bin/citrea/tests/e2e/mod.rs index 8235dc96f..df4fe2a60 100644 --- a/bin/citrea/tests/e2e/mod.rs +++ b/bin/citrea/tests/e2e/mod.rs @@ -15,7 +15,6 @@ use std::time::Duration; use citrea_evm::smart_contracts::SimpleStorageContract; use citrea_stf::genesis_config::GenesisPaths; use reth_primitives::{Address, BlockNumberOrTag, U256}; -use shared_backup_db::{PostgresConnector, SharedBackupDbConfig}; use sov_mock_da::{MockAddress, MockDaService}; use sov_rollup_interface::rpc::{LastVerifiedProofResponse, SoftConfirmationStatus}; use sov_rollup_interface::services::da::DaService; @@ -63,11 +62,6 @@ async fn test_all_flow() { let prover_db_dir = storage_dir.path().join("prover").to_path_buf(); let fullnode_db_dir = storage_dir.path().join("full-node").to_path_buf(); - let psql_db_name = "test_all_flow".to_owned(); - let db_test_client = PostgresConnector::new_test_client(psql_db_name.clone()) - .await - .unwrap(); - let (seq_port_tx, seq_port_rx) = tokio::sync::oneshot::channel(); let da_db_dir_cloned = da_db_dir.clone(); @@ -103,7 +97,6 @@ async fn test_all_flow() { Some(ProverConfig { proving_mode: sov_stf_runner::ProverGuestRunConfig::Execute, proof_sampling_number: 0, - db_config: Some(SharedBackupDbConfig::default().set_db_name(psql_db_name)), }), NodeMode::Prover(seq_port), prover_db_dir, @@ -212,19 +205,6 @@ async fn test_all_flow() { .ledger_get_proof_by_slot_height(3) .await; - let db_proofs = db_test_client.get_all_proof_data().await.unwrap(); - - assert_eq!(db_proofs.len(), 1); - assert_eq!( - db_proofs[0].state_transition.0.sequencer_da_public_key, - prover_proof.state_transition.sequencer_da_public_key - ); - assert_eq!( - db_proofs[0].state_transition.0.sequencer_public_key, - prover_proof.state_transition.sequencer_public_key - ); - assert_eq!(db_proofs[0].l1_tx_id, prover_proof.l1_tx_id); - // the proof will be in l1 block #4 because prover publishes it after the commitment and in mock da submitting proof and commitments creates a new block // For full node to see the proof, we publish another l2 block and now it will check #4 l1 block // 6th soft batch @@ -327,18 +307,6 @@ async fn test_all_flow() { .ledger_get_proof_by_slot_height(5) .await; - let db_proofs = db_test_client.get_all_proof_data().await.unwrap(); - - assert_eq!(db_proofs.len(), 2); - assert_eq!( - db_proofs[1].state_transition.0.sequencer_da_public_key, - prover_proof_data.state_transition.sequencer_da_public_key - ); - assert_eq!( - db_proofs[1].state_transition.0.sequencer_public_key, - prover_proof_data.state_transition.sequencer_public_key - ); - wait_for_proof(&full_node_test_client, 6, Some(Duration::from_secs(120))).await; let full_node_proof_data = full_node_test_client .ledger_get_verified_proofs_by_slot_height(6) diff --git a/bin/citrea/tests/e2e/proving.rs b/bin/citrea/tests/e2e/proving.rs index 1551e621d..e6dce4965 100644 --- a/bin/citrea/tests/e2e/proving.rs +++ b/bin/citrea/tests/e2e/proving.rs @@ -2,148 +2,20 @@ use std::time::Duration; use citrea_stf::genesis_config::GenesisPaths; -use shared_backup_db::{PostgresConnector, ProofType, SharedBackupDbConfig}; use sov_mock_da::{MockAddress, MockDaService}; -use sov_rollup_interface::rpc::{ProofRpcResponse, SoftConfirmationStatus}; +use sov_rollup_interface::rpc::SoftConfirmationStatus; use sov_rollup_interface::services::da::DaService; use sov_stf_runner::ProverConfig; use crate::evm::make_test_client; use crate::test_helpers::{ - start_rollup, tempdir_with_children, wait_for_l1_block, wait_for_l2_block, - wait_for_postgres_proofs, wait_for_proof, wait_for_prover_l1_height, NodeMode, + start_rollup, tempdir_with_children, wait_for_l1_block, wait_for_l2_block, wait_for_proof, + wait_for_prover_l1_height, NodeMode, }; use crate::{ DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, DEFAULT_PROOF_WAIT_DURATION, TEST_DATA_GENESIS_PATH, }; -/// Run the sequencer and the prover node. -/// Trigger proof production. -/// Check if the proof can be queried from the prover node and the database. -#[tokio::test(flavor = "multi_thread")] -async fn test_db_get_proof() { - // citrea::initialize_logging(tracing::Level::INFO); - - let storage_dir = tempdir_with_children(&["DA", "sequencer", "prover"]); - let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); - let prover_db_dir = storage_dir.path().join("prover").to_path_buf(); - let da_db_dir = storage_dir.path().join("DA").to_path_buf(); - - let psql_db_name = "test_db_get_proof".to_string(); - let db_test_client = PostgresConnector::new_test_client(psql_db_name.clone()) - .await - .unwrap(); - - let (seq_port_tx, seq_port_rx) = tokio::sync::oneshot::channel(); - - let da_db_dir_cloned = da_db_dir.clone(); - let seq_task = tokio::spawn(async { - start_rollup( - seq_port_tx, - GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), - None, - NodeMode::SequencerNode, - sequencer_db_dir, - da_db_dir_cloned, - 4, - true, - None, - None, - Some(true), - DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, - ) - .await; - }); - - let seq_port = seq_port_rx.await.unwrap(); - let test_client = make_test_client(seq_port).await; - let da_service = MockDaService::new(MockAddress::from([0; 32]), &da_db_dir); - - let (prover_node_port_tx, prover_node_port_rx) = tokio::sync::oneshot::channel(); - - let da_db_dir_cloned = da_db_dir.clone(); - let prover_node_task = tokio::spawn(async move { - start_rollup( - prover_node_port_tx, - GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), - Some(ProverConfig { - proving_mode: sov_stf_runner::ProverGuestRunConfig::Execute, - proof_sampling_number: 0, - db_config: Some(SharedBackupDbConfig::default().set_db_name(psql_db_name)), - }), - NodeMode::Prover(seq_port), - prover_db_dir, - da_db_dir_cloned, - 4, - true, - None, - None, - Some(true), - DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, - ) - .await; - }); - - let prover_node_port = prover_node_port_rx.await.unwrap(); - - let prover_node_test_client = make_test_client(prover_node_port).await; - da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 2, None).await; - - test_client.send_publish_batch_request().await; - test_client.send_publish_batch_request().await; - test_client.send_publish_batch_request().await; - test_client.send_publish_batch_request().await; - wait_for_l2_block(&test_client, 4, None).await; - - // Commitment - wait_for_l1_block(&da_service, 3, None).await; - - // wait here until we see from prover's rpc that it finished proving - wait_for_prover_l1_height( - &prover_node_test_client, - 4, - Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), - ) - .await; - - wait_for_postgres_proofs(&db_test_client, 1, Some(Duration::from_secs(60))).await; - - let ledger_proof = prover_node_test_client - .ledger_get_proof_by_slot_height(3) - .await; - - let db_proofs = db_test_client.get_all_proof_data().await.unwrap(); - - assert_eq!(db_proofs.len(), 1); - - let db_state_transition = &db_proofs[0].state_transition.0; - - assert_eq!( - db_state_transition.sequencer_da_public_key, - ledger_proof.state_transition.sequencer_da_public_key - ); - assert_eq!( - db_state_transition.sequencer_public_key, - ledger_proof.state_transition.sequencer_public_key - ); - assert_eq!(db_proofs[0].l1_tx_id, ledger_proof.l1_tx_id); - - match ledger_proof.proof { - ProofRpcResponse::Full(p) => { - assert_eq!(db_proofs[0].proof_type, ProofType::Full); - assert_eq!(db_proofs[0].proof_data, p) - } - ProofRpcResponse::PublicInput(p) => { - assert_eq!(db_proofs[0].proof_type, ProofType::PublicInput); - assert_eq!(db_proofs[0].proof_data, p) - } - }; - - seq_task.abort(); - prover_node_task.abort(); -} - /// Run the sequencer, prover and full node. /// Trigger proof production. /// Check if the verified proof can be queried from the full node. @@ -193,7 +65,6 @@ async fn full_node_verify_proof_and_store() { Some(ProverConfig { proving_mode: sov_stf_runner::ProverGuestRunConfig::Execute, proof_sampling_number: 0, - db_config: None, }), NodeMode::Prover(seq_port), prover_db_dir, diff --git a/bin/citrea/tests/e2e/sequencer_behaviour.rs b/bin/citrea/tests/e2e/sequencer_behaviour.rs index 5613a0e69..5db7e6b15 100644 --- a/bin/citrea/tests/e2e/sequencer_behaviour.rs +++ b/bin/citrea/tests/e2e/sequencer_behaviour.rs @@ -10,7 +10,6 @@ use citrea_primitives::TEST_PRIVATE_KEY; use citrea_sequencer::{SequencerConfig, SequencerMempoolConfig}; use citrea_stf::genesis_config::GenesisPaths; use reth_primitives::{Address, BlockNumberOrTag}; -use shared_backup_db::{PostgresConnector, SharedBackupDbConfig}; use sov_mock_da::{MockAddress, MockDaService, MockDaSpec}; use tokio::time::sleep; @@ -18,8 +17,8 @@ use crate::e2e::{initialize_test, TestConfig}; use crate::evm::{init_test_rollup, make_test_client}; use crate::test_client::TestClient; use crate::test_helpers::{ - create_default_sequencer_config, start_rollup, tempdir_with_children, wait_for_l1_block, - wait_for_l2_block, wait_for_postgres_commitment, NodeMode, + create_default_sequencer_config, start_rollup, tempdir_with_children, wait_for_commitment, + wait_for_l1_block, wait_for_l2_block, NodeMode, }; use crate::{ DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, DEFAULT_MIN_SOFT_CONFIRMATIONS_PER_COMMITMENT, @@ -58,7 +57,6 @@ async fn test_sequencer_fill_missing_da_blocks() -> Result<(), anyhow::Error> { test_mode: true, deposit_mempool_fetch_limit: 10, mempool_conf: Default::default(), - db_config: Default::default(), da_update_interval_ms: 500, block_production_interval_ms: 500, }), @@ -141,18 +139,13 @@ async fn test_sequencer_commitment_threshold() { let da_db_dir = storage_dir.path().join("DA").to_path_buf(); let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); - let psql_db_name = "test_sequencer_commitment_threshold".to_owned(); - - let db_test_client = PostgresConnector::new_test_client(psql_db_name.clone()) - .await - .unwrap(); + let da_service = MockDaService::new(MockAddress::from([0; 32]), &da_db_dir); // Put a large number for commitment threshold let min_soft_confirmations_per_commitment = 1_000_000; let mut sequencer_config = create_default_sequencer_config(min_soft_confirmations_per_commitment, Some(true), 10); - sequencer_config.db_config = Some(SharedBackupDbConfig::default().set_db_name(psql_db_name)); sequencer_config.mempool_conf = SequencerMempoolConfig { max_account_slots: 1000, ..Default::default() @@ -199,8 +192,7 @@ async fn test_sequencer_commitment_threshold() { wait_for_l2_block(&seq_test_client, 11, Some(Duration::from_secs(60))).await; // At block 725, the state diff should be large enough to trigger a commitment. - wait_for_postgres_commitment(&db_test_client, 1, Some(Duration::from_secs(60))).await; - let commitments = db_test_client.get_all_commitments().await.unwrap(); + let commitments = wait_for_commitment(&da_service, 2, Some(Duration::from_secs(60))).await; assert_eq!(commitments.len(), 1); for _ in 0..10 { @@ -218,9 +210,8 @@ async fn test_sequencer_commitment_threshold() { // At block 1450, the state diff should be large enough to trigger a commitment. // But the 50 remaining blocks state diff should NOT trigger a third. - wait_for_postgres_commitment(&db_test_client, 2, Some(Duration::from_secs(60))).await; - let commitments = db_test_client.get_all_commitments().await.unwrap(); - assert_eq!(commitments.len(), 2); + let commitments = wait_for_commitment(&da_service, 3, Some(Duration::from_secs(60))).await; + assert_eq!(commitments.len(), 1); seq_task.abort(); } @@ -373,7 +364,6 @@ async fn test_gas_limit_too_high() { max_account_slots: tx_count * 2, ..Default::default() }, - db_config: Default::default(), da_update_interval_ms: 1000, block_production_interval_ms: 1000, }), @@ -515,7 +505,6 @@ async fn test_system_tx_effect_on_block_gas_limit() -> Result<(), anyhow::Error> max_account_slots: 100, ..Default::default() }, - db_config: Default::default(), da_update_interval_ms: 1000, block_production_interval_ms: 500, }), diff --git a/bin/citrea/tests/e2e/sequencer_replacement.rs b/bin/citrea/tests/e2e/sequencer_replacement.rs index 2822bdb52..9825af4fe 100644 --- a/bin/citrea/tests/e2e/sequencer_replacement.rs +++ b/bin/citrea/tests/e2e/sequencer_replacement.rs @@ -9,15 +9,15 @@ use alloy_rlp::Decodable; use citrea_sequencer::SequencerMempoolConfig; use citrea_stf::genesis_config::GenesisPaths; use reth_primitives::{Address, BlockNumberOrTag}; -use shared_backup_db::{PostgresConnector, SharedBackupDbConfig}; +use sov_db::ledger_db::{LedgerDB, SequencerLedgerOps}; use sov_mock_da::{MockAddress, MockDaService}; use tokio::time::sleep; use crate::e2e::{copy_dir_recursive, execute_blocks, TestConfig}; use crate::evm::{init_test_rollup, make_test_client}; use crate::test_helpers::{ - create_default_sequencer_config, start_rollup, tempdir_with_children, wait_for_l1_block, - wait_for_l2_block, wait_for_postgres_commitment, NodeMode, + create_default_sequencer_config, start_rollup, tempdir_with_children, wait_for_commitment, + wait_for_l1_block, wait_for_l2_block, NodeMode, }; use crate::{ DEFAULT_MIN_SOFT_CONFIRMATIONS_PER_COMMITMENT, DEFAULT_PROOF_WAIT_DURATION, @@ -30,22 +30,14 @@ use crate::{ /// Check if the full node can continue block production. #[tokio::test(flavor = "multi_thread")] async fn test_sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Error> { - // citrea::initialize_logging(tracing::Level::DEBUG); + citrea::initialize_logging(tracing::Level::DEBUG); let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); let da_db_dir = storage_dir.path().join("DA").to_path_buf(); let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); let fullnode_db_dir = storage_dir.path().join("full-node").to_path_buf(); - let psql_db_name = "sequencer_crash_and_replace_full_node".to_owned(); - - let db_test_client = PostgresConnector::new_test_client(psql_db_name.clone()) - .await - .unwrap(); - - let mut sequencer_config = create_default_sequencer_config(4, Some(true), 10); - - sequencer_config.db_config = Some(SharedBackupDbConfig::default().set_db_name(psql_db_name)); + let sequencer_config = create_default_sequencer_config(4, Some(true), 10); let da_service = MockDaService::with_finality(MockAddress::from([0; 32]), 0, &da_db_dir); da_service.publish_test_block().await.unwrap(); @@ -128,9 +120,10 @@ async fn test_sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Erro // assume sequencer craashed seq_task.abort(); - wait_for_postgres_commitment(&db_test_client, 1, Some(Duration::from_secs(60))).await; - let commitments = db_test_client.get_all_commitments().await.unwrap(); + let commitments = wait_for_commitment(&da_service, 2, Some(Duration::from_secs(60))).await; assert_eq!(commitments.len(), 1); + assert_eq!(commitments[0].l2_start_block_number, 1); + assert_eq!(commitments[0].l2_end_block_number, 4); full_node_task.abort(); @@ -175,19 +168,16 @@ async fn test_sequencer_crash_and_replace_full_node() -> Result<(), anyhow::Erro wait_for_l1_block(&da_service, 3, None).await; - wait_for_postgres_commitment( - &db_test_client, - 2, + let commitments = wait_for_commitment( + &da_service, + 3, Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), ) .await; - let commitments = db_test_client.get_all_commitments().await.unwrap(); - assert_eq!(commitments.len(), 2); - assert_eq!(commitments[0].l2_start_height, 1); - assert_eq!(commitments[0].l2_end_height, 4); - assert_eq!(commitments[1].l2_start_height, 5); - assert_eq!(commitments[1].l2_end_height, 8); + assert_eq!(commitments.len(), 1); + assert_eq!(commitments[0].l2_start_block_number, 5); + assert_eq!(commitments[0].l2_end_block_number, 8); seq_task.abort(); @@ -208,19 +198,11 @@ async fn test_sequencer_crash_restore_mempool() -> Result<(), anyhow::Error> { let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); let da_db_dir = storage_dir.path().join("DA").to_path_buf(); - let db_test_client = - PostgresConnector::new_test_client("sequencer_crash_restore_mempool".to_owned()) - .await - .unwrap(); - let mut sequencer_config = create_default_sequencer_config(4, Some(true), 10); sequencer_config.mempool_conf = SequencerMempoolConfig { max_account_slots: 100, ..Default::default() }; - sequencer_config.db_config = Some( - SharedBackupDbConfig::default().set_db_name("sequencer_crash_restore_mempool".to_owned()), - ); let da_service = MockDaService::with_finality(MockAddress::from([0; 32]), 2, &da_db_dir.clone()); @@ -277,18 +259,31 @@ async fn test_sequencer_crash_restore_mempool() -> Result<(), anyhow::Error> { assert_eq!(tx_1.hash, *tx_hash); assert_eq!(tx_2.hash, *tx_hash2); - let txs = db_test_client.get_all_txs().await.unwrap(); + // crash and reopen and check if the txs are in the mempool + seq_task.abort(); + + // Copy data into a separate directory since the original sequencer + // directory is locked by a LOCK file. + // This would enable us to access ledger DB directly. + let _ = copy_dir_recursive( + &sequencer_db_dir, + &storage_dir.path().join("sequencer_unlocked"), + ); + let sequencer_db_dir = storage_dir.path().join("sequencer_unlocked").to_path_buf(); + let ledger_db = LedgerDB::with_path(sequencer_db_dir.clone()) + .expect("Should be able to open after stopping the sequencer"); + let txs = ledger_db.get_mempool_txs().unwrap(); assert_eq!(txs.len(), 2); - assert_eq!(txs[0].tx_hash, tx_hash.to_vec()); - assert_eq!(txs[1].tx_hash, tx_hash2.to_vec()); + assert_eq!(txs[1].0, tx_hash.to_vec()); + assert_eq!(txs[0].0, tx_hash2.to_vec()); let signed_tx = Signed::::try_from(tx_1.clone()).unwrap(); let envelope = TxEnvelope::Eip1559(signed_tx); - let decoded = TxEnvelope::decode(&mut txs[0].tx.as_ref()).unwrap(); + let decoded = TxEnvelope::decode(&mut txs[1].1.as_ref()).unwrap(); assert_eq!(envelope, decoded); - // crash and reopen and check if the txs are in the mempool - seq_task.abort(); + // Remove lock + drop(ledger_db); let _ = copy_dir_recursive( &sequencer_db_dir, @@ -300,13 +295,14 @@ async fn test_sequencer_crash_restore_mempool() -> Result<(), anyhow::Error> { let config1 = sequencer_config.clone(); let da_db_dir_cloned = da_db_dir.clone(); let sequencer_db_dir = storage_dir.path().join("sequencer_copy").to_path_buf(); + let sequencer_db_dir_cloned = sequencer_db_dir.clone(); let seq_task = tokio::spawn(async move { start_rollup( seq_port_tx, GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), None, NodeMode::SequencerNode, - sequencer_db_dir, + sequencer_db_dir_cloned, da_db_dir_cloned, 4, true, @@ -337,7 +333,7 @@ async fn test_sequencer_crash_restore_mempool() -> Result<(), anyhow::Error> { assert_eq!(tx_1_mempool, tx_1); assert_eq!(tx_2_mempool, tx_2); - // publish block and check if the txs are deleted from pg + // publish block and check if the txs are deleted from ledger seq_test_client.send_publish_batch_request().await; wait_for_l2_block(&seq_test_client, 1, None).await; @@ -355,11 +351,22 @@ async fn test_sequencer_crash_restore_mempool() -> Result<(), anyhow::Error> { .await .is_none()); - let txs = db_test_client.get_all_txs().await.unwrap(); + seq_task.abort(); + + // Copy data into a separate directory since the original sequencer + // directory is locked by a LOCK file. + // This would enable us to access ledger DB directly. + let _ = copy_dir_recursive( + &sequencer_db_dir, + &storage_dir.path().join("sequencer_unlocked"), + ); + let sequencer_db_dir = storage_dir.path().join("sequencer_unlocked").to_path_buf(); + let ledger_db = LedgerDB::with_path(sequencer_db_dir.clone()) + .expect("Should be able to open after stopping the sequencer"); + let txs = ledger_db.get_mempool_txs().unwrap(); // should be removed from db assert_eq!(txs.len(), 0); - seq_task.abort(); Ok(()) } diff --git a/bin/citrea/tests/e2e/syncing.rs b/bin/citrea/tests/e2e/syncing.rs index 88a03a989..f9d8b7fb1 100644 --- a/bin/citrea/tests/e2e/syncing.rs +++ b/bin/citrea/tests/e2e/syncing.rs @@ -5,7 +5,6 @@ use std::time::Duration; use citrea_stf::genesis_config::GenesisPaths; use ethereum_rpc::CitreaStatus; use reth_primitives::{Address, BlockNumberOrTag}; -use shared_backup_db::SharedBackupDbConfig; use sov_mock_da::{MockAddress, MockDaService, MockDaSpec, MockHash}; use sov_rollup_interface::da::{DaData, DaSpec}; use sov_rollup_interface::services::da::DaService; @@ -304,7 +303,6 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> { GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), Some(ProverConfig { proving_mode: sov_stf_runner::ProverGuestRunConfig::Execute, - db_config: Some(SharedBackupDbConfig::default()), proof_sampling_number: 0, }), NodeMode::Prover(seq_port), diff --git a/bin/citrea/tests/sequencer_commitments/mod.rs b/bin/citrea/tests/sequencer_commitments/mod.rs index a595032d8..508c1ac26 100644 --- a/bin/citrea/tests/sequencer_commitments/mod.rs +++ b/bin/citrea/tests/sequencer_commitments/mod.rs @@ -4,7 +4,6 @@ use borsh::BorshDeserialize; use citrea_stf::genesis_config::GenesisPaths; use rs_merkle::algorithms::Sha256; use rs_merkle::MerkleTree; -use shared_backup_db::{PostgresConnector, SharedBackupDbConfig}; use sov_mock_da::{MockAddress, MockDaService, MockDaSpec}; use sov_modules_api::BlobReaderTrait; use sov_rollup_interface::da::DaData; @@ -14,8 +13,8 @@ use sov_stf_runner::ProverConfig; use crate::evm::make_test_client; use crate::test_client::TestClient; use crate::test_helpers::{ - create_default_sequencer_config, start_rollup, tempdir_with_children, wait_for_l1_block, - wait_for_l2_block, wait_for_postgres_commitment, wait_for_prover_l1_height, NodeMode, + start_rollup, tempdir_with_children, wait_for_l1_block, wait_for_l2_block, + wait_for_prover_l1_height, NodeMode, }; use crate::{ DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT, DEFAULT_PROOF_WAIT_DURATION, TEST_DATA_GENESIS_PATH, @@ -183,82 +182,6 @@ async fn check_sequencer_commitment( assert_eq!(commitment.merkle_root, merkle_tree.root().unwrap()); } -#[tokio::test(flavor = "multi_thread")] -async fn check_commitment_in_offchain_db() { - // citrea::initialize_logging(tracing::Level::INFO); - - let db_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); - let da_db_dir = db_dir.path().join("DA").to_path_buf(); - let sequencer_db_dir = db_dir.path().join("sequencer").to_path_buf(); - - let (seq_port_tx, seq_port_rx) = tokio::sync::oneshot::channel(); - let mut sequencer_config = create_default_sequencer_config(4, Some(true), 10); - - let db_name = "check_commitment_in_offchain_db".to_owned(); - sequencer_config.db_config = Some(SharedBackupDbConfig::default().set_db_name(db_name.clone())); - - // drops db if exists from previous test runs, recreates the db - let db_test_client = PostgresConnector::new_test_client(db_name).await.unwrap(); - - let da_db_dir_cloned = da_db_dir.clone(); - let seq_task = tokio::spawn(async move { - start_rollup( - seq_port_tx, - GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), - None, - NodeMode::SequencerNode, - sequencer_db_dir, - da_db_dir_cloned, - 4, - true, - None, - Some(sequencer_config), - Some(true), - 10, - ) - .await; - }); - - let seq_port = seq_port_rx.await.unwrap(); - let test_client = make_test_client(seq_port).await; - let da_service = MockDaService::new(MockAddress::from([0; 32]), &da_db_dir); - - da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 2, None).await; - - // publish 3 soft confirmations, no commitment should be sent - for _ in 0..3 { - test_client.send_publish_batch_request().await; - } - wait_for_l2_block(&test_client, 3, None).await; - - da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 3, None).await; - - // publish 4th block - test_client.send_publish_batch_request().await; - wait_for_l2_block(&test_client, 4, None).await; - - // commitment should be published with this call - da_service.publish_test_block().await.unwrap(); - wait_for_l1_block(&da_service, 4, None).await; - wait_for_l1_block(&da_service, 5, None).await; - - wait_for_postgres_commitment( - &db_test_client, - 1, - Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)), - ) - .await; - - let commitments = db_test_client.get_all_commitments().await.unwrap(); - assert_eq!(commitments.len(), 1); - assert_eq!(commitments[0].l2_start_height, 1); - assert_eq!(commitments[0].l2_end_height, 4); - - seq_task.abort(); -} - #[tokio::test(flavor = "multi_thread")] async fn test_ledger_get_commitments_on_slot() { // citrea::initialize_logging(tracing::Level::INFO); @@ -398,7 +321,6 @@ async fn test_ledger_get_commitments_on_slot_prover() { Some(ProverConfig { proving_mode: sov_stf_runner::ProverGuestRunConfig::Execute, proof_sampling_number: 0, - db_config: None, }), NodeMode::Prover(seq_port), prover_db_dir, diff --git a/bin/citrea/tests/test_helpers/mod.rs b/bin/citrea/tests/test_helpers/mod.rs index f9225298e..7a0e0e30f 100644 --- a/bin/citrea/tests/test_helpers/mod.rs +++ b/bin/citrea/tests/test_helpers/mod.rs @@ -2,14 +2,17 @@ use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::time::{Duration, SystemTime}; +use borsh::BorshDeserialize; use citrea::{CitreaRollupBlueprint, MockDemoRollup}; use citrea_primitives::TEST_PRIVATE_KEY; use citrea_sequencer::SequencerConfig; use citrea_stf::genesis_config::GenesisPaths; -use shared_backup_db::PostgresConnector; -use sov_mock_da::{MockAddress, MockDaConfig, MockDaService}; +use sov_mock_da::{MockAddress, MockBlock, MockDaConfig, MockDaService}; use sov_modules_api::default_signature::private_key::DefaultPrivateKey; use sov_modules_api::PrivateKey; +use sov_rollup_interface::da::{BlobReaderTrait, DaData, SequencerCommitment}; +use sov_rollup_interface::services::da::{DaService, SlotData}; +use sov_rollup_interface::zk::Proof; use sov_stf_runner::{ FullNodeConfig, ProverConfig, RollupPublicKeys, RpcConfig, RunnerConfig, StorageConfig, }; @@ -173,8 +176,6 @@ pub fn create_default_sequencer_config( test_mode: test_mode.unwrap_or(false), deposit_mempool_fetch_limit, mempool_conf: Default::default(), - // Offchain db will be active only in some tests - db_config: None, da_update_interval_ms: 500, block_production_interval_ms: 500, // since running in test mode, we can set this to a lower value } @@ -263,75 +264,92 @@ pub async fn wait_for_l1_block(da_service: &MockDaService, num: u64, timeout: Op sleep(Duration::from_secs(2)).await; } -pub async fn wait_for_proof(test_client: &TestClient, slot_height: u64, timeout: Option) { +#[instrument(level = "debug", skip(da_service))] +pub async fn wait_for_commitment( + da_service: &MockDaService, + l1_height: u64, + timeout: Option, +) -> Vec { let start = SystemTime::now(); - let timeout = timeout.unwrap_or(Duration::from_secs(60)); // Default 60 seconds timeout + let timeout = timeout.unwrap_or(Duration::from_secs(30)); // Default 30 seconds timeout loop { debug!( - "Waiting for L1 block height containing zkproof {}", - slot_height + "Waiting for an L1 commitments to be published at L1 height {}", + l1_height ); - let proof = test_client - .ledger_get_verified_proofs_by_slot_height(slot_height) - .await; - if proof.is_some() { - break; - } - let now = SystemTime::now(); - if start + timeout <= now { - panic!("Timeout while waiting for proof at height {}", slot_height); - } + let Ok(l1_block) = da_service.get_block_at(l1_height).await else { + sleep(Duration::from_secs(1)).await; + continue; + }; - sleep(Duration::from_secs(1)).await; - } - // Let knowledge of the new DA block propagate - sleep(Duration::from_secs(2)).await; -} + let (sequencer_commitments, _) = extract_da_data(da_service, l1_block.clone()); -#[instrument(level = "debug", skip(db_test_client))] -pub async fn wait_for_postgres_commitment( - db_test_client: &PostgresConnector, - num: usize, - timeout: Option, -) { - let start = SystemTime::now(); - let timeout = timeout.unwrap_or(Duration::from_secs(30)); // Default 30 seconds timeout - loop { - debug!("Waiting for {} L1 commitments to be published", num); - let commitments = db_test_client.get_all_commitments().await.unwrap().len(); - if commitments >= num { - break; + if !sequencer_commitments.is_empty() { + return sequencer_commitments; } let now = SystemTime::now(); if start + timeout <= now { - panic!("Timeout. {} commitments exist at this point", commitments); + panic!( + "Timeout. {} commitments exist at this point", + sequencer_commitments.len() + ); } sleep(Duration::from_secs(1)).await; } } -pub async fn wait_for_postgres_proofs( - db_test_client: &PostgresConnector, - num: usize, - timeout: Option, -) { +pub async fn wait_for_proof(test_client: &TestClient, slot_height: u64, timeout: Option) { let start = SystemTime::now(); - let timeout = timeout.unwrap_or(Duration::from_secs(30)); // Default 30 seconds timeout + let timeout = timeout.unwrap_or(Duration::from_secs(60)); // Default 60 seconds timeout loop { - debug!("Waiting for {} L1 proofs to be published", num); - let commitments = db_test_client.get_all_proof_data().await.unwrap().len(); - if commitments >= num { + debug!( + "Waiting for L1 block height containing zkproof {}", + slot_height + ); + let proof = test_client + .ledger_get_verified_proofs_by_slot_height(slot_height) + .await; + if proof.is_some() { break; } let now = SystemTime::now(); if start + timeout <= now { - panic!("Timeout. {} proofs exist at this point", commitments); + panic!("Timeout while waiting for proof at height {}", slot_height); } sleep(Duration::from_secs(1)).await; } + // Let knowledge of the new DA block propagate + sleep(Duration::from_secs(2)).await; +} + +fn extract_da_data( + da_service: &MockDaService, + block: MockBlock, +) -> (Vec, Vec) { + let mut sequencer_commitments = Vec::::new(); + let mut zk_proofs = Vec::::new(); + + da_service + .extract_relevant_blobs(&block) + .into_iter() + .for_each(|mut tx| { + let data = DaData::try_from_slice(tx.full_data()); + if let Ok(DaData::SequencerCommitment(seq_com)) = data { + sequencer_commitments.push(seq_com); + } else if let Ok(DaData::ZKProof(proof)) = data { + zk_proofs.push(proof); + } else { + tracing::warn!( + "Found broken DA data in block 0x{}: {:?}", + hex::encode(block.hash()), + data + ); + } + }); + (sequencer_commitments, zk_proofs) } diff --git a/crates/fullnode/Cargo.toml b/crates/fullnode/Cargo.toml index a0a4ed808..438e26d68 100644 --- a/crates/fullnode/Cargo.toml +++ b/crates/fullnode/Cargo.toml @@ -12,7 +12,6 @@ repository.workspace = true # Citrea Deps citrea-primitives = { path = "../primitives", features = ["native"] } sequencer-client = { path = "../sequencer-client" } -shared-backup-db = { path = "../shared-backup-db" } # Sov SDK deps sov-db = { path = "../sovereign-sdk/full-node/db/sov-db" } diff --git a/crates/fullnode/src/runner.rs b/crates/fullnode/src/runner.rs index 0f66eaba3..ef785092d 100644 --- a/crates/fullnode/src/runner.rs +++ b/crates/fullnode/src/runner.rs @@ -404,6 +404,8 @@ where SoftConfirmationStatus::Finalized, )?; } + self.ledger_db + .set_last_commitment_l2_height(BatchNumber(end_l2_height))?; } Ok(()) } diff --git a/crates/prover/Cargo.toml b/crates/prover/Cargo.toml index 5a200305c..71ce4911c 100644 --- a/crates/prover/Cargo.toml +++ b/crates/prover/Cargo.toml @@ -13,7 +13,6 @@ repository.workspace = true citrea-primitives = { path = "../primitives", features = ["native"] } citrea-stf = { path = "../citrea-stf" } sequencer-client = { path = "../sequencer-client" } -shared-backup-db = { path = "../shared-backup-db" } # Sov SDK deps sov-db = { path = "../sovereign-sdk/full-node/db/sov-db" } diff --git a/crates/prover/src/runner.rs b/crates/prover/src/runner.rs index 669b5b493..7a5bc8317 100644 --- a/crates/prover/src/runner.rs +++ b/crates/prover/src/runner.rs @@ -17,7 +17,6 @@ use jsonrpsee::server::{BatchRequestConfig, ServerBuilder}; use jsonrpsee::RpcModule; use rand::Rng; use sequencer_client::{GetSoftBatchResponse, SequencerClient}; -use shared_backup_db::{DbPoolError, PostgresConnector, ProofType}; use sov_db::ledger_db::{ProverLedgerOps, SlotCommit}; use sov_db::schema::types::{BatchNumber, SlotNumber, StoredStateTransition}; use sov_modules_api::storage::HierarchicalStorageManager; @@ -243,14 +242,6 @@ where let prover_config = self.prover_config.clone().unwrap(); - let pg_client = match prover_config.clone().db_config { - Some(db_config) => { - info!("Connecting to postgres"); - Some(PostgresConnector::new(db_config.clone()).await) - } - None => None, - }; - // Create l1 sync worker task let (l1_tx, mut l1_rx) = mpsc::channel(1); @@ -295,7 +286,7 @@ where if let Err(e) = self.process_l1_block( pending_l1, skip_submission_until_l1, - &pg_client, &prover_config, + &prover_config, ).await { error!("Could not process L1 block and generate proof: {:?}", e); } @@ -413,7 +404,6 @@ where &mut self, pending_l1_blocks: &mut VecDeque<::FilteredBlock>, skip_submission_until_l1: u64, - pg_client: &Option>, prover_config: &ProverConfig, ) -> Result<(), anyhow::Error> { while !pending_l1_blocks.is_empty() { @@ -543,7 +533,7 @@ where // Skip submission until l1 height if l1_height >= skip_submission_until_l1 && should_prove { - self.generate_and_submit_proof(transition_data, pg_client, l1_height, hash) + self.generate_and_submit_proof(transition_data, l1_height, hash) .await?; } else { info!("Skipping proving for l1 height {}", l1_height); @@ -691,7 +681,6 @@ where async fn generate_and_submit_proof( &self, transition_data: StateTransitionData, - pg_client: &Option>, l1_height: u64, hash: <::Spec as DaSpec>::SlotHash, ) -> Result<(), anyhow::Error> { @@ -756,27 +745,6 @@ where validity_condition: borsh::to_vec(&transition_data.validity_condition).unwrap(), }; - match pg_client.as_ref() { - Some(Ok(pool)) => { - info!("Inserting proof data into postgres"); - let (proof_data, proof_type) = match proof.clone() { - Proof::Full(full_proof) => (full_proof, ProofType::Full), - Proof::PublicInput(public_input) => (public_input, ProofType::PublicInput), - }; - pool.insert_proof_data( - tx_id_u8.to_vec(), - proof_data, - stored_state_transition.clone().into(), - proof_type, - ) - .await - .unwrap(); - } - _ => { - warn!("No postgres client found"); - } - } - if let Err(e) = self.ledger_db .put_proof_data(l1_height, tx_id_u8, proof, stored_state_transition) diff --git a/crates/sequencer/Cargo.toml b/crates/sequencer/Cargo.toml index 6f8853db1..a89b41a21 100644 --- a/crates/sequencer/Cargo.toml +++ b/crates/sequencer/Cargo.toml @@ -19,7 +19,6 @@ anyhow = { workspace = true } bincode = { workspace = true } borsh = { workspace = true } chrono = { workspace = true } -deadpool-postgres = { workspace = true } digest = { workspace = true } futures = { workspace = true } hex = { workspace = true } @@ -61,7 +60,6 @@ sov-stf-runner = { path = "../sovereign-sdk/full-node/sov-stf-runner" } citrea-evm = { path = "../evm", features = ["native"] } citrea-primitives = { path = "../primitives" } citrea-stf = { path = "../citrea-stf", features = ["native"] } -shared-backup-db = { path = "../shared-backup-db" } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/sequencer/src/commitment_controller.rs b/crates/sequencer/src/commitment_controller.rs index bc675c292..5440164fb 100644 --- a/crates/sequencer/src/commitment_controller.rs +++ b/crates/sequencer/src/commitment_controller.rs @@ -31,7 +31,7 @@ pub fn get_commitment_info( // Get latest finalized and pending commitments and find the max height let last_finalized_l2_height = ledger_db - .get_last_sequencer_commitment_l2_height()? + .get_last_commitment_l2_height()? .unwrap_or(BatchNumber(0)); let last_pending_l2_height = ledger_db .get_pending_commitments_l2_range()? diff --git a/crates/sequencer/src/config.rs b/crates/sequencer/src/config.rs index 666f84547..a0039e7f6 100644 --- a/crates/sequencer/src/config.rs +++ b/crates/sequencer/src/config.rs @@ -1,5 +1,4 @@ use serde::Deserialize; -use shared_backup_db::SharedBackupDbConfig; /// Rollup Configuration #[derive(Debug, Clone, PartialEq, Deserialize)] @@ -14,8 +13,6 @@ pub struct SequencerConfig { pub deposit_mempool_fetch_limit: usize, /// Sequencer specific mempool config pub mempool_conf: SequencerMempoolConfig, - /// Offchain db config - pub db_config: Option, /// DA layer update loop interval in ms pub da_update_interval_ms: u64, /// Block production interval in ms @@ -88,12 +85,6 @@ mod tests { base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 - [db_config] - db_host = "localhost" - db_port = 5432 - db_user = "postgres" - db_password = "postgres" - db_name = "postgres" "#; let config_file = create_config_from(config); @@ -115,7 +106,6 @@ mod tests { base_fee_tx_size: 200, max_account_slots: 16, }, - db_config: Some(SharedBackupDbConfig::default()), da_update_interval_ms: 1000, block_production_interval_ms: 1000, }; diff --git a/crates/sequencer/src/rpc.rs b/crates/sequencer/src/rpc.rs index c00cfadde..88a6c1127 100644 --- a/crates/sequencer/src/rpc.rs +++ b/crates/sequencer/src/rpc.rs @@ -9,7 +9,7 @@ use reth_primitives::{Bytes, FromRecoveredPooledTransaction, IntoRecoveredTransa use reth_rpc::eth::error::EthApiError; use reth_rpc_types_compat::transaction::from_recovered; use reth_transaction_pool::EthPooledTransaction; -use shared_backup_db::PostgresConnector; +use sov_db::ledger_db::SequencerLedgerOps; use sov_modules_api::WorkingSet; use tokio::sync::Mutex; use tracing::{debug, error}; @@ -18,18 +18,21 @@ use crate::deposit_data_mempool::DepositDataMempool; use crate::mempool::CitreaMempool; use crate::utils::recover_raw_transaction; -pub(crate) struct RpcContext { +pub(crate) struct RpcContext { pub mempool: Arc>, pub deposit_mempool: Arc>, pub l2_force_block_tx: UnboundedSender<()>, pub storage: C::Storage, + pub ledger: DB, pub test_mode: bool, - pub pg_pool: Option>, } -pub(crate) fn create_rpc_module( - rpc_context: RpcContext, -) -> Result>, jsonrpsee::core::RegisterMethodError> { +pub(crate) fn create_rpc_module< + C: sov_modules_api::Context, + DB: SequencerLedgerOps + Send + Sync + 'static, +>( + rpc_context: RpcContext, +) -> Result>, jsonrpsee::core::RegisterMethodError> { let test_mode = rpc_context.test_mode; let mut rpc = RpcModule::new(rpc_context); rpc.register_async_method("eth_sendRawTransaction", |parameters, ctx| async move { @@ -49,17 +52,14 @@ pub(crate) fn create_rpc_module( .await .map_err(EthApiError::from)?; - if let Some(pool) = &ctx.pg_pool { - let mut rlp_encoded_tx = Vec::new(); - pool_transaction - .to_recovered_transaction() - .into_signed() - .encode_enveloped(&mut rlp_encoded_tx); - // Do not return error here just log - match pool.insert_mempool_tx(hash.to_vec(), rlp_encoded_tx).await { - Ok(_) => (), - Err(e) => tracing::warn!("Failed to insert mempool tx into db: {:?}", e), - }; + let mut rlp_encoded_tx = Vec::new(); + pool_transaction + .to_recovered_transaction() + .into_signed() + .encode_enveloped(&mut rlp_encoded_tx); + // Do not return error here just log + if let Err(e) = ctx.ledger.insert_mempool_tx(hash.to_vec(), rlp_encoded_tx) { + tracing::warn!("Failed to insert mempool tx into db: {:?}", e); } Ok::(hash) diff --git a/crates/sequencer/src/sequencer.rs b/crates/sequencer/src/sequencer.rs index 7e56adfb2..734825713 100644 --- a/crates/sequencer/src/sequencer.rs +++ b/crates/sequencer/src/sequencer.rs @@ -24,7 +24,6 @@ use reth_transaction_pool::{ BestTransactions, BestTransactionsAttributes, ChangedAccount, EthPooledTransaction, ValidPoolTransaction, }; -use shared_backup_db::{CommitmentStatus, PostgresConnector}; use soft_confirmation_rule_enforcer::SoftConfirmationRuleEnforcer; use sov_accounts::Accounts; use sov_accounts::Response::{AccountEmpty, AccountExists}; @@ -115,7 +114,7 @@ where PreState = Sm::NativeStorage, ChangeSet = Sm::NativeChangeSet, > + StfBlueprintTrait, - DB: SequencerLedgerOps + Send + Clone + 'static, + DB: SequencerLedgerOps + Send + Sync + Clone + 'static, { #[allow(clippy::too_many_arguments)] pub fn new( @@ -361,7 +360,6 @@ where da_block: ::FilteredBlock, l1_fee_rate: u128, l2_block_mode: L2BlockMode, - pg_pool: &Option, last_used_l1_height: u64, ) -> anyhow::Result<(u64, bool)> { let da_height = da_block.header().height(); @@ -592,17 +590,12 @@ where .set_state_diff(self.last_state_diff.clone())?; } - if let Some(pg_pool) = pg_pool.clone() { - // TODO: Is this okay? I'm not sure because we have a loop in this and I can't do async in spawn_blocking - tokio::spawn(async move { - let txs = txs_to_remove - .iter() - .map(|tx_hash| tx_hash.to_vec()) - .collect::>>(); - if let Err(e) = pg_pool.delete_txs_by_tx_hashes(txs).await { - warn!("Failed to remove txs from mempool: {:?}", e); - } - }); + let txs = txs_to_remove + .iter() + .map(|tx_hash| tx_hash.to_vec()) + .collect::>>(); + if let Err(e) = self.ledger_db.remove_mempool_txs(txs) { + warn!("Failed to remove txs from mempool: {:?}", e); } Ok((da_block.header().height(), state_diff_threshold_reached)) @@ -674,11 +667,10 @@ where && commitment.l2_end_block_number == l2_end.0 }) { // Update last sequencer commitment l2 height - match self.ledger_db.get_last_sequencer_commitment_l2_height()? { + match self.ledger_db.get_last_commitment_l2_height()? { Some(last_commitment_l2_height) if last_commitment_l2_height >= l2_end => {} _ => { - self.ledger_db - .set_last_sequencer_commitment_l2_height(l2_end)?; + self.ledger_db.set_last_commitment_l2_height(l2_end)?; } }; @@ -739,42 +731,19 @@ where ); let ledger_db = self.ledger_db.clone(); - let db_config = self.config.db_config.clone(); let handle_da_response = async move { let result: anyhow::Result<()> = async move { - let tx_id = rx + let _tx_id = rx .await .map_err(|_| anyhow!("DA service is dead!"))? .map_err(|_| anyhow!("Send transaction cannot fail"))?; ledger_db - .set_last_sequencer_commitment_l2_height(l2_end) + .set_last_commitment_l2_height(l2_end) .map_err(|_| { anyhow!("Sequencer: Failed to set last sequencer commitment L2 height") })?; - if let Some(db_config) = db_config { - match PostgresConnector::new(db_config).await { - Ok(pg_connector) => { - pg_connector - .insert_sequencer_commitment( - Into::<[u8; 32]>::into(tx_id).to_vec(), - l2_start.0 as u32, - l2_end.0 as u32, - commitment.merkle_root.to_vec(), - CommitmentStatus::Mempool, - ) - .await - .map_err(|_| { - anyhow!("Sequencer: Failed to insert sequencer commitment") - })?; - } - Err(e) => { - warn!("Failed to connect to postgres: {:?}", e); - } - } - } - ledger_db.delete_pending_commitment_l2_range(&(l2_start, l2_end))?; info!("New commitment. L2 range: #{}-{}", l2_start.0, l2_end.0); @@ -871,30 +840,11 @@ where .await .map_err(|e| anyhow!(e))?; - // If connected to offchain db first check if the commitments are in sync - let mut pg_pool = None; - if let Some(db_config) = self.config.db_config.clone() { - pg_pool = match PostgresConnector::new(db_config).await { - Ok(pg_connector) => { - match self.sync_commitments_from_db(pg_connector.clone()).await { - Ok(()) => debug!("Sequencer: Commitments are in sync"), - Err(e) => { - warn!("Sequencer: Offchain db error: {:?}", e); - } - } - match self.restore_mempool(pg_connector.clone()).await { - Ok(()) => debug!("Sequencer: Mempool restored"), - Err(e) => { - warn!("Sequencer: Mempool restore error: {:?}", e); - } - } - Some(pg_connector) - } - Err(e) => { - warn!("Failed to connect to postgres: {:?}", e); - None - } - }; + match self.restore_mempool().await { + Ok(()) => debug!("Sequencer: Mempool restored"), + Err(e) => { + warn!("Sequencer: Mempool restore error: {:?}", e); + } } // Initialize our knowledge of the state of the DA-layer @@ -996,7 +946,7 @@ where .map_err(|e| anyhow!(e))?; debug!("Created an empty L2 for L1={}", needed_da_block_height); - if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height).await { + if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, last_used_l1_height).await { error!("Sequencer error: {}", e); } } @@ -1012,7 +962,7 @@ where } }; let l1_fee_rate = l1_fee_rate.clamp(*l1_fee_rate_range.start(), *l1_fee_rate_range.end()); - match self.produce_l2_block(last_finalized_block.clone(), l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height).await { + match self.produce_l2_block(last_finalized_block.clone(), l1_fee_rate, L2BlockMode::NotEmpty, last_used_l1_height).await { Ok((l1_block_number, state_diff_threshold_reached)) => { last_used_l1_height = l1_block_number; @@ -1044,7 +994,7 @@ where .map_err(|e| anyhow!(e))?; debug!("Created an empty L2 for L1={}", needed_da_block_height); - if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, &pg_pool, last_used_l1_height).await { + if let Err(e) = self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::Empty, last_used_l1_height).await { error!("Sequencer error: {}", e); } } @@ -1062,7 +1012,7 @@ where let l1_fee_rate = l1_fee_rate.clamp(*l1_fee_rate_range.start(), *l1_fee_rate_range.end()); let instant = Instant::now(); - match self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::NotEmpty, &pg_pool, last_used_l1_height).await { + match self.produce_l2_block(da_block, l1_fee_rate, L2BlockMode::NotEmpty, last_used_l1_height).await { Ok((l1_block_number, state_diff_threshold_reached)) => { // Set the next iteration's wait time to produce a block based on the // previous block's execution time. @@ -1168,25 +1118,16 @@ where } /// Creates a shared RpcContext with all required data. - async fn create_rpc_context(&self) -> RpcContext { + async fn create_rpc_context(&self) -> RpcContext { let l2_force_block_tx = self.l2_force_block_tx.clone(); - let mut pg_pool = None; - if let Some(pg_config) = self.config.db_config.clone() { - pg_pool = match PostgresConnector::new(pg_config).await { - Ok(pg_connector) => Some(Arc::new(pg_connector)), - Err(e) => { - warn!("Failed to connect to postgres: {:?}", e); - None - } - }; - } + RpcContext { mempool: self.mempool.clone(), deposit_mempool: self.deposit_mempool.clone(), l2_force_block_tx, storage: self.storage.clone(), + ledger: self.ledger_db.clone(), test_mode: self.config.test_mode, - pg_pool, } } @@ -1201,14 +1142,11 @@ where Ok(rpc_methods) } - pub async fn restore_mempool( - &self, - pg_connector: PostgresConnector, - ) -> Result<(), anyhow::Error> { - let mempool_txs = pg_connector.get_all_txs().await?; - for tx in mempool_txs { + pub async fn restore_mempool(&self) -> Result<(), anyhow::Error> { + let mempool_txs = self.ledger_db.get_mempool_txs()?; + for (_, tx) in mempool_txs { let recovered = - recover_raw_transaction(reth_primitives::Bytes::from(tx.tx.as_slice().to_vec()))?; + recover_raw_transaction(reth_primitives::Bytes::from(tx.as_slice().to_vec()))?; let pooled_tx = EthPooledTransaction::from_recovered_pooled_transaction(recovered); let _ = self.mempool.add_external_transaction(pooled_tx).await?; @@ -1216,29 +1154,6 @@ where Ok(()) } - pub async fn sync_commitments_from_db( - &self, - pg_connector: PostgresConnector, - ) -> Result<(), anyhow::Error> { - let db_commitment = match pg_connector.get_last_commitment().await? { - Some(comm) => comm, - // ignore if postgres is out of sync - None => return Ok(()), - }; - let ledger_commitment_l2_height = self - .ledger_db - .get_last_sequencer_commitment_l2_height()? - .unwrap_or_default(); - if ledger_commitment_l2_height.0 >= db_commitment.l2_end_height { - return Ok(()); - } - - self.ledger_db - .set_last_sequencer_commitment_l2_height(BatchNumber(db_commitment.l2_end_height))?; - - Ok(()) - } - fn get_account_updates(&self) -> Result, anyhow::Error> { let head = self .db_provider diff --git a/crates/shared-backup-db/Cargo.toml b/crates/shared-backup-db/Cargo.toml deleted file mode 100644 index 36636d565..000000000 --- a/crates/shared-backup-db/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "shared-backup-db" -authors = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -repository = { workspace = true } - -version = { workspace = true } -publish = false -readme = "README.md" -resolver = "2" - -[dependencies] -deadpool-postgres = { workspace = true } -postgres-types = { workspace = true, features = ["with-serde_json-1"] } -serde = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } - -sov-rollup-interface = { path = "../sovereign-sdk/rollup-interface" } - -[dev-dependencies] - -[features] -default = [] -test-utils = [] diff --git a/crates/shared-backup-db/README.md b/crates/shared-backup-db/README.md deleted file mode 100644 index ba8f561e6..000000000 --- a/crates/shared-backup-db/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# Description -This is an offchain db where we store sequencer commmitments details. - -## Why is this required for sequencer -This database serves as a backup to ensure data integrity in case of any issues, and it will also support an API that provides access to the data. \ No newline at end of file diff --git a/crates/shared-backup-db/src/config.rs b/crates/shared-backup-db/src/config.rs deleted file mode 100644 index c313d2068..000000000 --- a/crates/shared-backup-db/src/config.rs +++ /dev/null @@ -1,101 +0,0 @@ -use deadpool_postgres::tokio_postgres::Config; -use serde::Deserialize; - -/// Offchain DB Config -#[derive(Debug, Clone, PartialEq, Deserialize)] -pub struct SharedBackupDbConfig { - db_host: String, - db_port: usize, - db_user: String, - db_password: String, - db_name: String, - max_pool_size: Option, -} - -impl SharedBackupDbConfig { - pub fn new( - db_host: String, - db_port: usize, - db_user: String, - db_password: String, - db_name: String, - max_pool_size: Option, - ) -> Self { - Self { - db_host, - db_port, - db_user, - db_password, - db_name, - max_pool_size, - } - } - - pub fn db_host(&self) -> &String { - &self.db_host - } - - pub fn db_port(&self) -> usize { - self.db_port - } - - pub fn db_user(&self) -> &String { - &self.db_user - } - - pub fn db_password(&self) -> &String { - &self.db_password - } - - pub fn db_name(&self) -> &String { - &self.db_name - } - - pub fn max_pool_size(&self) -> Option { - self.max_pool_size - } - - pub fn set_db_name(mut self, db_name: String) -> Self { - self.db_name = db_name; - self - } - - pub fn parse_to_connection_string(&self) -> String { - format!( - "host={} port={} user={} password={} dbname={}", - self.db_host, self.db_port, self.db_user, self.db_password, self.db_name - ) - } - - pub fn parse_to_connection_string_with_db(&self, db_name: String) -> String { - format!( - "host={} port={} user={} password={} dbname={}", - self.db_host, self.db_port, self.db_user, self.db_password, db_name - ) - } -} - -impl Default for SharedBackupDbConfig { - fn default() -> Self { - Self { - db_host: "localhost".to_string(), - db_port: 5432, - db_user: "postgres".to_string(), - db_password: "postgres".to_string(), - db_name: "postgres".to_string(), - max_pool_size: None, - } - } -} - -impl From for Config { - fn from(val: SharedBackupDbConfig) -> Self { - let mut cfg = Config::new(); - cfg.host(val.db_host()) - .port(val.db_port() as u16) - .user(val.db_user()) - .password(val.db_password()) - .dbname(val.db_name()) - .clone() - } -} diff --git a/crates/shared-backup-db/src/lib.rs b/crates/shared-backup-db/src/lib.rs deleted file mode 100644 index f40c8d787..000000000 --- a/crates/shared-backup-db/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod config; -pub mod postgres_connector; -pub mod tables; - -pub use config::SharedBackupDbConfig; -pub use postgres_connector::{DbPoolError, PostgresConnector}; -pub use tables::{CommitmentStatus, DbProof, DbSequencerCommitment, ProofType, Tables}; diff --git a/crates/shared-backup-db/src/postgres_connector.rs b/crates/shared-backup-db/src/postgres_connector.rs deleted file mode 100644 index a0b8fbe37..000000000 --- a/crates/shared-backup-db/src/postgres_connector.rs +++ /dev/null @@ -1,440 +0,0 @@ -use std::str::FromStr; - -use deadpool_postgres::tokio_postgres::config::Config as PgConfig; -use deadpool_postgres::tokio_postgres::{NoTls, Row}; -use deadpool_postgres::{Manager, ManagerConfig, Object, Pool, PoolError, RecyclingMethod}; -use sov_rollup_interface::rpc::StateTransitionRpcResponse; -use tracing::{debug, instrument}; - -use crate::config::SharedBackupDbConfig; -use crate::tables::{ - CommitmentStatus, DbMempoolTx, DbProof, DbSequencerCommitment, ProofType, Tables, - INDEX_L2_END_HEIGHT, MEMPOOL_TXS_TABLE_CREATE_QUERY, PROOF_TABLE_CREATE_QUERY, - SEQUENCER_COMMITMENT_TABLE_CREATE_QUERY, -}; - -pub type DbPoolError = PoolError; - -#[derive(Clone)] -pub struct PostgresConnector { - client: Pool, -} - -impl PostgresConnector { - #[instrument(level = "trace", err)] - pub async fn new(pg_config: SharedBackupDbConfig) -> Result { - let mut cfg: PgConfig = pg_config.clone().into(); - - let mgr_config = ManagerConfig { - recycling_method: RecyclingMethod::Fast, - }; - let mgr = Manager::from_config(cfg.clone(), NoTls, mgr_config.clone()); - let mut pool = Pool::builder(mgr) - .max_size(pg_config.max_pool_size().unwrap_or(16)) - .build() - .unwrap(); - let mut client = pool.get().await?; - - debug!("Connecting PG client to DB: {}", pg_config.db_name()); - - // Create new db if running thread is not main or tokio-runtime-worker, meaning when running for tests - if cfg!(feature = "test-utils") { - // create new db - let _ = client - .batch_execute(&format!("CREATE DATABASE {};", pg_config.db_name())) - .await; - - //connect to new db - cfg.dbname(pg_config.db_name()); - let mgr = Manager::from_config(cfg, NoTls, mgr_config); - pool = Pool::builder(mgr) - .max_size(pg_config.max_pool_size().unwrap_or(16)) - .build() - .unwrap(); - // new client - client = pool.get().await?; - } - - // create tables - client - .batch_execute(SEQUENCER_COMMITMENT_TABLE_CREATE_QUERY) - .await?; - client.batch_execute(MEMPOOL_TXS_TABLE_CREATE_QUERY).await?; - client.batch_execute(PROOF_TABLE_CREATE_QUERY).await?; - let db_client = Self { client: pool }; - - let _ = db_client.create_indexes().await; - - Ok(db_client) - } - - #[instrument(level = "trace", skip(self), err)] - pub async fn client(&self) -> Result { - self.client.get().await - } - - #[instrument(level = "trace", skip(self), err, ret)] - pub async fn create_indexes(&self) -> Result<(), PoolError> { - let client = self.client().await?; - // client.batch_execute(INDEX_L1_END_HEIGHT).await?; - // client.batch_execute(INDEX_L1_END_HASH).await?; - client.batch_execute(INDEX_L2_END_HEIGHT).await?; - Ok(()) - } - - #[cfg(feature = "test-utils")] - pub async fn new_test_client(db_name: String) -> Result { - let mut cfg: PgConfig = SharedBackupDbConfig::default().into(); - - let mgr_config = ManagerConfig { - recycling_method: RecyclingMethod::Fast, - }; - let mgr = Manager::from_config(cfg.clone(), NoTls, mgr_config.clone()); - let pool = Pool::builder(mgr).max_size(16).build().unwrap(); - let client = pool.get().await.unwrap(); - - client - .batch_execute(&format!("DROP DATABASE IF EXISTS {};", db_name.clone())) - .await - .unwrap(); - - client - .batch_execute(&format!("CREATE DATABASE {};", db_name.clone())) - .await - .unwrap(); - - drop(pool); - - //connect to new db - cfg.dbname(&db_name); - - let mgr = Manager::from_config(cfg, NoTls, mgr_config); - let test_pool = Pool::builder(mgr).max_size(16).build().unwrap(); - let test_client = test_pool.get().await.unwrap(); - - test_client - .batch_execute(SEQUENCER_COMMITMENT_TABLE_CREATE_QUERY) - .await - .unwrap(); - - test_client - .batch_execute(MEMPOOL_TXS_TABLE_CREATE_QUERY) - .await - .unwrap(); - test_client - .batch_execute(PROOF_TABLE_CREATE_QUERY) - .await - .unwrap(); - - let test_client = Self { client: test_pool }; - - test_client.create_indexes().await.unwrap(); - Ok(test_client) - } - - #[allow(clippy::too_many_arguments)] - #[instrument(level = "trace", skip_all, fields(l1_start_height), err, ret)] - pub async fn insert_sequencer_commitment( - &self, - l1_tx_id: Vec, - l2_start_height: u32, - l2_end_height: u32, - merkle_root: Vec, - status: CommitmentStatus, - ) -> Result { - let client = self.client().await?; - Ok(client - .execute( - "INSERT INTO sequencer_commitments (l1_tx_id, l2_start_height, l2_end_height, merkle_root, status) VALUES ($1, $2, $3, $4, $5)", - &[ - &l1_tx_id, - &l2_start_height, - &l2_end_height, - &merkle_root, - &status.to_string(), - ], - ).await?) - } - - #[instrument(level = "trace", skip(self, tx), err, ret)] - pub async fn insert_mempool_tx(&self, tx_hash: Vec, tx: Vec) -> Result { - let client = self.client().await?; - Ok(client - .execute( - "INSERT INTO mempool_txs (tx_hash, tx) VALUES ($1, $2);", - &[&tx_hash, &tx], - ) - .await?) - } - - #[instrument(level = "trace", skip(self), err)] - pub async fn get_all_commitments(&self) -> Result, PoolError> { - let client = self.client().await?; - Ok(client - .query("SELECT * FROM sequencer_commitments", &[]) - .await? - .iter() - .map(PostgresConnector::row_to_sequencer_commitment) - .collect()) - } - - #[instrument(level = "trace", skip(self), err)] - pub async fn get_all_txs(&self) -> Result, PoolError> { - let client = self.client().await?; - Ok(client - .query("SELECT * FROM mempool_txs", &[]) - .await? - .iter() - .map(PostgresConnector::row_to_mempool_tx) - .collect()) - } - - #[instrument(level = "trace", skip(self), err)] - pub async fn get_last_commitment(&self) -> Result, PoolError> { - let client = self.client().await?; - let rows = client - .query( - "SELECT * FROM sequencer_commitments ORDER BY id DESC LIMIT 1", - &[], - ) - .await?; - if rows.is_empty() { - return Ok(None); - } - Ok(Some(PostgresConnector::row_to_sequencer_commitment( - &rows[0], - ))) - } - - #[instrument(level = "trace", skip_all, err, ret)] - pub async fn delete_txs_by_tx_hashes(&self, tx_hashes: Vec>) -> Result { - let client = self.client().await?; - Ok(client - .execute( - "DELETE FROM mempool_txs WHERE tx_hash = ANY($1);", - &[&tx_hashes], - ) - .await?) - } - - #[allow(clippy::too_many_arguments)] - #[instrument(level = "trace", skip_all, fields(l1_tx_id), err, ret)] - pub async fn insert_proof_data( - &self, - l1_tx_id: Vec, - proof_data: Vec, - state_transition_rpc_response: StateTransitionRpcResponse, - proof_type: ProofType, - ) -> Result { - let state_tranistion_rpc_response_json = - postgres_types::Json::(state_transition_rpc_response); - let client = self.client().await?; - Ok(client - .execute( - "INSERT INTO proof (l1_tx_id, proof_data, state_transition, proof_type) VALUES ($1, $2, $3, $4);", - &[&l1_tx_id, &proof_data, &state_tranistion_rpc_response_json, &proof_type.to_string()], - ) - .await?) - } - - #[instrument(level = "trace", skip(self), err)] - pub async fn get_all_proof_data(&self) -> Result, PoolError> { - let client = self.client().await?; - Ok(client - .query("SELECT * FROM proof", &[]) - .await? - .iter() - .map(PostgresConnector::row_to_proof) - .collect()) - } - - #[instrument(level = "trace", skip(self), fields(%table), err, ret)] - pub async fn drop_table(&self, table: Tables) -> Result { - let client = self.client().await?; - Ok(client - .execute(format!("DROP TABLE {};", table).as_str(), &[]) - .await?) - } - - #[cfg(test)] - #[instrument(level = "trace", skip(self), fields(%table), ret)] - pub async fn create_table(&self, table: Tables) { - let client = self.client().await.unwrap(); - let query = match table { - Tables::SequencerCommitment => SEQUENCER_COMMITMENT_TABLE_CREATE_QUERY, - Tables::MempoolTxs => MEMPOOL_TXS_TABLE_CREATE_QUERY, - Tables::Proof => PROOF_TABLE_CREATE_QUERY, - }; - client.execute(query, &[]).await.unwrap(); - } - - // Helper function to convert a Row to DbSequencerCommitment - fn row_to_sequencer_commitment(row: &Row) -> DbSequencerCommitment { - DbSequencerCommitment { - l1_tx_id: row.get("l1_tx_id"), - // postgres does not support u64 - l2_start_height: row.get::<&str, u32>("l2_start_height") as u64, - l2_end_height: row.get::<&str, u32>("l2_end_height") as u64, - merkle_root: row.get("merkle_root"), - status: CommitmentStatus::from_str(row.get("status")).unwrap(), - } - } - - fn row_to_mempool_tx(row: &Row) -> DbMempoolTx { - DbMempoolTx { - tx_hash: row.get("tx_hash"), - tx: row.get("tx"), - } - } - - fn row_to_proof(row: &Row) -> DbProof { - DbProof { - l1_tx_id: row.get("l1_tx_id"), - proof_data: row.get("proof_data"), - state_transition: row.get("state_transition"), - proof_type: ProofType::from_str(row.get("proof_type")).unwrap(), - } - } -} - -#[cfg(all(test, feature = "test-utils"))] -mod tests { - use super::*; - use crate::tables::Tables; - - #[tokio::test] - async fn test_insert_sequencer_commitment() { - let client = PostgresConnector::new_test_client("insert_sequencer_commitments".to_owned()) - .await - .unwrap(); - client.create_table(Tables::SequencerCommitment).await; - - let inserted = client - .insert_sequencer_commitment( - vec![0; 32], - 10, - 11, - vec![1; 32], - CommitmentStatus::Mempool, - ) - .await - .unwrap(); - - assert_eq!(inserted, 1); - - let rows = client.get_all_commitments().await.unwrap(); - assert_eq!(rows.len(), 1); - assert_eq!(rows[0].l1_tx_id, vec![0; 32]); - assert_eq!(rows[0].l2_start_height, 10); - assert_eq!(rows[0].l2_end_height, 11); - assert!(matches!(rows[0].status, CommitmentStatus::Mempool)); - - let _ = client.drop_table(Tables::SequencerCommitment).await; - } - - #[tokio::test] - async fn test_insert_rlp_tx() { - let client = PostgresConnector::new_test_client("insert_rlp_tx".to_owned()) - .await - .unwrap(); - client.create_table(Tables::MempoolTxs).await; - - client - .insert_mempool_tx(vec![1, 2, 3], vec![1, 2, 4]) - .await - .unwrap(); - - let txs = client.get_all_txs().await.unwrap(); - - assert_eq!(txs.len(), 1); - assert_eq!( - txs[0], - DbMempoolTx { - tx_hash: vec![1, 2, 3], - tx: vec![1, 2, 4] - } - ); - - client - .insert_mempool_tx(vec![3, 4, 5], vec![10, 20, 42]) - .await - .unwrap(); - - client - .insert_mempool_tx(vec![5, 6, 7], vec![12, 22, 42]) - .await - .unwrap(); - - client - .delete_txs_by_tx_hashes(vec![vec![1, 2, 3], vec![5, 6, 7]]) - .await - .unwrap(); - - let txs = client.get_all_txs().await.unwrap(); - - assert_eq!(txs.len(), 1); - assert_eq!( - txs[0], - DbMempoolTx { - tx_hash: vec![3, 4, 5], - tx: vec![10, 20, 42] - } - ); - } - - #[tokio::test] - async fn test_insert_proof_data() { - let client = PostgresConnector::new_test_client("test_insert_proof_data".to_string()) - .await - .unwrap(); - client.create_table(Tables::Proof).await; - - let inserted = client - .insert_proof_data( - vec![0; 32], - vec![1; 32], - StateTransitionRpcResponse { - initial_state_root: [0; 32].to_vec(), - final_state_root: [1; 32].to_vec(), - state_diff: vec![(vec![2u8; 32], Some(vec![3u8; 32])), (vec![5u8; 32], None)] - .into_iter() - .collect(), - da_slot_hash: [2; 32], - sequencer_commitments_range: (0, 5), - sequencer_public_key: [3; 32].to_vec(), - sequencer_da_public_key: [4; 32].to_vec(), - validity_condition: [5; 32].to_vec(), - }, - ProofType::Full, - ) - .await - .unwrap(); - - assert_eq!(inserted, 1); - - let proofs = client.get_all_proof_data().await.unwrap(); - assert_eq!(proofs.len(), 1); - assert_eq!( - proofs[0], - DbProof { - l1_tx_id: vec![0; 32], - proof_data: vec![1; 32], - state_transition: postgres_types::Json(StateTransitionRpcResponse { - initial_state_root: [0; 32].to_vec(), - final_state_root: [1; 32].to_vec(), - state_diff: vec![(vec![2; 32], Some(vec![3; 32])), (vec![5; 32], None)] - .into_iter() - .collect(), - da_slot_hash: [2; 32], - sequencer_commitments_range: (0, 5), - sequencer_public_key: [3; 32].to_vec(), - sequencer_da_public_key: [4; 32].to_vec(), - validity_condition: [5; 32].to_vec(), - }), - proof_type: ProofType::Full, - } - ); - - client.drop_table(Tables::Proof).await.unwrap(); - } -} diff --git a/crates/shared-backup-db/src/tables.rs b/crates/shared-backup-db/src/tables.rs deleted file mode 100644 index bf0cb90d9..000000000 --- a/crates/shared-backup-db/src/tables.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::fmt; - -use postgres_types::Json; -use sov_rollup_interface::rpc::StateTransitionRpcResponse; - -pub enum Tables { - /// string version is sequencer_commitment - #[allow(dead_code)] - SequencerCommitment, - MempoolTxs, - Proof, -} - -// impl to_string for tables -impl fmt::Display for Tables { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Tables::SequencerCommitment => write!(f, "sequencer_commitments"), - Tables::MempoolTxs => write!(f, "mempool_txs"), - Tables::Proof => write!(f, "proof"), - } - } -} - -#[derive(Debug, Clone)] -pub enum CommitmentStatus { - Mempool, - Mined, - Finalized, -} - -impl fmt::Display for CommitmentStatus { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - CommitmentStatus::Mempool => write!(f, "mempool"), - CommitmentStatus::Mined => write!(f, "mined"), - CommitmentStatus::Finalized => write!(f, "finalized"), - } - } -} - -impl std::str::FromStr for CommitmentStatus { - type Err = (); - fn from_str(s: &str) -> Result { - match s { - "mempool" => Ok(CommitmentStatus::Mempool), - "mined" => Ok(CommitmentStatus::Mined), - "finalized" => Ok(CommitmentStatus::Finalized), - _ => Err(()), - } - } -} - -#[derive(Debug, Clone)] -pub struct DbSequencerCommitment { - /// Hex encoded L1 transaction ID - pub l1_tx_id: Vec, - // pub l1_start_height: u32, - // pub l1_end_height: u32, - // /// Hex encoded L1 start hash - // pub l1_start_hash: Vec, - // /// Hex encoded L1 end hash - // pub l1_end_hash: Vec, - pub l2_start_height: u64, - pub l2_end_height: u64, - /// Hex encoded merkle root of soft confirmation hashes - pub merkle_root: Vec, - pub status: CommitmentStatus, -} - -// Fields removed: -// -// l1_start_hash BYTEA NOT NULL, -// l1_end_hash BYTEA NOT NULL, -// l1_start_height OID NOT NULL, -// l1_end_height OID NOT NULL, -pub const SEQUENCER_COMMITMENT_TABLE_CREATE_QUERY: &str = " -CREATE TABLE IF NOT EXISTS sequencer_commitments ( - id SERIAL PRIMARY KEY, - l1_tx_id BYTEA NOT NULL, - l2_start_height OID NOT NULL, - l2_end_height OID NOT NULL, - merkle_root BYTEA NOT NULL, - status VARCHAR(15) NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - - UNIQUE (l2_start_height, l2_end_height) -); -"; - -pub const INDEX_L2_END_HEIGHT: &str = - "CREATE INDEX IF NOT EXISTS idx_l2_end_height ON sequencer_commitments(l2_end_height);"; -pub const INDEX_L1_END_HEIGHT: &str = - "CREATE INDEX IF NOT EXISTS idx_l1_end_height ON sequencer_commitments(l1_end_height);"; -pub const INDEX_L1_END_HASH: &str = - "CREATE INDEX IF NOT EXISTS idx_l1_end_hash ON sequencer_commitments(l1_end_hash);"; - -// tx is rlp encoded -pub const MEMPOOL_TXS_TABLE_CREATE_QUERY: &str = " -CREATE TABLE IF NOT EXISTS mempool_txs ( - id SERIAL PRIMARY KEY, - tx_hash BYTEA NOT NULL UNIQUE, - tx BYTEA NOT NULL -);"; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DbMempoolTx { - /// Tx Hash - pub tx_hash: Vec, - /// RLP encoded transaction - pub tx: Vec, -} - -pub const PROOF_TABLE_CREATE_QUERY: &str = " -CREATE TABLE IF NOT EXISTS proof ( - id SERIAL PRIMARY KEY, - l1_tx_id BYTEA NOT NULL, - proof_data BYTEA NOT NULL, - state_transition JSON NOT NULL, - proof_type VARCHAR(20) NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() -); -"; - -#[derive(Debug, Clone, PartialEq)] -pub struct DbProof { - pub l1_tx_id: Vec, - pub proof_data: Vec, - pub state_transition: Json, - pub proof_type: ProofType, -} - -#[derive(Debug, Clone, PartialEq)] -pub enum ProofType { - Full, - PublicInput, -} - -impl fmt::Display for ProofType { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - ProofType::Full => write!(f, "Full"), - ProofType::PublicInput => write!(f, "Public Input"), - } - } -} - -impl std::str::FromStr for ProofType { - type Err = (); - fn from_str(s: &str) -> Result { - match s { - "Full" => Ok(ProofType::Full), - "Public Input" => Ok(ProofType::PublicInput), - _ => Err(()), - } - } -} diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs index f585499ad..1e40bce0c 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs @@ -17,9 +17,9 @@ use crate::rocks_db_config::gen_rocksdb_options; use crate::schema::tables::{ ActiveFork, BatchByHash, BatchByNumber, CommitmentsByNumber, EventByKey, EventByNumber, L2GenesisStateRoot, L2RangeByL1Height, L2Witness, LastSequencerCommitmentSent, LastStateDiff, - PendingSequencerCommitmentL2Range, ProofBySlotNumber, ProverLastScannedSlot, SlotByHash, - SlotByNumber, SoftBatchByHash, SoftBatchByNumber, SoftConfirmationStatus, TxByHash, TxByNumber, - VerifiedProofsBySlotNumber, LEDGER_TABLES, + MempoolTxs, PendingSequencerCommitmentL2Range, ProofBySlotNumber, ProverLastScannedSlot, + SlotByHash, SlotByNumber, SoftBatchByHash, SoftBatchByNumber, SoftConfirmationStatus, TxByHash, + TxByNumber, VerifiedProofsBySlotNumber, LEDGER_TABLES, }; use crate::schema::types::{ split_tx_for_storage, BatchNumber, EventNumber, L2HeightRange, SlotNumber, StoredBatch, @@ -466,6 +466,26 @@ impl SharedLedgerOps for LedgerDB { .get::(&()) .map(|fork| fork.unwrap_or_default()) } + + /// Get the most recent committed batch + /// Returns L2 height. + #[instrument(level = "trace", skip(self), err, ret)] + fn get_last_commitment_l2_height(&self) -> anyhow::Result> { + self.db.get::(&()) + } + + /// Used by the nodes to record that it has committed a soft confirmations on a given L2 height. + /// For a sequencer, the last commitment height is set when the block is produced. + /// For a full node the last commitment is set when a commitment is read from a finalized DA layer block. + #[instrument(level = "trace", skip(self), err, ret)] + fn set_last_commitment_l2_height(&self, l2_height: BatchNumber) -> Result<(), anyhow::Error> { + let mut schema_batch = SchemaBatch::new(); + + schema_batch.put::(&(), &l2_height)?; + self.db.write_schemas(schema_batch)?; + + Ok(()) + } } impl ProverLedgerOps for LedgerDB { @@ -653,20 +673,6 @@ impl SequencerLedgerOps for LedgerDB { Ok(()) } - /// Used by the sequencer to record that it has committed to soft confirmations on a given L2 height - #[instrument(level = "trace", skip(self), err, ret)] - fn set_last_sequencer_commitment_l2_height( - &self, - l2_height: BatchNumber, - ) -> Result<(), anyhow::Error> { - let mut schema_batch = SchemaBatch::new(); - - schema_batch.put::(&(), &l2_height)?; - self.db.write_schemas(schema_batch)?; - - Ok(()) - } - /// Gets all pending commitments' l2 ranges. /// Returns start-end L2 heights. #[instrument(level = "trace", skip(self), err)] @@ -708,17 +714,10 @@ impl SequencerLedgerOps for LedgerDB { Ok(()) } - /// Get the most recent committed batch - /// Returns L2 height. - #[instrument(level = "trace", skip(self), err, ret)] - fn get_last_sequencer_commitment_l2_height(&self) -> anyhow::Result> { - self.db.get::(&()) - } - /// Get the most recent commitment's l1 height #[instrument(level = "trace", skip(self), err, ret)] fn get_l1_height_of_last_commitment(&self) -> anyhow::Result> { - let l2_height = self.get_last_sequencer_commitment_l2_height()?; + let l2_height = self.get_last_commitment_l2_height()?; match l2_height { Some(l2_height) => { let soft_confirmation = self @@ -729,6 +728,35 @@ impl SequencerLedgerOps for LedgerDB { None => Ok(None), } } + + fn insert_mempool_tx(&self, tx_hash: Vec, tx: Vec) -> anyhow::Result<()> { + let mut schema_batch = SchemaBatch::new(); + schema_batch.put::(&tx_hash, &tx)?; + + self.db.write_schemas(schema_batch)?; + + Ok(()) + } + + fn get_mempool_txs(&self) -> anyhow::Result, Vec)>> { + let mut iter = self.db.iter::()?; + iter.seek_to_first(); + + let txs = iter + .map(|item| item.map(|item| (item.key, item.value))) + .collect::, Vec)>, _>>()?; + + Ok(txs) + } + + fn remove_mempool_txs(&self, tx_hashes: Vec>) -> anyhow::Result<()> { + let mut schema_batch = SchemaBatch::new(); + for tx_hash in tx_hashes { + schema_batch.delete::(&tx_hash)?; + } + self.db.write_schemas(schema_batch)?; + Ok(()) + } } impl NodeLedgerOps for LedgerDB { diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs index cb69385d0..072387f63 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs @@ -122,6 +122,13 @@ pub trait SharedLedgerOps { /// Gets the currently active fork fn get_active_fork(&self) -> Result; + + /// Used by the sequencer to record that it has committed to soft confirmations on a given L2 height + fn set_last_commitment_l2_height(&self, l2_height: BatchNumber) -> Result<()>; + + /// Get the most recent committed batch + /// Returns L2 height. + fn get_last_commitment_l2_height(&self) -> anyhow::Result>; } /// Node ledger operations @@ -198,9 +205,6 @@ pub trait SequencerLedgerOps: SharedLedgerOps { data_to_commit: SlotCommit, ) -> Result<()>; - /// Used by the sequencer to record that it has committed to soft confirmations on a given L2 height - fn set_last_sequencer_commitment_l2_height(&self, l2_height: BatchNumber) -> Result<()>; - /// Gets all pending commitments' l2 ranges. /// Returns start-end L2 heights. fn get_pending_commitments_l2_range(&self) -> Result>; @@ -214,10 +218,15 @@ pub trait SequencerLedgerOps: SharedLedgerOps { /// Sets the latest state diff fn set_state_diff(&self, state_diff: StateDiff) -> Result<()>; - /// Get the most recent committed batch - /// Returns L2 height. - fn get_last_sequencer_commitment_l2_height(&self) -> anyhow::Result>; - /// Get the most recent commitment's l1 height fn get_l1_height_of_last_commitment(&self) -> anyhow::Result>; + + /// Insert mempool transaction + fn insert_mempool_tx(&self, tx_hash: Vec, tx: Vec) -> anyhow::Result<()>; + + /// Insert mempool transaction + fn remove_mempool_txs(&self, tx_hashes: Vec>) -> anyhow::Result<()>; + + /// Fetch mempool transactions + fn get_mempool_txs(&self) -> anyhow::Result, Vec)>>; } diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs index 02f05fe1b..6c15ff47c 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs @@ -74,6 +74,7 @@ pub const LEDGER_TABLES: &[&str] = &[ CommitmentsByNumber::table_name(), ProofBySlotNumber::table_name(), VerifiedProofsBySlotNumber::table_name(), + MempoolTxs::table_name(), ]; /// A list of all tables used by the NativeDB. These tables store @@ -337,6 +338,11 @@ define_table_with_default_codec!( (VerifiedProofsBySlotNumber) SlotNumber => Vec ); +define_table_with_default_codec!( + /// Transactions in mempool (TxHash, TxData) + (MempoolTxs) Vec => Vec +); + impl KeyEncoder for NodeKey { fn encode_key(&self) -> sov_schema_db::schema::Result> { // 8 bytes for version, 4 each for the num_nibbles and bytes.len() fields, plus 1 byte per byte of nibllepath diff --git a/crates/sovereign-sdk/full-node/sov-stf-runner/Cargo.toml b/crates/sovereign-sdk/full-node/sov-stf-runner/Cargo.toml index 88e31eab5..8bdb6925a 100644 --- a/crates/sovereign-sdk/full-node/sov-stf-runner/Cargo.toml +++ b/crates/sovereign-sdk/full-node/sov-stf-runner/Cargo.toml @@ -36,7 +36,6 @@ sov-rollup-interface = { path = "../../rollup-interface" } # Citrea-Deps citrea-primitives = { path = "../../../primitives" } -shared-backup-db = { path = "../../../shared-backup-db", optional = true } [dev-dependencies] sha2 = { workspace = true } @@ -60,7 +59,6 @@ native = [ "futures", "async-trait", "thiserror", - "shared-backup-db", "rand", "tower", "hyper", diff --git a/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs b/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs index 87cda068d..668f21dae 100644 --- a/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs +++ b/crates/sovereign-sdk/full-node/sov-stf-runner/src/config.rs @@ -4,7 +4,6 @@ use std::path::{Path, PathBuf}; use serde::de::DeserializeOwned; use serde::Deserialize; -use shared_backup_db::SharedBackupDbConfig; use crate::ProverGuestRunConfig; @@ -130,8 +129,6 @@ pub struct ProverConfig { pub proving_mode: ProverGuestRunConfig, /// Average number of commitments to prove pub proof_sampling_number: usize, - /// Offchain db config - pub db_config: Option, } impl Default for ProverConfig { @@ -139,7 +136,6 @@ impl Default for ProverConfig { Self { proving_mode: ProverGuestRunConfig::Execute, proof_sampling_number: 0, - db_config: None, } } } @@ -244,13 +240,6 @@ mod tests { let config = r#" proving_mode = "skip" proof_sampling_number = 500 - - [db_config] - db_host = "localhost" - db_port = 5432 - db_user = "postgres" - db_password = "postgres" - db_name = "postgres" "#; let config_file = create_config_from(config); @@ -259,7 +248,6 @@ mod tests { let expected = ProverConfig { proving_mode: ProverGuestRunConfig::Skip, proof_sampling_number: 500, - db_config: Some(SharedBackupDbConfig::default()), }; assert_eq!(config, expected); } diff --git a/docker-compose.postgres.yml b/docker-compose.postgres.yml deleted file mode 100644 index 9443d3240..000000000 --- a/docker-compose.postgres.yml +++ /dev/null @@ -1,11 +0,0 @@ -services: - postgres: - image: postgres:16-alpine - ports: - - 5432:5432 - volumes: - - ./resources/dbs/postgres:/var/lib/postgresql/data - environment: - - POSTGRES_PASSWORD=postgres - - POSTGRES_USER=postgres - - POSTGRES_DB=postgres diff --git a/docs/run.md b/docs/run.md index 7e51f959c..0ce8d4462 100644 --- a/docs/run.md +++ b/docs/run.md @@ -14,19 +14,6 @@ Build citrea: make build ``` -### Prequisites - -For production use cases, we leverage PostgreSQL for a few extra features in the sequencer. These features are optional, if you don't want to run them, make sure the `sequencer_config.toml` file does not contain the db_config altogether in order to skip using a storage DB backend such as postgres. - -If running Postgres is prefered, you can execute the following command: - -```sh -docker compose -f docker-compose.postgres.yml up -d - -``` - -this will run postgres in a dockerized daemon mode. - ### Run on Mock DA Run on a local da layer, sharable between nodes that run on your computer. diff --git a/resources/configs/bitcoin-regtest/sequencer_config.toml b/resources/configs/bitcoin-regtest/sequencer_config.toml index 34756184f..f4c639071 100644 --- a/resources/configs/bitcoin-regtest/sequencer_config.toml +++ b/resources/configs/bitcoin-regtest/sequencer_config.toml @@ -13,10 +13,3 @@ queue_tx_size = 200 base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 - -[db_config] -db_host = "localhost" -db_port = 5432 -db_user = "postgres" -db_password = "postgres" -db_name = "postgres" diff --git a/resources/configs/mock-dockerized/sequencer_config.toml b/resources/configs/mock-dockerized/sequencer_config.toml index 6e0e00f2f..97795bf4d 100644 --- a/resources/configs/mock-dockerized/sequencer_config.toml +++ b/resources/configs/mock-dockerized/sequencer_config.toml @@ -13,10 +13,3 @@ queue_tx_size = 200 base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 - -[db_config] -db_host = "localhost" -db_port = 5432 -db_user = "postgres" -db_password = "postgres" -db_name = "postgres" diff --git a/resources/configs/mock/prover_config.toml b/resources/configs/mock/prover_config.toml index 105b60c4e..f22d841e5 100644 --- a/resources/configs/mock/prover_config.toml +++ b/resources/configs/mock/prover_config.toml @@ -1,9 +1,2 @@ proving_mode = "execute" proof_sampling_number = 500 - -[db_config] -db_host = "localhost" -db_port = 5432 -db_user = "postgres" -db_password = "postgres" -db_name = "postgres" diff --git a/resources/configs/mock/sequencer_config.toml b/resources/configs/mock/sequencer_config.toml index 6e0e00f2f..97795bf4d 100644 --- a/resources/configs/mock/sequencer_config.toml +++ b/resources/configs/mock/sequencer_config.toml @@ -13,10 +13,3 @@ queue_tx_size = 200 base_fee_tx_limit = 100000 base_fee_tx_size = 200 max_account_slots = 16 - -[db_config] -db_host = "localhost" -db_port = 5432 -db_user = "postgres" -db_password = "postgres" -db_name = "postgres" diff --git a/resources/hive/.dockerignore b/resources/hive/.dockerignore index 44710fb34..e69de29bb 100644 --- a/resources/hive/.dockerignore +++ b/resources/hive/.dockerignore @@ -1 +0,0 @@ -*/data/postgres