Skip to content

Commit

Permalink
parse: refactor alignment list sorting/tagging / insertion of empty a…
Browse files Browse the repository at this point in the history
…lgns
  • Loading branch information
golobor committed Mar 25, 2024
1 parent 414db72 commit 5d08a41
Showing 1 changed file with 66 additions and 55 deletions.
121 changes: 66 additions & 55 deletions pairtools/lib/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,66 @@ def flip_alignment(hic_algn):
return hic_algn



def _convert_gaps_into_alignments(sorted_algns, max_inter_align_gap):
"""
Inplace conversion of gaps longer than max_inter_align_gap into alignments
"""
if (len(sorted_algns) == 1) and (not sorted_algns[0]["is_mapped"]):
return

last_5_pos = 0
for i in range(len(sorted_algns)):
algn = sorted_algns[i]
if algn["dist_to_5"] - last_5_pos > max_inter_align_gap:
new_algn = empty_alignment()
new_algn["dist_to_5"] = last_5_pos
new_algn["algn_read_span"] = algn["dist_to_5"] - last_5_pos
new_algn["read_len"] = algn["read_len"]
new_algn["dist_to_3"] = new_algn["read_len"] - algn["dist_to_5"]

last_5_pos = algn["dist_to_5"] + algn["algn_read_span"]

sorted_algns.insert(i, new_algn)
i += 2
else:
last_5_pos = max(last_5_pos, algn["dist_to_5"] + algn["algn_read_span"])
i += 1


def normalize_alignment_list(algns, side, sort_by="dist_to_5", max_inter_align_gap=None):
"""
Normalize the alignment list: insert empty alignments in gaps between alignments,
sort by distance to the 5' end, add read side, alignment index.
Args:
algns (list): The list of alignments.
side (str): The side of the alignment.
sort_by (str, optional): The key to sort the alignments by. Defaults to "dist_to_5".
max_inter_align_gap (int, optional): The maximum allowed gap between alignments. Defaults to None.
Returns:
list: The normalized alignment list.
"""
if len(algns) == 0:
algns = [empty_alignment()]

if max_inter_align_gap is not None:
_convert_gaps_into_alignments(algns, max_inter_align_gap)

if sort_by:
algns = sorted(algns, key=lambda algn: algn[sort_by])


for i, algn in enumerate(algns):
algn["read_side"] = side
algn["align_idx"] = i
algn["tot_align_count"] = len(algns)

return algns


def flip_orientation(hic_algn):
"""
Flip orientation of a single alignment
Expand Down Expand Up @@ -475,18 +535,9 @@ def parse_read(
for sam in sams2
]

if len(algns1) > 0:
algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"])
else:
algns1 = [empty_alignment()] # Empty alignment dummy
if len(algns2) > 0:
algns2 = sorted(algns2, key=lambda algn: algn["dist_to_5"])
else:
algns2 = [empty_alignment()] # Empty alignment dummy
algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap)
algns2 = normalize_alignment_list(algns2, 2, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap)

if max_inter_align_gap is not None:
_convert_gaps_into_alignments(algns1, max_inter_align_gap)
_convert_gaps_into_alignments(algns2, max_inter_align_gap)

# By default, assume each molecule is a single pair with single unconfirmed pair:
hic_algn1 = algns1[0]
Expand Down Expand Up @@ -619,10 +670,8 @@ def parse2_read(
)
for sam in sams2 # note sams2, that's how these reads are typically parsed
]
algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"])
if max_inter_align_gap is not None:
_convert_gaps_into_alignments(algns1, max_inter_align_gap)


algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap)
algns2 = [empty_alignment()] # Empty alignment dummy

if len(algns1) > 1:
Expand Down Expand Up @@ -678,20 +727,8 @@ def parse2_read(
for sam in sams2
]

# Sort alignments by the distance to the 5'-end:
if len(algns1) > 0:
algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"])
else:
algns1 = [empty_alignment()] # Empty alignment dummy
if len(algns2) > 0:
algns2 = sorted(algns2, key=lambda algn: algn["dist_to_5"])
else:
algns2 = [empty_alignment()] # Empty alignment dummy

# Convert alignment gaps to alignments:
if max_inter_align_gap is not None:
_convert_gaps_into_alignments(algns1, max_inter_align_gap)
_convert_gaps_into_alignments(algns2, max_inter_align_gap)
algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap)
algns2 = normalize_alignment_list(algns2, 2, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap)

is_chimeric_1 = len(algns1) > 1
is_chimeric_2 = len(algns2) > 1
Expand Down Expand Up @@ -829,32 +866,6 @@ def rescue_walk(algns1, algns2, max_molecule_size):
return None


def _convert_gaps_into_alignments(sorted_algns, max_inter_align_gap):
"""
Inplace conversion of gaps longer than max_inter_align_gap into alignments
"""
if (len(sorted_algns) == 1) and (not sorted_algns[0]["is_mapped"]):
return

last_5_pos = 0
for i in range(len(sorted_algns)):
algn = sorted_algns[i]
if algn["dist_to_5"] - last_5_pos > max_inter_align_gap:
new_algn = empty_alignment()
new_algn["dist_to_5"] = last_5_pos
new_algn["algn_read_span"] = algn["dist_to_5"] - last_5_pos
new_algn["read_len"] = algn["read_len"]
new_algn["dist_to_3"] = new_algn["read_len"] - algn["dist_to_5"]

last_5_pos = algn["dist_to_5"] + algn["algn_read_span"]

sorted_algns.insert(i, new_algn)
i += 2
else:
last_5_pos = max(last_5_pos, algn["dist_to_5"] + algn["algn_read_span"])
i += 1


def parse_complex_walk(
algns1,
algns2,
Expand Down

0 comments on commit 5d08a41

Please sign in to comment.