From 39c927d335b4a39bcd8331d8c0043b141d176521 Mon Sep 17 00:00:00 2001 From: Michal Puncochar Date: Wed, 27 Sep 2023 23:18:58 +0200 Subject: [PATCH] Changed filtering of markers and samples and primary/secondary behavior --- metaphlan/strainphlan.py | 326 +++++++++-------------- metaphlan/utils/__init__.py | 4 +- metaphlan/utils/external_exec.py | 117 +++----- metaphlan/utils/parallelisation.py | 11 +- metaphlan/utils/phylophlan_controller.py | 76 ++++-- 5 files changed, 223 insertions(+), 311 deletions(-) diff --git a/metaphlan/strainphlan.py b/metaphlan/strainphlan.py index 28a4c7a..6a53120 100755 --- a/metaphlan/strainphlan.py +++ b/metaphlan/strainphlan.py @@ -14,7 +14,6 @@ import io import os import re -import subprocess as sp import tempfile import time from shutil import copyfile, rmtree @@ -25,58 +24,16 @@ try: from .utils import * + from .utils.external_exec import run_command except ImportError: from utils import * + from utils.external_exec import run_command class Strainphlan: - """StrainPhlAn class""" - - def add_secondary_samples_and_references(self, messages=True): - """Adds secondary samples and references to the marker matrix - - Args: - messages (bool, optional): Whether the function is verbose. Defaults to True. - """ - if len(self.secondary_samples) > 0: - if messages: - info("Getting markers from secondary sample files...") - self.add_secondary_samples() - if messages: - info("Done.") - if len(self.secondary_references) > 0: - info("Getting markers from secondary reference files...") - self.add_secondary_references() - info("Done.") - - def add_secondary_samples(self): - """Adds secondary samples to the marker matrix""" - filtered_clade_markers = self.cleaned_markers_matrix.columns.tolist() - markers_matrix = execute_pool(((Strainphlan.get_matrix_for_sample, sample, filtered_clade_markers, self.breadth_thres) - for sample in self.secondary_samples), self.nprocs) - self.include_secondary_matrix(markers_matrix) - - def add_secondary_references(self): - """Adds secondary references to the marker matrix""" - filtered_clade_markers = self.cleaned_markers_matrix.columns.tolist() - markers_matrix = execute_pool(((Strainphlan.process_reference, reference, self.tmp_dir, self.clade_markers_file, - filtered_clade_markers, self.trim_sequences) for reference in self.secondary_references), self.nprocs) - self.include_secondary_matrix(markers_matrix) - - def include_secondary_matrix(self, markers_matrix): - """Returns the cleaned matrix adding secondary samples/references - - Args: - markers_matrix (list): List containing the samples-to-markers information for the secondary entries - """ - mm = pd.DataFrame.from_records(markers_matrix, index='sample') - mm = mm.loc[mm.sum(axis=1) >= (self.secondary_sample_with_n_markers if self.abs_n_markers_thres else len( - self.db_clade_markers) * self.secondary_sample_with_n_markers // 100)] - self.cleaned_markers_matrix = pd.concat( - [self.cleaned_markers_matrix, mm]) def get_markers_matrix_from_samples(self, print_clades=False): - """Gets a binary matrix representing the presence/ausence of the clade markers in the uploaded samples + """Gets a binary matrix representing the presence/absence of the clade markers in the uploaded samples Args: print_clades (bool, optional): Whether the function has been called from the print_clades_only function. Defaults to False. @@ -84,7 +41,9 @@ def get_markers_matrix_from_samples(self, print_clades=False): Returns: list: the list containing the samples-to-markers information """ - if not print_clades: + if print_clades: + self.db_clade_markers = pd.DataFrame() + else: if not self.clade_markers_file: self.database_controller.extract_markers([self.clade], self.tmp_dir) self.clade_markers_file = os.path.join(self.tmp_dir, "{}.fna".format(self.clade)) @@ -121,10 +80,11 @@ def get_matrix_for_sample(sample_path, clade_markers, breadth_thres): markers = {"sample": sample_path} markers.update({m: 0 for m in clade_markers}) - markers.update( - {marker.name: 1 for marker in sample.consensus_markers if marker.name in clade_markers and marker.breadth >= breadth_thres}) + markers.update({marker.name: 1 for marker in sample.consensus_markers + if marker.name in clade_markers and marker.breadth >= breadth_thres}) return markers + def filter_markers_matrix(self, markers_matrix, messages=True): """Filters the primary samples, references and markers based on the user defined thresholds @@ -132,54 +92,47 @@ def filter_markers_matrix(self, markers_matrix, messages=True): markers_matrix (list): The list with the sample-to-markers information messages (bool, optional): Whether to be verbose. Defaults to True. """ - mm = pd.DataFrame.from_records(markers_matrix, index='sample') - self.min_markers = self.sample_with_n_markers_after_filt if self.abs_n_markers_thres else len( - self.db_clade_markers) * self.sample_with_n_markers_after_filt // 100 - # Checks if the percentage of markers of a sample sample reaches sample_with_n_markers, if not, removes the sample - mm = mm.loc[mm.sum(axis=1) >= (self.sample_with_n_markers if self.abs_n_markers_thres else len( - self.db_clade_markers) * self.sample_with_n_markers // 100)] - if not self.check_matrix_length(mm.index, 4, "Phylogeny can not be inferred. Too many samples were discarded.", messages): - return - # Checks if the percentage of samples that contain a marker reaches marker_in_n_samples, if not, removes the marker - mm = mm.loc[:, mm.sum(axis=0) >= ( - self.marker_in_n_samples if self.abs_n_samples_thres else self.marker_in_n_samples * len(mm) // 100)] - if not self.check_matrix_length(mm.columns, self.min_markers, "Phylogeny can not be inferred. Too many markers were discarded.", messages): - return - # Checks again if the percentage of markers of a sample reaches sample_with_n_markers_after_filt, if not, removes the sample - mm = mm.loc[mm.sum(axis=1) >= self.min_markers] - if not self.check_matrix_length(mm.index, 4, "Phylogeny can not be inferred. No enough markers were kept for the samples.", messages): + mm = pd.DataFrame.from_records(markers_matrix, index='sample') # df with index samples and columns markers + + # Step I: samples with not enough markers are treated as secondary + # here the percentage is calculated from the number of markers in at least one sample, it can be less than + # the total number of available markers in the database for the clade + _, n_markers = mm.shape + min_markers = max(self.sample_with_n_markers, n_markers * self.sample_with_n_markers_perc / 100) + mm_primary = mm.loc[mm.sum(axis=1) >= min_markers] + + # Step II: filter markers in not enough primary samples + n_samples, _ = mm_primary.shape + min_samples = n_samples * self.marker_in_n_samples_perc / 100 + mm = mm.loc[:, mm_primary.sum(axis=0) >= min_samples] + + # Step III: filter samples with not enough markers + # here the percentage is calculated from the remaining markers + _, n_markers = mm.shape + min_markers = max(self.sample_with_n_markers_after_filt, + n_markers * self.sample_with_n_markers_after_filt_perc / 100) + mm = mm.loc[mm.sum(axis=1) >= min_markers] + if len(mm) < 4: + if messages: + error("Phylogeny can not be inferred. Less than 4 samples remained after filtering of the markers.", + exit=True) return - self.cleaned_markers_matrix = mm - def check_matrix_length(self, matrix, threshold, message, verbose): - """Checks the length of the markers' matrix - - Args: - matrix (list): the matrix to check the length - threshold (int): size threshold - message (str): message to print if verbose - verbose (bool): whether to be verbose + self.cleaned_markers_matrix = mm - Returns: - bool: whether the matrix size reaches the length threshold - """ - if len(matrix) < threshold: - if verbose: - error(message, exit=True) - return False - return True - def copy_filtered_references(self, markers_tmp_dir): + def copy_filtered_references(self, markers_tmp_dir, filtered_samples): """Copies the FASTA files of the filtered references to be processed by PhyloPhlAn Args: markers_tmp_dir (str): the temporal folder where to copy the reference genomes + filtered_samples (set): the set of samples after filtering """ - for reference in self.references+self.secondary_references: + for reference in self.references: sample_name = os.path.splitext(os.path.basename(reference[:-4]))[0] if reference.endswith( '.bz2') else os.path.splitext(os.path.basename(reference))[0] - if sample_name in [os.path.splitext(os.path.basename(sample))[0] for sample in self.cleaned_markers_matrix.index]: + if sample_name in filtered_samples: reference_marker = os.path.join(self.tmp_dir, "reference_markers", f'{sample_name}.fna.bz2') copyfile(reference_marker, os.path.join(markers_tmp_dir, f"{sample_name}.fna.bz2")) @@ -189,14 +142,13 @@ def matrix_markers_to_fasta(self): Returns: str: the temporal folder where the FASTA files were written """ - markers_tmp_dir = os.path.join( - self.tmp_dir, "{}.StrainPhlAn4".format(self.clade)) + markers_tmp_dir = os.path.join(self.tmp_dir, "{}.StrainPhlAn4".format(self.clade)) create_folder(markers_tmp_dir) - self.copy_filtered_references(markers_tmp_dir) - filtered_samples = self.cleaned_markers_matrix.index.tolist() - filtered_markers = self.cleaned_markers_matrix.columns.tolist() - execute_pool(((Strainphlan.sample_markers_to_fasta, sample, filtered_samples, filtered_markers, self.trim_sequences, markers_tmp_dir) - for sample in self.samples + self.secondary_samples), self.nprocs) + filtered_samples = set(self.cleaned_markers_matrix.index) + filtered_markers = set(self.cleaned_markers_matrix.columns) + self.copy_filtered_references(markers_tmp_dir, filtered_samples) + execute_pool(((Strainphlan.sample_markers_to_fasta, sample, filtered_samples, filtered_markers, + self.trim_sequences, markers_tmp_dir) for sample in self.samples), self.nprocs) return markers_tmp_dir @staticmethod @@ -205,13 +157,13 @@ def sample_markers_to_fasta(sample_path, filtered_samples, filtered_markers, tri Args: sample_path (str): the path to the sample - trim_sequences: filtered_markers: filtered_samples: + trim_sequences: markers_tmp_dir (str): the temporal folder were the FASTA file is written """ if sample_path in filtered_samples: - sample_name = os.path.splitext(os.path.basename(sample_path))[0].replace(".pkl", "") + sample_name = os.path.splitext(os.path.basename(sample_path))[0].replace(".pkl", "").replace('.json', '') marker_output_file = os.path.join(markers_tmp_dir, '{}.fna.bz2'.format(sample_name)) sample = ConsensusMarkers.from_file(sample_path) sample.consensus_markers = [m for m in sample.consensus_markers if m.name in filtered_markers] @@ -238,6 +190,7 @@ def process_reference(cls, reference_file, tmp_dir, clade_markers_file, clade_ma tmp_dir (str): the temporal folder where the BLASTn results where saved clade_markers_file (str): clade_markers (list): the list with the clade markers names + trim_sequences: Returns: dict: the dictionary with the reference-to-markers information @@ -283,18 +236,17 @@ def extract_markers_from_genome(reference_file, clade_markers_file): # we need the additional btop column columns = 'qseqid sseqid pident length mismatch gapopen qstart qend qlen sstart send evalue bitscore btop' - cmd = f'blastn -query {clade_markers_file} -subject - -num_threads {1} -outfmt'.split(' ') \ - + ['6 ' + columns] + cmd = f'blastn -query {clade_markers_file} -subject - -num_threads {1} -outfmt "6 {columns}"' # run blastn and pass the raw data to stdin - r = sp.run(cmd, input=input_file_data, check=True, capture_output=True, text=True) + r = run_command(cmd, input=input_file_data, text=True) df = pd.read_csv(io.StringIO(r.stdout), sep='\t', names=columns.split(' ')) ext_markers = {} for _, row in df.iterrows(): sseq = str(input_seqs[row['sseqid']].seq) - btop = re.split(r'(\d+|[^\d]{2})', row['btop'])[1::2] # blast trace-back operations + btop = re.split(r'(\d+|\D{2})', row['btop'])[1::2] # blast trace-back operations assert row['qend'] >= row['qstart'] strand = 1 if row['send'] >= row['sstart'] else -1 # whether reverse-complemented @@ -340,7 +292,7 @@ def calculate_polymorphic_rates(self): with open(os.path.join(self.output_dir, "{}.polymorphic".format(self.clade)), 'w') as polymorphic_file: polymorphic_file.write("sample\tpercentage_of_polymorphic_sites\tavg_by_marker\tmedian_by_marker" + "\tstd_by_marker\tmin_by_marker\tmax_by_marker\tq25_by_marker\tq75_by_marker") - for sample_path in self.samples+self.secondary_samples: + for sample_path in self.samples: sample = ConsensusMarkers.from_file(sample_path) p_stats, p_count, m_len = [], 0, 0 for marker in sample.consensus_markers: @@ -361,35 +313,24 @@ def write_info(self): with open(os.path.join(self.output_dir, "{}.info".format(self.clade)), 'w') as info_file: info_file.write("Clade: {}\n".format(self.clade)) info_file.write( - "Number of main samples: {}\n".format(len(self.samples))) - info_file.write("Number of secondary samples: {}\n".format( - len(self.secondary_samples))) + "Number of samples: {}\n".format(len(self.samples))) info_file.write( - "Number of main references: {}\n".format(len(self.references))) - info_file.write("Number of secondary references: {}\n".format( - len(self.secondary_references))) + "Number of references: {}\n".format(len(self.references))) info_file.write("Number of available markers for the clade: {}\n".format( len(self.db_clade_markers))) info_file.write("Filtering parameters:\n") info_file.write("\tNumber of bases to remove when trimming markers: {}\n".format( self.trim_sequences)) - info_file.write("\tMinimum {} of markers to keep a main sample: {}\n".format( - 'number' if self.abs_n_markers_thres else 'percentage', self.sample_with_n_markers)) - info_file.write("\tMinimum {} of markers to keep a main sample after first filtering: {}\n".format( - 'number' if self.abs_n_markers_thres else 'percentage', self.sample_with_n_markers_after_filt)) - info_file.write("\tMinimum {} of markers to keep a secondary sample: {}\n".format( - 'number' if self.abs_n_markers_thres else 'percentage', self.secondary_sample_with_n_markers)) - info_file.write("\tMinimum {} of samples to keep a marker: {}\n".format( - 'number' if self.abs_n_samples_thres else 'percentage', self.marker_in_n_samples)) + info_file.write(f"\tMinimum number of markers to make a sample primary: {self.sample_with_n_markers}\n") + info_file.write(f"\tMinimum percentage of markers to make a sample primary: {self.sample_with_n_markers_perc}\n") + info_file.write(f"\tMinimum number of markers to keep a sample after filtering: {self.sample_with_n_markers_after_filt}\n") + info_file.write(f"\tMinimum percentage of markers to keep a sample after filtering: {self.sample_with_n_markers_after_filt_perc}\n") + info_file.write(f"\tMinimum percentage of samples to keep a marker: {self.marker_in_n_samples_perc}\n") info_file.write("Number of markers selected after filtering: {}\n".format( len(self.cleaned_markers_matrix.columns))) - info_file.write("Number of main samples after filtering: {}\n".format(len( + info_file.write("Number of samples after filtering: {}\n".format(len( [sample for sample in self.samples if sample in self.cleaned_markers_matrix.index.tolist()]))) - info_file.write("Number of secondary samples after filtering: {}\n".format(len( - [sample for sample in self.secondary_samples if sample in self.cleaned_markers_matrix.index.tolist()]))) - info_file.write("Number of main references after filtering: {}\n".format(len([reference for reference in self.references if (os.path.splitext( - os.path.basename(reference[:-4]))[0] if reference.endswith('.bz2') else os.path.splitext(os.path.basename(reference))[0]) in filtered_names]))) - info_file.write("Number of secondary references after filtering: {}\n".format(len([reference for reference in self.secondary_references if (os.path.splitext( + info_file.write("Number of references after filtering: {}\n".format(len([reference for reference in self.references if (os.path.splitext( os.path.basename(reference[:-4]))[0] if reference.endswith('.bz2') else os.path.splitext(os.path.basename(reference))[0]) in filtered_names]))) info_file.write( "PhyloPhlan phylogenetic precision mode: {}\n".format(self.phylophlan_mode)) @@ -428,15 +369,13 @@ def print_clades(self): markers2species = self.database_controller.get_markers2species() species2samples = self.detect_clades(markers2species) info('Detected clades: ') - sorted_species2samples = collections.OrderedDict( - sorted(species2samples.items(), key=lambda kv: kv[1], reverse=True)) + sorted_species2samples = collections.OrderedDict(sorted(species2samples.items(), key=lambda kv: kv[1], + reverse=True)) with open(os.path.join(self.output_dir, 'print_clades_only.tsv'), 'w') as wf: wf.write('Clade\tNumber_of_samples\n') for species in sorted_species2samples: - info('\t{}: in {} samples.'.format( - species, sorted_species2samples[species])) - wf.write('{}\t{}\n'.format( - species, sorted_species2samples[species])) + info('\t{}: in {} samples.'.format(species, sorted_species2samples[species])) + wf.write('{}\t{}\n'.format(species, sorted_species2samples[species])) info('Done.') def interactive_clade_selection(self): @@ -489,71 +428,65 @@ def filter_markers_samples(self, print_clades=False): print_clades (bool, optional): Whether it was run in the print_clade_only mode. Defaults to False. """ if not print_clades: - info("Getting markers from main samples...") + info("Getting markers from samples...") markers_matrix = self.get_markers_matrix_from_samples(print_clades) if not print_clades: info("Done.") - info("Getting markers from main references...") + info("Getting markers from references...") markers_matrix += self.get_markers_from_references() info("Done.") info("Removing bad markers / samples...") self.filter_markers_matrix(markers_matrix, messages=not print_clades) if not print_clades: info("Done.") - info("Getting markers from secondary samples and references...") - self.add_secondary_samples_and_references(messages=not print_clades) - if not print_clades: - info("Done.") def run_strainphlan(self): """Runs the full StrainPhlAn pipeline""" if self.print_clades_only: self.print_clades() - else: - if self.clade.startswith('s__') and 'CHOCOPhlAnSGB' in self.database_controller.get_database_name(): - self.interactive_clade_selection() - info("Creating temporary directory...") - self.tmp_dir = tempfile.mkdtemp(dir=self.tmp_dir) - info("Done.") - info("Filtering markers and samples...") - self.filter_markers_samples() - info("Done.") - info("Writing samples as markers' FASTA files...") - samples_as_markers_dir = self.matrix_markers_to_fasta() - info("Done.") - info("Calculating polymorphic rates...") - self.calculate_polymorphic_rates() - info("Done.") - info("Executing PhyloPhlAn...") - self.phylophlan_controller.compute_phylogeny(samples_as_markers_dir, len(self.cleaned_markers_matrix), - self.tmp_dir) - info("Done.") - info("Writing information file...") - self.write_info() + return + + if self.clade.startswith('s__') and 'CHOCOPhlAnSGB' in self.database_controller.get_database_name(): + self.interactive_clade_selection() + info("Creating temporary directory...") + self.tmp_dir = tempfile.mkdtemp(dir=self.tmp_dir) + info("Done.") + info("Filtering markers and samples...") + self.filter_markers_samples() + info("Done.") + info("Writing samples as markers' FASTA files...") + samples_as_markers_dir = self.matrix_markers_to_fasta() + info("Done.") + info("Calculating polymorphic rates...") + self.calculate_polymorphic_rates() + info("Done.") + info("Computing phylogeny...") + self.phylophlan_controller.compute_phylogeny(samples_as_markers_dir, len(self.cleaned_markers_matrix), + self.tmp_dir) + info("Done.") + info("Writing information file...") + self.write_info() + info("Done.") + if not self.debug: + info("Removing temporary files...") + rmtree(self.tmp_dir, ignore_errors=False, onerror=None) info("Done.") - if not self.debug: - info("Removing temporary files...") - rmtree(self.tmp_dir, ignore_errors=False, onerror=None) - info("Done.") def __init__(self, args): + self.db_clade_markers = None + self.cleaned_markers_matrix = None self.database_controller = MetaphlanDatabaseController(args.database) self.clade_markers_file = args.clade_markers self.samples = args.samples self.references = args.references if not args.print_clades_only else [] - self.secondary_samples = args.secondary_samples - self.secondary_references = args.secondary_references if not args.print_clades_only else [] self.clade = args.clade self.output_dir = args.output_dir self.trim_sequences = args.trim_sequences self.sample_with_n_markers = args.sample_with_n_markers - self.marker_in_n_samples = args.marker_in_n_samples - self.secondary_sample_with_n_markers = args.secondary_sample_with_n_markers - self.sample_with_n_markers_after_filt = args.sample_with_n_markers * \ - 2 // 3 if (args.sample_with_n_markers_after_filt is None or args.sample_with_n_markers < - args.sample_with_n_markers_after_filt) else args.sample_with_n_markers_after_filt - self.abs_n_markers_thres = args.abs_n_markers_thres - self.abs_n_samples_thres = args.abs_n_samples_thres + self.sample_with_n_markers_perc = args.sample_with_n_markers_perc + self.marker_in_n_samples_perc = args.marker_in_n_samples_perc + self.sample_with_n_markers_after_filt = args.sample_with_n_markers_after_filt + self.sample_with_n_markers_after_filt_perc = args.sample_with_n_markers_after_filt_perc self.breadth_thres = args.breadth_thres self.print_clades_only = args.print_clades_only self.non_interactive = args.non_interactive @@ -563,49 +496,46 @@ def __init__(self, args): self.nprocs = args.nprocs self.tmp_dir = args.output_dir if args.tmp is None else args.tmp self.phylophlan_controller = Phylophlan3Controller(args) - self.cleaned_markers_matrix = pd.DataFrame() def read_params(): - """ Reads and parses the command line arguments of the script + """ + Reads and parses the command line arguments of the script Returns: namespace: The populated namespace with the command line arguments """ - p = ap.ArgumentParser( - description="", formatter_class=ap.ArgumentDefaultsHelpFormatter) + p = ap.ArgumentParser(description="", formatter_class=ap.ArgumentDefaultsHelpFormatter) p.add_argument('-d', '--database', type=str, default='latest', help="The input MetaPhlAn {} database".format(__version__)) p.add_argument('-m', '--clade_markers', type=str, default=None, help="The clade markers as FASTA file") - p.add_argument('-s', '--samples', type=str, nargs='+', default=[], + p.add_argument('-s', '--samples', type=str, nargs='+', help="The reconstructed markers for each sample") p.add_argument('-r', '--references', type=str, nargs='+', default=[], help="The reference genomes") p.add_argument('-c', '--clade', type=str, default=None, help="The clade to investigate") - p.add_argument('-o', '--output_dir', type=str, default=None, + p.add_argument('-o', '--output_dir', type=str, help="The output directory") p.add_argument('-n', '--nprocs', type=int, default=1, help="The number of threads to use") - p.add_argument('--secondary_samples', type=str, nargs='+', default=[], - help="The reconstructed markers for each secondary sample") - p.add_argument('--secondary_references', type=str, nargs='+', default=[], - help="The secondary reference genomes") p.add_argument('--trim_sequences', type=int, default=50, help="The number of bases to remove from both ends when trimming markers") - p.add_argument('--marker_in_n_samples', type=int, default=80, - help="Theshold defining the minimum percentage of samples to keep a marker") - p.add_argument('--sample_with_n_markers', type=int, default=80, - help="Threshold defining the minimun percentage of markers to keep a sample") - p.add_argument('--secondary_sample_with_n_markers', type=int, default=80, - help="Threshold defining the minimun percentage of markers to keep a secondary sample") - p.add_argument('--sample_with_n_markers_after_filt', type=int, default=None, - help="Threshold defining the minimun percentage of markers to keep a sample after filtering the markers [only for dev]") - p.add_argument('--abs_n_markers_thres', action='store_true', default=False, - help="If specified, the *sample_with_n_markers* thresholds will be specified as absolute numbers") - p.add_argument('--abs_n_samples_thres', action='store_true', default=False, - help="If specified, the marker_in_n_samples threshold will be specified as absolute numbers") + p.add_argument('--sample_with_n_markers', type=int, default=20, + help="Threshold defining the minimum absolute number of markers for a sample to be primary. " + "This rule is combined with AND logic with --sample_with_n_markers_perc") + p.add_argument('--sample_with_n_markers_perc', type=float, default=25, + help="Threshold defining the minimum percentage of markers to for a sample to be primary. " + "This rule is combined with AND logic with --sample_with_n_markers") + p.add_argument('--marker_in_n_samples_perc', type=float, default=50, + help="Threshold defining the minimum percentage of primary samples to keep a marker") + p.add_argument('--sample_with_n_markers_after_filt', type=int, default=20, + help="Threshold defining the minimum absolute number of markers after filtering to keep a sample. " + "This rule is combined with AND logic with --sample_with_n_markers_after_filt_perc") + p.add_argument('--sample_with_n_markers_after_filt_perc', type=float, default=25, + help="Threshold defining the minimum percentage of markers kept after filtering to keep a sample. " + "This rule is combined with AND logic with --sample_with_n_markers_after_filt") p.add_argument('--breadth_thres', type=int, default=80, help="Threshold defining the minimum breadth of coverage for the markers") p.add_argument('--phylophlan_mode', choices=['accurate', 'fast'], default='fast', @@ -613,21 +543,23 @@ def read_params(): p.add_argument('--phylophlan_configuration', type=str, default=None, help="The PhyloPhlAn configuration file") p.add_argument('--tmp', type=str, default=None, - help="If specified, the directory where to store the temporal files.") + help="If specified, the directory where to store the temporary files.") p.add_argument('--mutation_rates', action='store_true', default=False, - help="If specified, StrainPhlAn will produce a mutation rates table for each of the aligned markers and a summary table " - "for the concatenated MSA. This operation can take long time to finish") + help="If specified, StrainPhlAn will produce a mutation rates table for each of the aligned markers" + " and a summary table for the concatenated MSA. This operation can take long time to finish") p.add_argument('--phylophlan_params', type=str, default=None, help="Additional phylophlan parameters") p.add_argument('--print_clades_only', action='store_true', default=False, help="If specified, StrainPhlAn will only print the potential clades and stop the execution") p.add_argument('--non_interactive', action='store_true', default=False, - help="If specified, StrainPhlAn will select the first SGB available when the clade is specified at the species level") + help="If specified, StrainPhlAn will select the first SGB available when the clade is specified at" + " the species level") p.add_argument('--treeshrink', action='store_true', default=False, help="If specified, StrainPhlAn will execute TreeShrink after building the tree") p.add_argument('--debug', action='store_true', default=False, help="If specified, StrainPhlAn will not remove the temporal folders") p.add_argument('-v', '--version', action='store_true', help="Shows this help message and exit") + return p.parse_args() @@ -639,12 +571,8 @@ def check_params(args): """ if args.print_clades_only and args.clade_markers: error('-m (or --clade_markers) cannot be specified together with --print_clades_only', exit=True) - elif not args.samples: - error('-s (or --samples) must be specified', exit=True) elif not args.print_clades_only and not args.clade: error('-c (or --clade) must be specified', exit=True) - elif not args.output_dir: - error('-o (or --output_dir) must be specified', exit=True) elif not os.path.exists(args.output_dir): error('The directory {} does not exist'.format(args.output_dir), exit=True) elif not (args.tmp is None) and not os.path.exists(args.tmp): @@ -663,14 +591,6 @@ def check_params(args): for r in args.references: if not os.path.exists(r): error('The reference file \"{}\" does not exist'.format(r), exit=True) - for s in args.secondary_samples: - if not os.path.exists(s): - error('The secondary input sample file \"{}\" does not exist'.format( - s), exit=True) - for r in args.secondary_references: - if not os.path.exists(r): - error('The secondary reference file \"{}\" does not exist'.format( - r), exit=True) def main(): diff --git a/metaphlan/utils/__init__.py b/metaphlan/utils/__init__.py index 3b956b4..bb3d316 100644 --- a/metaphlan/utils/__init__.py +++ b/metaphlan/utils/__init__.py @@ -1,6 +1,6 @@ from .util_fun import info, warning, error, create_folder from .parallelisation import execute_pool -from .external_exec import decompress_bz2, create_blastn_db, execute_blastn +from .external_exec import decompress_bz2 from .database_controller import MetaphlanDatabaseController from .phylophlan_controller import Phylophlan3Controller -from .consensus_markers import ConsensusMarker, ConsensusMarkers \ No newline at end of file +from .consensus_markers import ConsensusMarker, ConsensusMarkers diff --git a/metaphlan/utils/external_exec.py b/metaphlan/utils/external_exec.py index fe873c5..f847dbd 100755 --- a/metaphlan/utils/external_exec.py +++ b/metaphlan/utils/external_exec.py @@ -9,6 +9,7 @@ import os import re +import shlex import shutil import tempfile import subprocess as sb @@ -42,21 +43,6 @@ def execute(cmd): out_f.close() -def create_blastn_db(output_dir, reference): - """Creates the BLASTn database to align the reference genomes""" - reference_name = os.path.splitext(os.path.basename(reference))[0] - params = { - "program_name": "makeblastdb", - "params": "-parse_seqids -dbtype nucl", - "input": "-in", - "output": "-out", - "command_line": "#program_name# #params# #input# #output#" - } - execute(compose_command(params, input_file=reference, - output_file=os.path.join(output_dir, reference_name))) - return os.path.join(output_dir, reference_name) - - def build_bowtie2_db(input_fasta, output_database): params = { "program_name": "bowtie2-build", @@ -64,35 +50,6 @@ def build_bowtie2_db(input_fasta, output_database): "command_line": "#program_name# #params#" } execute(compose_command(params, input_file=input_fasta, output_file=output_database)) - - -def execute_blastn(output_dir, clade_markers, blastn_db, nprocs=1): - """Executes BLASTn""" - db_name = os.path.splitext(os.path.basename(blastn_db))[0] - params = { - "program_name": "blastn", - "params": "-outfmt \"6 qseqid sseqid qlen qstart qend sstart send\" -evalue 1e-10 -max_target_seqs 1", - "input": "-query", - "database": "-db", - "output": "-out", - "threads": "-num_threads", - "command_line": "#program_name# #params# #threads# #database# #input# #output#" - } - execute(compose_command(params, input_file=clade_markers, database=blastn_db, - output_file=os.path.join(output_dir, "{}.blastn".format(db_name)), nproc=nprocs)) - return os.path.join(output_dir, "{}.blastn".format(db_name)) - - -def create_phylophlan_db(output_dir, clade): - """Creates the PhyloPhlAn database""" - markers = os.path.join(output_dir, clade) - params = { - "program_name": "phylophlan_setup_database", - "params": "-d {} -e fna -t n --overwrite".format(clade), - "input": "-i", - "command_line": "#program_name# #input# #params#" - } - execute(compose_command(params, input_file=markers)) def generate_phylophlan_config_file(output_dir, configuration): @@ -108,32 +65,6 @@ def generate_phylophlan_config_file(output_dir, configuration): return conf_file -def execute_phylophlan(samples_markers_dir, conf_file, tmp_dir, output_dir, phylogeny_conf, additional_params, mutation_rates, nprocs): - """Executes PhyloPhlAn""" - cmd = f'phylophlan -i {samples_markers_dir} -o . --output_folder {output_dir} --nproc {nprocs} --strainphlan' \ - f' --{phylogeny_conf} --data_folder {tmp_dir} -t n -f {conf_file} --diversity low' \ - f' --genome_extension fna --min_num_entries 1 --min_num_markers 1' - - if additional_params is not None: - cmd += " " + additional_params - if mutation_rates: - cmd += " --mutation_rates" - - r = sb.run(cmd, capture_output=True, shell=True) - with open(os.path.join(tmp_dir, "phylophlan_log.stdout"), 'wb') as f: - f.write(r.stdout) - with open(os.path.join(tmp_dir, "phylophlan_log.stderr"), 'wb') as f: - f.write(r.stderr) - if r.returncode != 0: - error("Error executing PhyloPhlAn") - info('== stdout ==') - print(r.stdout.decode()) - info('== stderr ==') - print(r.stderr.decode()) - info('exiting') - exit(1) - - def execute_treeshrink(input_tree, output_dir, tmp=None, centroid=False, q_value=0.05, m_value='all-genes'): """Executes TreeShrink""" advanced_string = '' @@ -229,21 +160,14 @@ def generate_markers_fasta(database, output_dir): def compose_command(params, check=False, input_file=None, database=None, output_path=None, output_file=None, nproc=1): """Compose a command for further executions. Copied from the PhyloPhlAn 3 code""" - program_name = None stdin = None stdout = None environment = os.environ.copy() r_output_path = None r_output_file = None command_line = params['command_line'] - - if 'program_name' in list(params): - command_line = command_line.replace( - '#program_name#', params['program_name']) - program_name = params['program_name'] - else: - error('Error: something wrong... ' + - program_name+' not found!', exit=True) + program_name = params['program_name'] + command_line = command_line.replace('#program_name#', program_name) if check: command_line = program_name @@ -312,3 +236,38 @@ def compose_command(params, check=False, input_file=None, database=None, output_ return {'command_line': [str(a).replace('#', ' ') for a in re.sub(' +', ' ', command_line.replace('"', '')).split(' ') if a], 'stdin': stdin, 'stdout': stdout, 'env': environment, 'output_path': r_output_path, 'output_file': r_output_file} + + +def run_command(cmd, shell=False, **kwargs): + """ + Runs a command and checks for exit code + + Args: + cmd (str): A command to run + shell (bool): Whether to invoke shell + **kwargs: additional arguments passed to subprocess.run function + + Returns: + + """ + if not shell: + cmd_s = shlex.split(cmd) + else: + cmd_s = cmd + + r = sb.run(cmd_s, capture_output=True, **kwargs) + + if r.returncode != 0: + stdout = r.stdout + stderr = r.stderr + if 'text' not in kwargs or not kwargs['text']: + stdout = stdout.decode() + stderr = stderr.decode() + error('Execution failed for command', cmd) + print('stdout: ') + print(stdout) + print('stderr: ') + print(stderr) + error('Exiting', exit=True) + + return r diff --git a/metaphlan/utils/parallelisation.py b/metaphlan/utils/parallelisation.py index 696b30e..0fcfef0 100755 --- a/metaphlan/utils/parallelisation.py +++ b/metaphlan/utils/parallelisation.py @@ -29,11 +29,13 @@ def init_terminating(terminating_): global terminating terminating = terminating_ + def parallel_execution(arguments): - """Executes each parallelised call of a function + """ + Executes each parallelized call of a function Args: - arguments ((Callable, *Any)): tuple with the function and its arguments + arguments (Tuple[Callable, *Any]): tuple with the function and its arguments Returns: function: the call to the function @@ -50,7 +52,8 @@ def parallel_execution(arguments): def execute_pool(args, nprocs): - """Creates a pool for a parallelised function and returns the results of each execution as a list + """ + Creates a pool for a parallelized function and returns the results of each execution as a list Args: args (Iterable[tuple]): tuple with the function and its arguments @@ -60,7 +63,7 @@ def execute_pool(args, nprocs): list: the list with the results of the parallel executions """ args = list(args) - if nprocs == 1 or len(args) == 1: # no need to initialize pool + if nprocs == 1 or len(args) <= 1: # no need to initialize pool return [function(*a) for function, *a in args] else: terminating = Event() diff --git a/metaphlan/utils/phylophlan_controller.py b/metaphlan/utils/phylophlan_controller.py index 0b7689f..41750bd 100644 --- a/metaphlan/utils/phylophlan_controller.py +++ b/metaphlan/utils/phylophlan_controller.py @@ -1,39 +1,42 @@ -__author__ = 'Aitor Blanco Miguez (aitor.blancomiguez@unitn.it' +__author__ = 'Aitor Blanco Miguez (aitor.blancomiguez@unitn.it),' \ + 'Michal Puncochar (michal.puncochar@unitn.it)' __version__ = '4.1.0' __date__ = '23 Aug 2023' import abc import os -import bz2 -from shutil import move, copy +from shutil import move + try: from .util_fun import info, error - from .external_exec import generate_phylophlan_config_file, create_phylophlan_db, execute_phylophlan, execute_treeshrink + from .external_exec import generate_phylophlan_config_file, execute_treeshrink, run_command except ImportError: from util_fun import info, error - from external_exec import generate_phylophlan_config_file, create_phylophlan_db, execute_phylophlan, execute_treeshrink + from external_exec import generate_phylophlan_config_file, execute_treeshrink, run_command -class PhylophlanController: +class PhylophlanController(abc.ABC): """PhyloPhlAnController interface class""" @abc.abstractmethod - def compute_phylogeny(self): + def compute_phylogeny(self, *args, **kwargs): """Executes PhyloPhlAn to compute phylogeny""" pass - def get_phylophlan_configuration(self): + @staticmethod + def get_phylophlan_configuration(): """Gets PhyloPhlAn configuration Returns: dict: The dictionary with the PhyloPhlAn configuration """ - configuration = {} - configuration['map'] = 'blastn' - configuration['aligner'] = 'mafft' - configuration['trim'] = 'trimal' - configuration['tree1'] = 'raxml' + configuration = { + 'map': 'blastn', + 'aligner': 'mafft', + 'trim': 'trimal', + 'tree1': 'raxml' + } return configuration @@ -54,9 +57,8 @@ def compute_phylogeny(self, samples_markers_dir, num_samples, tmp_dir): self.phylophlan_configuration = generate_phylophlan_config_file( tmp_dir, self.get_phylophlan_configuration()) info("\tDone.") - info("\tProcessing samples...") - execute_phylophlan(samples_markers_dir, self.phylophlan_configuration, tmp_dir, self.output_dir, - self.phylophlan_mode, self.phylophlan_params, self.mutation_rates, self.nprocs) + info("\tExecuting PhyloPhlAn...") + self.execute_phylophlan(samples_markers_dir, tmp_dir) if self.mutation_rates: move(os.path.join(self.output_dir, "mutation_rates.tsv"), os.path.join( self.output_dir, "{}.mutation".format(self.clade))) @@ -64,18 +66,46 @@ def compute_phylogeny(self, samples_markers_dir, num_samples, tmp_dir): self.output_dir, "{}_mutation_rates".format(self.clade))) info("\tDone.") if self.treeshrink: - info("Executing TreeShrink...") + info("\tExecuting TreeShrink...") execute_treeshrink(os.path.join(self.output_dir, 'RAxML_bestTree.{}.StrainPhlAn4.tre'.format( self.clade)), self.output_dir, tmp=tmp_dir, centroid=True if num_samples >= 100 else False) - info("Done.") - info('This StrainPhlAn analysis ran TreeShrink, do not forget to cite:') - info('Mai, Uyen, and Siavash Mirarab. 2018. “TreeShrink: Fast and Accurate Detection of Outlier Long Branches in Collections of Phylogenetic Trees.” BMC Genomics 19 (S5): 272. https://doi.org/10.1186/s12864-018-4620-2.') + info("\tDone.") + info('\tThis StrainPhlAn analysis ran TreeShrink, do not forget to cite:') + info('\tMai, Uyen, and Siavash Mirarab. 2018. “TreeShrink: Fast and Accurate Detection of Outlier Long' + ' Branches in Collections of Phylogenetic Trees.” BMC Genomics 19 (S5): 272.' + ' https://doi.org/10.1186/s12864-018-4620-2.') + + + def execute_phylophlan(self, samples_markers_dir, tmp_dir): + """ + Executes PhyloPhlAn + + Args: + samples_markers_dir: + tmp_dir: + + Returns: + + """ + cmd = f'phylophlan' \ + f' -i {samples_markers_dir} -o . --output_folder {self.output_dir} --nproc {self.nprocs}' \ + f' --strainphlan --{self.phylophlan_mode} --data_folder {tmp_dir} -f {self.phylophlan_configuration}' \ + f' -t n --diversity low --genome_extension fna --min_num_entries 1 --min_num_markers 1' + + if self.phylophlan_params is not None: + cmd += " " + self.phylophlan_params + if self.mutation_rates: + cmd += " --mutation_rates" + + r = run_command(cmd) + with open(os.path.join(tmp_dir, "phylophlan_log.stdout"), 'wb') as f: + f.write(r.stdout) + with open(os.path.join(tmp_dir, "phylophlan_log.stderr"), 'wb') as f: + f.write(r.stderr) + def __init__(self, args): self.samples = args.samples - self.secondary_samples = args.secondary_samples - self.marker_in_n_samples = args.marker_in_n_samples - self.abs_n_samples_thres = args.abs_n_samples_thres self.phylophlan_mode = args.phylophlan_mode self.phylophlan_params = args.phylophlan_params self.phylophlan_configuration = args.phylophlan_configuration