From 619bcd037b331e91c710da22faa4a4cdc0a33be8 Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Mon, 5 Jun 2023 15:00:52 -0700 Subject: [PATCH] move blast checks into the main intact.py file --- intact/blastit.py | 135 ----------------------------------------- intact/intact.py | 120 ++++++++++++++++++++++++++++++++++++ tests/test_nonhiv.py | 4 +- tests/test_scramble.py | 12 ++-- 4 files changed, 128 insertions(+), 143 deletions(-) delete mode 100644 intact/blastit.py diff --git a/intact/blastit.py b/intact/blastit.py deleted file mode 100644 index c11057f..0000000 --- a/intact/blastit.py +++ /dev/null @@ -1,135 +0,0 @@ - -import os -import sys -import subprocess -import tempfile -import csv -from dataclasses import dataclass - -import util.subtypes as st -import util.wrappers as wrappers - -@dataclass -class BlastRow: - qseqid: str - qlen: int - sseqid: str - sgi: str - slen: int - qstart: int - qend: int - sstart: int - send: int - evalue: float - bitscore: float - length: int - pident: float - nident: float - btop: int - stitle: str - sstrand: str - - -def init_blast_row(row): - it = iter(row) - return BlastRow( - qseqid=next(it), - qlen=int(next(it)), - sseqid=next(it), - sgi=next(it), - slen=int(next(it)), - qstart=int(next(it)), - qend=int(next(it)), - sstart=int(next(it)), - send=int(next(it)), - evalue=float(next(it)), - bitscore=float(next(it)), - length=int(next(it)), - pident=float(next(it)), - nident=float(next(it)), - btop=next(it), - stitle=next(it), - sstrand=next(it), - ) - - -def iterate_values_from_tsv(file_path): - with open(file_path, 'r') as tsv_file: - reader = csv.reader(tsv_file, delimiter='\t') - for row in reader: - yield row - - -def iterate_blast_rows_from_tsv(file_path): - previous_key = None - values = [] - - for row in iterate_values_from_tsv(file_path): - key = row[0] - typed = init_blast_row(row) - - if key != previous_key and previous_key is not None: - yield values - values = [] - - values.append(typed) - previous_key = key - - if values: - yield values - - -def blast_interate(subtype, input_file): - with tempfile.NamedTemporaryFile() as output_file: - db_file = st.alignment_file(subtype) - wrappers.blast(db_file, input_file, output_file.name) - for seq in iterate_blast_rows_from_tsv(output_file.name): - yield seq - - -def is_sorted(lst): - last = None - for x in lst: - if last is not None and x < last: - return False - last = x - return True - - -def check_scramble(blast_rows): - # HIV 5' region can easily map to its 3' region because they are identical. - # Such a maping would not constitute a scramble, so we ignore the 5' region for this check. - ignored_5_prime = [x for x in blast_rows if x.sstart > 622 and x.send > 622] - - if not ignored_5_prime: - # No alignment. - # It should be an error normally, yet not a scramble error. - return None - - all_same = len(set(x.sstrand for x in ignored_5_prime)) == 1 - if not all_same: - # Some parts of the sequence were aligned - # in forward direction (plus) - # and some in reverse (minus). - # This indicates an internal inversion. - return "mix" - - ignored_5_prime.sort(key=lambda x: x.qstart) - direction = ignored_5_prime[0].sstrand - if direction == "plus" and is_sorted(x.sstart for x in ignored_5_prime): - return None - elif direction == "minus" and is_sorted(x.send for x in reversed(ignored_5_prime)): - return None - else: - return direction + "Scramble" - - -def check_nonhiv(blast_rows): - aligned_length = sum(abs(x.qend - x.qstart) + 1 for x in blast_rows) - total_length = blast_rows[0].qlen if blast_rows else 1 - ratio = aligned_length / total_length - - if ratio < 0.8: - return "CHIMERA" - else: - return None diff --git a/intact/intact.py b/intact/intact.py index a103f26..f693b16 100644 --- a/intact/intact.py +++ b/intact/intact.py @@ -5,6 +5,9 @@ import subprocess import sys import uuid +import tempfile +import csv +from dataclasses import dataclass from Bio import AlignIO, Seq, SeqIO, SeqRecord from scipy.stats import fisher_exact @@ -57,6 +60,123 @@ def __init__(self, start, end, deleted_count, inserted_count): self.deleted_count = deleted_count self.inserted_count = inserted_count +@dataclass +class BlastRow: + qseqid: str + qlen: int + sseqid: str + sgi: str + slen: int + qstart: int + qend: int + sstart: int + send: int + evalue: float + bitscore: float + length: int + pident: float + nident: float + btop: int + stitle: str + sstrand: str + + +def init_blast_row(row): + it = iter(row) + return BlastRow( + qseqid=next(it), + qlen=int(next(it)), + sseqid=next(it), + sgi=next(it), + slen=int(next(it)), + qstart=int(next(it)), + qend=int(next(it)), + sstart=int(next(it)), + send=int(next(it)), + evalue=float(next(it)), + bitscore=float(next(it)), + length=int(next(it)), + pident=float(next(it)), + nident=float(next(it)), + btop=next(it), + stitle=next(it), + sstrand=next(it), + ) + +def iterate_blast_rows_from_tsv(file_path): + previous_key = None + values = [] + + for row in iterate_values_from_tsv(file_path): + key = row[0] + typed = init_blast_row(row) + + if key != previous_key and previous_key is not None: + yield values + values = [] + + values.append(typed) + previous_key = key + + if values: + yield values + + +def blast_interate(subtype, input_file): + with tempfile.NamedTemporaryFile() as output_file: + db_file = st.alignment_file(subtype) + wrappers.blast(db_file, input_file, output_file.name) + for seq in iterate_blast_rows_from_tsv(output_file.name): + yield seq + + +def is_sorted(lst): + last = None + for x in lst: + if last is not None and x < last: + return False + last = x + return True + + +def check_scramble(blast_rows): + # HIV 5' region can easily map to its 3' region because they are identical. + # Such a maping would not constitute a scramble, so we ignore the 5' region for this check. + ignored_5_prime = [x for x in blast_rows if x.sstart > 622 and x.send > 622] + + if not ignored_5_prime: + # No alignment. + # It should be an error normally, yet not a scramble error. + return None + + all_same = len(set(x.sstrand for x in ignored_5_prime)) == 1 + if not all_same: + # Some parts of the sequence were aligned + # in forward direction (plus) + # and some in reverse (minus). + # This indicates an internal inversion. + return "mix" + + ignored_5_prime.sort(key=lambda x: x.qstart) + direction = ignored_5_prime[0].sstrand + if direction == "plus" and is_sorted(x.sstart for x in ignored_5_prime): + return None + elif direction == "minus" and is_sorted(x.send for x in reversed(ignored_5_prime)): + return None + else: + return direction + "Scramble" + + +def check_nonhiv(blast_rows): + aligned_length = sum(abs(x.qend - x.qstart) + 1 for x in blast_rows) + total_length = blast_rows[0].qlen if blast_rows else 1 + ratio = aligned_length / total_length + + if ratio < 0.8: + return "CHIMERA" + else: + return None + def _getPositions(pattern, string): """ Hidden function used to get interator of all intances of pattern in string diff --git a/tests/test_nonhiv.py b/tests/test_nonhiv.py index 8113dab..9f83856 100644 --- a/tests/test_nonhiv.py +++ b/tests/test_nonhiv.py @@ -2,8 +2,8 @@ import pytest import os -import intact.blastit as blastit -from intact.blastit import check_scramble, check_nonhiv +import intact.intact as intact +from intact.intact import check_scramble, check_nonhiv class BlastRow: def __init__(self, qstart, qend, qlen): diff --git a/tests/test_scramble.py b/tests/test_scramble.py index d94cfe5..9039f32 100644 --- a/tests/test_scramble.py +++ b/tests/test_scramble.py @@ -2,8 +2,8 @@ import pytest import os -import intact.blastit as blastit -from intact.blastit import check_scramble, check_nonhiv +import intact.intact as intact +from intact.intact import check_scramble, check_nonhiv @pytest.mark.parametrize("lst, expected", [ ([1, 2, 3, 4, 5], True), @@ -13,24 +13,24 @@ ([1, 1, 2, 2, 3, 3], True), ]) def test_is_sorted(lst, expected): - assert blastit.is_sorted(lst) == expected + assert intact.is_sorted(lst) == expected # def test_blast(tmp_path): # input_file = "tests/data-large.fasta" # output_file = os.path.join(tmp_path, "out.tsv") -# # blastit.blast( +# # intact.blast( # # subtype="B", # # input_file=input_file, # # output_file=output_file, # # ) -# it = blastit.blast_interate(subtype="B", input_file=input_file) +# it = intact.blast_interate(subtype="B", input_file=input_file) # for v in it: # print("\n\n\n") # print("---------------------------") # print(v[0].qseqid) -# print(blastit.check_scramble(v)) +# print(intact.check_scramble(v)) # assert False