Skip to content

Commit

Permalink
Cigar tools: add lstrip and rstrip functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Donaim committed Nov 7, 2023
1 parent eba4319 commit 711cbcd
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 3 deletions.
41 changes: 40 additions & 1 deletion micall/tests/test_cigar_tools.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import pytest
from typing import List, Tuple
from math import floor
import itertools

from micall.utils.consensus_aligner import CigarActions
from micall.utils.cigar_tools import Cigar, CigarHit
from micall.utils.cigar_tools import Cigar, CigarHit, parse_cigar_operation, CIGAR_OP_MAPPING


cigar_mapping_cases: List[Tuple[Cigar, 'mapping', 'closest_mapping']] = [
Expand Down Expand Up @@ -284,6 +285,44 @@ def test_cigar_hit_ref_cut_add_associativity(hit, cut_point):
assert (a + b) + c == a + (b + c)


@pytest.mark.parametrize('hit', [x[0] for x in cigar_hit_ref_cut_cases])
def test_cigar_hit_lstrip_is_stringlike(hit):
all_chars = CIGAR_OP_MAPPING.keys()

actions_of = lambda s: [x for x in s if x in all_chars]
f = lambda x: ''.join(actions_of(str(x)))

for r in range(len(all_chars) + 1):
for char_set in itertools.combinations(all_chars, r):
actions = set(map(parse_cigar_operation, char_set))
chars = ''.join(char_set)

p = lambda x: x.cigar
g = lambda x: x.lstrip(actions)
h = lambda x: f(x).lstrip(chars)

assert f(p(g(hit))) == f(h(p(hit)))


@pytest.mark.parametrize('hit', [x[0] for x in cigar_hit_ref_cut_cases])
def test_cigar_hit_rstrip_is_stringlike(hit):
all_chars = CIGAR_OP_MAPPING.keys()

actions_of = lambda s: [x for x in s if x in all_chars]
f = lambda x: ''.join(actions_of(str(x)))

for r in range(len(all_chars) + 1):
for char_set in itertools.combinations(all_chars, r):
actions = set(map(parse_cigar_operation, char_set))
chars = ''.join(char_set)

p = lambda x: x.cigar
g = lambda x: x.rstrip(actions)
h = lambda x: f(x).rstrip(chars)

assert f(p(g(hit))) == f(h(p(hit)))


@pytest.mark.parametrize("reference_seq, query_seq, cigar, expected_reference, expected_query", [
('ACTG', 'ACTG', '4M', 'ACTG', 'ACTG'),
('ACTG', '', '4D', 'ACTG', '----'),
Expand Down
28 changes: 28 additions & 0 deletions micall/tests/test_contig_stitcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,31 @@ def test_stitching_of_zero_contigs():
contigs = []
result = list(stitch_contigs(contigs))
assert result == contigs


def test_correct_stitching_of_two_partially_overlapping_different_organism_contigs():
# Scenario: Two partially overlapping contigs, but which come from different organism,
# are not stitched into a single sequence.

ref_seq = 'A' * 100 + 'C' * 100

contigs = [
GenotypedContig(name='a',
seq='A' * 50 + 'C' * 20,
ref_name='testref-1',
ref_seq=ref_seq,
matched_fraction=0.5,
),
GenotypedContig(name='b',
seq='A' * 20 + 'C' * 50,
ref_name='testref-2',
ref_seq=ref_seq,
matched_fraction=0.5,
),
]

result = list(stitch_contigs(contigs))
assert len(result) == 2

assert sorted(map(lambda x: x.seq, contigs)) \
== sorted(map(lambda x: x.seq, result))
41 changes: 39 additions & 2 deletions micall/utils/cigar_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from math import ceil, floor
import re
from typing import Tuple, Iterable, Optional
from typing import Container, Tuple, Iterable, Optional
from dataclasses import dataclass
from functools import cached_property
from itertools import chain, dropwhile

from micall.utils.consensus_aligner import CigarActions

Expand Down Expand Up @@ -130,7 +131,7 @@ class Cigar(list):
"""


def __init__(self, cigar_lst):
def __init__(self, cigar_lst: Iterable[Tuple[int, CigarActions]]):
super().__init__([])
for x in cigar_lst: self.append(x)

Expand Down Expand Up @@ -239,6 +240,16 @@ def slice_operations(self, start_inclusive, end_noninclusive) -> 'Cigar':
[start_inclusive:end_noninclusive])


def lstrip(self, actions: Container[CigarActions]) -> 'Cigar':
""" Return a copy of the Cigar with leading actions removed. """
return Cigar(dropwhile(lambda tupl: tupl[1] in actions, self))


def rstrip(self, actions: Container[CigarActions]) -> 'Cigar':
""" Return a copy of the Cigar with trailing actions removed. """
return Cigar(reversed(list(dropwhile(lambda tupl: tupl[1] in actions, reversed(self)))))


@cached_property
def coordinate_mapping(self) -> CoordinateMapping:
"""
Expand Down Expand Up @@ -428,6 +439,32 @@ def cut_reference(self, cut_point: float) -> 'CigarHit':
return left, right


def lstrip(self, actions: Container[CigarActions]) -> 'CigarHit':
""" Return a copy of the CigarHit with leading actions removed. """

cigar = self.cigar.lstrip(actions)
reference_delta = cigar.ref_length - self.cigar.ref_length
query_delta = cigar.query_length - self.cigar.query_length
return CigarHit(cigar,
r_st=self.r_st,
r_ei=self.r_ei + reference_delta,
q_st=self.q_st,
q_ei=self.q_ei + query_delta)


def rstrip(self, actions: Container[CigarActions]) -> 'CigarHit':
""" Return a copy of the CigarHit with trailing actions removed. """

cigar = self.cigar.rstrip(actions)
reference_delta = cigar.ref_length - self.cigar.ref_length
query_delta = cigar.query_length - self.cigar.query_length
return CigarHit(cigar,
r_st=self.r_st,
r_ei=self.r_ei + reference_delta,
q_st=self.q_st,
q_ei=self.q_ei + query_delta)


@cached_property
def coordinate_mapping(self) -> CoordinateMapping:
return self.cigar.coordinate_mapping.translate(self.r_st, self.q_st)
Expand Down

0 comments on commit 711cbcd

Please sign in to comment.