Skip to content

Commit

Permalink
Contig stitcher: make mypy-compliant
Browse files Browse the repository at this point in the history
To verify that the modules are correctly typed, run:

```shell
mypy micall/core/contig_stitcher.py --check-untyped-defs \
   | grep -e tools.py: -e stitcher.py:
```
  • Loading branch information
Donaim committed Nov 20, 2023
1 parent e067032 commit 7a153c0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
21 changes: 12 additions & 9 deletions micall/core/contig_stitcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,16 @@ def __init__(self, parts: List[AlignedContig]):
raise ValueError("Empty Frankenstei do not exist")

Check warning on line 139 in micall/core/contig_stitcher.py

View check run for this annotation

Codecov / codecov/patch

micall/core/contig_stitcher.py#L139

Added line #L139 was not covered by tests

# Flatten any possible Frankenstein parts
self.parts = [subpart for part in parts for subpart in
(part.parts if isinstance(part, FrankensteinContig) else [part])]
self.parts: List[AlignedContig] = \
[subpart for part in parts for subpart in
(part.parts if isinstance(part, FrankensteinContig) else [part])]

aligned = reduce(FrankensteinContig.munge, self.parts)

super().__init__(aligned.query, aligned.alignment)


def cut_reference(self, cut_point: float) -> 'FrankensteinContig':
def cut_reference(self, cut_point: float) -> Tuple['FrankensteinContig', 'FrankensteinContig']:
# Search for the part that needs to be cut
left_parts = list(takewhile(lambda part: cut_point >= part.alignment.r_ei + 1, self.parts))
target_part = self.parts[len(left_parts)]
Expand Down Expand Up @@ -266,7 +267,7 @@ def calculate_concordance(left: str, right: str) -> List[float]:
if len(left) != len(right):
raise ValueError("Can only calculate concordance for same sized sequences")

result = [0] * len(left)
result: List[float] = [0] * len(left)

def slide(left, right):
window_size = 30
Expand Down Expand Up @@ -388,6 +389,8 @@ def calculate_cumulative_coverage(contigs) -> List[Tuple[int, int]]:
for cover_interval in cumulative_coverage):
return current

return None


def drop_completely_covered(contigs: List[AlignedContig]) -> List[AlignedContig]:
""" Filter out all contigs that are contained within other contigs. """
Expand All @@ -402,7 +405,7 @@ def drop_completely_covered(contigs: List[AlignedContig]) -> List[AlignedContig]
return contigs


def split_contigs_with_gaps(contigs: List[AlignedContig]) -> Iterable[AlignedContig]:
def split_contigs_with_gaps(contigs: List[AlignedContig]) -> List[AlignedContig]:
def covered_by(gap, other):
# Check if any 1 reference coordinate in gap is mapped in other.
gap_coords = gap.coordinate_mapping.ref_to_query.domain
Expand Down Expand Up @@ -438,7 +441,7 @@ def try_split(contig):
process_queue.put(right_part)
return

process_queue = LifoQueue()
process_queue: LifoQueue = LifoQueue()
for contig in contigs: process_queue.put(contig)

while not process_queue.empty():
Expand All @@ -453,12 +456,12 @@ def stitch_contigs(contigs: Iterable[GenotypedContig]) -> Iterable[AlignedContig

# Contigs that did not align do not need any more processing
yield from (x for x in maybe_aligned if not isinstance(x, AlignedContig))
aligned = [x for x in maybe_aligned if isinstance(x, AlignedContig)]
aligned: List[AlignedContig] = \
[x for x in maybe_aligned if isinstance(x, AlignedContig)]

aligned = split_contigs_with_gaps(aligned)
aligned = drop_completely_covered(aligned)
aligned = combine_overlaps(aligned)
yield from aligned
yield from combine_overlaps(aligned)


def stitch_consensus(contigs: Iterable[GenotypedContig]) -> Iterable[GenotypedContig]:
Expand Down
26 changes: 13 additions & 13 deletions micall/utils/cigar_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Cigar(tuple):
"""

def __new__(cls, cigar_lst: Iterable[Tuple[int, CigarActions]]):
return super(Cigar, cls).__new__(cls, Cigar.normalize(cigar_lst))
return super(Cigar, cls).__new__(cls, Cigar.normalize(cigar_lst)) # type: ignore


@staticmethod
Expand Down Expand Up @@ -370,7 +370,7 @@ def parse(string) -> 'Cigar':
match = re.match(r'([0-9]+)([^0-9])', string)
if match:
num, operation = match.groups()
data.append([int(num), Cigar.parse_operation(operation)])
data.append((int(num), Cigar.parse_operation(operation)))
string = string[match.end():]
else:
raise ValueError(f"Invalid CIGAR string. Invalid part: {string[:20]}")
Expand Down Expand Up @@ -573,7 +573,7 @@ def epsilon(self):
return Fraction(1, self.cigar.op_length * 3 + 1)


def _ref_cut_to_op_cut(self, cut_point: float):
def _ref_cut_to_op_cut(self, cut_point):
mapping = self.coordinate_mapping

left_op_cut_point = mapping.ref_to_op.left_max(floor(cut_point))
Expand Down Expand Up @@ -615,17 +615,17 @@ def cut_reference(self, cut_point: float) -> Tuple['CigarHit', 'CigarHit']:
The two parts do not share any elements, and no element is "lost".
"""

cut_point = Fraction(cut_point)
if cut_point.denominator == 1:
fcut_point: Fraction = Fraction(cut_point)
if fcut_point.denominator == 1:
raise ValueError("Cut accepts fractions, not integers")

if self.ref_length == 0 or \
not (self.r_st - 1 < cut_point < self.r_ei + 1):
not (self.r_st - 1 < fcut_point < self.r_ei + 1):
raise IndexError("Cut point out of reference bounds")

op_cut_point = self._ref_cut_to_op_cut(cut_point)
left = self._slice(self.r_st, self.q_st, 0, floor(op_cut_point))
right = self._slice(left.r_ei + 1, left.q_ei + 1, ceil(op_cut_point), self.cigar.op_length)
op_fcut_point = self._ref_cut_to_op_cut(fcut_point)
left = self._slice(self.r_st, self.q_st, 0, floor(op_fcut_point))
right = self._slice(left.r_ei + 1, left.q_ei + 1, ceil(op_fcut_point), self.cigar.op_length)

return left, right

Expand Down Expand Up @@ -677,7 +677,7 @@ def __repr__(self):
return f'CigarHit({str(self.cigar)!r}, r_st={self.r_st!r}, r_ei={self.r_ei!r}, q_st={self.q_st!r}, q_ei={self.q_ei!r})'


def connect_cigar_hits(cigar_hits: Iterable[CigarHit]) -> List[CigarHit]:
def connect_cigar_hits(cigar_hits: List[CigarHit]) -> List[CigarHit]:
"""
This function exists to deal with the fact that mappy does not always
connect big gaps, and returns surrounding parts as two separate alignment hits.
Expand All @@ -688,10 +688,10 @@ def connect_cigar_hits(cigar_hits: Iterable[CigarHit]) -> List[CigarHit]:
that overlap with previously found alignments.
"""

if not len(cigar_hits) > 0:
if len(cigar_hits) == 0:
raise ValueError("Expected a non-empty list of cigar hits")

accumulator = []
accumulator: List[CigarHit] = []

# Collect non-overlaping parts.
# Earlier matches have priority over ones that come after.
Expand All @@ -705,7 +705,7 @@ def connect_cigar_hits(cigar_hits: Iterable[CigarHit]) -> List[CigarHit]:
sorted_parts = sorted(accumulator, key=lambda p: p.r_st)

# Segregate independent matches.
sorted_groups = []
sorted_groups: List[List[CigarHit]] = []

def find_group(hit):
for group in sorted_groups:
Expand Down

0 comments on commit 7a153c0

Please sign in to comment.