diff --git a/src/task_monitor/tasks/process_identities.rs b/src/task_monitor/tasks/process_identities.rs index 3c7708d9..a2ca21a3 100644 --- a/src/task_monitor/tasks/process_identities.rs +++ b/src/task_monitor/tasks/process_identities.rs @@ -159,44 +159,57 @@ async fn process_identities( continue; } - let batch_size = if next_update[0].update.element == Hash::ZERO { - identity_manager.max_deletion_batch_size().await + let (batch_size, insertion) = if next_update[0].update.element == Hash::ZERO { + (identity_manager.max_deletion_batch_size().await, false) }else{ - identity_manager.max_insertion_batch_size().await + (identity_manager.max_insertion_batch_size().await, true) }; // We have _at most_ one complete batch here. let updates = batching_tree.peek_next_updates(batch_size); - // If there are not enough identities to insert at this - // stage we can wait. The timer will ensure that the API - // clients do not wait too long for their submission to be - // completed. - if updates.len() < batch_size && !should_process_anyway { - // We do not reset the timer here as we may want to - // insert anyway soon. - tracing::trace!( - "Pending identities ({}) is less than batch size ({}). Waiting.", - updates.len(), - batch_size - ); - continue; - } - commit_identities( - database, - identity_manager, - batching_tree, - monitored_txs_sender, - &updates, - ).await?; + // If insertion, check if we have enough identities to insert or if the insertion interval has elapsed + if insertion{ + // If there are not enough identities to insert at this + // stage we can wait. The timer will ensure that the API + // clients do not wait too long for their submission to be + // completed. + if updates.len() < batch_size && !should_process_anyway { + // We do not reset the timer here as we may want to + // insert anyway soon. + tracing::trace!( + "Pending identities ({}) is less than batch size ({}). Waiting.", + updates.len(), + batch_size + ); + continue; + } + + commit_identities( + database, + identity_manager, + batching_tree, + monitored_txs_sender, + &updates, + ).await?; + + // We've inserted the identities, so we want to ensure that + // we don't trigger again until either we get a full batch + // or the timer ticks. + timer.reset(); + last_batch_time = Utc::now(); + database.update_latest_insertion_timestamp(last_batch_time).await?; - // We've inserted the identities, so we want to ensure that - // we don't trigger again until either we get a full batch - // or the timer ticks. - timer.reset(); - last_batch_time = Utc::now(); - database.update_latest_insertion_timestamp(last_batch_time).await?; + }else{ + commit_identities( + database, + identity_manager, + batching_tree, + monitored_txs_sender, + &updates, + ).await?; + } // We want to check if there's a full batch available immediately wake_up_notify.notify_one();