Skip to content


Better blast params, fixing bug affecting polymorphic rates and print…
Browse files Browse the repository at this point in the history
… clades only
  • Loading branch information
PuncocharM committed Dec 29, 2023
1 parent 2928fb5 commit de48935
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 21 deletions.
89 changes: 72 additions & 17 deletions metaphlan/
Original file line number Diff line number Diff line change
Expand Up @@ -291,43 +291,81 @@ def extract_markers_from_genome(cls, reference_file, clade_markers_file):
input_seqs = { seq for seq in SeqIO.parse(io.StringIO(input_file_data), 'fasta')}

# we need the additional btop column
columns = 'qseqid sseqid pident length mismatch gapopen qstart qend qlen sstart send evalue bitscore btop'
cmd = f'blastn -query {clade_markers_file} -subject - -num_threads {1} -outfmt "6 {columns}"'
columns = 'qseqid sseqid pident length mismatch gapopen qstart qend qlen sstart send sstrand evalue bitscore btop'
blast_params = '-task megablast ' \
'-word_size 28 ' \
'-reward 1 -penalty -3 ' \
'-gapopen 5 -gapextend 2 ' \
'-perc_identity 90 -qcov_hsp_perc 10'
cmd = f'blastn -query {clade_markers_file} -subject - {blast_params} -num_threads {1} -outfmt "6 {columns}"'

# run blastn and pass the raw data to stdin
r = run_command(cmd, input=input_file_data, text=True)

# load the blast output
df = pd.read_csv(io.StringIO(r.stdout), sep='\t', names=columns.split(' '), dtype={'btop': str})
# df['qcov'] = (df['qend'] - df['qstart'] + 1) / df['qlen']
# df = df.query('qcov >= .1 and pident >= 90').copy()
df = df.sort_values('bitscore', ascending=False)

df['ext_s'] = ''
ext_ss = []
for idx, row in df.iterrows():
sseq = str(input_seqs[row['sseqid']].seq)
ext_s = cls.extract_with_btop(sseq, **row[['btop', 'qstart', 'qend', 'qlen', 'sstart', 'send']])
df.loc[idx, 'ext_s'] = ext_s
df['ext_s'] = ext_ss

def segments_overlap(sa, sb):
"""Intervals are closed, i.e. s = [s0, s1]"""
if sa[0] > sb[1] or sb[0] > sa[1]:
return False
return True

def segments_intersection(sa, sb):
return max(sa[0], sb[0]), min(sa[1], sb[1])

def segment_difference(sa, sb):
si = segments_intersection(sa, sb)
if si[0] > si[1]:
return sa
return (sa[0], sb[0] - 1) if sa[0] < sb[0] else (sb[1] + 1, sa[1])

ext_markers = {}
for query, idx in df.groupby('qseqid').groups.items(): # for each query/marker
df_query = df.loc[idx]
best_ref = df_query.iloc[0]['sseqid'] # take the best hit and consider only that contig/reference
df_query = df_query.query(f'sseqid=="{best_ref}"')

# take the best hit and consider only that contig/reference and that strand
best_ref = df_query.iloc[0]['sseqid']
best_strand = df_query.iloc[0]['sstrand']
df_query = df_query.query(f'sseqid=="{best_ref}" and sstrand=="{best_strand}"')

covered_regions = []
ext_s = ['-'] * df_query.iloc[0]['qlen']
for _, row in df_query.iterrows():
reg = (row['qstart'], row['qend'])
if any(segments_overlap(reg, cr) for cr in covered_regions):

# trim by already covered regions
for cr in covered_regions:
reg = segment_difference(reg, cr)

if reg[0] > reg[1]: # nothing left of the region

# non-overlaping hit => expand
assert all('-' in [a, b] for a, b in zip(ext_s, row['ext_s'])) # make sure they really don't overlap
ext_s = [a if b == '-' else b for a, b in zip(ext_s, row['ext_s'])]

for i, b in enumerate(row['ext_s']):
if i + 1 < reg[0] or i + 1 > reg[1]:
assert ext_s[i] == '-' # make sure the trimming works
ext_s[i] = b

# if any(segments_overlap(reg, cr) for cr in covered_regions):
# continue
# # non-overlaping hit => expand
# covered_regions.append(reg)
# assert all('-' in [a, b] for a, b in zip(ext_s, row['ext_s'])) # make sure they really don't overlap
# ext_s = [a if b == '-' else b for a, b in zip(ext_s, row['ext_s'])]

ext_markers[query] = ''.join(ext_s)

Expand All @@ -338,10 +376,10 @@ def calculate_polymorphic_rates(self):
"""Generates a file with the polymorphic rates of the species for each sample"""
rows = []
consensus_markers = execute_pool(((ConsensusMarkers.from_file, sample_path) for sample_path in self.samples),
nprocs=self.nprocs, return_generator=True)
for sample_path, sample in zip(self.samples, consensus_markers):
nprocs=self.nprocs, return_generator=True, ordered=True)
for sample_path, cm in zip(self.samples, consensus_markers):
p_stats, p_count, m_len = [], 0, 0
for marker in sample.consensus_markers:
for marker in cm.consensus_markers:
if in self.clade_markers_names:
p_count += marker.get_polymorphisms()
m_len += marker.get_sequence_length()
Expand Down Expand Up @@ -404,7 +442,7 @@ def detect_clades(self):
clades_to_check = set()
info('Processing samples...')
consensus_markers = execute_pool(((ConsensusMarkers.from_file, sample_path) for sample_path in self.samples),
nprocs=self.nprocs, return_generator=True)
nprocs=self.nprocs, return_generator=True, ordered=True)
for sample_path, cm in zip(self.samples, consensus_markers):
markers = [ for marker in cm.consensus_markers
if ( in markers2clade and marker.breadth >= self.breadth_thres)]
Expand Down Expand Up @@ -537,12 +575,29 @@ def run_strainphlan(self):

def get_input_samples(args_samples):
samples = []
for s in args_samples:
if os.path.isfile(s):
elif os.path.isdir(s):
dir_files = [os.path.join(s, f) for f in os.listdir(s)]
samples.extend([f for f in dir_files if os.path.isfile(f)])
elif not os.path.exists(s):
error(f'Sample file/folder {s} does not exist', exit=True)
error(f'Neither file nor directory: {s}', exit=True)

return samples

def __init__(self, args):
self.clade_markers_names = None
self.database_controller = MetaphlanDatabaseController(args.database)
self.clade_markers_file = args.clade_markers
self.samples = args.samples
self.references = args.references
self.samples = Strainphlan.get_input_samples(args.samples)
self.references = Strainphlan.get_input_samples(args.references)
self.clade = args.clade
self.output_dir = args.output_dir
self.trim_sequences = args.trim_sequences
Expand Down Expand Up @@ -659,7 +714,7 @@ def check_params(args):
sample_names = [Strainphlan.sample_path_to_name(s) for s in args.samples]
ref_names = [Strainphlan.sample_path_to_name(s) for s in args.references]
if len(sample_names) + len(ref_names) != len(set(sample_names + ref_names)):
error('Some sample or reference names are duplicated')
error('Some sample or reference names are duplicated', exit=True)

def main():
Expand Down
14 changes: 10 additions & 4 deletions metaphlan/utils/
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,31 @@ def iterator_shorter_than(i, ln):
return False

def execute_pool_iter(args, nprocs):
def execute_pool_iter(args, nprocs, ordered):
terminating = Event()
with Pool(initializer=init_terminating, initargs=(terminating,), processes=nprocs) as pool:
for r in pool.imap_unordered(parallel_execution, args, chunksize=CHUNKSIZE):
if ordered:
f = pool.imap
f = pool.imap_unordered

for r in f(parallel_execution, args, chunksize=CHUNKSIZE):
yield r
except Exception as e:
error('Parallel execution fails: {}'.format(e), exit=False)
raise e

def execute_pool(args, nprocs, return_generator=False):
def execute_pool(args, nprocs, return_generator=False, ordered=False):
Creates a pool for a parallelized function and returns the results of each execution as a list
args (Iterable[tuple]): tuple with the function and its arguments
nprocs (int): number of procs to use
return_generator (bool): Whether to return a non-blocking generator instead of list
ordered (bool): Whether the returning results should be in the same order as the input arguments
list: the list with the results of the parallel executions
Expand All @@ -88,7 +94,7 @@ def execute_pool(args, nprocs, return_generator=False):
if nprocs == 1 or iterator_shorter_than(args_tmp, 2): # no need to initialize pool
gen = (function(*a) for function, *a in args)
gen = execute_pool_iter(args, nprocs)
gen = execute_pool_iter(args, nprocs, ordered)

if return_generator:
return gen
Expand Down

0 comments on commit de48935

Please sign in to comment.