Skip to content

Commit

Permalink
Contig stitcher: reimplement complete coverage check
Browse files Browse the repository at this point in the history
* add merge_intervals helper function
* check if contig is covered by multiple other contigs
  • Loading branch information
Donaim committed Nov 14, 2023
1 parent 66fcafd commit 224fbd0
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 13 deletions.
69 changes: 58 additions & 11 deletions micall/core/contig_stitcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,21 +273,68 @@ def combine_overlaps(contigs: List[AlignedContig]) -> Iterable[AlignedContig]:
contigs.insert(0, new_contig)


def drop_completely_covered(contigs: List[AlignedContig]) -> List[AlignedContig]:
""" Filter out all contigs that are contained within other contigs. """
def merge_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
"""
Merge overlapping and adjacent intervals.
Note that intervals are inclusive.
:param intervals: A list of intervals [start, end] where 'start' and 'end' are integers.
:eturn: A list of merged intervals.
"""

if not intervals:
return []

# TODO: filter out if covered by multiple contigs
# TODO: split contigs that have big gaps in them first, otherwise they will cover too much.
# Sort intervals by their starting values
sorted_intervals = sorted(intervals, key=lambda x: x[0])

def find_most_covered(contigs) -> Optional[AlignedContig]:
for current in contigs:
if any(x for x in contigs if x != current and x.contains(current)):
return current
merged_intervals = [sorted_intervals[0]]
for current in sorted_intervals[1:]:
current_start, current_end = current
last_start, last_end = merged_intervals[-1]
if current_start <= last_end + 1:
# Extend the last interval if there is an overlap or if they are adjacent
merged_intervals[-1] = (min(last_start, current_start), max(last_end, current_end))
else:
# Add the current interval if there is no overlap
merged_intervals.append(current)

return merged_intervals


def find_covered_contig(contigs: List[AlignedContig]) -> Optional[AlignedContig]:
"""
Find and return the first contig that is completely covered by other contigs.
:param contigs: List of all aligned contigs to be considered.
:return: An AlignedContig if there is one completely covered by others, None otherwise.
"""

def calculate_cumulative_coverage(contigs) -> List[Tuple[int, int]]:
intervals = [(contig.alignment.r_st, contig.alignment.r_ei) for contig in contigs]
merged_intervals = merge_intervals(intervals)
return merged_intervals

for current in contigs:
current_interval = (current.alignment.r_st, current.alignment.r_ei)

# Create a map of cumulative coverage for contigs
other_contigs = [x for x in contigs if x != current and x.ref_name == current.ref_name]
cumulative_coverage = calculate_cumulative_coverage(other_contigs)

# Check if the current contig is covered by the cumulative coverage intervals
if any((cover_interval[0] <= current_interval[0] and cover_interval[1] >= current_interval[1])
for cover_interval in cumulative_coverage):
return current


def drop_completely_covered(contigs: List[AlignedContig]) -> List[AlignedContig]:
""" Filter out all contigs that are contained within other contigs. """

while contigs:
most_covered = find_most_covered(contigs)
if most_covered:
contigs.remove(most_covered)
covered = find_covered_contig(contigs)
if covered:
contigs.remove(covered)
else:
break

Check warning on line 339 in micall/core/contig_stitcher.py

View check run for this annotation

Codecov / codecov/patch

micall/core/contig_stitcher.py#L339

Added line #L339 was not covered by tests

Expand Down
170 changes: 168 additions & 2 deletions micall/tests/test_contig_stitcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

import pytest
from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig
from micall.core.contig_stitcher import split_contigs_with_gaps, stitch_contigs, GenotypedContig, merge_intervals, find_covered_contig
from micall.tests.utils import MockAligner


Expand Down Expand Up @@ -255,7 +255,6 @@ def test_correct_processing_complex_nogaps(exact_aligner):
# Scenario: There are two reference organisms.
# Each with 4 contigs.
# For each, three overlapping contigs are stitched together, the non-overlapping is kept separate.
# This seems like the most general scenario if no gaps or complete goverage is involved.

ref_seq = 'A' * 100 + 'C' * 100 + 'T' * 100 + 'G' * 100

Expand Down Expand Up @@ -438,3 +437,170 @@ def test_stitching_contig_with_small_covered_gap(exact_aligner):

assert sorted(map(lambda x: x.seq, contigs)) \
== sorted(map(lambda x: x.seq, result))



# _ _ _ _ _ _
# | | | |_ __ (_) |_ | |_ ___ ___| |_ ___
# | | | | '_ \| | __| | __/ _ \/ __| __/ __|
# | |_| | | | | | |_ | || __/\__ \ |_\__ \
# \___/|_| |_|_|\__| \__\___||___/\__|___/
#

@pytest.mark.parametrize("intervals, expected", [
([], []),
([(1, 3)], [(1, 3)]),
# Non-overlapping intervals
([(1, 3), (5, 6)], [(1, 3), (5, 6)]),
# Directly overlapping intervals
([(1, 3), (2, 5)], [(1, 5)]),
# Adjacent intervals that exactly touch each other
([(1, 2), (3, 4)], [(1, 4)]),
# Nested intervals
([(1, 10), (2, 5)], [(1, 10)]),
# Multiple merged intervals
([(1, 3), (2, 4), (6, 8), (10, 11), (11, 12)],
[(1, 4), (6, 8), (10, 12)]),
# Intervals out of initial order
([(4, 6), (1, 2)],
[(1, 2), (4, 6)]),
# Overlapping intervals with out of order inputs
([(1, 4), (3, 5), (2, 3), (7, 10), (9, 12)],
[(1, 5), (7, 12)]),
# Large set of intervals with various overlaps
([(1, 4), (2, 6), (5, 8), (7, 8), (10, 15), (11, 12), (13, 14), (17, 18)],
[(1, 8), (10, 15), (17, 18)]),
# Intervals where end is less than start should return as is or be handled explicitly depending on implementation
([(5, 3), (1, 2)],
[(1, 2), (5, 3)]),
# Intervals that are exactly one after the other in sequence / Intervals that are completely disjoint
([(1, 2), (4, 5), (7, 8)],
[(1, 2), (4, 5), (7, 8)]),
# Overlapping intervals that merge into one large interval
([(2, 6), (4, 10), (5, 15), (14, 20)],
[(2, 20)]),
# Same interval repeated multiple times
([(1, 5), (1, 5), (1, 5)],
[(1, 5)]),
# Single point intervals
([(1, 1), (5, 5), (3, 3)],
[(1, 1), (3, 3), (5, 5)]),
([(1, 1), (5, 5), (3, 3), (1, 1), (1, 1)],
[(1, 1), (3, 3), (5, 5)]),
([(1, 1), (2, 3)],
[(1, 3)]),
# Intervals that start with negative numbers
([(-5, 0), (-2, 3), (1, 7), (9, 12)],
[(-5, 7), (9, 12)]),
])
def test_merge_intervals(intervals, expected):
assert merge_intervals(intervals) == expected


class MockAlignedContig:
def __init__(self, ref_name, r_st, r_ei, name="contig"):
self.ref_name = ref_name
self.alignment = MockAlignment(r_st, r_ei)
self.name = name


class MockAlignment:
def __init__(self, r_st, r_ei):
self.r_st = r_st
self.r_ei = r_ei


# Simple function to create mock AlignedContig objects for testing, including ref_name.
def create_mock_aligned_contig(ref_name, r_st, r_ei, name="contig"):
return MockAlignedContig(ref_name, r_st, r_ei, name)


@pytest.mark.parametrize("contigs, expected_covered_name", [
# No contigs are completely covered.
([('ref1', 0, 100), ('ref1', 101, 200)], None),
([('ref1', 0, 50), ('ref1', 51, 100)], None),
# A single contig is completely covered by one other contig.
([('ref1', 0, 100), ('ref1', 0, 200)], 'contig1'),
([('ref1', 50, 150), ('ref1', 0, 200)], 'contig1'),
# A single contig completely covers another, but with different reference names.
([('ref1', 0, 50), ('ref2', 0, 100)], None),
# Single coverage with exact match.
([('ref1', 0, 100), ('ref1', 0, 100)], 'contig1'),
# A single contig is completely covered at the beginning by one and at the end by another contig.
([('ref1', 0, 50), ('ref1', 50, 100), ('ref1', 25, 75)], 'contig3'),
# Contigs overlap but none are completely covered.
([('ref1', 0, 50), ('ref1', 40, 90), ('ref1', 80, 120)], None),
# Multiple contigs with some covered completely by a single other contig.
([('ref1', 0, 200), ('ref1', 10, 30), ('ref1', 170, 190)], 'contig2'),
# Multiple contigs with complex overlaps and one completely covered.
([('ref1', 30, 60), ('ref1', 0, 50), ('ref1', 20, 70), ('ref1', 60, 90)], 'contig1'),
# Edge case where a contig starts where another ends.
([('ref1', 0, 50), ('ref1', 50, 100)], None),
# Contigs are completely covered in a nested fashion.
([('ref1', 0, 200), ('ref1', 50, 150), ('ref1', 100, 125)], 'contig2'),
# Contigs are adjacent and cover each other completely.
([('ref1', 0, 100), ('ref1', 101, 200), ('ref1', 0, 200)], 'contig1'),
# Single large contig covers several smaller non-adjacent contigs.
([('ref1', 0, 500), ('ref1', 50, 100), ('ref1', 200, 250), ('ref1', 300, 350)], 'contig2'),
# Single large contig covers several smaller adjacent contigs.
([('ref1', 50, 100), ('ref1', 70, 300), ('ref1', 101, 199), ('ref1', 200, 350)], 'contig2'),
# Single small contig is covered by several larger contigs.
([('ref1', 0, 250), ('ref1', 200, 300), ('ref1', 600, 800), ('ref1', 250, 700)], 'contig2'),
# Complex case with multiple contigs and complete coverage by combinations.
([('ref1', 0, 100), ('ref1', 30, 130), ('ref1', 60, 160), ('ref1', 90, 190), ('ref1', 120, 220)], 'contig2'),
# Contigs with same start but different end, where one is covered.
([('ref1', 0, 100), ('ref1', 0, 50)], 'contig2'),
# Contigs with same end but different start, where one is covered.
([('ref1', 50, 100), ('ref1', 0, 100)], 'contig1'),
# Contig covered by two overlapping contigs that don't individually cover the whole range.
([('ref1', 0, 75), ('ref1', 25, 100), ('ref1', 0, 100)], 'contig1'),
# Two contigs are covered completely by one large contig.
([('ref1', 0, 300), ('ref1', 50, 100), ('ref1', 200, 250)], 'contig2'),
# No contigs at all.
([], None),
])
def test_find_covered(contigs, expected_covered_name):
mock_contigs = [create_mock_aligned_contig(ref_name, r_st, r_ei, f'contig{i+1}')
for i, (ref_name, r_st, r_ei) in enumerate(contigs)]
covered = find_covered_contig(mock_contigs)
if expected_covered_name is None:
assert covered is None
else:
assert covered is not None
assert covered.name == expected_covered_name

0 comments on commit 224fbd0

Please sign in to comment.