diff --git a/micall/core/contig_stitcher.py b/micall/core/contig_stitcher.py index 1f05daa72..76d95afea 100644 --- a/micall/core/contig_stitcher.py +++ b/micall/core/contig_stitcher.py @@ -240,30 +240,52 @@ def stitch_2_contigs(left, right): return FrankensteinContig([left_remainder, overlap_contig, right_remainder]) -def stitch_contigs(contigs: Iterable[GenotypedContig]): - aligned = list(map(align_to_reference, contigs)) - - # Contigs that did not align do not need any more processing - yield from (x for x in aligned if not isinstance(x, AlignedContig)) - aligned = [x for x in aligned if isinstance(x, AlignedContig)] - +def combine_overlaps(contigs: List[AlignedContig]) -> Iterable[AlignedContig]: # Going left-to-right through aligned contigs. - aligned = list(sorted(aligned, key=lambda x: x.alignment.r_st)) - while aligned: - current = aligned.pop(0) - - # Filter out all contigs that are contained within the current one. - # TODO: actually filter out if covered by multiple contigs - # TODO: split contigs that have big gaps in them first, otherwise they will cover too much. - aligned = [x for x in aligned if not current.contains(x)] + contigs = list(sorted(contigs, key=lambda x: x.alignment.r_st)) + while contigs: + current = contigs.pop(0) # Find overlap. If there isn't one - we are done with the current contig. - overlapping_contig = find_overlapping_contig(current, aligned) + overlapping_contig = find_overlapping_contig(current, contigs) if not overlapping_contig: yield current continue # Replace two contigs by their stitched version, then loop with it. new_contig = stitch_2_contigs(current, overlapping_contig) - aligned.remove(overlapping_contig) - aligned.insert(0, new_contig) + contigs.remove(overlapping_contig) + contigs.insert(0, new_contig) + + +def drop_completely_covered(contigs: List[AlignedContig]) -> List[AlignedContig]: + """ Filter out all contigs that are contained within other contigs. """ + + # TODO: filter out if covered by multiple contigs + # TODO: split contigs that have big gaps in them first, otherwise they will cover too much. + + def find_most_covered(contigs) -> Optional[AlignedContig]: + for current in contigs: + if any(x for x in contigs if x != current and x.contains(current)): + return current + + while contigs: + most_covered = find_most_covered(contigs) + if most_covered: + contigs.remove(most_covered) + else: + break + + return contigs + + +def stitch_contigs(contigs: Iterable[GenotypedContig]): + maybe_aligned = list(map(align_to_reference, contigs)) + + # Contigs that did not align do not need any more processing + yield from (x for x in maybe_aligned if not isinstance(x, AlignedContig)) + aligned = [x for x in maybe_aligned if isinstance(x, AlignedContig)] + + aligned = drop_completely_covered(aligned) + + yield from combine_overlaps(aligned) diff --git a/micall/tests/test_contig_stitcher.py b/micall/tests/test_contig_stitcher.py index a00a1cfb5..234a6e753 100644 --- a/micall/tests/test_contig_stitcher.py +++ b/micall/tests/test_contig_stitcher.py @@ -201,19 +201,18 @@ def test_stitching_of_identical_contigs(): # Scenario: The function correctly handles and avoids duplication when identical contigs are stitched together. ref_seq = 'A' * 100 - contig = \ - GenotypedContig(name='a', + contigs = [ + GenotypedContig(name=name, seq='ACTGACTG' * 100, ref_name='testref', ref_seq='ACTGACTG' * 100, matched_fraction=1.0, ) - - contigs = [contig, contig, contig] + for name in ["a", "b", "c"]] result = list(stitch_contigs(contigs)) assert len(result) == 1 - assert result[0].query == contig + assert result[0].query == contigs[2] def test_stitching_of_zero_contigs():