From 2fd20f53c6c2cbad42a98493c7661f2ddc57ebed Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Wed, 15 Nov 2023 10:37:57 -0800 Subject: [PATCH] Small improvements to Cigar tools and Contig stitcher * Extracted the `align_to_reference` method from `GenotypedContig` class and converted it into a standalone function, streamlining how contig alignment is invoked across the application. * Transformed the `Cigar` data structure from a mutable list to an immutable tuple to enforce the immutability of CIGAR sequences after creation for safety and reliability. * Revised the `epsilon` property calculation in `CigarHit` for better accuracy and consistency. * Revised in-code documentation, removing redundant comments and placing technical details at the end of the file for cleaner, more focused code. * Removed the unused `closest_key` method from `IntDict`, simplifying the interface. * Enhanced comments in `CigarHit` and `Cigar`, making the code easier to understand and clarifying the purpose and functionality of core classes. --- micall/core/contig_stitcher.py | 28 +-- micall/tests/test_contig_stitcher.py | 6 +- micall/utils/cigar_tools.py | 256 ++++++++++++++------------- 3 files changed, 155 insertions(+), 135 deletions(-) diff --git a/micall/core/contig_stitcher.py b/micall/core/contig_stitcher.py index 6e75d3f79..b3daf1222 100644 --- a/micall/core/contig_stitcher.py +++ b/micall/core/contig_stitcher.py @@ -25,17 +25,6 @@ class GenotypedContig(Contig): ref_seq: str matched_fraction: Optional[float] # Approximated overall concordance between `seq` and `ref_seq`. - def align_to_reference(self): - aligner = Aligner(seq=self.ref_seq, preset='map-ont') - alignments = list(aligner.map(self.seq)) - if not alignments: - return self - - hits_array = [CigarHit(x.cigar, x.r_st, x.r_en - 1, x.q_st, x.q_en - 1) for x in alignments] - single_cigar_hit = connect_cigar_hits(hits_array) - return AlignedContig(query=self, alignment=single_cigar_hit) - - def cut_query(self, cut_point: float) -> Tuple['GenotypedContig', 'GenotypedContig']: """ Cuts this alignment in two parts with cut_point between them. @@ -59,6 +48,8 @@ def cut_query(self, cut_point: float) -> Tuple['GenotypedContig', 'GenotypedCont @dataclass class AlignedContig(GenotypedContig): + query: GenotypedContig + alignment: CigarHit def __init__(self, query: GenotypedContig, alignment: CigarHit): self.query = query @@ -173,6 +164,17 @@ def munge(left: AlignedContig, right: AlignedContig) -> AlignedContig: return AlignedContig(query, alignment) +def align_to_reference(contig): + aligner = Aligner(seq=contig.ref_seq, preset='map-ont') + alignments = list(aligner.map(contig.seq)) + if not alignments: + return contig + + hits_array = [CigarHit(x.cigar, x.r_st, x.r_en - 1, x.q_st, x.q_en - 1) for x in alignments] + single_cigar_hit = connect_cigar_hits(hits_array) + return AlignedContig(query=contig, alignment=single_cigar_hit) + + def align_queries(seq1: str, seq2: str) -> Tuple[str, str]: gap_open_penalty = 15 gap_extend_penalty = 3 @@ -376,7 +378,7 @@ def try_split(contig): continue if covered(contig, gap): - midpoint = gap.r_st + (gap.r_ei - gap.r_st) / 2 + contig.alignment.epsilon + midpoint = gap.r_st + (gap.r_ei - gap.r_st) // 2 + contig.alignment.epsilon left_part, right_part = contig.cut_reference(midpoint) left_part = left_part.rstrip_query() right_part = right_part.lstrip_query() @@ -398,7 +400,7 @@ def try_split(contig): def stitch_contigs(contigs: Iterable[GenotypedContig]) -> Iterable[AlignedContig]: - maybe_aligned = list(map(GenotypedContig.align_to_reference, contigs)) + maybe_aligned = list(map(align_to_reference, contigs)) # Contigs that did not align do not need any more processing yield from (x for x in maybe_aligned if not isinstance(x, AlignedContig)) diff --git a/micall/tests/test_contig_stitcher.py b/micall/tests/test_contig_stitcher.py index b5b03fd91..27cffd248 100644 --- a/micall/tests/test_contig_stitcher.py +++ b/micall/tests/test_contig_stitcher.py @@ -1,7 +1,7 @@ import pytest import random -from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig, merge_intervals, find_covered_contig, stitch_consensus, calculate_concordance +from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig, merge_intervals, find_covered_contig, stitch_consensus, calculate_concordance, align_to_reference from micall.tests.utils import MockAligner, fixed_random_seed @@ -405,7 +405,7 @@ def test_stitching_contig_with_big_covered_gap(exact_aligner): ), ] - contigs = [x.align_to_reference() for x in contigs] + contigs = list(map(align_to_reference, contigs)) assert len(list(contigs[0].alignment.gaps())) == 1 assert len(list(contigs[1].alignment.gaps())) == 0 @@ -434,7 +434,7 @@ def test_stitching_contig_with_small_covered_gap(exact_aligner): ), ] - contigs = [x.align_to_reference() for x in contigs] + contigs = list(map(align_to_reference, contigs)) assert len(list(contigs[0].alignment.gaps())) == 1 assert len(list(contigs[1].alignment.gaps())) == 0 diff --git a/micall/utils/cigar_tools.py b/micall/utils/cigar_tools.py index 6acee2793..4197a12dc 100644 --- a/micall/utils/cigar_tools.py +++ b/micall/utils/cigar_tools.py @@ -18,8 +18,8 @@ class IntDict(dict): An extension of the basic Python dictionary designed for integer-to-integer mappings. The IntDict maintains not just key-value pairs (as in a normal dictionary) but also - tracks additional sets called `domain` and `codomain`. These sets act as supersets - to the keys and values respectively, even including integers that might not be used + tracks additional sets called `domain` and `codomain`. These sets are supersets + of the keys and values respectively, as they include integers that might not be used directly in mappings but are within the range of interest for the domain and codomain. """ @@ -40,10 +40,6 @@ def extend(self, key: Optional[int], value: Optional[int]): self.codomain.add(value) - def closest_key(self, index) -> int: - return min(self.keys(), key=lambda x: abs(x - index)) - - def left_max(self, index) -> Optional[int]: return max((v for (k, v) in self.items() if k <= index), default=None) @@ -78,10 +74,6 @@ class CoordinateMapping: """ Manages bidirectional mappings between reference and query coordinates, as well as operation indices. - A CoordinateMapping object contains mappings which represent the relationships and positions between - elements of a reference sequence, a query sequence, and the corresponding operations as defined in a - CIGAR string. - The mapping enables conversion from reference to query coordinates and vice versa. It also manages the association of these coordinates with their respective operations in the alignment process. """ @@ -133,15 +125,17 @@ def __repr__(self): return f'CoordinateMapping({self.ref_to_op},{self.query_to_op})' -class Cigar(list): +class Cigar(tuple): """ Represents an alignment between a query sequence and a reference sequence using the Compact Idiosyncratic Gapped Alignment Report (CIGAR) string format. A CIGAR string is a sequence of operation codes ('M', 'I', 'D', etc.) each preceded by - the number of bases or residues to which the operation applies. The primary use of a - CIGAR string is to detail areas of alignment and gaps (insertions or deletions) between - the two sequences. + the number of bases or residues to which the operation applies. + + The class abstracts a CIGAR string as a sequence of discrete operations for convenient + manipulation (as seen in self.iterate_operations()), while retaining the compact + form for storage and return purposes (seen in self.__str__()). Instances of this class should be created by calling the Cigar.coerce method. Examples: @@ -152,9 +146,8 @@ class Cigar(list): CIGAR strings are defined in the SAM specification (https://samtools.github.io/hts-specs/SAMv1.pdf). """ - def __init__(self, cigar_lst: Iterable[Tuple[int, CigarActions]]): - super().__init__([]) - for x in cigar_lst: self.append(x) + def __new__(cls, cigar_lst: Iterable[Tuple[int, CigarActions]]): + return super(Cigar, cls).__new__(cls, Cigar.normalize(cigar_lst)) @staticmethod @@ -165,93 +158,12 @@ def coerce(obj): if isinstance(obj, str): return Cigar.parse(obj) - if isinstance(obj, list): + if isinstance(obj, list) or isinstance(obj, tuple): return Cigar(obj) raise TypeError(f"Cannot coerce {obj!r} to CIGAR string.") - OP_MAPPING = { - 'M': CigarActions.MATCH, # Alignment match (can be a sequence match or mismatch) - 'I': CigarActions.INSERT, # Insertion to the reference - 'D': CigarActions.DELETE, # Deletion from the reference - 'N': CigarActions.SKIPPED, # Skipped region from the reference - 'S': CigarActions.SOFT_CLIPPED, # Soft clip on the read (ignored region, not aligned but present in the read) - 'H': CigarActions.HARD_CLIPPED, # Hard clip on the read (ignored region, not present in the read) - 'P': CigarActions.PADDING, # Padding (silent deletion from padded reference, not applicable for our case) - '=': CigarActions.SEQ_MATCH, # Sequence match - 'X': CigarActions.MISMATCH, # Sequence mismatch - } - - - @staticmethod - def parse_operation(operation: str) -> CigarActions: - if operation in Cigar.OP_MAPPING: - return Cigar.OP_MAPPING[operation] - else: - raise ValueError(f"Unexpected CIGAR action: {operation}.") - - - @staticmethod - def operation_to_str(op: CigarActions) -> str: - return [k for (k, v) in Cigar.OP_MAPPING.items() if v == op][0] - - - @staticmethod - def parse(string) -> 'Cigar': - """ - Parses a CIGAR string into a Cigar object. - - :param string: A CIGAR string with the format '(\\d+[MIDNSHPX=])+', where each operation code - is preceded by a number indicating how many times the operation should be applied. - """ - - data = [] - while string: - match = re.match(r'([0-9]+)([^0-9])', string) - if match: - num, operation = match.groups() - data.append([int(num), Cigar.parse_operation(operation)]) - string = string[match.end():] - else: - raise ValueError(f"Invalid CIGAR string. Invalid part: {string[:20]}") - - return Cigar(data) - - - def append(self, item: Tuple[int, CigarActions]): - """ - Appends an operation to the CIGAR sequence, checking for type correctness - and performing normalization by merging consecutive identical operations. - """ - - # Type checking - if not isinstance(item, list) and not isinstance(item, tuple): - raise ValueError(f"Invalid CIGAR list: {item!r} is not a tuple.") - if len(item) != 2: - raise ValueError(f"Invalid CIGAR list: {item!r} is has a bad length.") - - num, operation = item - if isinstance(operation, int): - operation = CigarActions(operation) - if not isinstance(num, int) or not isinstance(operation, CigarActions): - raise ValueError(f"Invalid CIGAR list: {item!r} is not a number/operation tuple.") - if num < 0: - raise ValueError(f"Invalid CIGAR list: number of operations is negative.") - - # Normalization - if num == 0: - return - - if self: - last_num, last_operation = self[-1] - if operation == last_operation: - self[-1] = (last_num + num, operation) - return - - super().append((num, operation)) - - def iterate_operations(self) -> Iterable[CigarActions]: """ Yields each operation in the CIGAR sequence as a `CigarActions` enum. @@ -298,25 +210,6 @@ def iterate_operations_with_pointers(self) -> Iterable[Tuple[CigarActions, Optio yield (operation, None, None) - @cached_property - def op_length(self): - return sum(1 for x in self.iterate_operations()) - - - @cached_property - def query_length(self): - return max((query_pointer + 1 if query_pointer is not None else 0 for (_, _, query_pointer) - in self.iterate_operations_with_pointers()), - default=0) - - - @cached_property - def ref_length(self): - return max((ref_pointer + 1 if ref_pointer is not None else 0 for (_, ref_pointer, _) - in self.iterate_operations_with_pointers()), - default=0) - - def slice_operations(self, start_inclusive, end_noninclusive) -> 'Cigar': """ Creates a new Cigar object by slicing the current one from start_inclusive to @@ -415,6 +308,117 @@ def to_msa(self, reference_seq, query_seq) -> Tuple[str, str]: return reference_msa, query_msa + @cached_property + def op_length(self): + return sum(1 for x in self.iterate_operations()) + + + @cached_property + def query_length(self): + return max((query_pointer + 1 if query_pointer is not None else 0 for (_, _, query_pointer) + in self.iterate_operations_with_pointers()), + default=0) + + + @cached_property + def ref_length(self): + return max((ref_pointer + 1 if ref_pointer is not None else 0 for (_, ref_pointer, _) + in self.iterate_operations_with_pointers()), + default=0) + + # # + # Boring boilerplate code below # + # # + + OP_MAPPING = { + 'M': CigarActions.MATCH, # Alignment match (can be a sequence match or mismatch) + 'I': CigarActions.INSERT, # Insertion to the reference + 'D': CigarActions.DELETE, # Deletion from the reference + 'N': CigarActions.SKIPPED, # Skipped region from the reference + 'S': CigarActions.SOFT_CLIPPED, # Soft clip on the read (ignored region, not aligned but present in the read) + 'H': CigarActions.HARD_CLIPPED, # Hard clip on the read (ignored region, not present in the read) + 'P': CigarActions.PADDING, # Padding (silent deletion from padded reference, not applicable for our case) + '=': CigarActions.SEQ_MATCH, # Sequence match + 'X': CigarActions.MISMATCH, # Sequence mismatch + } + + + @staticmethod + def parse_operation(operation: str) -> CigarActions: + if operation in Cigar.OP_MAPPING: + return Cigar.OP_MAPPING[operation] + else: + raise ValueError(f"Unexpected CIGAR action: {operation}.") + + + @staticmethod + def operation_to_str(op: CigarActions) -> str: + return [k for (k, v) in Cigar.OP_MAPPING.items() if v == op][0] + + + @staticmethod + def parse(string) -> 'Cigar': + """ + Parses a CIGAR string into a Cigar object. + + :param string: A CIGAR string with the format '(\\d+[MIDNSHPX=])+', where each operation code + is preceded by a number indicating how many times the operation should be applied. + """ + + data = [] + while string: + match = re.match(r'([0-9]+)([^0-9])', string) + if match: + num, operation = match.groups() + data.append([int(num), Cigar.parse_operation(operation)]) + string = string[match.end():] + else: + raise ValueError(f"Invalid CIGAR string. Invalid part: {string[:20]}") + + return Cigar(data) + + + @staticmethod + def normalize(cigar_lst) -> Iterable[Tuple[int, CigarActions]]: + """ + Goes through the list appending operations to the CIGAR sequence, + checking for type correctness and performing normalization + by merging consecutive identical operations. + """ + + last_item = None + + for item in cigar_lst: + # Type checking + if not isinstance(item, list) and not isinstance(item, tuple): + raise ValueError(f"Invalid CIGAR list: {item!r} is not a tuple.") + if len(item) != 2: + raise ValueError(f"Invalid CIGAR list: {item!r} has a bad length.") + + num, operation = item + if isinstance(operation, int): + operation = CigarActions(operation) + if not isinstance(num, int) or not isinstance(operation, CigarActions): + raise ValueError(f"Invalid CIGAR list: {item!r} is not a number/operation tuple.") + if num < 0: + raise ValueError(f"Invalid CIGAR list: number of operations is negative.") + + # Normalization + if num == 0: + continue + + if last_item: + last_num, last_operation = last_item + if operation == last_operation: + last_item = (last_num + num, operation) + continue + + if last_item: yield (last_item[0], last_item[1]) + last_item = item + + if last_item: yield (last_item[0], last_item[1]) + + def __repr__(self): return f'Cigar({str(self)!r})' @@ -426,6 +430,20 @@ def __str__(self): @dataclass class CigarHit: + """ + This class provides an abstraction over the complex details involved in working with sequence alignments + expressed as CIGAR strings. It implements operations for alignment handling that are conceptually + straightforward but challenging to implement ad-hoc. + + The main tasks handled by this class are: + - Precisely dividing an alignment into two contiguous segments + at any given reference position (`cut_reference()`), + - Removing portions of the query sequence that do not align with + the reference sequence from either end + while preserving the alignment context (`lstrip_query()` and `rstrip_query()`), + - Enumerating gaps in the alignment (`gaps()`). + """ + cigar: Cigar r_st: int r_ei: int # inclusive @@ -552,7 +570,7 @@ def connect(self, other): @property def epsilon(self): - return Fraction(1, self.query_length + self.ref_length + 100) + return Fraction(1, self.cigar.op_length * 3 + 1) def _ref_cut_to_op_cut(self, cut_point: float):