Skip to content

Commit

Permalink
a little closer
Browse files Browse the repository at this point in the history
  • Loading branch information
bluegenes committed Dec 17, 2024
1 parent c8865b3 commit 8272d99
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 55 deletions.
69 changes: 41 additions & 28 deletions src/fastagather.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,50 @@
use crate::utils::buildutils::BuildCollection;
use anyhow::{bail, Result};

use needletail::parse_fastx_file;
use sourmash::selection::Selection;
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::utils::{
consume_query_by_gather, load_collection, load_sketches_above_threshold, write_prefetch,
ReportType,
};

#[allow(clippy::too_many_arguments)]
pub fn fastagather(
input_filename: String,
query_filename: String,
against_filepath: String,
input_moltype: String,
threshold_bp: u64,
selection: &Selection,
gather_output: Option<String>,
prefetch_output: Option<String>,
gather_output: Option<String>,
allow_failed_sigpaths: bool,
) -> Result<()> {

// to start, implement straightforward record --> sketch --> gather
// other ideas:
// - add full-file (lower resolution) prefetch first, to reduce search space
// - parallelize and/or batch records

// Build signature templates based on parsed parameters
let sig_template_result = BuildCollection::from_selection(selection);
let mut sig_templates = match sig_template_result {
Ok(sig_templates) => sig_templates,
let mut sig_template = match sig_template_result {
Ok(sig_template) => sig_template,
Err(e) => {
bail!("Failed to build template signatures: {}", e);
}
};

if sigs.is_empty() {
bail!("No signatures to build for the given parameters.");
if sig_template.size() != 1 {
bail!("FASTAgather requires a single signature type for search.");
}

let input_moltype = input_moltype.to_ascii_lowercase();

let mut against_selection = selection;
let scaled = query_mh.scaled();
against_selection.set_scaled(scaled);
// get scaled from selection here
let scaled = selection.scaled().unwrap(); // rm this unwrap?
against_selection.set_scaled(scaled as u32);

// calculate the minimum number of hashes based on desired threshold
let threshold_hashes = {
Expand All @@ -53,41 +63,44 @@ pub fn fastagather(
ReportType::Against,
allow_failed_sigpaths,
)?;

let failed_records = AtomicUsize::new(0);
// open file and start iterating through sequences
// Open fasta file reader
let mut reader = match parse_fastx_file(filename) {
let mut reader = match parse_fastx_file(query_filename.clone()) {
Ok(r) => r,
Err(err) => {
bail!("Error opening file {}: {:?}", filename, err);
bail!("Error opening file {}: {:?}", query_filename, err);
}
};

// later: can we parallelize across records or sigs? Do we want to batch groups of records for improved gather efficiency?
while let Some(record_result) = reader.next() {
// clone sig_templates for use
sigs = sig_templates.clone();
// clone sig_templates for use
let sigcoll = sig_template.clone();
match record_result {
Ok(record) => {
if let Err(err) = sigs.build_singleton_sigs(
record,
input_moltype,
filename.to_string(),
) {
if let Err(err) =
sigcoll.build_singleton_sigs(record, &input_moltype, query_filename.clone())
{
eprintln!(
"Error building signatures from file: {}, {:?}",
filename, err
query_filename, err
);
failed_records.fetch_add(1, atomic::Ordering::SeqCst);
failed_records.fetch_add(1, Ordering::SeqCst);
}
for (rec, query_sig) in sigs.iter(){
// in each iteration, this should just be a single signature made from the single record
for query_sig in sigcoll.sigs.iter() {
let query_md5 = query_sig.md5sum();
let query_mh = sig.minhash().expect("could not get minhash from sig");
let query_mh = query_sig.minhash().expect("could not get minhash from sig");
let query_name = query_sig.name(); // this is actually just record.id --> so maybe don't get it from sig here?

// now do prefetch/gather
let prefetch_result = load_sketches_above_threshold(against_collection, &query_mh, threshold_hashes)?;
let prefetch_result = load_sketches_above_threshold(
against_collection,
&query_mh,
threshold_hashes,
)?;
let matchlist = prefetch_result.0;
let skipped_paths = prefetch_result.1;
let failed_paths = prefetch_result.2;
Expand All @@ -97,7 +110,7 @@ pub fn fastagather(
query_filename.clone(),
query_name.clone(),
query_md5,
prefetch_output,
prefetch_output.clone(),
&matchlist,
)
.ok();
Expand All @@ -106,11 +119,11 @@ pub fn fastagather(
consume_query_by_gather(
query_name,
query_filename,
query_mh,
query_mh.clone(),
scaled as u32,
matchlist,
threshold_hashes,
gather_output,
gather_output.clone(),
)
.ok();
}
Expand Down
37 changes: 37 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::utils::build_selection;
use crate::utils::is_revindex_database;
mod check;
mod cluster;
mod fastagather;
mod fastgather;
mod fastmultigather;
mod fastmultigather_rocksdb;
Expand Down Expand Up @@ -359,6 +360,41 @@ fn do_cluster(
}
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (query_filename, against_filepath, input_moltype, threshold_bp, ksize, scaled, moltype, output_path_prefetch=None, output_path_gather=None))]
fn do_fastagather(
query_filename: String,
against_filepath: String,
input_moltype: String,
threshold_bp: u64,
ksize: u8,
scaled: Option<u32>,
moltype: String,
output_path_prefetch: Option<String>,
output_path_gather: Option<String>,
) -> anyhow::Result<u8> {
let selection = build_selection(ksize, scaled, &moltype);
let allow_failed_sigpaths = true;

match fastagather::fastagather(
query_filename,
against_filepath,
input_moltype,
threshold_bp,
&selection,
output_path_prefetch,
output_path_gather,
allow_failed_sigpaths,
) {
Ok(_) => Ok(0),
Err(e) => {
eprintln!("Error: {e}");
Ok(1)
}
}
}

/// Module interface for the `sourmash_plugin_branchwater` extension module.
#[pymodule]
Expand All @@ -374,5 +410,6 @@ fn sourmash_plugin_branchwater(_py: Python, m: &Bound<'_, PyModule>) -> PyResult
m.add_function(wrap_pyfunction!(do_pairwise, m)?)?;
m.add_function(wrap_pyfunction!(do_cluster, m)?)?;
m.add_function(wrap_pyfunction!(do_singlesketch, m)?)?;
m.add_function(wrap_pyfunction!(do_fastagather, m)?)?;
Ok(())
}
91 changes: 91 additions & 0 deletions src/python/sourmash_plugin_branchwater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,3 +800,94 @@ def main(self, args):
notify(f"...clustering is done! results in '{args.output}'")
notify(f" cluster counts in '{args.cluster_sizes}'")
return status


class Branchwater_Fastagather(CommandLinePlugin):
command = "fastagather"
description = "massively parallel gather directly from FASTA"

def __init__(self, p):
super().__init__(p)
p.add_argument("query_fa", help="FASTA file")
p.add_argument("against_paths", help="database file of sketches")
p.add_argument(
"-I",
"--input-moltype",
"--input-molecule-type",
choices=["DNA", "dna", "protein"],
default="DNA",
help="molecule type of input sequence (DNA or protein)",
)
p.add_argument(
"-o",
"--output-gather",
required=True,
help="save gather output (minimum metagenome cover) to this file",
)
p.add_argument(
"--output-prefetch", help="save prefetch output (all overlaps) to this file"
)
p.add_argument(
"-t",
"--threshold-bp",
default=4000,
type=float,
help="threshold in estimated base pairs, for reporting matches (default: 4kb)",
)
p.add_argument(
"-k",
"--ksize",
default=31,
type=int,
help="k-mer size at which to do comparisons (default: 31)",
)
p.add_argument(
"-s",
"--scaled",
default=1000,
type=int,
help="scaled factor at which to do comparisons (default: 1000)",
)
p.add_argument(
"-m",
"--moltype",
default="DNA",
choices=["DNA", "protein", "dayhoff", "hp"],
help="molecule type for search (DNA, protein, dayhoff, or hp; default DNA)",
)
p.add_argument(
"-c",
"--cores",
default=0,
type=int,
help="number of cores to use (default is all available)",
)

def main(self, args):
print_version()
notify(
f"ksize: {args.ksize} / scaled: {args.scaled} / moltype: {args.moltype} / threshold bp: {args.threshold_bp}"
)

num_threads = set_thread_pool(args.cores)

notify(
f"gathering all sketches in '{args.query_sig}' against '{args.against_paths}' using {num_threads} threads"
)
super().main(args)
status = sourmash_plugin_branchwater.do_fastagather(
args.query_fa,
args.against_paths,
args.input_moltype,
int(args.threshold_bp),
args.ksize,
args.scaled,
args.moltype,
args.output_gather,
args.output_prefetch,
)
if status == 0:
notify(f"...fastgather is done! gather results in '{args.output_gather}'")
if args.output_prefetch:
notify(f"prefetch results in '{args.output_prefetch}'")
return status
28 changes: 8 additions & 20 deletions src/python/tests/sourmash_tst_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import collections
import pprint

import pkg_resources
from pkg_resources import Requirement, resource_filename, ResolutionError
import importlib.metadata
import traceback
from io import open # pylint: disable=redefined-builtin
from io import StringIO
Expand Down Expand Up @@ -84,24 +83,13 @@ def _runscript(scriptname):
namespace = {"__name__": "__main__"}
namespace["sys"] = globals()["sys"]

try:
pkg_resources.load_entry_point("sourmash", "console_scripts", scriptname)()
return 0
except pkg_resources.ResolutionError:
pass

path = scriptpath()

scriptfile = os.path.join(path, scriptname)
if os.path.isfile(scriptfile):
if os.path.isfile(scriptfile):
exec( # pylint: disable=exec-used
compile(open(scriptfile).read(), scriptfile, "exec"), namespace
)
return 0

return -1

entry_points = importlib.metadata.entry_points(
group="console_scripts", name="sourmash"
)
assert len(entry_points) == 1
smash_cli = tuple(entry_points)[0].load()
smash_cli()
return 0

ScriptResults = collections.namedtuple("ScriptResults", ["status", "out", "err"])

Expand Down
Loading

0 comments on commit 8272d99

Please sign in to comment.