diff --git a/Cargo.lock b/Cargo.lock index 6ff3118..0c7b0fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1815,7 +1815,7 @@ dependencies = [ [[package]] name = "sourmash_plugin_directsketch" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "async_zip", diff --git a/Cargo.toml b/Cargo.toml index ba86999..835f0ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "sourmash_plugin_directsketch" -version = "0.2.0" +version = "0.2.1" edition = "2021" [lib] diff --git a/src/directsketch.rs b/src/directsketch.rs index e6a6964..5ac44c5 100644 --- a/src/directsketch.rs +++ b/src/directsketch.rs @@ -14,7 +14,6 @@ use std::sync::Arc; use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; -use tokio::time::Duration; use tokio_util::compat::Compat; use pyo3::prelude::*; @@ -615,11 +614,8 @@ pub async fn download_and_sketch( handles.push(failures_handle); // // Worker tasks - // let client = Client::new(); let semaphore = Arc::new(Semaphore::new(3)); // Limiting concurrent downloads let client = Arc::new(Client::new()); - // let semaphore = Arc::new(Semaphore::new(3)); // Allows up to 3 concurrent tasks - let mut interval = tokio::time::interval(Duration::from_secs(1)); // Open the file containing the accessions synchronously let (accession_info, n_accs) = load_accession_info(input_csv)?; @@ -643,7 +639,6 @@ pub async fn download_and_sketch( for (i, accinfo) in accession_info.into_iter().enumerate() { py.check_signals()?; // If interrupted, return an Err automatically - interval.tick().await; // Wait for the next interval tick before continuing let semaphore_clone = Arc::clone(&semaphore); let client_clone = Arc::clone(&client); let send_sigs = send_sigs.clone(); @@ -654,17 +649,18 @@ pub async fn download_and_sketch( let dna_sigs = dna_sig_templates.clone(); let prot_sigs = prot_sig_templates.clone(); - if (i + 1) % reporting_threshold == 0 { - let percent_processed = (((i + 1) as f64 / n_accs as f64) * 100.0).round(); - println!( - "Starting accession {}/{} ({}%)", - (i + 1), - n_accs, - percent_processed - ); - } tokio::spawn(async move { let _permit = semaphore_clone.acquire().await; + // Report when the permit is available and processing begins + if (i + 1) % reporting_threshold == 0 { + let percent_processed = (((i + 1) as f64 / n_accs as f64) * 100.0).round(); + println!( + "Starting accession {}/{} ({}%)", + (i + 1), + n_accs, + percent_processed + ); + } // Perform download and sketch let result = dl_sketch_accession( &client_clone,