From 24a0cbb6cdac2e894318e91f679620ee800d06fc Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Tue, 7 Nov 2023 14:46:46 -0800 Subject: [PATCH] Cigar tools: add lstrip and rstrip functions --- micall/tests/test_cigar_tools.py | 39 +++++++++++++++++++++++++- micall/tests/test_contig_stitcher.py | 28 +++++++++++++++++++ micall/utils/cigar_tools.py | 41 ++++++++++++++++++++++++++-- 3 files changed, 105 insertions(+), 3 deletions(-) diff --git a/micall/tests/test_cigar_tools.py b/micall/tests/test_cigar_tools.py index 1c31a7e8e..f23b02cbc 100644 --- a/micall/tests/test_cigar_tools.py +++ b/micall/tests/test_cigar_tools.py @@ -1,9 +1,10 @@ import pytest from typing import List, Tuple from math import floor +import itertools from micall.utils.consensus_aligner import CigarActions -from micall.utils.cigar_tools import Cigar, CigarHit +from micall.utils.cigar_tools import Cigar, CigarHit, parse_cigar_operation, CIGAR_OP_MAPPING cigar_mapping_cases: List[Tuple[Cigar, 'mapping', 'closest_mapping']] = [ @@ -284,6 +285,42 @@ def test_cigar_hit_ref_cut_add_associativity(hit, cut_point): assert (a + b) + c == a + (b + c) +@pytest.mark.parametrize('hit', [x[0] for x in cigar_hit_ref_cut_cases]) +def test_cigar_hit_lstrip_is_stringlike(hit): + all_chars = CIGAR_OP_MAPPING.keys() + + actions_of = lambda s: (x for x in s if x in all_chars) + + for r in range(len(all_chars) + 1): + for char_set in itertools.combinations(all_chars, r): + actions = set(map(parse_cigar_operation, char_set)) + chars = ''.join(char_set) + + p = lambda x: ''.join(actions_of(str(x.cigar))) + g = lambda x: x.lstrip(actions) + h = lambda x: x.lstrip(chars) + + assert p(g(hit)) == h(p(hit)) + + +@pytest.mark.parametrize('hit', [x[0] for x in cigar_hit_ref_cut_cases]) +def test_cigar_hit_rstrip_is_stringlike(hit): + all_chars = CIGAR_OP_MAPPING.keys() + + actions_of = lambda s: (x for x in s if x in all_chars) + + for r in range(len(all_chars) + 1): + for char_set in itertools.combinations(all_chars, r): + actions = set(map(parse_cigar_operation, char_set)) + chars = ''.join(char_set) + + p = lambda x: ''.join(actions_of(str(x.cigar))) + g = lambda x: x.rstrip(actions) + h = lambda x: x.rstrip(chars) + + assert p(g(hit)) == h(p(hit)) + + @pytest.mark.parametrize("reference_seq, query_seq, cigar, expected_reference, expected_query", [ ('ACTG', 'ACTG', '4M', 'ACTG', 'ACTG'), ('ACTG', '', '4D', 'ACTG', '----'), diff --git a/micall/tests/test_contig_stitcher.py b/micall/tests/test_contig_stitcher.py index 7946926ef..eed5c540f 100644 --- a/micall/tests/test_contig_stitcher.py +++ b/micall/tests/test_contig_stitcher.py @@ -221,3 +221,31 @@ def test_stitching_of_zero_contigs(): contigs = [] result = list(stitch_contigs(contigs)) assert result == contigs + + +def test_correct_stitching_of_two_partially_overlapping_different_organism_contigs(): + # Scenario: Two partially overlapping contigs, but which come from different organism, + # are not stitched into a single sequence. + + ref_seq = 'A' * 100 + 'C' * 100 + + contigs = [ + GenotypedContig(name='a', + seq='A' * 50 + 'C' * 20, + ref_name='testref-1', + ref_seq=ref_seq, + matched_fraction=0.5, + ), + GenotypedContig(name='b', + seq='A' * 20 + 'C' * 50, + ref_name='testref-2', + ref_seq=ref_seq, + matched_fraction=0.5, + ), + ] + + result = list(stitch_contigs(contigs)) + assert len(result) == 2 + + assert sorted(map(lambda x: x.seq, contigs)) \ + == sorted(map(lambda x: x.seq, result)) diff --git a/micall/utils/cigar_tools.py b/micall/utils/cigar_tools.py index e3f3f1fdf..370949c35 100644 --- a/micall/utils/cigar_tools.py +++ b/micall/utils/cigar_tools.py @@ -4,9 +4,10 @@ from math import ceil, floor import re -from typing import Tuple, Iterable, Optional +from typing import Container, Tuple, Iterable, Optional from dataclasses import dataclass from functools import cached_property +from itertools import chain, dropwhile from micall.utils.consensus_aligner import CigarActions @@ -130,7 +131,7 @@ class Cigar(list): """ - def __init__(self, cigar_lst): + def __init__(self, cigar_lst: Iterable[Tuple[int, CigarActions]]): super().__init__([]) for x in cigar_lst: self.append(x) @@ -239,6 +240,16 @@ def slice_operations(self, start_inclusive, end_noninclusive) -> 'Cigar': [start_inclusive:end_noninclusive]) + def lstrip(self, actions: Container[CigarActions]) -> 'Cigar': + """ Return a copy of the Cigar with leading actions removed. """ + return Cigar(dropwhile(lambda tupl: tupl[1] in actions, self)) + + + def rstrip(self, actions: Container[CigarActions]) -> 'Cigar': + """ Return a copy of the Cigar with trailing actions removed. """ + return Cigar(reversed(list(dropwhile(lambda tupl: tupl[1] in actions, reversed(self))))) + + @cached_property def coordinate_mapping(self) -> CoordinateMapping: """ @@ -428,6 +439,32 @@ def cut_reference(self, cut_point: float) -> 'CigarHit': return left, right + def lstrip(self, actions: Container[CigarActions]) -> 'CigarHit': + """ Return a copy of the CigarHit with leading actions removed. """ + + cigar = self.cigar.lstrip(actions) + reference_delta = cigar.ref_length - self.cigar.ref_length + query_delta = cigar.query_length - self.cigar.query_length + return CigarHit(cigar, + r_st=self.r_st, + r_ei=self.r_ei + reference_delta, + q_st=self.q_st, + q_ei=self.q_ei + query_delta) + + + def rstrip(self, actions: Container[CigarActions]) -> 'CigarHit': + """ Return a copy of the CigarHit with trailing actions removed. """ + + cigar = self.cigar.rstrip(actions) + reference_delta = cigar.ref_length - self.cigar.ref_length + query_delta = cigar.query_length - self.cigar.query_length + return CigarHit(cigar, + r_st=self.r_st, + r_ei=self.r_ei + reference_delta, + q_st=self.q_st, + q_ei=self.q_ei + query_delta) + + @cached_property def coordinate_mapping(self) -> CoordinateMapping: return self.cigar.coordinate_mapping.translate(self.r_st, self.q_st)