Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring back tx monitoring #634

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/contracts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl IdentityManager {
}

#[instrument(level = "debug", skip(self))]
pub async fn mine_identities(&self, transaction_id: TransactionId) -> anyhow::Result<bool> {
pub async fn mine_transaction(&self, transaction_id: TransactionId) -> anyhow::Result<bool> {
let result = self.ethereum.mine_transaction(transaction_id).await?;

Ok(result)
Expand All @@ -345,7 +345,7 @@ impl IdentityManager {
for pending_identity_tx in pending_identities {
// Ignores the result of each transaction - we only care about a clean slate in
// terms of pending transactions
drop(self.mine_identities(pending_identity_tx).await);
drop(self.mine_transaction(pending_identity_tx).await);
}

Ok(())
Expand Down
39 changes: 27 additions & 12 deletions src/task_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use anyhow::Result as AnyhowResult;
use clap::Parser;
use once_cell::sync::Lazy;
use prometheus::{linear_buckets, register_gauge, register_histogram, Gauge, Histogram};
use tokio::sync::{broadcast, Notify, RwLock};
use tokio::sync::{broadcast, mpsc, Notify, RwLock};
use tokio::task::JoinHandle;
use tracing::{info, instrument, warn};

use self::tasks::delete_identities::DeleteIdentities;
use self::tasks::finalize_identities::FinalizeRoots;
use self::tasks::insert_identities::InsertIdentities;
use self::tasks::monitor_txs::MonitorTxs;
use self::tasks::process_identities::ProcessIdentities;
use crate::contracts::SharedIdentityManager;
use crate::database::Database;
Expand Down Expand Up @@ -98,19 +99,17 @@ pub struct Options {
#[clap(long, env, default_value = "0")]
pub max_epoch_duration_seconds: u64,

/// How many identities can be held in the API insertion queue at any given
/// time Past this limit the API request will block until the queue has
/// space for the insertion.
#[clap(long, env, default_value = "100")]
pub insert_identities_capacity: usize,

/// The maximum number of windows to scan for finalization logs
#[clap(long, env, default_value = "100")]
pub scanning_window_size: u64,

/// The number of seconds to wait between fetching logs
#[clap(long, env, default_value = "30")]
pub time_between_scans_seconds: u64,

/// The number of txs in the channel that we'll be monitoring
#[clap(long, env, default_value = "100")]
pub monitored_txs_capacity: usize,
}

/// A worker that commits identities to the blockchain.
Expand Down Expand Up @@ -138,6 +137,7 @@ pub struct TaskMonitor {
batch_deletion_timeout_seconds: i64,
// TODO: docs
min_batch_deletion_size: usize,
monitored_txs_capacity: usize,
}

impl TaskMonitor {
Expand All @@ -151,10 +151,10 @@ impl TaskMonitor {
batch_timeout_seconds,
scanning_window_size,
time_between_scans_seconds,
batch_deletion_timeout_seconds: _,
min_batch_deletion_size: _,
insert_identities_capacity: _,
max_epoch_duration_seconds,
monitored_txs_capacity,
batch_deletion_timeout_seconds,
min_batch_deletion_size,
} = *options;

Self {
Expand All @@ -165,9 +165,10 @@ impl TaskMonitor {
batch_insert_timeout_secs: batch_timeout_seconds,
scanning_window_size,
time_between_scans: Duration::from_secs(time_between_scans_seconds),
batch_deletion_timeout_seconds: options.batch_deletion_timeout_seconds,
min_batch_deletion_size: options.min_batch_deletion_size,
batch_deletion_timeout_seconds,
min_batch_deletion_size,
max_epoch_duration: Duration::from_secs(max_epoch_duration_seconds),
monitored_txs_capacity,
}
}

Expand All @@ -182,6 +183,9 @@ impl TaskMonitor {
// but for symmetry's sake we create it for every task with `.subscribe()`
let (shutdown_sender, _) = broadcast::channel(1);

let (monitored_txs_sender, monitored_txs_receiver) =
mpsc::channel(self.monitored_txs_capacity);

let wake_up_notify = Arc::new(Notify::new());
// Immediately notify so we can start processing if we have pending identities
// in the database
Expand Down Expand Up @@ -214,6 +218,7 @@ impl TaskMonitor {
self.identity_manager.clone(),
self.tree_state.get_batching_tree(),
self.batch_insert_timeout_secs,
monitored_txs_sender,
wake_up_notify.clone(),
);

Expand All @@ -225,6 +230,16 @@ impl TaskMonitor {

handles.push(process_identities_handle);

let monitor_txs = MonitorTxs::new(self.identity_manager.clone(), monitored_txs_receiver);

let monitor_txs_handle = crate::utils::spawn_monitored_with_backoff(
move || monitor_txs.clone().run(),
shutdown_sender.clone(),
PROCESS_IDENTITIES_BACKOFF,
);

handles.push(monitor_txs_handle);

// Insert identities task
let insert_identities = InsertIdentities::new(
self.database.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/task_monitor/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod delete_identities;
pub mod finalize_identities;
pub mod insert_identities;
pub mod monitor_txs;
pub mod process_identities;
45 changes: 45 additions & 0 deletions src/task_monitor/tasks/monitor_txs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::sync::Arc;

use anyhow::Result as AnyhowResult;
use tokio::sync::{mpsc, Mutex};

use crate::contracts::{IdentityManager, SharedIdentityManager};
use crate::ethereum::write::TransactionId;

pub struct MonitorTxs {
identity_manager: SharedIdentityManager,
monitored_txs_receiver: Arc<Mutex<mpsc::Receiver<TransactionId>>>,
}

impl MonitorTxs {
pub fn new(
identity_manager: SharedIdentityManager,
monitored_txs_receiver: mpsc::Receiver<TransactionId>,
) -> Arc<Self> {
Arc::new(Self {
identity_manager,
monitored_txs_receiver: Arc::new(Mutex::new(monitored_txs_receiver)),
})
}

pub async fn run(self: Arc<Self>) -> anyhow::Result<()> {
monitor_txs_loop(&self.identity_manager, &self.monitored_txs_receiver).await?;

Ok(())
}
}

async fn monitor_txs_loop(
identity_manager: &IdentityManager,
monitored_txs_receiver: &Mutex<mpsc::Receiver<TransactionId>>,
) -> AnyhowResult<()> {
let mut monitored_txs_receiver = monitored_txs_receiver.lock().await;

while let Some(tx) = monitored_txs_receiver.recv().await {
if !identity_manager.mine_transaction(tx.clone()).await? {
panic!("Failed to mine transaction: {}", tx);
}
}

Ok(())
}
33 changes: 23 additions & 10 deletions src/task_monitor/tasks/process_identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use ethers::types::U256;
use ruint::Uint;
use semaphore::merkle_tree::Proof;
use semaphore::poseidon_tree::Branch;
use tokio::sync::Notify;
use tokio::sync::{mpsc, Notify};
use tokio::{select, time};
use tracing::{debug, error, info, instrument, warn};

use crate::contracts::{IdentityManager, SharedIdentityManager};
use crate::database::Database;
use crate::ethereum::write::TransactionId;
use crate::identity_tree::{
AppliedTreeUpdate, Hash, Intermediate, TreeVersion, TreeVersionReadOps, TreeWithNextVersion,
};
Expand All @@ -30,6 +31,7 @@ pub struct ProcessIdentities {
identity_manager: SharedIdentityManager,
batching_tree: TreeVersion<Intermediate>,
batch_insert_timeout_secs: u64,
monitored_txs_sender: mpsc::Sender<TransactionId>,
wake_up_notify: Arc<Notify>,
}

Expand All @@ -39,13 +41,15 @@ impl ProcessIdentities {
identity_manager: SharedIdentityManager,
batching_tree: TreeVersion<Intermediate>,
batch_insert_timeout_secs: u64,
monitored_txs_sender: mpsc::Sender<TransactionId>,
wake_up_notify: Arc<Notify>,
) -> Arc<Self> {
Arc::new(Self {
database,
identity_manager,
batching_tree,
batch_insert_timeout_secs,
monitored_txs_sender,
wake_up_notify,
})
}
Expand All @@ -55,6 +59,7 @@ impl ProcessIdentities {
&self.database,
&self.identity_manager,
&self.batching_tree,
&self.monitored_txs_sender,
&self.wake_up_notify,
self.batch_insert_timeout_secs,
)
Expand All @@ -66,6 +71,7 @@ async fn process_identities(
database: &Database,
identity_manager: &IdentityManager,
batching_tree: &TreeVersion<Intermediate>,
monitored_txs_sender: &mpsc::Sender<TransactionId>,
wake_up_notify: &Notify,
timeout_secs: u64,
) -> AnyhowResult<()> {
Expand Down Expand Up @@ -121,6 +127,7 @@ async fn process_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

Expand Down Expand Up @@ -180,6 +187,7 @@ async fn process_identities(
database,
identity_manager,
batching_tree,
monitored_txs_sender,
&updates,
).await?;

Expand All @@ -201,10 +209,11 @@ async fn commit_identities(
database: &Database,
identity_manager: &IdentityManager,
batching_tree: &TreeVersion<Intermediate>,
monitored_txs_sender: &mpsc::Sender<TransactionId>,
updates: &[AppliedTreeUpdate],
) -> AnyhowResult<()> {
// If the update is an insertion
if updates
let tx_id = if updates
.first()
.context("Updates should be > 1")?
.update
Expand All @@ -221,7 +230,7 @@ async fn commit_identities(
prover.batch_size()
);

insert_identities(database, identity_manager, batching_tree, updates, prover).await?;
insert_identities(database, identity_manager, batching_tree, updates, prover).await?
} else {
let prover = identity_manager
.get_suitable_deletion_prover(updates.len())
Expand All @@ -233,7 +242,11 @@ async fn commit_identities(
prover.batch_size()
);

delete_identities(database, identity_manager, batching_tree, updates, prover).await?;
delete_identities(database, identity_manager, batching_tree, updates, prover).await?
};

if let Some(tx_id) = tx_id {
monitored_txs_sender.send(tx_id).await?;
}

Ok(())
Expand All @@ -246,12 +259,12 @@ pub async fn insert_identities(
batching_tree: &TreeVersion<Intermediate>,
updates: &[AppliedTreeUpdate],
prover: ReadOnlyProver<'_, Prover>,
) -> AnyhowResult<()> {
) -> AnyhowResult<Option<TransactionId>> {
TaskMonitor::log_identities_queues(database).await?;

if updates.is_empty() {
warn!("Identity commit requested with zero identities. Continuing.");
return Ok(());
return Ok(None);
}

debug!("Starting identity commit for {} identities.", updates.len());
Expand Down Expand Up @@ -422,7 +435,7 @@ pub async fn insert_identities(

TaskMonitor::log_batch_size(updates.len());

Ok(())
Ok(Some(transaction_id))
}

pub async fn delete_identities(
Expand All @@ -431,12 +444,12 @@ pub async fn delete_identities(
batching_tree: &TreeVersion<Intermediate>,
updates: &[AppliedTreeUpdate],
prover: ReadOnlyProver<'_, Prover>,
) -> AnyhowResult<()> {
) -> AnyhowResult<Option<TransactionId>> {
TaskMonitor::log_identities_queues(database).await?;

if updates.is_empty() {
warn!("Identity commit requested with zero identities. Continuing.");
return Ok(());
return Ok(None);
}

debug!("Starting identity commit for {} identities.", updates.len());
Expand Down Expand Up @@ -569,5 +582,5 @@ pub async fn delete_identities(

TaskMonitor::log_batch_size(updates.len());

Ok(())
Ok(Some(transaction_id))
}