From 5d08a415487a66617e5cb3232190547f029543d2 Mon Sep 17 00:00:00 2001 From: Anton Goloborodko Date: Mon, 25 Mar 2024 11:01:45 +0100 Subject: [PATCH] parse: refactor alignment list sorting/tagging / insertion of empty algns --- pairtools/lib/parse.py | 121 ++++++++++++++++++++++------------------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/pairtools/lib/parse.py b/pairtools/lib/parse.py index 99a9006..f6382c2 100644 --- a/pairtools/lib/parse.py +++ b/pairtools/lib/parse.py @@ -392,6 +392,66 @@ def flip_alignment(hic_algn): return hic_algn + +def _convert_gaps_into_alignments(sorted_algns, max_inter_align_gap): + """ + Inplace conversion of gaps longer than max_inter_align_gap into alignments + """ + if (len(sorted_algns) == 1) and (not sorted_algns[0]["is_mapped"]): + return + + last_5_pos = 0 + for i in range(len(sorted_algns)): + algn = sorted_algns[i] + if algn["dist_to_5"] - last_5_pos > max_inter_align_gap: + new_algn = empty_alignment() + new_algn["dist_to_5"] = last_5_pos + new_algn["algn_read_span"] = algn["dist_to_5"] - last_5_pos + new_algn["read_len"] = algn["read_len"] + new_algn["dist_to_3"] = new_algn["read_len"] - algn["dist_to_5"] + + last_5_pos = algn["dist_to_5"] + algn["algn_read_span"] + + sorted_algns.insert(i, new_algn) + i += 2 + else: + last_5_pos = max(last_5_pos, algn["dist_to_5"] + algn["algn_read_span"]) + i += 1 + + +def normalize_alignment_list(algns, side, sort_by="dist_to_5", max_inter_align_gap=None): + """ + Normalize the alignment list: insert empty alignments in gaps between alignments, + sort by distance to the 5' end, add read side, alignment index. + + Args: + algns (list): The list of alignments. + side (str): The side of the alignment. + sort_by (str, optional): The key to sort the alignments by. Defaults to "dist_to_5". + max_inter_align_gap (int, optional): The maximum allowed gap between alignments. Defaults to None. + + Returns: + list: The normalized alignment list. + + """ + if len(algns) == 0: + algns = [empty_alignment()] + + if max_inter_align_gap is not None: + _convert_gaps_into_alignments(algns, max_inter_align_gap) + + if sort_by: + algns = sorted(algns, key=lambda algn: algn[sort_by]) + + + for i, algn in enumerate(algns): + algn["read_side"] = side + algn["align_idx"] = i + algn["tot_align_count"] = len(algns) + + return algns + + def flip_orientation(hic_algn): """ Flip orientation of a single alignment @@ -475,18 +535,9 @@ def parse_read( for sam in sams2 ] - if len(algns1) > 0: - algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"]) - else: - algns1 = [empty_alignment()] # Empty alignment dummy - if len(algns2) > 0: - algns2 = sorted(algns2, key=lambda algn: algn["dist_to_5"]) - else: - algns2 = [empty_alignment()] # Empty alignment dummy + algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) + algns2 = normalize_alignment_list(algns2, 2, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) - if max_inter_align_gap is not None: - _convert_gaps_into_alignments(algns1, max_inter_align_gap) - _convert_gaps_into_alignments(algns2, max_inter_align_gap) # By default, assume each molecule is a single pair with single unconfirmed pair: hic_algn1 = algns1[0] @@ -619,10 +670,8 @@ def parse2_read( ) for sam in sams2 # note sams2, that's how these reads are typically parsed ] - algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"]) - if max_inter_align_gap is not None: - _convert_gaps_into_alignments(algns1, max_inter_align_gap) - + + algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) algns2 = [empty_alignment()] # Empty alignment dummy if len(algns1) > 1: @@ -678,20 +727,8 @@ def parse2_read( for sam in sams2 ] - # Sort alignments by the distance to the 5'-end: - if len(algns1) > 0: - algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"]) - else: - algns1 = [empty_alignment()] # Empty alignment dummy - if len(algns2) > 0: - algns2 = sorted(algns2, key=lambda algn: algn["dist_to_5"]) - else: - algns2 = [empty_alignment()] # Empty alignment dummy - - # Convert alignment gaps to alignments: - if max_inter_align_gap is not None: - _convert_gaps_into_alignments(algns1, max_inter_align_gap) - _convert_gaps_into_alignments(algns2, max_inter_align_gap) + algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) + algns2 = normalize_alignment_list(algns2, 2, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) is_chimeric_1 = len(algns1) > 1 is_chimeric_2 = len(algns2) > 1 @@ -829,32 +866,6 @@ def rescue_walk(algns1, algns2, max_molecule_size): return None -def _convert_gaps_into_alignments(sorted_algns, max_inter_align_gap): - """ - Inplace conversion of gaps longer than max_inter_align_gap into alignments - """ - if (len(sorted_algns) == 1) and (not sorted_algns[0]["is_mapped"]): - return - - last_5_pos = 0 - for i in range(len(sorted_algns)): - algn = sorted_algns[i] - if algn["dist_to_5"] - last_5_pos > max_inter_align_gap: - new_algn = empty_alignment() - new_algn["dist_to_5"] = last_5_pos - new_algn["algn_read_span"] = algn["dist_to_5"] - last_5_pos - new_algn["read_len"] = algn["read_len"] - new_algn["dist_to_3"] = new_algn["read_len"] - algn["dist_to_5"] - - last_5_pos = algn["dist_to_5"] + algn["algn_read_span"] - - sorted_algns.insert(i, new_algn) - i += 2 - else: - last_5_pos = max(last_5_pos, algn["dist_to_5"] + algn["algn_read_span"]) - i += 1 - - def parse_complex_walk( algns1, algns2,