diff --git a/intact/intact.py b/intact/intact.py index ece17af..37358d4 100644 --- a/intact/intact.py +++ b/intact/intact.py @@ -6,7 +6,7 @@ from dataclasses import dataclass from collections import Counter import Bio -from Bio import AlignIO, Seq, SeqIO, SeqRecord +from Bio import Seq, SeqIO, SeqRecord from scipy.stats import fisher_exact import util.constants as const @@ -15,6 +15,7 @@ import util.log as log import util.coordinates as coords import util.detailed_aligner as detailed_aligner +from util.aligned_sequence import AlignedSequence, ReferenceIndex from util.blastrow import BlastRow @@ -44,6 +45,7 @@ class IntactnessError: error: str message: str + @dataclass class ExpectedORF: name: str @@ -54,18 +56,19 @@ class ExpectedORF: aminoacids: str protein: str + @staticmethod - def subtyped(reference, pos_mapping, name, start, end, deletion_tolerence): + def subtyped(aligned_sequence, name, start, end, deletion_tolerence): vpr_defective_insertion_pos = 5772 start = start if start < vpr_defective_insertion_pos else start - 1 end = end if end < vpr_defective_insertion_pos else end - 1 - start_s = pos_mapping[start - 1] - end_s = pos_mapping[end] + start_s = aligned_sequence.map_index(start - 1) # decrement is needed because original "start" is 1-based. + end_s = aligned_sequence.map_index(end) - nucleotides = str(reference.seq[start_s:end_s]) + nucleotides = str(aligned_sequence.this.seq[start_s:end_s]) aminoacids = translate(nucleotides) - has_start_codon = translate(reference.seq[(start - 1):end]).startswith("M") + has_start_codon = translate(aligned_sequence.this.seq[(start - 1):end]).startswith("M") protein = get_biggest_protein(has_start_codon, aminoacids) return ExpectedORF(name=name, @@ -77,6 +80,7 @@ def subtyped(reference, pos_mapping, name, start, end, deletion_tolerence): protein=protein, ) + @dataclass class CandidateORF: name: str @@ -89,6 +93,7 @@ class CandidateORF: protein: str aminoacids: str + @dataclass class FoundORF: name: str @@ -102,6 +107,7 @@ class FoundORF: aminoacids: str nucleotides: str + @dataclass class HolisticInfo: qlen: int = dataclasses.field(default=None) @@ -837,21 +843,23 @@ def intact( working_dir, reference_name = sorted(subtype_choices.keys())[0] reference = subtype_choices[reference_name] - hxb2_alignment = wrappers.mafft([hxb2_reference, reference]) - pos_mapping = coords.map_positions(hxb2_alignment[0], hxb2_alignment[1]) + aligned_subtype = AlignedSequence(this=reference, reference=hxb2_reference) + + # hxb2_alignment = wrappers.mafft([hxb2_reference, reference]) + # pos_mapping = coords.map_positions(hxb2_alignment[0], hxb2_alignment[1]) # convert ORF positions to appropriate subtype forward_orfs, reverse_orfs, small_orfs = [ [ - ExpectedORF.subtyped(reference, pos_mapping, n, s, e, delta) \ + ExpectedORF.subtyped(aligned_subtype, n, s, e, delta) \ for (n, s, e, delta) in orfs ] \ for orfs in [hxb2_forward_orfs, hxb2_reverse_orfs, hxb2_small_orfs] ] # convert PSI locus and RRE locus to appropriate subtype - psi_locus = [pos_mapping[x] for x in hxb2_psi_locus] - rre_locus = [pos_mapping[x] for x in hxb2_rre_locus] + psi_locus = [aligned_subtype.map_index(x) for x in hxb2_psi_locus] + rre_locus = [aligned_subtype.map_index(x) for x in hxb2_rre_locus] sequence_errors = [] holistic = HolisticInfo() @@ -865,7 +873,7 @@ def intact( working_dir, holistic.orfs_start = min(forward_orfs, key=lambda e: e.start).start holistic.orfs_end = max(forward_orfs, key=lambda e: e.end).end - clamp = lambda p: max(holistic.orfs_start, min(holistic.orfs_end, p)) + clamp = lambda p: max(min(p, holistic.orfs_end), holistic.orfs_start) aligned_reference_orfs_length = sum(abs(clamp(x.send + 1) - clamp(x.sstart)) for x in blast_rows) blast_matched_orfs_slen = holistic.orfs_end - holistic.orfs_start holistic.blast_sseq_orfs_coverage = aligned_reference_orfs_length / blast_matched_orfs_slen @@ -936,8 +944,8 @@ def intact( working_dir, if check_major_splice_donor_site: mutated_splice_donor_site = has_mutated_major_splice_donor_site( alignment, - pos_mapping[hxb2_msd_site_locus], - pos_mapping[hxb2_msd_site_locus + 1], + aligned_subtype.map_index(hxb2_msd_site_locus), + aligned_subtype.map_index(hxb2_msd_site_locus + 1), const.DEFAULT_MSD_SEQUENCE) if mutated_splice_donor_site is not None: sequence_errors.append(mutated_splice_donor_site) diff --git a/util/aligned_sequence.py b/util/aligned_sequence.py new file mode 100644 index 0000000..e08e9bd --- /dev/null +++ b/util/aligned_sequence.py @@ -0,0 +1,71 @@ +import dataclasses +from dataclasses import dataclass +from Bio import Seq, SeqRecord + +import util.coordinates as coords +import util.wrappers as wrappers + +@dataclass +class ReferenceIndex: + value: int + + +@dataclass +class AlignedSequence: + this: Seq + reference: Seq + alignment: (str, str) = dataclasses.field(default=None) + coordinates_mapping: list[int] = dataclasses.field(default=None) + + + def get_alignment(self): + if not self.alignment: + self.alignment = wrappers.mafft([self.reference, self.this]) + + return self.alignment + + + def aligned_reference(self): + return self.get_alignment()[0] + + + def aligned_this(self): + return self.get_alignment()[1] + + + def map_index(self, index): + if not self.coordinates_mapping: + self.coordinates_mapping = coords.map_positions(self.aligned_reference(), self.aligned_this()) + + if not isinstance(index, int): + raise TypeError(f"Expected integer as index", index) + + return self.coordinates_mapping[index] + + + def index(self, index): + if isinstance(index, ReferenceIndex): + index = self.map_index(index) + + return self.this[index] + + + def slice(self, first, last): + if isinstance(first, ReferenceIndex): + first = self.map_index(first) + if isinstance(last, ReferenceIndex): + last = self.map_index(last) + + newthis = self.this[first:(last + 1)] + newreference = self.reference[self.map_index(first):(self.map_index(last) + 1)] + # TODO: calculate new "coordinates_mapping" and new "alignment" from these indexes. + return Sequence(this=newthis, reference=newreference) + + + def reverse(self): + newthis = SeqRecord.SeqRecord(Seq.reverse_complement(this.seq), + id = this.id + " [REVERSED]", + name = this.name + ) + + return Sequence(this=newthis, reference=self.reference)