Skip to content

Commit

Permalink
add blast checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Donaim committed Jun 2, 2023
1 parent 2f8c44d commit e77222b
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 0 deletions.
151 changes: 151 additions & 0 deletions intact/blastit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@

import os
import sys
import subprocess
import tempfile
import csv
from dataclasses import dataclass

import util.subtypes as st


@dataclass
class BlastRow:
qseqid: str
qlen: int
sseqid: str
sgi: str
slen: int
qstart: int
qend: int
sstart: int
send: int
evalue: float
bitscore: float
length: int
pident: float
nident: float
btop: int
stitle: str
sstrand: str


def blast(subtype, input_file, output_file):
db_file = st.alignment_file(subtype)

subprocess.call(
["blastn",
"-query", input_file,
"-db", db_file,
"-num_alignments", "1",
"-reward", "1",
"-penalty", "-1",
"-gapopen", "2",
"-gapextend", "1",
"-out", output_file,
"-outfmt", "6 qseqid qlen sseqid sgi slen qstart qend sstart send evalue bitscore length pident nident btop stitle sstrand"],
shell=False)


def init_blast_row(row):
it = iter(row)
return BlastRow(
qseqid=next(it),
qlen=int(next(it)),
sseqid=next(it),
sgi=next(it),
slen=int(next(it)),
qstart=int(next(it)),
qend=int(next(it)),
sstart=int(next(it)),
send=int(next(it)),
evalue=float(next(it)),
bitscore=float(next(it)),
length=int(next(it)),
pident=float(next(it)),
nident=float(next(it)),
btop=next(it),
stitle=next(it),
sstrand=next(it),
)


def iterate_values_from_tsv(file_path):
with open(file_path, 'r') as tsv_file:
reader = csv.reader(tsv_file, delimiter='\t')
for row in reader:
yield row


def iterate_blast_rows_from_tsv(file_path):
previous_key = None
values = []

for row in iterate_values_from_tsv(file_path):
key = row[0]
typed = init_blast_row(row)

if key != previous_key and previous_key is not None:
yield values
values = []

values.append(typed)
previous_key = key

if values:
yield values


def blast_interate(subtype, input_file):
with tempfile.NamedTemporaryFile() as output_file:
blast(subtype, input_file, output_file.name)
for seq in iterate_blast_rows_from_tsv(output_file.name):
yield seq


def is_sorted(lst):
last = None
for x in lst:
if last is not None and x < last:
return False
last = x
return True


def check_scramble(blast_rows):
# HIV 5' region can easily map to its 3' region because they are identical.
# Such a maping would not constitute a scramble, so we ignore the 5' region for this check.
ignored_5_prime = [x for x in blast_rows if x.sstart > 622 and x.send > 622]

if not ignored_5_prime:
# No alignment.
# It should be an error normally, yet not a scramble error.
return None

all_same = len(set(x.sstrand for x in ignored_5_prime)) == 1
if not all_same:
# Some parts of the sequence were aligned
# in forward direction (plus)
# and some in reverse (minus).
# This indicates an internal inversion.
return "mix"

ignored_5_prime.sort(key=lambda x: x.qstart)
direction = ignored_5_prime[0].sstrand
if direction == "plus" and is_sorted(x.sstart for x in ignored_5_prime):
return None
elif direction == "minus" and is_sorted(x.send for x in reversed(ignored_5_prime)):
return None
else:
return direction + "Scramble"


def check_nonhiv(blast_rows):
aligned_length = sum(abs(x.qend - x.qstart) + 1 for x in blast_rows)
total_length = blast_rows[0].qlen if blast_rows else 1
ratio = aligned_length / total_length

if ratio < 0.8:
return "CHIMERA"
else:
return None
49 changes: 49 additions & 0 deletions tests/test_nonhiv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

import pytest
import os

import intact.blastit as blastit
from intact.blastit import check_scramble, check_nonhiv

class BlastRow:
def __init__(self, qstart, qend, qlen):
self.qstart = qstart
self.qend = qend
self.qlen = qlen


def test_check_nonhiv_empty_blast_rows():
blast_rows = []
assert check_nonhiv(blast_rows) == "CHIMERA"


def test_check_nonhiv_low_ratio():
blast_rows = [
BlastRow(0, 100, 1000),
BlastRow(200, 300, 1000),
BlastRow(400, 500, 1000)
]
assert check_nonhiv(blast_rows) == "CHIMERA"


def test_check_nonhiv_high_ratio():
blast_rows = [
BlastRow(0, 200, 1000),
BlastRow(200, 400, 1000),
BlastRow(400, 900, 1000)
]
assert check_nonhiv(blast_rows) is None


def test_check_nonhiv_high_ratio_reversed():
blast_rows = [
BlastRow(0, 200, 1000),
BlastRow(200, 400, 1000),
BlastRow(900, 400, 1000)
]
assert check_nonhiv(blast_rows) is None


def test_check_nonhiv_single_blast_row():
blast_rows = [BlastRow(0, 200, 1000)]
assert check_nonhiv(blast_rows) == "CHIMERA"
134 changes: 134 additions & 0 deletions tests/test_scramble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@

import pytest
import os

import intact.blastit as blastit
from intact.blastit import check_scramble, check_nonhiv

@pytest.mark.parametrize("lst, expected", [
([1, 2, 3, 4, 5], True),
([1, 3, 2, 4, 5], False),
([], True),
([5], True),
([1, 1, 2, 2, 3, 3], True),
])
def test_is_sorted(lst, expected):
assert blastit.is_sorted(lst) == expected

# def test_blast(tmp_path):
# input_file = "tests/data-large.fasta"
# output_file = os.path.join(tmp_path, "out.tsv")

# # blastit.blast(
# # subtype="B",
# # input_file=input_file,
# # output_file=output_file,
# # )

# it = blastit.blast_interate(subtype="B", input_file=input_file)
# for v in it:
# print("\n\n\n")
# print("---------------------------")
# print(v[0].qseqid)
# print(blastit.check_scramble(v))

# assert False

class BlastRow:
def __init__(self, sstart, send, qstart, sstrand):
self.sstart = sstart
self.send = send
self.qstart = qstart
self.sstrand = sstrand

def test_check_scramble_no_alignment():
# Test case where there is no alignment
blast_rows = []
assert check_scramble(blast_rows) is None

def test_check_scramble_internal_inversion():
# Test case where some parts of the sequence are aligned in forward direction
# and some in reverse, indicating an internal inversion error.
blast_rows = [
BlastRow(sstart=900, send=910, qstart=5, sstrand="plus"),
BlastRow(sstart=930, send=940, qstart=25, sstrand="minus"),
]
assert check_scramble(blast_rows) == "mix"

def test_check_scramble_plus_strand_sorted():
# Test case where the direction is "plus" and the sstart values are sorted.
blast_rows = [
BlastRow(sstart=900, send=910, qstart=5, sstrand="plus"),
BlastRow(sstart=930, send=940, qstart=25, sstrand="plus"),
]
assert check_scramble(blast_rows) is None

def test_check_scramble_minus_strand_sorted():
# Test case where the direction is "minus" and the send values are sorted in reverse order.
blast_rows = [
BlastRow(sstart=910, send=900, qstart=25, sstrand="minus"),
BlastRow(sstart=940, send=930, qstart=5, sstrand="minus"),
]
assert check_scramble(blast_rows) is None

def test_check_scramble_plus_strand_unsorted():
# Test case with mixed directions and inversions
blast_rows = [
BlastRow(sstart=700, send=700, qstart=99, sstrand="plus"),
BlastRow(sstart=880, send=890, qstart=25, sstrand="plus"),
BlastRow(sstart=920, send=930, qstart=45, sstrand="plus"),
BlastRow(sstart=950, send=980, qstart=85, sstrand="plus"),
]
assert check_scramble(blast_rows) == "plusScramble"

def test_check_scramble_plus_strand_unsorted_5prime():
# Test case with mixed directions and inversions
blast_rows = [
BlastRow(sstart=100, send=110, qstart=99, sstrand="plus"),
BlastRow(sstart=880, send=890, qstart=25, sstrand="plus"),
BlastRow(sstart=920, send=930, qstart=45, sstrand="plus"),
BlastRow(sstart=950, send=980, qstart=85, sstrand="plus"),
]
assert check_scramble(blast_rows) == None

def test_check_scramble_minus_strand_unsorted():
# Test case where the direction is "minus" but the send values are not sorted in reverse order.
blast_rows = [
BlastRow(sstart=910, send=900, qstart=5, sstrand="minus"),
BlastRow(sstart=940, send=930, qstart=25, sstrand="minus"),
]
assert check_scramble(blast_rows) == "minusScramble"

def test_check_scramble_mixed_direction():
# Test case where some rows have "plus" direction and some have "minus" direction.
blast_rows = [
BlastRow(sstart=900, send=910, qstart=5, sstrand="plus"),
BlastRow(sstart=930, send=940, qstart=25, sstrand="minus"),
BlastRow(sstart=950, send=990, qstart=45, sstrand="minus"),
]
assert check_scramble(blast_rows) == "mix"

def test_check_scramble_single_row_plus_strand():
# Test case with a single row aligned in the "plus" strand
blast_rows = [
BlastRow(sstart=700, send=710, qstart=5, sstrand="plus"),
]
assert check_scramble(blast_rows) is None

def test_check_scramble_single_row_minus_strand():
# Test case with a single row aligned in the "minus" strand
blast_rows = [
BlastRow(sstart=900, send=910, qstart=5, sstrand="minus"),
]
assert check_scramble(blast_rows) is None

def test_check_scramble_multiple_directions_and_inversions():
# Test case with mixed directions and inversions
blast_rows = [
BlastRow(sstart=900, send=910, qstart=5, sstrand="plus"),
BlastRow(sstart=880, send=890, qstart=25, sstrand="minus"),
BlastRow(sstart=920, send=930, qstart=45, sstrand="minus"),
BlastRow(sstart=850, send=880, qstart=85, sstrand="plus"),
BlastRow(sstart=890, send=880, qstart=85, sstrand="minus"),
]
assert check_scramble(blast_rows) == "mix"

0 comments on commit e77222b

Please sign in to comment.