Skip to content

Commit

Permalink
Add CIGAR tools module
Browse files Browse the repository at this point in the history
  • Loading branch information
Donaim committed Nov 6, 2023
1 parent d62ac24 commit 951ca56
Show file tree
Hide file tree
Showing 5 changed files with 949 additions and 1 deletion.
133 changes: 133 additions & 0 deletions micall/core/contig_stitcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import argparse
import logging
import os
from typing import Iterable, Optional
from collections import Counter
from csv import DictWriter, DictReader
from dataclasses import dataclass
from datetime import datetime
from glob import glob
from io import StringIO
from itertools import chain
from operator import itemgetter
from shutil import rmtree
from subprocess import run, PIPE, CalledProcessError, STDOUT
from tempfile import mkdtemp
from mappy import Aligner

from micall.utils.cigar_tools import connect_cigar_hits, CigarHit


@dataclass
class Contig:
name: str
seq: str


@dataclass
class GenotypedContig(Contig):
ref_name: str
ref_seq: str
matched_fraction: Optional[float] # Approximated overall concordance between `seq` and `ref_seq`.


@dataclass
class AlignedContig:
contig: GenotypedContig
alignment: CigarHit

def reference_slice(self, r_st, r_en):
""" Narrows this alignment to a more specific region in reference. """

alignment = self.alignment.reference_slice(r_st, r_en)
contig = self.contig
return AlignedContig(contig, alignment)


def align_to_reference(contig: GenotypedContig):
aligner = Aligner(seq=contig.ref_seq, bw=500, bw_long=500, preset='map-ont')
alignments = list(aligner.map(contig.seq))
if not alignments:
return AlignedContig(contig=contig, alignment=None)

hits_array = [CigarHit(x.cigar, x.r_st, x.r_en - 1, x.q_st, x.q_en - 1) for x in alignments]
single_cigar_hit = connect_cigar_hits(hits_array)
return AlignedContig(contig=contig, alignment=single_cigar_hit)


def intervals_overlap(x, y):
""" Check if two intervals (x0, x1) and (y0, y1) overlap. """

if x[0] > y[1] or y[0] > x[1]:
return False
else:
return True


def interval_contains(x, y):
""" Check if interval (x0, x1) contains interval (y0, y1). """

return x[0] <= y[0] and x[1] >= y[1]


def find_all_overlapping_contigs(self, aligned_contigs):
for other in aligned_contigs:
if self.contig.ref_name != other.contig.ref_name:
continue

if self.alignment.overlaps(other.alignment):
yield other


def find_overlapping_contig(self, aligned_contigs):
every = find_all_overlapping_contigs(self, aligned_contigs)
return max(chain(every, [None]),
key=lambda other: other.alignment.r_ei - other.alignment.r_st if other else 0)


def calculate_overlap(left, right):
left_seq_0 = left.contig.seq
right_seq_0 = right.contig.seq

left_mapping = left.alignment.coordinate_mapping
right_mapping = right.alignment.coordinate_mapping

overlap_interval = (right.alignment.r_st, left.alignment.r_ei)
# left_overlap_seq = [left.contig.ref_seq[i] for i in ]

left_overlap = left.reference_slice(overlap_interval[0], overlap_interval[1])
right_overlap = right.reference_slice(overlap_interval[0], overlap_interval[1])




def stitch_contigs(contigs: Iterable[GenotypedContig]):
aligned = list(map(align_to_reference, contigs))

# Contigs that did not align do not need any more processing
stitched = [x.contig for x in aligned if x.alignment is None]
aligned = [x for x in aligned if x.alignment is not None]

# Going left-to-right through aligned parts.
aligned = list(sorted(aligned, key=lambda x: x.alignment.r_st))
while aligned:
current = aligned.pop(0)

# Filter out all contigs that are contained within the current one.
# TODO: actually filter out if covered by multiple contigs
aligned = [x for x in aligned if not \
interval_contains((current.alignment.r_st, current.alignment.r_ei),
(x.alignment.r_st, x.alignment.r_ei))]

# Find overlap. If there isn't one - we are done with the current contig.
overlapping_contig = find_overlapping_contig(current, aligned)
if not overlapping_contig:
stitched.append(current.contig)
continue

# Get overlaping regions
overlap = calculate_overlap(current, overlapping_contig)

# aligned.append(combined)

return stitched
Loading

0 comments on commit 951ca56

Please sign in to comment.