diff --git a/src/contracts/mod.rs b/src/contracts/mod.rs index aa8cbf2f..3df8819a 100644 --- a/src/contracts/mod.rs +++ b/src/contracts/mod.rs @@ -323,7 +323,7 @@ impl IdentityManager { } #[instrument(level = "debug", skip(self))] - pub async fn mine_identities(&self, transaction_id: TransactionId) -> anyhow::Result { + pub async fn mine_transaction(&self, transaction_id: TransactionId) -> anyhow::Result { let result = self.ethereum.mine_transaction(transaction_id).await?; Ok(result) @@ -345,7 +345,7 @@ impl IdentityManager { for pending_identity_tx in pending_identities { // Ignores the result of each transaction - we only care about a clean slate in // terms of pending transactions - drop(self.mine_identities(pending_identity_tx).await); + drop(self.mine_transaction(pending_identity_tx).await); } Ok(()) diff --git a/src/task_monitor.rs b/src/task_monitor.rs index cfcc9a10..82d3d464 100644 --- a/src/task_monitor.rs +++ b/src/task_monitor.rs @@ -5,13 +5,14 @@ use anyhow::Result as AnyhowResult; use clap::Parser; use once_cell::sync::Lazy; use prometheus::{linear_buckets, register_gauge, register_histogram, Gauge, Histogram}; -use tokio::sync::{broadcast, Notify, RwLock}; +use tokio::sync::{broadcast, mpsc, Notify, RwLock}; use tokio::task::JoinHandle; use tracing::{info, instrument, warn}; use self::tasks::delete_identities::DeleteIdentities; use self::tasks::finalize_identities::FinalizeRoots; use self::tasks::insert_identities::InsertIdentities; +use self::tasks::monitor_txs::MonitorTxs; use self::tasks::process_identities::ProcessIdentities; use crate::contracts::SharedIdentityManager; use crate::database::Database; @@ -98,12 +99,6 @@ pub struct Options { #[clap(long, env, default_value = "0")] pub max_epoch_duration_seconds: u64, - /// How many identities can be held in the API insertion queue at any given - /// time Past this limit the API request will block until the queue has - /// space for the insertion. - #[clap(long, env, default_value = "100")] - pub insert_identities_capacity: usize, - /// The maximum number of windows to scan for finalization logs #[clap(long, env, default_value = "100")] pub scanning_window_size: u64, @@ -111,6 +106,10 @@ pub struct Options { /// The number of seconds to wait between fetching logs #[clap(long, env, default_value = "30")] pub time_between_scans_seconds: u64, + + /// The number of txs in the channel that we'll be monitoring + #[clap(long, env, default_value = "100")] + pub monitored_txs_capacity: usize, } /// A worker that commits identities to the blockchain. @@ -138,6 +137,7 @@ pub struct TaskMonitor { batch_deletion_timeout_seconds: i64, // TODO: docs min_batch_deletion_size: usize, + monitored_txs_capacity: usize, } impl TaskMonitor { @@ -151,10 +151,10 @@ impl TaskMonitor { batch_timeout_seconds, scanning_window_size, time_between_scans_seconds, - batch_deletion_timeout_seconds: _, - min_batch_deletion_size: _, - insert_identities_capacity: _, max_epoch_duration_seconds, + monitored_txs_capacity, + batch_deletion_timeout_seconds, + min_batch_deletion_size, } = *options; Self { @@ -165,9 +165,10 @@ impl TaskMonitor { batch_insert_timeout_secs: batch_timeout_seconds, scanning_window_size, time_between_scans: Duration::from_secs(time_between_scans_seconds), - batch_deletion_timeout_seconds: options.batch_deletion_timeout_seconds, - min_batch_deletion_size: options.min_batch_deletion_size, + batch_deletion_timeout_seconds, + min_batch_deletion_size, max_epoch_duration: Duration::from_secs(max_epoch_duration_seconds), + monitored_txs_capacity, } } @@ -182,6 +183,9 @@ impl TaskMonitor { // but for symmetry's sake we create it for every task with `.subscribe()` let (shutdown_sender, _) = broadcast::channel(1); + let (monitored_txs_sender, monitored_txs_receiver) = + mpsc::channel(self.monitored_txs_capacity); + let wake_up_notify = Arc::new(Notify::new()); // Immediately notify so we can start processing if we have pending identities // in the database @@ -214,6 +218,7 @@ impl TaskMonitor { self.identity_manager.clone(), self.tree_state.get_batching_tree(), self.batch_insert_timeout_secs, + monitored_txs_sender, wake_up_notify.clone(), ); @@ -225,6 +230,16 @@ impl TaskMonitor { handles.push(process_identities_handle); + let monitor_txs = MonitorTxs::new(self.identity_manager.clone(), monitored_txs_receiver); + + let monitor_txs_handle = crate::utils::spawn_monitored_with_backoff( + move || monitor_txs.clone().run(), + shutdown_sender.clone(), + PROCESS_IDENTITIES_BACKOFF, + ); + + handles.push(monitor_txs_handle); + // Insert identities task let insert_identities = InsertIdentities::new( self.database.clone(), diff --git a/src/task_monitor/tasks/mod.rs b/src/task_monitor/tasks/mod.rs index b3f3569b..c4ffb8f0 100644 --- a/src/task_monitor/tasks/mod.rs +++ b/src/task_monitor/tasks/mod.rs @@ -1,4 +1,5 @@ pub mod delete_identities; pub mod finalize_identities; pub mod insert_identities; +pub mod monitor_txs; pub mod process_identities; diff --git a/src/task_monitor/tasks/monitor_txs.rs b/src/task_monitor/tasks/monitor_txs.rs new file mode 100644 index 00000000..f8c86d46 --- /dev/null +++ b/src/task_monitor/tasks/monitor_txs.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use anyhow::Result as AnyhowResult; +use tokio::sync::{mpsc, Mutex}; + +use crate::contracts::{IdentityManager, SharedIdentityManager}; +use crate::ethereum::write::TransactionId; + +pub struct MonitorTxs { + identity_manager: SharedIdentityManager, + monitored_txs_receiver: Arc>>, +} + +impl MonitorTxs { + pub fn new( + identity_manager: SharedIdentityManager, + monitored_txs_receiver: mpsc::Receiver, + ) -> Arc { + Arc::new(Self { + identity_manager, + monitored_txs_receiver: Arc::new(Mutex::new(monitored_txs_receiver)), + }) + } + + pub async fn run(self: Arc) -> anyhow::Result<()> { + monitor_txs_loop(&self.identity_manager, &self.monitored_txs_receiver).await?; + + Ok(()) + } +} + +async fn monitor_txs_loop( + identity_manager: &IdentityManager, + monitored_txs_receiver: &Mutex>, +) -> AnyhowResult<()> { + let mut monitored_txs_receiver = monitored_txs_receiver.lock().await; + + while let Some(tx) = monitored_txs_receiver.recv().await { + if !identity_manager.mine_transaction(tx.clone()).await? { + panic!("Failed to mine transaction: {}", tx); + } + } + + Ok(()) +} diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 17420f20..4e086bb9 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -7,12 +7,13 @@ use ethers::types::U256; use ruint::Uint; use semaphore::merkle_tree::Proof; use semaphore::poseidon_tree::Branch; -use tokio::sync::Notify; +use tokio::sync::{mpsc, Notify}; use tokio::{select, time}; use tracing::{debug, error, info, instrument, warn}; use crate::contracts::{IdentityManager, SharedIdentityManager}; use crate::database::Database; +use crate::ethereum::write::TransactionId; use crate::identity_tree::{ AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion, }; @@ -30,6 +31,7 @@ pub struct ProcessIdentities { identity_manager: SharedIdentityManager, batching_tree: TreeVersion, batch_insert_timeout_secs: u64, + monitored_txs_sender: mpsc::Sender, wake_up_notify: Arc, } @@ -39,6 +41,7 @@ impl ProcessIdentities { identity_manager: SharedIdentityManager, batching_tree: TreeVersion, batch_insert_timeout_secs: u64, + monitored_txs_sender: mpsc::Sender, wake_up_notify: Arc, ) -> Arc { Arc::new(Self { @@ -46,6 +49,7 @@ impl ProcessIdentities { identity_manager, batching_tree, batch_insert_timeout_secs, + monitored_txs_sender, wake_up_notify, }) } @@ -55,6 +59,7 @@ impl ProcessIdentities { &self.database, &self.identity_manager, &self.batching_tree, + &self.monitored_txs_sender, &self.wake_up_notify, self.batch_insert_timeout_secs, ) @@ -66,6 +71,7 @@ async fn process_identities( database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, + monitored_txs_sender: &mpsc::Sender, wake_up_notify: &Notify, timeout_secs: u64, ) -> AnyhowResult<()> { @@ -121,6 +127,7 @@ async fn process_identities( database, identity_manager, batching_tree, + monitored_txs_sender, &updates, ).await?; @@ -180,6 +187,7 @@ async fn process_identities( database, identity_manager, batching_tree, + monitored_txs_sender, &updates, ).await?; @@ -201,10 +209,11 @@ async fn commit_identities( database: &Database, identity_manager: &IdentityManager, batching_tree: &TreeVersion, + monitored_txs_sender: &mpsc::Sender, updates: &[AppliedTreeUpdate], ) -> AnyhowResult<()> { // If the update is an insertion - if updates + let tx_id = if updates .first() .context("Updates should be > 1")? .update @@ -221,7 +230,7 @@ async fn commit_identities( prover.batch_size() ); - insert_identities(database, identity_manager, batching_tree, updates, prover).await?; + insert_identities(database, identity_manager, batching_tree, updates, prover).await? } else { let prover = identity_manager .get_suitable_deletion_prover(updates.len()) @@ -233,7 +242,11 @@ async fn commit_identities( prover.batch_size() ); - delete_identities(database, identity_manager, batching_tree, updates, prover).await?; + delete_identities(database, identity_manager, batching_tree, updates, prover).await? + }; + + if let Some(tx_id) = tx_id { + monitored_txs_sender.send(tx_id).await?; } Ok(()) @@ -246,12 +259,12 @@ pub async fn insert_identities( batching_tree: &TreeVersion, updates: &[AppliedTreeUpdate], prover: ReadOnlyProver<'_, Prover>, -) -> AnyhowResult<()> { +) -> AnyhowResult> { TaskMonitor::log_identities_queues(database).await?; if updates.is_empty() { warn!("Identity commit requested with zero identities. Continuing."); - return Ok(()); + return Ok(None); } debug!("Starting identity commit for {} identities.", updates.len()); @@ -422,7 +435,7 @@ pub async fn insert_identities( TaskMonitor::log_batch_size(updates.len()); - Ok(()) + Ok(Some(transaction_id)) } pub async fn delete_identities( @@ -431,12 +444,12 @@ pub async fn delete_identities( batching_tree: &TreeVersion, updates: &[AppliedTreeUpdate], prover: ReadOnlyProver<'_, Prover>, -) -> AnyhowResult<()> { +) -> AnyhowResult> { TaskMonitor::log_identities_queues(database).await?; if updates.is_empty() { warn!("Identity commit requested with zero identities. Continuing."); - return Ok(()); + return Ok(None); } debug!("Starting identity commit for {} identities.", updates.len()); @@ -569,5 +582,5 @@ pub async fn delete_identities( TaskMonitor::log_batch_size(updates.len()); - Ok(()) + Ok(Some(transaction_id)) }