Skip to content

Commit

Permalink
Small improvements to Cigar tools and Contig stitcher
Browse files Browse the repository at this point in the history
* Extracted the `align_to_reference` method from `GenotypedContig`
  class and converted it into a standalone function, streamlining how
  contig alignment is invoked across the application.

* Transformed the `Cigar` data structure from a mutable list to an
  immutable tuple to enforce the immutability of CIGAR sequences after
  creation for safety and reliability.

* Revised the `epsilon` property calculation in `CigarHit` for better
  accuracy and consistency.
  • Loading branch information
Donaim committed Nov 15, 2023
1 parent 87d2d4d commit 5b13ac1
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 55 deletions.
28 changes: 15 additions & 13 deletions micall/core/contig_stitcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,6 @@ class GenotypedContig(Contig):
ref_seq: str
matched_fraction: Optional[float] # Approximated overall concordance between `seq` and `ref_seq`.

def align_to_reference(self):
aligner = Aligner(seq=self.ref_seq, preset='map-ont')
alignments = list(aligner.map(self.seq))
if not alignments:
return self

hits_array = [CigarHit(x.cigar, x.r_st, x.r_en - 1, x.q_st, x.q_en - 1) for x in alignments]
single_cigar_hit = connect_cigar_hits(hits_array)
return AlignedContig(query=self, alignment=single_cigar_hit)


def cut_query(self, cut_point: float) -> Tuple['GenotypedContig', 'GenotypedContig']:
"""
Cuts this alignment in two parts with cut_point between them.
Expand All @@ -59,6 +48,8 @@ def cut_query(self, cut_point: float) -> Tuple['GenotypedContig', 'GenotypedCont

@dataclass
class AlignedContig(GenotypedContig):
query: GenotypedContig
alignment: CigarHit

def __init__(self, query: GenotypedContig, alignment: CigarHit):
self.query = query
Expand Down Expand Up @@ -173,6 +164,17 @@ def munge(left: AlignedContig, right: AlignedContig) -> AlignedContig:
return AlignedContig(query, alignment)


def align_to_reference(contig):
aligner = Aligner(seq=contig.ref_seq, preset='map-ont')
alignments = list(aligner.map(contig.seq))
if not alignments:
return contig

hits_array = [CigarHit(x.cigar, x.r_st, x.r_en - 1, x.q_st, x.q_en - 1) for x in alignments]
single_cigar_hit = connect_cigar_hits(hits_array)
return AlignedContig(query=contig, alignment=single_cigar_hit)


def align_queries(seq1: str, seq2: str) -> Tuple[str, str]:
gap_open_penalty = 15
gap_extend_penalty = 3
Expand Down Expand Up @@ -376,7 +378,7 @@ def try_split(contig):
continue

Check warning on line 378 in micall/core/contig_stitcher.py

View check run for this annotation

Codecov / codecov/patch

micall/core/contig_stitcher.py#L378

Added line #L378 was not covered by tests

if covered(contig, gap):
midpoint = gap.r_st + (gap.r_ei - gap.r_st) / 2 + contig.alignment.epsilon
midpoint = gap.r_st + (gap.r_ei - gap.r_st) // 2 + contig.alignment.epsilon
left_part, right_part = contig.cut_reference(midpoint)
left_part = left_part.rstrip_query()
right_part = right_part.lstrip_query()
Expand All @@ -398,7 +400,7 @@ def try_split(contig):


def stitch_contigs(contigs: Iterable[GenotypedContig]) -> Iterable[AlignedContig]:
maybe_aligned = list(map(GenotypedContig.align_to_reference, contigs))
maybe_aligned = list(map(align_to_reference, contigs))

# Contigs that did not align do not need any more processing
yield from (x for x in maybe_aligned if not isinstance(x, AlignedContig))
Expand Down
6 changes: 3 additions & 3 deletions micall/tests/test_contig_stitcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import pytest
import random
from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig, merge_intervals, find_covered_contig, stitch_consensus, calculate_concordance
from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig, merge_intervals, find_covered_contig, stitch_consensus, calculate_concordance, align_to_reference
from micall.tests.utils import MockAligner, fixed_random_seed


Expand Down Expand Up @@ -405,7 +405,7 @@ def test_stitching_contig_with_big_covered_gap(exact_aligner):
),
]

contigs = [x.align_to_reference() for x in contigs]
contigs = list(map(align_to_reference, contigs))
assert len(list(contigs[0].alignment.gaps())) == 1
assert len(list(contigs[1].alignment.gaps())) == 0

Expand Down Expand Up @@ -434,7 +434,7 @@ def test_stitching_contig_with_small_covered_gap(exact_aligner):
),
]

contigs = [x.align_to_reference() for x in contigs]
contigs = list(map(align_to_reference, contigs))
assert len(list(contigs[0].alignment.gaps())) == 1
assert len(list(contigs[1].alignment.gaps())) == 0

Expand Down
85 changes: 46 additions & 39 deletions micall/utils/cigar_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def __repr__(self):
return f'CoordinateMapping({self.ref_to_op},{self.query_to_op})'


class Cigar(list):
class Cigar(tuple):
"""
Represents an alignment between a query sequence and a reference sequence using the
Compact Idiosyncratic Gapped Alignment Report (CIGAR) string format.
Expand All @@ -152,9 +152,49 @@ class Cigar(list):
CIGAR strings are defined in the SAM specification (https://samtools.github.io/hts-specs/SAMv1.pdf).
"""

def __init__(self, cigar_lst: Iterable[Tuple[int, CigarActions]]):
super().__init__([])
for x in cigar_lst: self.append(x)
def __new__(cls, cigar_lst: Iterable[Tuple[int, CigarActions]]):
return super(Cigar, cls).__new__(cls, Cigar.normalize(cigar_lst))


@staticmethod
def normalize(cigar_lst) -> Iterable[Tuple[int, CigarActions]]:
"""
Goes through the list appending operations to the CIGAR sequence,
checking for type correctness and performing normalization
by merging consecutive identical operations.
"""

last_item = None

for item in cigar_lst:
# Type checking
if not isinstance(item, list) and not isinstance(item, tuple):
raise ValueError(f"Invalid CIGAR list: {item!r} is not a tuple.")
if len(item) != 2:
raise ValueError(f"Invalid CIGAR list: {item!r} has a bad length.")

num, operation = item
if isinstance(operation, int):
operation = CigarActions(operation)
if not isinstance(num, int) or not isinstance(operation, CigarActions):
raise ValueError(f"Invalid CIGAR list: {item!r} is not a number/operation tuple.")
if num < 0:
raise ValueError(f"Invalid CIGAR list: number of operations is negative.")

# Normalization
if num == 0:
continue

if last_item:
last_num, last_operation = last_item
if operation == last_operation:
last_item = (last_num + num, operation)
continue

if last_item: yield (last_item[0], last_item[1])
last_item = item

if last_item: yield (last_item[0], last_item[1])


@staticmethod
Expand All @@ -165,7 +205,7 @@ def coerce(obj):
if isinstance(obj, str):
return Cigar.parse(obj)

if isinstance(obj, list):
if isinstance(obj, list) or isinstance(obj, tuple):
return Cigar(obj)

raise TypeError(f"Cannot coerce {obj!r} to CIGAR string.")
Expand Down Expand Up @@ -219,39 +259,6 @@ def parse(string) -> 'Cigar':
return Cigar(data)


def append(self, item: Tuple[int, CigarActions]):
"""
Appends an operation to the CIGAR sequence, checking for type correctness
and performing normalization by merging consecutive identical operations.
"""

# Type checking
if not isinstance(item, list) and not isinstance(item, tuple):
raise ValueError(f"Invalid CIGAR list: {item!r} is not a tuple.")
if len(item) != 2:
raise ValueError(f"Invalid CIGAR list: {item!r} is has a bad length.")

num, operation = item
if isinstance(operation, int):
operation = CigarActions(operation)
if not isinstance(num, int) or not isinstance(operation, CigarActions):
raise ValueError(f"Invalid CIGAR list: {item!r} is not a number/operation tuple.")
if num < 0:
raise ValueError(f"Invalid CIGAR list: number of operations is negative.")

# Normalization
if num == 0:
return

if self:
last_num, last_operation = self[-1]
if operation == last_operation:
self[-1] = (last_num + num, operation)
return

super().append((num, operation))


def iterate_operations(self) -> Iterable[CigarActions]:
"""
Yields each operation in the CIGAR sequence as a `CigarActions` enum.
Expand Down Expand Up @@ -552,7 +559,7 @@ def connect(self, other):

@property
def epsilon(self):
return Fraction(1, self.query_length + self.ref_length + 100)
return Fraction(1, self.cigar.op_length * 3 + 1)


def _ref_cut_to_op_cut(self, cut_point: float):
Expand Down

0 comments on commit 5b13ac1

Please sign in to comment.