Skip to content

Commit

Permalink
single-end and expand tests added. fixed issue with empty alignment a…
Browse files Browse the repository at this point in the history
…t R2 in single-end mode that was emerging together with --expand.
  • Loading branch information
agalitsyna committed Oct 4, 2024
1 parent d3624a4 commit 942b490
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 58 deletions.
4 changes: 2 additions & 2 deletions pairtools/cli/parse2.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@
help="""Reported position of alignments in pairs of complex walks (pos columns).
Each alignment in .bam/.sam Hi-C-like data has two ends, and you can report one or another depending of the position of alignment on a read or in a pair.
"junction" - inner ends of sequential alignments in each pair, aka ligation junctions (complex walks default),
"junction" - inner ends of sequential alignments in each pair, aka ligation junctions,
"read" - 5'-end of alignments relative to R1 or R2 read coordinate system (as in traditional Hi-C),
"walk" - 5'-end of alignments relative to the whole walk coordinate system,
"outer" - outer ends of sequential alignments in each pair. """,
"outer" - outer ends of sequential alignments in each pair (parse2 default). """,
)
@click.option(
"--report-orientation",
Expand Down
115 changes: 59 additions & 56 deletions pairtools/lib/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def parse2_read(
]

algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap)
algns2 = [empty_alignment()] # Empty alignment dummy
algns2 = [] # Empty alignment dummy

if len(algns1) > 1:
# Look for ligation pair, and report linear alignments after deduplication of complex walks:
Expand Down Expand Up @@ -891,11 +891,11 @@ def parse_complex_walk(
**Intramolecular deduplication**
Forward read (left): right read (right):
Forward read (left): right read (right):
5'------------------------->3' 3'<--------------------------5'
algns1 algns2
algns1 algns2
<5---3><5---3><5---3><5---3> <3---5><3---5><3---5><3---5>
l0 l1 l2 l3 r3 r2 r1 r0
l0 l1 l2 l3 r3 r2 r1 r0
Alignment - bwa mem reported hit or alignment after gaps conversion.
Left and right alignments (algns1: [l0, l1, l2, l3], algns2: [r0, r1, r2, r3])
Expand Down Expand Up @@ -929,8 +929,8 @@ def parse_complex_walk(
If comparison is successful, go to 6.
6. Verify.
Check that downstream pairs on the left read overlap with the upstream pairs on the right read.
If yes, exit.
If not, we do not have an overlap, go to step 3.
If yes, exit.
If not, we do not have an overlap, go to step 3.
"""

AVAILABLE_REPORT_POSITION = ["outer", "junction", "read", "walk"]
Expand Down Expand Up @@ -1007,66 +1007,70 @@ def parse_complex_walk(
if not is_overlap:
current_right_pair = 1

# II. Search of partial overlap if there are less than 2 alignments at either sides, or no overlaps found
if current_right_pair == 1:
last_reported_alignment_left = last_reported_alignment_right = 1
if partial_overlap(
algns1[-1],
algns2[-1],
max_insert_size=max_insert_size,
dedup_max_mismatch=dedup_max_mismatch,
):
if (
n_algns1 >= 2
): # single alignment on right read and multiple alignments on left
pair_index = (len(algns1) - 1, "R1")
output_pairs.append(
format_pair(
algns1[-2],
algns1[-1],
pair_index=pair_index,
algn2_pos3=algns2[-1]["pos5"],
report_position=report_position,
report_orientation=report_orientation,
if (n_algns2 == 0):
last_reported_alignment_left = 1
last_reported_alignment_right = 0
else:
# II. Search of partial overlap if there are less than 2 alignments at either sides, or no overlaps found
if (current_right_pair == 1):
last_reported_alignment_left = last_reported_alignment_right = 1
if partial_overlap(
algns1[-1],
algns2[-1],
max_insert_size=max_insert_size,
dedup_max_mismatch=dedup_max_mismatch,
):
if (
n_algns1 >= 2
): # single alignment on right read and multiple alignments on left
pair_index = (len(algns1) - 1, "R1")
output_pairs.append(
format_pair(
algns1[-2],
algns1[-1],
pair_index=pair_index,
algn2_pos3=algns2[-1]["pos5"],
report_position=report_position,
report_orientation=report_orientation,
)
)
)
last_reported_alignment_left = 2 # set the pointer for reporting
last_reported_alignment_left = 2 # set the pointer for reporting

if (
n_algns2 >= 2
): # single alignment on left read and multiple alignments on right
pair_index = (len(algns1), "R2")
output_pairs.append(
format_pair(
algns2[-1],
algns2[-2],
pair_index=pair_index,
algn1_pos3=algns1[-1]["pos5"],
report_position=report_position,
report_orientation=report_orientation,
)
)
last_reported_alignment_right = 2 # set the pointer for reporting

# Note that if n_algns1==n_algns2==1 and alignments overlap, then we don't need to check,
# it's a non-ligated DNA fragment that we don't report.

if (
n_algns2 >= 2
): # single alignment on left read and multiple alignments on right
pair_index = (len(algns1), "R2")
else: # end alignments do not overlap, report regular pair:
pair_index = (len(algns1), "R1-2")
output_pairs.append(
format_pair(
algns1[-1],
algns2[-1],
algns2[-2],
pair_index=pair_index,
algn1_pos3=algns1[-1]["pos5"],
report_position=report_position,
report_orientation=report_orientation,
)
)
last_reported_alignment_right = 2 # set the pointer for reporting

# Note that if n_algns1==n_algns2==1 and alignments overlap, then we don't need to check,
# it's a non-ligated DNA fragment that we don't report.

else: # end alignments do not overlap, report regular pair:
pair_index = (len(algns1), "R1-2")
output_pairs.append(
format_pair(
algns1[-1],
algns2[-1],
pair_index=pair_index,
report_position=report_position,
report_orientation=report_orientation,
)
)

else: # there was an overlap, set some pointers:
last_reported_alignment_left = (
last_reported_alignment_right
) = current_right_pair
else: # there was an overlap, set some pointers:
last_reported_alignment_left = (
last_reported_alignment_right
) = current_right_pair

# III. Report all remaining alignments.
# Report all unique alignments on left read (sequential):
Expand Down Expand Up @@ -1146,7 +1150,6 @@ def expand_pairs(pairs_list, max_expansion_depth=None):
list of expanded pairs
"""

for algn1, _algn1, pair_index1 in pairs_list:
for _algn2, algn2, pair_index2 in pairs_list:
if pair_index1 > pair_index2:
Expand Down
135 changes: 135 additions & 0 deletions tests/test_parse2.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,138 @@ def test_mock_pysam_parse2_pair():
print()

assert assigned_pair == simulated_pair


def test_mock_pysam_parse2_single_end():

"""Testing single-end mode for parse2, no-flip mode.
--report-position is outer (parse2 default)
--report-orientation is pair (parse2 default)
"""

mock_sam_path = os.path.join(testdir, "data", "mock.parse2-single-end.sam")
mock_chroms_path = os.path.join(testdir, "data", "mock.chrom.sizes")
try:
result = subprocess.check_output(
[
"python",
"-m",
"pairtools",
"parse2",
"-c",
mock_chroms_path,
"--single-end",
"--add-pair-index",
"--no-flip",
"--report-position",
"outer",
"--report-orientation",
"pair",
mock_sam_path,
],
).decode("ascii")
except subprocess.CalledProcessError as e:
print(e.output)
print(sys.exc_info())
raise e

# check if the header got transferred correctly
sam_header = [l.strip() for l in open(mock_sam_path, "r") if l.startswith("@")]
pairsam_header = [l.strip() for l in result.split("\n") if l.startswith("#")]
for l in sam_header:
assert any([l in l2 for l2 in pairsam_header])

# check that the pairs got assigned properly
id_counter = 0
prev_id = ""
for l in result.split("\n"):
if l.startswith("#") or not l:
continue

if prev_id == l.split("\t")[0]:
id_counter += 1
else:
id_counter = 0
prev_id = l.split("\t")[0]

assigned_pair = l.split("\t")[1:8] + l.split("\t")[-2:]
print(l.split("SIMULATED:", 1)[1].split("\031", 1)[0].split("|"), id_counter)
simulated_pair = (
l.split("SIMULATED:", 1)[1]
.split("\031", 1)[0]
.split("|")[id_counter]
.split(",")
)
print(assigned_pair)
print(simulated_pair, prev_id)
print()

assert assigned_pair == simulated_pair


def test_mock_pysam_parse2_single_end_expand():

"""Testing single-end mode for parse2, no-flip mode, with --expand.
--report-position is outer (parse2 default)
--report-orientation is pair (parse2 default)
"""

mock_sam_path = os.path.join(testdir, "data", "mock.parse2-single-end.expand.sam")
mock_chroms_path = os.path.join(testdir, "data", "mock.chrom.sizes")
try:
result = subprocess.check_output(
[
"python",
"-m",
"pairtools",
"parse2",
"-c",
mock_chroms_path,
"--single-end",
"--expand",
"--add-pair-index",
"--no-flip",
"--report-position",
"outer",
"--report-orientation",
"pair",
mock_sam_path,
],
).decode("ascii")
except subprocess.CalledProcessError as e:
print(e.output)
print(sys.exc_info())
raise e

# check if the header got transferred correctly
sam_header = [l.strip() for l in open(mock_sam_path, "r") if l.startswith("@")]
pairsam_header = [l.strip() for l in result.split("\n") if l.startswith("#")]
for l in sam_header:
assert any([l in l2 for l2 in pairsam_header])

# check that the pairs got assigned properly
id_counter = 0
prev_id = ""
for l in result.split("\n"):
if l.startswith("#") or not l:
continue

if prev_id == l.split("\t")[0]:
id_counter += 1
else:
id_counter = 0
prev_id = l.split("\t")[0]

assigned_pair = l.split("\t")[1:8] + l.split("\t")[-2:]
print(l.split("SIMULATED:", 1)[1].split("\031", 1)[0].split("|"), id_counter)
simulated_pair = (
l.split("SIMULATED:", 1)[1]
.split("\031", 1)[0]
.split("|")[id_counter]
.split(",")
)
print(assigned_pair)
print(simulated_pair, prev_id)
print()

assert assigned_pair == simulated_pair

0 comments on commit 942b490

Please sign in to comment.