From fab7e02b04603ec50024053e2449c3a075033298 Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Wed, 15 Nov 2023 10:37:57 -0800 Subject: [PATCH] Small improvements to Cigar tools and Contig stitcher * Extracted the `align_to_reference` method from `GenotypedContig` class and converted it into a standalone function, streamlining how contig alignment is invoked across the application. * Transformed the `Cigar` data structure from a mutable list to an immutable tuple to enforce the immutability of CIGAR sequences after creation for safety and reliability. * Revised the `epsilon` property calculation in `CigarHit` for better accuracy and consistency. --- micall/core/contig_stitcher.py | 24 ++++---- micall/tests/test_contig_stitcher.py | 6 +- micall/utils/cigar_tools.py | 85 +++++++++++++++------------- 3 files changed, 61 insertions(+), 54 deletions(-) diff --git a/micall/core/contig_stitcher.py b/micall/core/contig_stitcher.py index 6e75d3f79..4cfb0f2d1 100644 --- a/micall/core/contig_stitcher.py +++ b/micall/core/contig_stitcher.py @@ -25,17 +25,6 @@ class GenotypedContig(Contig): ref_seq: str matched_fraction: Optional[float] # Approximated overall concordance between `seq` and `ref_seq`. - def align_to_reference(self): - aligner = Aligner(seq=self.ref_seq, preset='map-ont') - alignments = list(aligner.map(self.seq)) - if not alignments: - return self - - hits_array = [CigarHit(x.cigar, x.r_st, x.r_en - 1, x.q_st, x.q_en - 1) for x in alignments] - single_cigar_hit = connect_cigar_hits(hits_array) - return AlignedContig(query=self, alignment=single_cigar_hit) - - def cut_query(self, cut_point: float) -> Tuple['GenotypedContig', 'GenotypedContig']: """ Cuts this alignment in two parts with cut_point between them. @@ -173,6 +162,17 @@ def munge(left: AlignedContig, right: AlignedContig) -> AlignedContig: return AlignedContig(query, alignment) +def align_to_reference(contig): + aligner = Aligner(seq=contig.ref_seq, preset='map-ont') + alignments = list(aligner.map(contig.seq)) + if not alignments: + return contig + + hits_array = [CigarHit(x.cigar, x.r_st, x.r_en - 1, x.q_st, x.q_en - 1) for x in alignments] + single_cigar_hit = connect_cigar_hits(hits_array) + return AlignedContig(query=contig, alignment=single_cigar_hit) + + def align_queries(seq1: str, seq2: str) -> Tuple[str, str]: gap_open_penalty = 15 gap_extend_penalty = 3 @@ -398,7 +398,7 @@ def try_split(contig): def stitch_contigs(contigs: Iterable[GenotypedContig]) -> Iterable[AlignedContig]: - maybe_aligned = list(map(GenotypedContig.align_to_reference, contigs)) + maybe_aligned = list(map(align_to_reference, contigs)) # Contigs that did not align do not need any more processing yield from (x for x in maybe_aligned if not isinstance(x, AlignedContig)) diff --git a/micall/tests/test_contig_stitcher.py b/micall/tests/test_contig_stitcher.py index b5b03fd91..27cffd248 100644 --- a/micall/tests/test_contig_stitcher.py +++ b/micall/tests/test_contig_stitcher.py @@ -1,7 +1,7 @@ import pytest import random -from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig, merge_intervals, find_covered_contig, stitch_consensus, calculate_concordance +from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig, merge_intervals, find_covered_contig, stitch_consensus, calculate_concordance, align_to_reference from micall.tests.utils import MockAligner, fixed_random_seed @@ -405,7 +405,7 @@ def test_stitching_contig_with_big_covered_gap(exact_aligner): ), ] - contigs = [x.align_to_reference() for x in contigs] + contigs = list(map(align_to_reference, contigs)) assert len(list(contigs[0].alignment.gaps())) == 1 assert len(list(contigs[1].alignment.gaps())) == 0 @@ -434,7 +434,7 @@ def test_stitching_contig_with_small_covered_gap(exact_aligner): ), ] - contigs = [x.align_to_reference() for x in contigs] + contigs = list(map(align_to_reference, contigs)) assert len(list(contigs[0].alignment.gaps())) == 1 assert len(list(contigs[1].alignment.gaps())) == 0 diff --git a/micall/utils/cigar_tools.py b/micall/utils/cigar_tools.py index 6acee2793..1c36e486d 100644 --- a/micall/utils/cigar_tools.py +++ b/micall/utils/cigar_tools.py @@ -133,7 +133,7 @@ def __repr__(self): return f'CoordinateMapping({self.ref_to_op},{self.query_to_op})' -class Cigar(list): +class Cigar(tuple): """ Represents an alignment between a query sequence and a reference sequence using the Compact Idiosyncratic Gapped Alignment Report (CIGAR) string format. @@ -152,9 +152,49 @@ class Cigar(list): CIGAR strings are defined in the SAM specification (https://samtools.github.io/hts-specs/SAMv1.pdf). """ - def __init__(self, cigar_lst: Iterable[Tuple[int, CigarActions]]): - super().__init__([]) - for x in cigar_lst: self.append(x) + def __new__(cls, cigar_lst: Iterable[Tuple[int, CigarActions]]): + return super(Cigar, cls).__new__(cls, Cigar.normalize(cigar_lst)) + + + @staticmethod + def normalize(cigar_lst) -> Iterable[Tuple[int, CigarActions]]: + """ + Goes through the list appending operations to the CIGAR sequence, + checking for type correctness and performing normalization + by merging consecutive identical operations. + """ + + last_item = None + + for item in cigar_lst: + # Type checking + if not isinstance(item, list) and not isinstance(item, tuple): + raise ValueError(f"Invalid CIGAR list: {item!r} is not a tuple.") + if len(item) != 2: + raise ValueError(f"Invalid CIGAR list: {item!r} has a bad length.") + + num, operation = item + if isinstance(operation, int): + operation = CigarActions(operation) + if not isinstance(num, int) or not isinstance(operation, CigarActions): + raise ValueError(f"Invalid CIGAR list: {item!r} is not a number/operation tuple.") + if num < 0: + raise ValueError(f"Invalid CIGAR list: number of operations is negative.") + + # Normalization + if num == 0: + continue + + if last_item: + last_num, last_operation = last_item + if operation == last_operation: + last_item = (last_num + num, operation) + continue + + if last_item: yield (last_item[0], last_item[1]) + last_item = item + + if last_item: yield (last_item[0], last_item[1]) @staticmethod @@ -165,7 +205,7 @@ def coerce(obj): if isinstance(obj, str): return Cigar.parse(obj) - if isinstance(obj, list): + if isinstance(obj, list) or isinstance(obj, tuple): return Cigar(obj) raise TypeError(f"Cannot coerce {obj!r} to CIGAR string.") @@ -219,39 +259,6 @@ def parse(string) -> 'Cigar': return Cigar(data) - def append(self, item: Tuple[int, CigarActions]): - """ - Appends an operation to the CIGAR sequence, checking for type correctness - and performing normalization by merging consecutive identical operations. - """ - - # Type checking - if not isinstance(item, list) and not isinstance(item, tuple): - raise ValueError(f"Invalid CIGAR list: {item!r} is not a tuple.") - if len(item) != 2: - raise ValueError(f"Invalid CIGAR list: {item!r} is has a bad length.") - - num, operation = item - if isinstance(operation, int): - operation = CigarActions(operation) - if not isinstance(num, int) or not isinstance(operation, CigarActions): - raise ValueError(f"Invalid CIGAR list: {item!r} is not a number/operation tuple.") - if num < 0: - raise ValueError(f"Invalid CIGAR list: number of operations is negative.") - - # Normalization - if num == 0: - return - - if self: - last_num, last_operation = self[-1] - if operation == last_operation: - self[-1] = (last_num + num, operation) - return - - super().append((num, operation)) - - def iterate_operations(self) -> Iterable[CigarActions]: """ Yields each operation in the CIGAR sequence as a `CigarActions` enum. @@ -552,7 +559,7 @@ def connect(self, other): @property def epsilon(self): - return Fraction(1, self.query_length + self.ref_length + 100) + return Fraction(1, self.cigar.op_length * 3 + 1) def _ref_cut_to_op_cut(self, cut_point: float):