From 471f4a739f02e8bf9360edb3cc1c55d57b97f70b Mon Sep 17 00:00:00 2001 From: Piotr Heilman <1212808+piohei@users.noreply.github.com> Date: Tue, 28 May 2024 11:48:11 +0200 Subject: [PATCH] Added batching using DB. (#731) --- .../database/013_batches_and_transactions.sql | 24 ++ src/app.rs | 22 +- src/database/mod.rs | 164 +++++++++++ src/database/query.rs | 263 ++++++++++++++++++ src/database/types.rs | 140 ++++++++++ src/prover/identity.rs | 3 +- src/task_monitor.rs | 26 +- ...rocess_identities.rs => create_batches.rs} | 210 +++++++------- src/task_monitor/tasks/mod.rs | 3 +- src/task_monitor/tasks/process_batches.rs | 206 ++++++++++++++ tests/common/mod.rs | 3 +- tests/tree_restore_multiple_commitments.rs | 2 +- tests/tree_restore_one_commitment.rs | 2 +- tests/tree_restore_with_root_back_to_init.rs | 202 ++++++++++++++ .../tree_restore_with_root_back_to_middle.rs | 239 ++++++++++++++++ 15 files changed, 1385 insertions(+), 124 deletions(-) create mode 100644 schemas/database/013_batches_and_transactions.sql rename src/task_monitor/tasks/{process_identities.rs => create_batches.rs} (78%) create mode 100644 src/task_monitor/tasks/process_batches.rs create mode 100644 tests/tree_restore_with_root_back_to_init.rs create mode 100644 tests/tree_restore_with_root_back_to_middle.rs diff --git a/schemas/database/013_batches_and_transactions.sql b/schemas/database/013_batches_and_transactions.sql new file mode 100644 index 00000000..f57ca4ca --- /dev/null +++ b/schemas/database/013_batches_and_transactions.sql @@ -0,0 +1,24 @@ +-- Create ENUM for prover type +CREATE TABLE batches +( + id BIGSERIAL UNIQUE PRIMARY KEY, + next_root BYTEA NOT NULL UNIQUE, + prev_root BYTEA UNIQUE, + created_at TIMESTAMPTZ NOT NULL, + batch_type VARCHAR(50) NOT NULL, + data JSON NOT NULL, + + FOREIGN KEY (prev_root) REFERENCES batches (next_root) ON DELETE CASCADE +); + +CREATE INDEX idx_batches_prev_root ON batches (prev_root); +CREATE UNIQUE INDEX i_single_null_prev_root ON batches ((batches.prev_root IS NULL)) WHERE batches.prev_root IS NULL; + +CREATE TABLE transactions +( + transaction_id VARCHAR(256) NOT NULL UNIQUE PRIMARY KEY, + batch_next_root BYTEA NOT NULL UNIQUE, + created_at TIMESTAMPTZ NOT NULL, + + FOREIGN KEY (batch_next_root) REFERENCES batches (next_root) ON DELETE CASCADE +); \ No newline at end of file diff --git a/src/app.rs b/src/app.rs index 6dcec2c6..b6433e7f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -16,7 +16,7 @@ use crate::database::Database; use crate::ethereum::Ethereum; use crate::identity_tree::{ CanonicalTreeBuilder, Hash, InclusionProof, ProcessedStatus, RootItem, Status, TreeState, - TreeUpdate, TreeVersionReadOps, UnprocessedStatus, + TreeUpdate, TreeVersionReadOps, TreeWithNextVersion, UnprocessedStatus, }; use crate::prover::map::initialize_prover_maps; use crate::prover::{ProverConfig, ProverType}; @@ -114,10 +114,12 @@ impl App { // Note that we don't have a way of queuing a root here for finalization. // so it's going to stay as "processed" until the next root is mined. self.database.mark_root_as_processed_tx(&root_hash).await?; + self.database.delete_batches_after_root(&root_hash).await?; } else { // Db is either empty or we're restarting with a new contract/chain // so we should mark everything as pending self.database.mark_all_as_pending().await?; + self.database.delete_all_batches().await?; } let timer = Instant::now(); @@ -263,6 +265,7 @@ impl App { let (processed, batching_builder) = processed_builder.seal_and_continue(); let (batching, mut latest_builder) = batching_builder.seal_and_continue(); + let pending_items = self .database .get_commitments_by_status(ProcessedStatus::Pending) @@ -271,6 +274,15 @@ impl App { latest_builder.update(&update); } let latest = latest_builder.seal(); + + let batch = self.database.get_latest_batch().await?; + if let Some(batch) = batch { + if batching.get_root() != batch.next_root { + batching.apply_updates_up_to(batch.next_root); + } + assert_eq!(batching.get_root(), batch.next_root); + } + Ok(Some(TreeState::new(mined, processed, batching, latest))) } @@ -357,6 +369,14 @@ impl App { let latest = latest_builder.seal(); + let batch = self.database.get_latest_batch().await?; + if let Some(batch) = batch { + if batching.get_root() != batch.next_root { + batching.apply_updates_up_to(batch.next_root); + } + assert_eq!(batching.get_root(), batch.next_root); + } + Ok(TreeState::new(mined, processed, batching, latest)) } diff --git a/src/database/mod.rs b/src/database/mod.rs index 946d4531..d1b68971 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -164,7 +164,9 @@ mod test { use super::Database; use crate::config::DatabaseConfig; use crate::database::query::DatabaseQuery; + use crate::database::types::BatchType; use crate::identity_tree::{Hash, ProcessedStatus, Status, UnprocessedStatus}; + use crate::prover::identity::Identity; use crate::prover::{ProverConfig, ProverType}; use crate::utils::secret::SecretUrl; @@ -1334,4 +1336,166 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_insert_batch() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities: Vec<_> = mock_identities(10) + .iter() + .map(|commitment| { + Identity::new( + (*commitment).into(), + mock_roots(10).iter().map(|root| (*root).into()).collect(), + ) + }) + .collect(); + let roots = mock_roots(2); + + db.insert_new_batch_head(&roots[0]).await?; + db.insert_new_batch(&roots[1], &roots[0], BatchType::Insertion, &identities, &[ + 0, + ]) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn test_get_next_batch() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities: Vec<_> = mock_identities(10) + .iter() + .map(|commitment| { + Identity::new( + (*commitment).into(), + mock_roots(10).iter().map(|root| (*root).into()).collect(), + ) + }) + .collect(); + let indexes = vec![0]; + let roots = mock_roots(2); + + db.insert_new_batch_head(&roots[0]).await?; + db.insert_new_batch( + &roots[1], + &roots[0], + BatchType::Insertion, + &identities, + &indexes, + ) + .await?; + + let next_batch = db.get_next_batch(&roots[0]).await?; + + assert!(next_batch.is_some()); + + let next_batch = next_batch.unwrap(); + + assert_eq!(next_batch.prev_root.unwrap(), roots[0]); + assert_eq!(next_batch.next_root, roots[1]); + assert_eq!(next_batch.data.0.identities, identities); + assert_eq!(next_batch.data.0.indexes, indexes); + + let next_batch = db.get_next_batch(&roots[1]).await?; + + assert!(next_batch.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_next_batch_without_transaction() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let identities: Vec<_> = mock_identities(10) + .iter() + .map(|commitment| { + Identity::new( + (*commitment).into(), + mock_roots(10).iter().map(|root| (*root).into()).collect(), + ) + }) + .collect(); + let indexes = vec![0]; + let roots = mock_roots(2); + let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9"); + + db.insert_new_batch_head(&roots[0]).await?; + db.insert_new_batch( + &roots[1], + &roots[0], + BatchType::Insertion, + &identities, + &indexes, + ) + .await?; + + let next_batch = db.get_next_batch_without_transaction().await?; + + assert!(next_batch.is_some()); + + let next_batch = next_batch.unwrap(); + + assert_eq!(next_batch.prev_root.unwrap(), roots[0]); + assert_eq!(next_batch.next_root, roots[1]); + assert_eq!(next_batch.data.0.identities, identities); + assert_eq!(next_batch.data.0.indexes, indexes); + + db.insert_new_transaction(&transaction_id, &roots[1]) + .await?; + + let next_batch = db.get_next_batch_without_transaction().await?; + + assert!(next_batch.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_batch_head() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let roots = mock_roots(1); + + let batch_head = db.get_batch_head().await?; + + assert!(batch_head.is_none()); + + db.insert_new_batch_head(&roots[0]).await?; + + let batch_head = db.get_batch_head().await?; + + assert!(batch_head.is_some()); + let batch_head = batch_head.unwrap(); + + assert_eq!(batch_head.prev_root, None); + assert_eq!(batch_head.next_root, roots[0]); + assert!( + batch_head.data.0.identities.is_empty(), + "Should have empty identities." + ); + assert!( + batch_head.data.0.indexes.is_empty(), + "Should have empty indexes." + ); + + Ok(()) + } + + #[tokio::test] + async fn test_insert_transaction() -> anyhow::Result<()> { + let docker = Cli::default(); + let (db, _db_container) = setup_db(&docker).await?; + let roots = mock_roots(1); + let transaction_id = String::from("173bcbfd-e1d9-40e2-ba10-fc1dfbf742c9"); + + db.insert_new_batch_head(&roots[0]).await?; + + db.insert_new_transaction(&transaction_id, &roots[0]) + .await?; + + Ok(()) + } } diff --git a/src/database/query.rs b/src/database/query.rs index 80fec59c..c93da388 100644 --- a/src/database/query.rs +++ b/src/database/query.rs @@ -6,10 +6,12 @@ use sqlx::{Executor, Postgres, Row}; use tracing::instrument; use types::{DeletionEntry, LatestDeletionEntry, RecoveryEntry}; +use crate::database::types::{BatchEntry, BatchEntryData, BatchType, TransactionEntry}; use crate::database::{types, Error}; use crate::identity_tree::{ Hash, ProcessedStatus, RootItem, TreeItem, TreeUpdate, UnprocessedStatus, }; +use crate::prover::identity::Identity; use crate::prover::{ProverConfig, ProverType}; const MAX_UNPROCESSED_FETCH_COUNT: i64 = 10_000; @@ -542,4 +544,265 @@ pub trait DatabaseQuery<'a>: Executor<'a, Database = Postgres> { let row_unprocessed = self.fetch_one(query_queued_deletion).await?; Ok(row_unprocessed.get::(0)) } + + async fn insert_new_batch_head(self, next_root: &Hash) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO batches( + id, + next_root, + prev_root, + created_at, + batch_type, + data + ) VALUES (DEFAULT, $1, NULL, CURRENT_TIMESTAMP, $2, $3) + "#, + ) + .bind(next_root) + .bind(BatchType::Insertion) + .bind(sqlx::types::Json::from(BatchEntryData { + identities: vec![], + indexes: vec![], + })); + + self.execute(query).await?; + Ok(()) + } + + async fn insert_new_batch( + self, + next_root: &Hash, + prev_root: &Hash, + batch_type: BatchType, + identities: &[Identity], + indexes: &[usize], + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO batches( + id, + next_root, + prev_root, + created_at, + batch_type, + data + ) VALUES (DEFAULT, $1, $2, CURRENT_TIMESTAMP, $3, $4) + "#, + ) + .bind(next_root) + .bind(prev_root) + .bind(batch_type) + .bind(sqlx::types::Json::from(BatchEntryData { + identities: identities.to_vec(), + indexes: indexes.to_vec(), + })); + + self.execute(query).await?; + Ok(()) + } + + async fn get_next_batch(self, prev_root: &Hash) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE prev_root = $1 + LIMIT 1 + "#, + ) + .bind(prev_root) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_latest_batch(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches + ORDER BY id DESC + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_latest_batch_with_transaction(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + batches.id, + batches.next_root, + batches.prev_root, + batches.created_at, + batches.batch_type, + batches.data + FROM batches + LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root + WHERE transactions.batch_next_root IS NOT NULL AND batches.prev_root IS NOT NULL + ORDER BY batches.id DESC + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_next_batch_without_transaction(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + batches.id, + batches.next_root, + batches.prev_root, + batches.created_at, + batches.batch_type, + batches.data + FROM batches + LEFT JOIN transactions ON batches.next_root = transactions.batch_next_root + WHERE transactions.batch_next_root IS NULL AND batches.prev_root IS NOT NULL + ORDER BY batches.id ASC + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_batch_head(self) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE prev_root IS NULL + LIMIT 1 + "#, + ) + .fetch_optional(self) + .await?; + + Ok(res) + } + + async fn get_all_batches_after(self, id: i64) -> Result, Error> { + let res = sqlx::query_as::<_, BatchEntry>( + r#" + SELECT + id, + next_root, + prev_root, + created_at, + batch_type, + data + FROM batches WHERE id >= $1 ORDER BY id ASC + "#, + ) + .bind(id) + .fetch_all(self) + .await?; + + Ok(res) + } + + #[instrument(skip(self), level = "debug")] + async fn delete_batches_after_root(self, root: &Hash) -> Result<(), Error> { + let query = sqlx::query( + r#" + DELETE FROM batches + WHERE prev_root = $1 + "#, + ) + .bind(root); + + self.execute(query).await?; + Ok(()) + } + + #[instrument(skip(self), level = "debug")] + async fn delete_all_batches(self) -> Result<(), Error> { + let query = sqlx::query( + r#" + DELETE FROM batches + "#, + ); + + self.execute(query).await?; + Ok(()) + } + + async fn root_in_batch_chain(self, root: &Hash) -> Result { + let query = sqlx::query( + r#"SELECT exists(SELECT 1 FROM batches where prev_root = $1 OR next_root = $1)"#, + ) + .bind(root); + let row_unprocessed = self.fetch_one(query).await?; + Ok(row_unprocessed.get::(0)) + } + + async fn insert_new_transaction( + self, + transaction_id: &String, + batch_next_root: &Hash, + ) -> Result<(), Error> { + let query = sqlx::query( + r#" + INSERT INTO transactions( + transaction_id, + batch_next_root, + created_at + ) VALUES ($1, $2, CURRENT_TIMESTAMP) + "#, + ) + .bind(transaction_id) + .bind(batch_next_root); + + self.execute(query).await?; + Ok(()) + } + + async fn get_transaction_for_batch( + self, + next_root: &Hash, + ) -> Result, Error> { + let res = sqlx::query_as::<_, TransactionEntry>( + r#" + SELECT + transaction_id, + batch_next_root, + created_at + FROM transactions WHERE batch_next_root = $1 + LIMIT 1 + "#, + ) + .bind(next_root) + .fetch_optional(self) + .await?; + + Ok(res) + } } diff --git a/src/database/types.rs b/src/database/types.rs index 71b596b2..08239ef7 100644 --- a/src/database/types.rs +++ b/src/database/types.rs @@ -1,7 +1,13 @@ use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::database::{HasArguments, HasValueRef}; +use sqlx::encode::IsNull; +use sqlx::error::BoxDynError; use sqlx::prelude::FromRow; +use sqlx::{Decode, Encode, Postgres, Type}; use crate::identity_tree::{Hash, Status, UnprocessedStatus}; +use crate::prover::identity::Identity; pub struct UnprocessedCommitment { pub commitment: Hash, @@ -37,3 +43,137 @@ pub struct CommitmentHistoryEntry { pub held_back: bool, pub status: Status, } + +#[derive(Debug, Copy, Clone, sqlx::Type, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +#[sqlx(type_name = "VARCHAR", rename_all = "PascalCase")] +pub enum BatchType { + #[default] + Insertion, + Deletion, +} + +impl From for BatchType { + fn from(s: String) -> Self { + match s.as_str() { + "Insertion" => BatchType::Insertion, + "Deletion" => BatchType::Deletion, + _ => BatchType::Insertion, + } + } +} + +impl std::fmt::Display for BatchType { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + BatchType::Insertion => write!(f, "insertion"), + BatchType::Deletion => write!(f, "deletion"), + } + } +} + +#[derive(Debug, Clone, FromRow)] +pub struct BatchEntry { + pub id: i64, + pub next_root: Hash, + // In general prev_root is present all the time except the first row (head of the batches + // chain) + pub prev_root: Option, + pub created_at: DateTime, + pub batch_type: BatchType, + pub data: sqlx::types::Json, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BatchEntryData { + pub identities: Vec, + pub indexes: Vec, +} + +#[derive(Debug, Clone, FromRow)] +pub struct TransactionEntry { + pub batch_next_root: Hash, + pub transaction_id: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Commitments(pub Vec); + +impl Encode<'_, Postgres> for Commitments { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { + let commitments = &self + .0 + .iter() + .map(|c| c.to_be_bytes()) // Why be not le? + .collect::>(); + + <&Vec<[u8; 32]> as Encode>::encode(commitments, buf) + } +} + +impl Decode<'_, Postgres> for Commitments { + fn decode(value: >::ValueRef) -> Result { + let value = as Decode>::decode(value)?; + + let res = value.iter().map(|&v| Hash::from_be_bytes(v)).collect(); + + Ok(Commitments(res)) + } +} + +impl Type for Commitments { + fn type_info() -> ::TypeInfo { + <&Vec<&[u8]> as Type>::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + <&Vec<&[u8]> as Type>::compatible(ty) + } +} + +impl From> for Commitments { + fn from(value: Vec) -> Self { + Commitments(value) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LeafIndexes(pub Vec); + +impl Encode<'_, Postgres> for LeafIndexes { + fn encode_by_ref(&self, buf: &mut >::ArgumentBuffer) -> IsNull { + let commitments = &self + .0 + .iter() + .map(|&c| c as i64) // Why be not le? + .collect(); + + <&Vec as Encode>::encode(commitments, buf) + } +} + +impl Decode<'_, Postgres> for LeafIndexes { + fn decode(value: >::ValueRef) -> Result { + let value = as Decode>::decode(value)?; + + let res = value.iter().map(|&v| v as usize).collect(); + + Ok(LeafIndexes(res)) + } +} + +impl Type for LeafIndexes { + fn type_info() -> ::TypeInfo { + <&Vec as Type>::type_info() + } + + fn compatible(ty: &::TypeInfo) -> bool { + <&Vec as Type>::compatible(ty) + } +} + +impl From> for LeafIndexes { + fn from(value: Vec) -> Self { + LeafIndexes(value) + } +} diff --git a/src/prover/identity.rs b/src/prover/identity.rs index c0a56b3b..4c2d5f00 100644 --- a/src/prover/identity.rs +++ b/src/prover/identity.rs @@ -1,8 +1,9 @@ use ethers::types::U256; +use serde::{Deserialize, Serialize}; /// A representation of an identity insertion into the merkle tree as used for /// the prover endpoint. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Identity { /// The identity commitment value that is inserted into the merkle tree. pub commitment: U256, diff --git a/src/task_monitor.rs b/src/task_monitor.rs index aac1b1ae..7de91b35 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -143,13 +143,37 @@ impl TaskMonitor { handles.push(queue_monitor_handle); // Process identities + let base_next_batch_notify = Arc::new(Notify::new()); + + // Create batches + let app = self.app.clone(); + let next_batch_notify = base_next_batch_notify.clone(); + let wake_up_notify = base_wake_up_notify.clone(); + + let create_batches = move || { + tasks::create_batches::create_batches( + app.clone(), + next_batch_notify.clone(), + wake_up_notify.clone(), + ) + }; + let create_batches_handle = crate::utils::spawn_monitored_with_backoff( + create_batches, + shutdown_sender.clone(), + PROCESS_IDENTITIES_BACKOFF, + ); + handles.push(create_batches_handle); + + // Process batches let app = self.app.clone(); + let next_batch_notify = base_next_batch_notify.clone(); let wake_up_notify = base_wake_up_notify.clone(); let process_identities = move || { - tasks::process_identities::process_identities( + tasks::process_batches::process_batches( app.clone(), monitored_txs_sender.clone(), + next_batch_notify.clone(), wake_up_notify.clone(), ) }; diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/create_batches.rs similarity index 78% rename from src/task_monitor/tasks/process_identities.rs rename to src/task_monitor/tasks/create_batches.rs index 01c1aec2..4ab8638b 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/create_batches.rs @@ -2,18 +2,19 @@ use std::sync::Arc; use anyhow::Context; use chrono::{DateTime, Utc}; -use ethers::types::U256; +use ethers::prelude::U256; use ruint::Uint; use semaphore::merkle_tree::Proof; use semaphore::poseidon_tree::{Branch, PoseidonHash}; -use tokio::sync::{mpsc, Notify}; +use tokio::sync::Notify; use tokio::{select, time}; use tracing::instrument; use crate::app::App; use crate::contracts::IdentityManager; +use crate::database; use crate::database::query::DatabaseQuery as _; -use crate::ethereum::write::TransactionId; +use crate::database::Database; use crate::identity_tree::{ AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, }; @@ -21,21 +22,21 @@ use crate::prover::identity::Identity; use crate::prover::Prover; use crate::task_monitor::TaskMonitor; use crate::utils::batch_type::BatchType; -use crate::utils::index_packing::pack_indices; /// The number of seconds either side of the timer tick to treat as enough to /// trigger a forced batch insertion. const DEBOUNCE_THRESHOLD_SECS: i64 = 1; -pub async fn process_identities( +pub async fn create_batches( app: Arc, - monitored_txs_sender: Arc>, + next_batch_notify: Arc, wake_up_notify: Arc, ) -> anyhow::Result<()> { tracing::info!("Awaiting for a clean slate"); app.identity_manager.await_clean_slate().await?; - tracing::info!("Starting identity processor."); + tracing::info!("Starting batch creator."); + ensure_batch_chain_initialized(&app).await?; // We start a timer and force it to perform one initial tick to avoid an // immediate trigger. @@ -83,12 +84,18 @@ pub async fn process_identities( .batching_tree() .peek_next_updates(batch_size); + if updates.is_empty() { + tracing::trace!("No updates found. Waiting."); + continue; + } + // If the batch is a deletion, process immediately without resetting the timer if batch_type.is_deletion() { commit_identities( + &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &monitored_txs_sender, + &next_batch_notify, &updates, ) .await?; @@ -107,9 +114,10 @@ pub async fn process_identities( // process the batch if updates.len() >= batch_size || batch_time_elapsed { commit_identities( + &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &monitored_txs_sender, + &next_batch_notify, &updates, ) .await?; @@ -142,9 +150,10 @@ pub async fn process_identities( // If the next batch is deletion, process the current insertion batch if next_batch_is_deletion { commit_identities( + &app.database, &app.identity_manager, app.tree_state()?.batching_tree(), - &monitored_txs_sender, + &next_batch_notify, &updates, ) .await?; @@ -166,14 +175,25 @@ pub async fn process_identities( } } +async fn ensure_batch_chain_initialized(app: &Arc) -> anyhow::Result<()> { + let batch_head = app.database.get_batch_head().await?; + if batch_head.is_none() { + app.database + .insert_new_batch_head(&app.tree_state()?.batching_tree().get_root()) + .await?; + } + Ok(()) +} + async fn commit_identities( + database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, - monitored_txs_sender: &mpsc::Sender, + next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], ) -> anyhow::Result<()> { // If the update is an insertion - let tx_id = if updates + if updates .first() .context("Updates should be > 1")? .update @@ -190,7 +210,7 @@ async fn commit_identities( "Insertion batch", ); - insert_identities(identity_manager, batching_tree, updates, &prover).await? + insert_identities(database, batching_tree, next_batch_notify, updates, &prover).await } else { let prover = identity_manager .get_suitable_deletion_prover(updates.len()) @@ -202,27 +222,22 @@ async fn commit_identities( "Deletion batch" ); - delete_identities(identity_manager, batching_tree, updates, &prover).await? - }; - - if let Some(tx_id) = tx_id { - monitored_txs_sender.send(tx_id).await?; + delete_identities(database, batching_tree, next_batch_notify, updates, &prover).await } - - Ok(()) } #[instrument(level = "info", skip_all)] pub async fn insert_identities( - identity_manager: &IdentityManager, + database: &Database, batching_tree: &TreeVersion, + next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], prover: &Prover, -) -> anyhow::Result> { +) -> anyhow::Result<()> { assert_updates_are_consecutive(updates); - let start_index = updates[0].update.leaf_index; - let pre_root: U256 = batching_tree.get_root().into(); + let pre_root = batching_tree.get_root(); + let mut insertion_indices: Vec<_> = updates.iter().map(|f| f.update.leaf_index).collect(); let mut commitments: Vec = updates .iter() .map(|update| update.update.element.into()) @@ -271,6 +286,7 @@ pub async fn insert_identities( for i in start_index..(start_index + padding) { let proof = latest_tree_from_updates.proof(i); merkle_proofs.push(proof); + insertion_indices.push(i); } } @@ -287,60 +303,42 @@ pub async fn insert_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root: U256 = latest_tree_from_updates.root().into(); + let post_root = latest_tree_from_updates.root(); let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - - identity_manager.validate_merkle_proofs(&identity_commitments)?; - - // We prepare the proof before reserving a slot in the pending identities - let proof = IdentityManager::prepare_insertion_proof( - prover, - start_index, - pre_root, - &identity_commitments, - post_root, - ) - .await?; + let start_index = *insertion_indices.first().unwrap(); tracing::info!( start_index, ?pre_root, ?post_root, - "Submitting insertion batch" + "Submitting insertion batch to DB" ); - // With all the data prepared we can submit the identities to the on-chain - // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager - .register_identities( - start_index, - pre_root, - post_root, - identity_commitments, - proof, + // With all the data prepared we can submit the batch to database. + database + .insert_new_batch( + &post_root, + &pre_root, + database::types::BatchType::Insertion, + &identity_commitments, + &insertion_indices, ) - .await - .map_err(|e| { - tracing::error!(?e, "Failed to insert identity to contract."); - e - })?; + .await?; tracing::info!( start_index, ?pre_root, ?post_root, - ?transaction_id, - "Insertion batch submitted" + "Insertion batch submitted to DB" ); - // Update the batching tree only after submitting the identities to the chain - batching_tree.apply_updates_up_to(post_root.into()); - - tracing::info!(start_index, ?pre_root, ?post_root, "Tree updated"); + next_batch_notify.notify_one(); TaskMonitor::log_batch_size(updates.len()); - Ok(Some(transaction_id)) + batching_tree.apply_updates_up_to(post_root); + + Ok(()) } fn assert_updates_are_consecutive(updates: &[AppliedTreeUpdate]) { @@ -368,21 +366,18 @@ fn assert_updates_are_consecutive(updates: &[AppliedTreeUpdate]) { } pub async fn delete_identities( - identity_manager: &IdentityManager, + database: &Database, batching_tree: &TreeVersion, + next_batch_notify: &Arc, updates: &[AppliedTreeUpdate], prover: &Prover, -) -> anyhow::Result> { +) -> anyhow::Result<()> { // Grab the initial conditions before the updates are applied to the tree. - let pre_root: U256 = batching_tree.get_root().into(); + let pre_root = batching_tree.get_root(); - let mut deletion_indices = updates - .iter() - .map(|f| f.update.leaf_index as u32) - .collect::>(); + let mut deletion_indices: Vec<_> = updates.iter().map(|f| f.update.leaf_index).collect(); - let commitments = - batching_tree.commitments_by_indices(deletion_indices.iter().map(|x| *x as usize)); + let commitments = batching_tree.commitments_by_indices(deletion_indices.iter().copied()); let mut commitments: Vec = commitments.into_iter().map(U256::from).collect(); let latest_tree_from_updates = updates @@ -419,7 +414,7 @@ pub async fn delete_identities( // ensure that our batches match that size. We do this by padding deletion // indices with tree.depth() ^ 2. The deletion prover will skip the proof for // any deletion with an index greater than the max tree depth - let pad_index = 2_u32.pow(latest_tree_from_updates.depth() as u32); + let pad_index = 2_u32.pow(latest_tree_from_updates.depth() as u32) as usize; if commitment_count != batch_size { let padding = batch_size - commitment_count; @@ -442,50 +437,46 @@ pub async fn delete_identities( // With the updates applied we can grab the value of the tree's new root and // build our identities for sending to the identity manager. - let post_root: U256 = latest_tree_from_updates.root().into(); + let post_root = latest_tree_from_updates.root(); let identity_commitments = zip_commitments_and_proofs(commitments, merkle_proofs); - identity_manager.validate_merkle_proofs(&identity_commitments)?; + tracing::info!(?pre_root, ?post_root, "Submitting deletion batch to DB"); - // We prepare the proof before reserving a slot in the pending identities - let proof = IdentityManager::prepare_deletion_proof( - prover, - pre_root, - deletion_indices.clone(), - identity_commitments, - post_root, - ) - .await?; + // With all the data prepared we can submit the batch to database. + database + .insert_new_batch( + &post_root, + &pre_root, + database::types::BatchType::Deletion, + &identity_commitments, + &deletion_indices, + ) + .await?; - let packed_deletion_indices = pack_indices(&deletion_indices); + tracing::info!(?pre_root, ?post_root, "Deletion batch submitted to DB"); - tracing::info!(?pre_root, ?post_root, "Submitting deletion batch"); + next_batch_notify.notify_one(); - // With all the data prepared we can submit the identities to the on-chain - // identity manager and wait for that transaction to be mined. - let transaction_id = identity_manager - .delete_identities(proof, packed_deletion_indices, pre_root, post_root) - .await - .map_err(|e| { - tracing::error!(?e, "Failed to insert identity to contract."); - e - })?; + TaskMonitor::log_batch_size(updates.len()); - tracing::info!( - ?pre_root, - ?post_root, - ?transaction_id, - "Deletion batch submitted" - ); + batching_tree.apply_updates_up_to(post_root); - // Update the batching tree only after submitting the identities to the chain - batching_tree.apply_updates_up_to(post_root.into()); + Ok(()) +} - tracing::info!(?pre_root, ?post_root, "Tree updated"); +fn determine_batch_type(tree: &TreeVersion) -> Option { + let next_update = tree.peek_next_updates(1); + if next_update.is_empty() { + return None; + } - TaskMonitor::log_batch_size(updates.len()); + let batch_type = if next_update[0].update.element == Hash::ZERO { + BatchType::Deletion + } else { + BatchType::Insertion + }; - Ok(Some(transaction_id)) + Some(batch_type) } fn zip_commitments_and_proofs( @@ -508,18 +499,3 @@ fn zip_commitments_and_proofs( }) .collect() } - -fn determine_batch_type(tree: &TreeVersion) -> Option { - let next_update = tree.peek_next_updates(1); - if next_update.is_empty() { - return None; - } - - let batch_type = if next_update[0].update.element == Hash::ZERO { - BatchType::Deletion - } else { - BatchType::Insertion - }; - - Some(batch_type) -} diff --git a/src/task_monitor/tasks/mod.rs b/src/task_monitor/tasks/mod.rs index ec1861b3..fbbee77e 100644 --- a/src/task_monitor/tasks/mod.rs +++ b/src/task_monitor/tasks/mod.rs @@ -1,6 +1,7 @@ +pub mod create_batches; pub mod delete_identities; pub mod finalize_identities; pub mod insert_identities; pub mod monitor_queue; pub mod monitor_txs; -pub mod process_identities; +pub mod process_batches; diff --git a/src/task_monitor/tasks/process_batches.rs b/src/task_monitor/tasks/process_batches.rs new file mode 100644 index 00000000..06ed0036 --- /dev/null +++ b/src/task_monitor/tasks/process_batches.rs @@ -0,0 +1,206 @@ +use std::sync::Arc; +use std::time::Duration; + +use ethers::types::U256; +use tokio::sync::{mpsc, Notify}; +use tokio::{select, time}; +use tracing::instrument; + +use crate::app::App; +use crate::contracts::IdentityManager; +use crate::database::query::DatabaseQuery as _; +use crate::database::types::{BatchEntry, BatchType}; +use crate::ethereum::write::TransactionId; +use crate::prover::Prover; +use crate::utils::index_packing::pack_indices; + +pub async fn process_batches( + app: Arc, + monitored_txs_sender: Arc>, + next_batch_notify: Arc, + wake_up_notify: Arc, +) -> anyhow::Result<()> { + tracing::info!("Awaiting for a clean slate"); + app.identity_manager.await_clean_slate().await?; + + // This is a tricky way to know that we are not changing data during tree + // initialization process. + _ = app.tree_state()?; + tracing::info!("Starting identity processor."); + + // We start a timer and force it to perform one initial tick to avoid an + // immediate trigger. + let mut timer = time::interval(Duration::from_secs(5)); + + loop { + // We wait either for a timer tick or a full batch + select! { + _ = timer.tick() => { + tracing::info!("Identity processor woken due to timeout"); + } + + () = next_batch_notify.notified() => { + tracing::trace!("Identity processor woken due to next batch creation"); + }, + + () = wake_up_notify.notified() => { + tracing::trace!("Identity processor woken due to request"); + }, + } + + let next_batch = app.database.get_next_batch_without_transaction().await?; + let Some(next_batch) = next_batch else { + continue; + }; + + let tx_id = + commit_identities(&app.identity_manager, &monitored_txs_sender, &next_batch).await?; + + if let Some(tx_id) = tx_id { + app.database + .insert_new_transaction(&tx_id.0, &next_batch.next_root) + .await?; + } + + // We want to check if there's a full batch available immediately + wake_up_notify.notify_one(); + } +} + +async fn commit_identities( + identity_manager: &IdentityManager, + monitored_txs_sender: &mpsc::Sender, + batch: &BatchEntry, +) -> anyhow::Result> { + // If the update is an insertion + let tx_id = if batch.batch_type == BatchType::Insertion { + let prover = identity_manager + .get_suitable_insertion_prover(batch.data.0.identities.len()) + .await?; + + tracing::info!( + num_updates = batch.data.0.identities.len(), + batch_size = prover.batch_size(), + "Insertion batch", + ); + + insert_identities(identity_manager, &prover, batch).await? + } else { + let prover = identity_manager + .get_suitable_deletion_prover(batch.data.0.identities.len()) + .await?; + + tracing::info!( + num_updates = batch.data.0.identities.len(), + batch_size = prover.batch_size(), + "Deletion batch" + ); + + delete_identities(identity_manager, &prover, batch).await? + }; + + if let Some(tx_id) = tx_id.clone() { + monitored_txs_sender.send(tx_id).await?; + } + + Ok(tx_id) +} + +#[instrument(level = "info", skip_all)] +pub async fn insert_identities( + identity_manager: &IdentityManager, + prover: &Prover, + batch: &BatchEntry, +) -> anyhow::Result> { + identity_manager.validate_merkle_proofs(&batch.data.0.identities)?; + let start_index = *batch.data.0.indexes.first().expect("Should exist."); + let pre_root: U256 = batch.prev_root.expect("Should exist.").into(); + let post_root: U256 = batch.next_root.into(); + + // We prepare the proof before reserving a slot in the pending identities + let proof = IdentityManager::prepare_insertion_proof( + prover, + start_index, + pre_root, + &batch.data.0.identities, + post_root, + ) + .await?; + + tracing::info!( + start_index, + ?pre_root, + ?post_root, + "Submitting insertion batch" + ); + + // With all the data prepared we can submit the identities to the on-chain + // identity manager and wait for that transaction to be mined. + let transaction_id = identity_manager + .register_identities( + start_index, + pre_root, + post_root, + batch.data.0.identities.clone(), + proof, + ) + .await + .map_err(|e| { + tracing::error!(?e, "Failed to insert identity to contract."); + e + })?; + + tracing::info!( + start_index, + ?pre_root, + ?post_root, + ?transaction_id, + "Insertion batch submitted" + ); + + Ok(Some(transaction_id)) +} + +pub async fn delete_identities( + identity_manager: &IdentityManager, + prover: &Prover, + batch: &BatchEntry, +) -> anyhow::Result> { + identity_manager.validate_merkle_proofs(&batch.data.0.identities)?; + let pre_root: U256 = batch.prev_root.expect("Should exist.").into(); + let post_root: U256 = batch.next_root.into(); + let deletion_indices: Vec<_> = batch.data.0.indexes.iter().map(|&v| v as u32).collect(); + + // We prepare the proof before reserving a slot in the pending identities + let proof = IdentityManager::prepare_deletion_proof( + prover, + pre_root, + deletion_indices.clone(), + batch.data.0.identities.clone(), + post_root, + ) + .await?; + + let packed_deletion_indices = pack_indices(&deletion_indices); + + tracing::info!(?pre_root, ?post_root, "Submitting deletion batch"); + + // With all the data prepared we can submit the identities to the on-chain + // identity manager and wait for that transaction to be mined. + let transaction_id = identity_manager + .delete_identities(proof, packed_deletion_indices, pre_root, post_root) + .await + .map_err(|e| { + tracing::error!(?e, "Failed to insert identity to contract."); + e + })?; + + tracing::info!( + ?pre_root, + ?post_root, + ?transaction_id, + "Deletion batch submitted" + ); + + Ok(Some(transaction_id)) +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 65a9f15c..43df3e2f 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -64,6 +64,7 @@ pub mod prelude { spawn_mock_insertion_prover, test_inclusion_proof, test_insert_identity, test_verify_proof, test_verify_proof_on_chain, }; + pub use crate::common::chain_mock::spawn_mock_chain; pub use crate::common::test_same_tree_states; } @@ -747,7 +748,7 @@ pub async fn spawn_deps<'a, 'b, 'c>( )) } -async fn spawn_db(docker: &Cli) -> anyhow::Result> { +async fn spawn_db(docker: &Cli) -> anyhow::Result { let db_container = postgres_docker_utils::setup(docker).await.unwrap(); Ok(db_container) diff --git a/tests/tree_restore_multiple_commitments.rs b/tests/tree_restore_multiple_commitments.rs index 8ab70adb..e1f3cfe2 100644 --- a/tests/tree_restore_multiple_commitments.rs +++ b/tests/tree_restore_multiple_commitments.rs @@ -5,7 +5,7 @@ use common::prelude::*; const IDLE_TIME: u64 = 7; #[tokio::test] -async fn tree_restore_multiple_comittments() -> anyhow::Result<()> { +async fn tree_restore_multiple_commitments() -> anyhow::Result<()> { // Initialize logging for the test. init_tracing_subscriber(); info!("Starting integration test"); diff --git a/tests/tree_restore_one_commitment.rs b/tests/tree_restore_one_commitment.rs index 37b25f30..7e38b106 100644 --- a/tests/tree_restore_one_commitment.rs +++ b/tests/tree_restore_one_commitment.rs @@ -5,7 +5,7 @@ use common::prelude::*; const IDLE_TIME: u64 = 7; #[tokio::test] -async fn tree_restore_one_comittment() -> anyhow::Result<()> { +async fn tree_restore_one_commitment() -> anyhow::Result<()> { // Initialize logging for the test. init_tracing_subscriber(); info!("Starting integration test"); diff --git a/tests/tree_restore_with_root_back_to_init.rs b/tests/tree_restore_with_root_back_to_init.rs new file mode 100644 index 00000000..1f77b89d --- /dev/null +++ b/tests/tree_restore_with_root_back_to_init.rs @@ -0,0 +1,202 @@ +mod common; + +use common::prelude::*; + +const IDLE_TIME: u64 = 7; + +#[tokio::test] +async fn tree_restore_with_root_back_to_init() -> anyhow::Result<()> { + // Initialize logging for the test. + init_tracing_subscriber(); + info!("Starting integration test"); + + let batch_size: usize = 3; + + let mut ref_tree = PoseidonTree::new(DEFAULT_TREE_DEPTH + 1, ruint::Uint::ZERO); + let initial_root: U256 = ref_tree.root().into(); + + let docker = Cli::default(); + let (mock_chain, db_container, insertion_prover_map, _, micro_oz) = spawn_deps( + initial_root, + &[batch_size], + &[], + DEFAULT_TREE_DEPTH as u8, + &docker, + ) + .await?; + + let prover_mock = &insertion_prover_map[&batch_size]; + + let db_socket_addr = db_container.address(); + let db_url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); + + // temp dir will be deleted on drop call + let temp_dir = tempfile::tempdir()?; + info!( + "temp dir created at: {:?}", + temp_dir.path().join("testfile") + ); + + let config = TestConfigBuilder::new() + .db_url(&db_url) + .oz_api_url(µ_oz.endpoint()) + .oz_address(micro_oz.address()) + .identity_manager_address(mock_chain.identity_manager.address()) + .primary_network_provider(mock_chain.anvil.endpoint()) + .cache_file(temp_dir.path().join("testfile").to_str().unwrap()) + .add_prover(prover_mock) + .build()?; + + let (app, app_handle, local_addr) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app."); + + let test_identities = generate_test_identities(3); + let identities_ref: Vec = test_identities + .iter() + .map(|i| Hash::from_str_radix(i, 16).unwrap()) + .collect(); + + let uri = "http://".to_owned() + &local_addr.to_string(); + let client = Client::new(); + + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 0).await; + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 1).await; + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 2).await; + + tokio::time::sleep(Duration::from_secs(IDLE_TIME)).await; + + // Check that we can also get these inclusion proofs back. + test_inclusion_proof( + &uri, + &client, + 0, + &ref_tree, + &Hash::from_str_radix(&test_identities[0], 16) + .expect("Failed to parse Hash from test leaf 0"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 1, + &ref_tree, + &Hash::from_str_radix(&test_identities[1], 16) + .expect("Failed to parse Hash from test leaf 1"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 2, + &ref_tree, + &Hash::from_str_radix(&test_identities[2], 16) + .expect("Failed to parse Hash from test leaf 2"), + false, + ) + .await; + + let tree_state = app.tree_state()?.clone(); + + assert_eq!(tree_state.latest_tree().next_leaf(), 3); + + // Shutdown the app and reset the mock shutdown, allowing us to test the + // behaviour with saved data. + info!("Stopping the app for testing purposes"); + shutdown(); + app_handle.await.unwrap(); + reset_shutdown(); + + drop(mock_chain); + drop(micro_oz); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let mock_chain = + spawn_mock_chain(initial_root, &[batch_size], &[], DEFAULT_TREE_DEPTH as u8).await?; + let micro_oz = + micro_oz::spawn(mock_chain.anvil.endpoint(), mock_chain.private_key.clone()).await?; + + let config = TestConfigBuilder::new() + .db_url(&db_url) + .oz_api_url(µ_oz.endpoint()) + .oz_address(micro_oz.address()) + .identity_manager_address(mock_chain.identity_manager.address()) + .primary_network_provider(mock_chain.anvil.endpoint()) + .cache_file(temp_dir.path().join("testfile").to_str().unwrap()) + .add_prover(prover_mock) + .build()?; + + let (app, app_handle, local_addr) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app."); + + let uri = "http://".to_owned() + &local_addr.to_string(); + + let restored_tree_state = app.tree_state()?.clone(); + + assert_eq!( + restored_tree_state.latest_tree().get_root(), + tree_state.latest_tree().get_root() + ); + assert_eq!( + restored_tree_state.batching_tree().get_root(), + initial_root.into() + ); + assert_eq!( + restored_tree_state.mined_tree().get_root(), + initial_root.into() + ); + assert_eq!( + restored_tree_state.processed_tree().get_root(), + initial_root.into() + ); + + tokio::time::sleep(Duration::from_secs(IDLE_TIME)).await; + + // Check that we can also get these inclusion proofs back. + test_inclusion_proof( + &uri, + &client, + 0, + &ref_tree, + &Hash::from_str_radix(&test_identities[0], 16) + .expect("Failed to parse Hash from test leaf 0"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 1, + &ref_tree, + &Hash::from_str_radix(&test_identities[1], 16) + .expect("Failed to parse Hash from test leaf 1"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 2, + &ref_tree, + &Hash::from_str_radix(&test_identities[2], 16) + .expect("Failed to parse Hash from test leaf 2"), + false, + ) + .await; + + test_same_tree_states(&tree_state, &restored_tree_state).await?; + + // Shutdown the app properly for the final time + shutdown(); + app_handle.await.unwrap(); + for (_, prover) in insertion_prover_map.into_iter() { + prover.stop(); + } + reset_shutdown(); + + Ok(()) +} diff --git a/tests/tree_restore_with_root_back_to_middle.rs b/tests/tree_restore_with_root_back_to_middle.rs new file mode 100644 index 00000000..74b4489d --- /dev/null +++ b/tests/tree_restore_with_root_back_to_middle.rs @@ -0,0 +1,239 @@ +mod common; + +use common::prelude::*; + +const IDLE_TIME: u64 = 7; + +#[tokio::test] +async fn tree_restore_with_root_back_to_middle() -> anyhow::Result<()> { + // Initialize logging for the test. + init_tracing_subscriber(); + info!("Starting integration test"); + + let batch_size: usize = 3; + + let mut ref_tree = PoseidonTree::new(DEFAULT_TREE_DEPTH + 1, ruint::Uint::ZERO); + let initial_root: U256 = ref_tree.root().into(); + + let docker = Cli::default(); + let (mock_chain, db_container, insertion_prover_map, _, micro_oz) = spawn_deps( + initial_root, + &[batch_size], + &[], + DEFAULT_TREE_DEPTH as u8, + &docker, + ) + .await?; + + let prover_mock = &insertion_prover_map[&batch_size]; + + let db_socket_addr = db_container.address(); + let db_url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); + + // temp dir will be deleted on drop call + let temp_dir = tempfile::tempdir()?; + info!( + "temp dir created at: {:?}", + temp_dir.path().join("testfile") + ); + + let config = TestConfigBuilder::new() + .db_url(&db_url) + .oz_api_url(µ_oz.endpoint()) + .oz_address(micro_oz.address()) + .identity_manager_address(mock_chain.identity_manager.address()) + .primary_network_provider(mock_chain.anvil.endpoint()) + .cache_file(temp_dir.path().join("testfile").to_str().unwrap()) + .add_prover(prover_mock) + .build()?; + + let (app, app_handle, local_addr) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app."); + + let test_identities = generate_test_identities(6); + let identities_ref: Vec = test_identities + .iter() + .map(|i| Hash::from_str_radix(i, 16).unwrap()) + .collect(); + + let uri = "http://".to_owned() + &local_addr.to_string(); + let client = Client::new(); + + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 0).await; + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 1).await; + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 2).await; + + tokio::time::sleep(Duration::from_secs(IDLE_TIME)).await; + + // Check that we can also get these inclusion proofs back. + test_inclusion_proof( + &uri, + &client, + 0, + &ref_tree, + &Hash::from_str_radix(&test_identities[0], 16) + .expect("Failed to parse Hash from test leaf 0"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 1, + &ref_tree, + &Hash::from_str_radix(&test_identities[1], 16) + .expect("Failed to parse Hash from test leaf 1"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 2, + &ref_tree, + &Hash::from_str_radix(&test_identities[2], 16) + .expect("Failed to parse Hash from test leaf 2"), + false, + ) + .await; + + let mid_root: U256 = ref_tree.root().into(); + + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 3).await; + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 4).await; + test_insert_identity(&uri, &client, &mut ref_tree, &identities_ref, 5).await; + + tokio::time::sleep(Duration::from_secs(IDLE_TIME)).await; + + // Check that we can also get these inclusion proofs back. + test_inclusion_proof( + &uri, + &client, + 3, + &ref_tree, + &Hash::from_str_radix(&test_identities[3], 16) + .expect("Failed to parse Hash from test leaf 3"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 4, + &ref_tree, + &Hash::from_str_radix(&test_identities[4], 16) + .expect("Failed to parse Hash from test leaf 4"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 5, + &ref_tree, + &Hash::from_str_radix(&test_identities[5], 16) + .expect("Failed to parse Hash from test leaf 5"), + false, + ) + .await; + + let tree_state = app.tree_state()?.clone(); + + assert_eq!(tree_state.latest_tree().next_leaf(), 6); + + // Shutdown the app and reset the mock shutdown, allowing us to test the + // behaviour with saved data. + info!("Stopping the app for testing purposes"); + shutdown(); + app_handle.await.unwrap(); + reset_shutdown(); + + drop(mock_chain); + drop(micro_oz); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let mock_chain = + spawn_mock_chain(mid_root, &[batch_size], &[], DEFAULT_TREE_DEPTH as u8).await?; + let micro_oz = + micro_oz::spawn(mock_chain.anvil.endpoint(), mock_chain.private_key.clone()).await?; + + let config = TestConfigBuilder::new() + .db_url(&db_url) + .oz_api_url(µ_oz.endpoint()) + .oz_address(micro_oz.address()) + .identity_manager_address(mock_chain.identity_manager.address()) + .primary_network_provider(mock_chain.anvil.endpoint()) + .cache_file(temp_dir.path().join("testfile").to_str().unwrap()) + .add_prover(prover_mock) + .build()?; + + let (app, app_handle, local_addr) = spawn_app(config.clone()) + .await + .expect("Failed to spawn app."); + + let uri = "http://".to_owned() + &local_addr.to_string(); + + let restored_tree_state = app.tree_state()?.clone(); + + assert_eq!( + restored_tree_state.latest_tree().get_root(), + tree_state.latest_tree().get_root() + ); + assert_eq!( + restored_tree_state.batching_tree().get_root(), + mid_root.into() + ); + assert_eq!(restored_tree_state.mined_tree().get_root(), mid_root.into()); + assert_eq!( + restored_tree_state.processed_tree().get_root(), + mid_root.into() + ); + + tokio::time::sleep(Duration::from_secs(IDLE_TIME)).await; + + // Check that we can also get these inclusion proofs back. + test_inclusion_proof( + &uri, + &client, + 3, + &ref_tree, + &Hash::from_str_radix(&test_identities[3], 16) + .expect("Failed to parse Hash from test leaf 3"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 4, + &ref_tree, + &Hash::from_str_radix(&test_identities[4], 16) + .expect("Failed to parse Hash from test leaf 4"), + false, + ) + .await; + test_inclusion_proof( + &uri, + &client, + 5, + &ref_tree, + &Hash::from_str_radix(&test_identities[5], 16) + .expect("Failed to parse Hash from test leaf 5"), + false, + ) + .await; + + test_same_tree_states(&tree_state, &restored_tree_state).await?; + + // Shutdown the app properly for the final time + shutdown(); + app_handle.await.unwrap(); + for (_, prover) in insertion_prover_map.into_iter() { + prover.stop(); + } + reset_shutdown(); + + Ok(()) +}