From 0f6e731ca01662a9fc8203d74d97ba8c926b24bd Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Thu, 15 Feb 2024 15:56:31 -0800 Subject: [PATCH] Merge fastqs: support arbitrary number of inputs Also improve the compression handling. --- micall/core/merge_fastqs.py | 223 ++++++++++-------- micall/core/trim_fastqs.py | 9 +- micall/tests/test_merge_fastqs.py | 363 +++++++++++++++++++++++------- 3 files changed, 426 insertions(+), 169 deletions(-) diff --git a/micall/core/merge_fastqs.py b/micall/core/merge_fastqs.py index d39992234..b3d134c5b 100644 --- a/micall/core/merge_fastqs.py +++ b/micall/core/merge_fastqs.py @@ -1,127 +1,172 @@ -from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +import argparse import logging import shutil import sys import os import tempfile import gzip +import csv +from typing import TextIO, List, Optional, Dict, IO, Set, Tuple from micall.core.trim_fastqs import TrimSteps, trim logging.basicConfig(level=logging.INFO, format='%(asctime)s[%(levelname)s]%(name)s.%(funcName)s(): %(message)s') -logger = logging.getLogger('micall') +logger = logging.getLogger(__name__) -def concatenate_files(input_file1, input_file2, output_file): - with open(input_file1, 'rb') as src1, \ - open(input_file2, 'rb') as src2, \ - open(output_file, 'wb') as dst: +def concatenate_files(input_files: List[str], output_file: str) -> None: + with open(output_file, 'wb') as dst: + for input_file in input_files: + with open(input_file, 'rb') as src: + shutil.copyfileobj(src, dst) - shutil.copyfileobj(src1, dst) - shutil.copyfileobj(src2, dst) +def parse_inputs_and_merge_fastqs(trim_file: TextIO, mergeplan_file: TextIO, zip_file: Optional[TextIO]) -> None: + mergeplan: Dict[str, List[str]] = {} + trimplan: Set[Tuple[str, ...]] = set() + zipped: Set[str] = set() + bad_cycles: Dict[str, str] = {} -def merge_fastqs(args): - with tempfile.NamedTemporaryFile() as trimmed_fastq1_a, \ - tempfile.NamedTemporaryFile() as trimmed_fastq2_a, \ - tempfile.NamedTemporaryFile() as trimmed_fastq1_b, \ - tempfile.NamedTemporaryFile() as trimmed_fastq2_b: + mergeplan_reader = csv.DictReader(mergeplan_file) + trim_reader = csv.DictReader(trim_file) + zip_reader = csv.DictReader(zip_file) if zip_file is not None else None - logger.info('Processing reads of Sample A.') + for row in mergeplan_reader: + input_path = row["input"] + output_path = row["output"] - trim((args.fastq1_a, args.fastq2_a), - args.bad_cycles_a_csv, - (trimmed_fastq1_a.name, trimmed_fastq2_a.name), - use_gzip=not args.unzipped) + if output_path not in mergeplan: + mergeplan[output_path] = [] + mergeplan[output_path].append(input_path) - logger.info('Processing reads of Sample B.') + no_badcycles_fields = list(sorted( + (field for field in trim_reader.fieldnames or [] if field != "bad_cycles"), + key=lambda field: field.lower())) + expected_no_badcycles_fields = [f"r{i + 1}" for i in range(len(no_badcycles_fields))] - trim((args.fastq1_b, args.fastq2_b), - args.bad_cycles_b_csv, - (trimmed_fastq1_b.name, trimmed_fastq2_b.name), - use_gzip=not args.unzipped) + if [field.lower() for field in no_badcycles_fields] \ + != expected_no_badcycles_fields: + raise ValueError(f"Bad field names: {no_badcycles_fields}, expected {expected_no_badcycles_fields}") - logger.info('Merging resulting reads files.') + for row in trim_reader: + input_paths = tuple(row[field] for field in no_badcycles_fields) + trimplan.add(input_paths) + bad_cycles_path = row.get("bad_cycles", "") + if bad_cycles_path: + for input_path in input_paths: + bad_cycles[input_path] = bad_cycles_path - os.makedirs(os.path.dirname(args.fastq1_result), exist_ok=True) - os.makedirs(os.path.dirname(args.fastq2_result), exist_ok=True) + if zip_reader is not None: + for row in zip_reader: + zipped.add(row["file"]) - if args.unzipped: - concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name, - args.fastq1_result) - concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name, - args.fastq2_result) + return merge_fastqs(trimplan, mergeplan, zipped, bad_cycles) - else: - with tempfile.NamedTemporaryFile() as temp_fastq1, \ - tempfile.NamedTemporaryFile() as temp_fastq2: - temp_fastq1.close() - temp_fastq2.close() +def compress_file(input_path: str, output_path: str) -> None: + with open(input_path, 'rb') as input_file, \ + open(output_path, 'wb') as output_file: + with gzip.GzipFile(fileobj=output_file, mode='wb') as gzip_file: + shutil.copyfileobj(input_file, gzip_file) - concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name, - temp_fastq1.name) - concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name, - temp_fastq2.name) - logger.info('Compressing final outputs.') +def uncompress_file(input_path: str, output_path: str) -> None: + with open(input_path, 'rb') as compressed_file, \ + open(output_path, 'w+b') as ret: + with gzip.GzipFile(fileobj=compressed_file, mode='rb') as gzip_file: + shutil.copyfileobj(gzip_file, ret) - with open(temp_fastq1.name, 'rb') as f_in, gzip.open(args.fastq1_result, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - with open(temp_fastq2.name, 'rb') as f_in, gzip.open(args.fastq2_result, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) +def get_transitive(graph: Dict[str, str], key: str) -> str: + if key in graph: + return get_transitive(graph, graph[key]) + else: + return key - logger.info('Done.') +def merge_fastqs(trimplan: Set[Tuple[str, ...]], + mergeplan: Dict[str, List[str]], + zipped: Set[str] = set(), + bad_cycles: Dict[str, str] = {}, + ) -> None: + + inputs = [value for values in mergeplan.values() for value in values] + outputs = list(mergeplan.keys()) + name_mapping: Dict[str, str] = {} + temporary: List[IO] = [] + + for output_path in outputs: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + for input_path in inputs: + if input_path in zipped: + logger.info('Uncompressing %s.', input_path) + uncompressed = tempfile.NamedTemporaryFile(mode="w+b") + uncompressed_path = uncompressed.name + uncompress_file(input_path, uncompressed_path) + temporary.append(uncompressed) + name_mapping[input_path] = uncompressed_path + + for to_trim in trimplan: + assert len(to_trim) > 0 + + trim_outputs: List[str] = [] + trim_inputs: List[str] = [] -def main(argv) -> int: - parser = ArgumentParser( - description="Combine and filter the FASTQ files from two samples into a single output file.", - formatter_class=ArgumentDefaultsHelpFormatter) - - parser.add_argument( - "fastq1_a", - help="FASTQ file containing forward reads of sample A", - ) - parser.add_argument( - "fastq2_a", - help="FASTQ file containing reverse reads of sample A", - ) - parser.add_argument( - "fastq1_b", - help="FASTQ file containing forward reads of sample B", - ) - parser.add_argument( - "fastq2_b", - help="FASTQ file containing reverse reads of sample B", - ) - parser.add_argument( - "fastq1_result", - help="Resulting combined FASTQ file containing forward reads", - ) - parser.add_argument( - "fastq2_result", - help="Resulting combined FASTQ file containing reverse reads", - ) - parser.add_argument( - "--bad_cycles_a_csv", - help="list of tiles and cycles rejected for poor quality in sample A", - ) - parser.add_argument( - "--bad_cycles_b_csv", - help="list of tiles and cycles rejected for poor quality in sample B", - ) - parser.add_argument( - '--unzipped', '-u', - action='store_true', - help='Set if the original FASTQ files are not compressed', - ) + all_bad_cycles_paths = set(bad_cycles[path] for path in to_trim if path in bad_cycles) + if len(all_bad_cycles_paths) == 0: + bad_cycles_path = None + elif len(all_bad_cycles_paths) == 1: + bad_cycles_path = next(iter(all_bad_cycles_paths)) + else: + raise ValueError(f"Ambiguous bad_cycles for {to_trim}: {all_bad_cycles_paths}.") + + for path in to_trim: + path = get_transitive(name_mapping, path) + tmp = tempfile.NamedTemporaryFile() + trim_inputs.append(path) + trim_outputs.append(tmp.name) + temporary.append(tmp) + name_mapping[path] = tmp.name + + logger.info('Trimming samples %s.', ','.join(map(repr, to_trim))) + trim(trim_inputs, bad_cycles_path, trim_outputs, use_gzip=False) + + for output_path in mergeplan: + input_files = mergeplan[output_path] + logger.info('Merging results %s to %s.', ','.join(map(repr, input_files)), output_path) + input_files = [get_transitive(name_mapping, path) for path in input_files] + tmp = tempfile.NamedTemporaryFile() + temporary.append(tmp) + name_mapping[output_path] = tmp.name + output_path = tmp.name + concatenate_files(input_files, output_path) + + for output_path in mergeplan: + concatenated = name_mapping[output_path] + if output_path in zipped: + logger.info('Compressing output %s.', output_path) + compress_file(concatenated, output_path) + else: + os.rename(concatenated, output_path) + for toclose in temporary: + try: toclose.close() + except: pass + + logger.info('Done.') + + +def main(argv: List[str]) -> int: + parser = argparse.ArgumentParser(description="Combine and filter the FASTQ files from multiple samples into single output files.") + parser.add_argument("trimplan", type=argparse.FileType('rt'), help="A CSV file containing the lists of files to be trimmed.") + parser.add_argument("mergeplan", type=argparse.FileType('rt'), help="A CSV file containing merge plan.") + parser.add_argument("--zipfile", type=argparse.FileType('rt'), help="A CSV file containing a list of files that are compressed or need to be compressed.") args = parser.parse_args(argv) - merge_fastqs(args) + + parse_inputs_and_merge_fastqs(args.trimplan, args.mergeplan, args.zipfile) return 0 diff --git a/micall/core/trim_fastqs.py b/micall/core/trim_fastqs.py index 6248d0f76..4ce85334b 100755 --- a/micall/core/trim_fastqs.py +++ b/micall/core/trim_fastqs.py @@ -16,6 +16,7 @@ from pathlib import Path import typing +from typing import Optional from Bio import SeqIO @@ -57,12 +58,12 @@ def parse_args(): def trim(original_fastq_filenames: typing.Sequence[str], - bad_cycles_filename: str, + bad_cycles_filename: Optional[str], trimmed_fastq_filenames: typing.Sequence[str], use_gzip: bool = True, - summary_file: typing.TextIO = None, - skip: typing.Tuple[str] = (), - project_code: str = None): + summary_file: Optional[typing.TextIO] = None, + skip: typing.Tuple[str, ...] = (), + project_code: Optional[str] = None): """ :param original_fastq_filenames: sequence of two filenames, containing diff --git a/micall/tests/test_merge_fastqs.py b/micall/tests/test_merge_fastqs.py index 7052697a7..7f8e40c3e 100644 --- a/micall/tests/test_merge_fastqs.py +++ b/micall/tests/test_merge_fastqs.py @@ -5,68 +5,138 @@ import csv import gzip -from micall.core.merge_fastqs import main +from micall.core.merge_fastqs import main, merge_fastqs -FASTQ_DATA_A = """@M01234:01:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 +FASTQ_DATA_A_R1 = """@M01234:01:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 AAGTCGACGAATG + ABCDEFGHIJKLM""" -FASTQ_DATA_A2 = """@M01234:01:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 +FASTQ_DATA_A_R2 = """@M01234:01:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 AACCTGAAGGGTT + ABCDEFGHIJKLM""" -FASTQ_DATA_B = """@M01234:02:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 +FASTQ_DATA_B_R1 = """@M01234:02:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 CCCTAAAGGTTTG + ABCDEFGHIJKLM""" -FASTQ_DATA_B2 = """@M01234:02:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 +FASTQ_DATA_B_R2 = """@M01234:02:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 ACGTACGTACGTA + ABCDEFGHIJKLM""" +FASTQ_DATA_C_R1 = """@M01234:03:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 +AAATAAAGGTTTG ++ +ABMDEFGZIJKLM""" +FASTQ_DATA_C_R2 = """@M01234:03:000000000-AAAAA:1:1101:2020:0001 2:N:0:1 +ABGAACGTACGTA ++ +ABCAEFGHIUKLM""" @pytest.fixture def fastq_files(tmp_path): # Create forward and reverse fastq files for two samples: A and B. - fastq1_a = tmp_path / "fastq1_a.fastq" - fastq2_a = tmp_path / "fastq2_a.fastq" - fastq1_b = tmp_path / "fastq1_b.fastq" - fastq2_b = tmp_path / "fastq2_b.fastq" - fastq1_a.write_text(FASTQ_DATA_A) - fastq2_a.write_text(FASTQ_DATA_A2) - fastq1_b.write_text(FASTQ_DATA_B) - fastq2_b.write_text(FASTQ_DATA_B2) + fastq_a_r1 = tmp_path / "fastq_a_r1.fastq" + fastq_a_r2 = tmp_path / "fastq_a_r2.fastq" + fastq_b_r1 = tmp_path / "fastq_b_r1.fastq" + fastq_b_r2 = tmp_path / "fastq_b_r2.fastq" + fastq_a_r1.write_text(FASTQ_DATA_A_R1) + fastq_a_r2.write_text(FASTQ_DATA_A_R2) + fastq_b_r1.write_text(FASTQ_DATA_B_R1) + fastq_b_r2.write_text(FASTQ_DATA_B_R2) # Paths for output files - fastq1_result = tmp_path / "fastq1_result.fastq" - fastq2_result = tmp_path / "fastq2_result.fastq" + fastq_result_r1 = tmp_path / "fastq_result_r1.fastq" + fastq_result_r2 = tmp_path / "fastq_result_r2.fastq" + + return (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2) + + +def test_basic_merge_fastqs(fastq_files): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2) \ + = fastq_files + + trimplan = {(fastq_a_r1, fastq_a_r2), + (fastq_b_r1, fastq_b_r2), + } + mergeplan = {fastq_result_r1: [fastq_a_r1, fastq_b_r1], + fastq_result_r2: [fastq_a_r2, fastq_b_r2], + } + + merge_fastqs(trimplan, mergeplan) + + # Add some assertions + assert os.path.exists(fastq_result_r1) + assert os.path.exists(fastq_result_r2) + + # New checks: Check if the merged files contain the correct sequences and qualities. + + with open(fastq_result_r1, 'r') as f: + data = f.read() + # should contain the sequences and qualities of the first reads of both samples + assert FASTQ_DATA_A_R1 in data + assert FASTQ_DATA_B_R1 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R1.strip()) + 1 + len(FASTQ_DATA_B_R1.strip()) + + with open(fastq_result_r2, 'r') as f: + data = f.read() + # should contain the sequences and qualities of the second reads of both samples + assert FASTQ_DATA_A_R2 in data + assert FASTQ_DATA_B_R2 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R2.strip()) + 1 + len(FASTQ_DATA_B_R2.strip()) + + +@pytest.fixture +def csvs_with_fastq_files(fastq_files, tmp_path): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2) \ + = fastq_files - return (fastq1_a, fastq2_a, fastq1_b, fastq2_b, fastq1_result, fastq2_result) + trimplan = tmp_path / "trimplan.csv" + mergeplan = tmp_path / "mergeplan.csv" + trimplan.write_text(f""" +r1,r2 +{fastq_a_r1},{fastq_a_r2} +{fastq_b_r1},{fastq_b_r2} + """.strip()) -def test_basic_main(fastq_files): - (fastq1_a, fastq2_a, fastq1_b, fastq2_b, fastq1_result, fastq2_result) = fastq_files - args = [str(fastq1_a), str(fastq2_a), str(fastq1_b), str(fastq2_b), '--unzipped', str(fastq1_result), str(fastq2_result)] + mergeplan.write_text(f""" +input,output +{fastq_a_r1},{fastq_result_r1} +{fastq_b_r1},{fastq_result_r1} +{fastq_a_r2},{fastq_result_r2} +{fastq_b_r2},{fastq_result_r2} + """.strip()) - main(args) + return (str(trimplan), str(mergeplan)) + + +def test_basic_main(csvs_with_fastq_files, fastq_files): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2) \ + = fastq_files + (trimplan, mergeplan) = csvs_with_fastq_files + + main([trimplan, mergeplan]) # Add some assertions - assert os.path.exists(fastq_files[-2]) # assert fastq1_result exists - assert os.path.exists(fastq_files[-1]) # assert fastq2_result exists + assert os.path.exists(fastq_result_r1) + assert os.path.exists(fastq_result_r2) # New checks: Check if the merged files contain the correct sequences and qualities. - with open(fastq1_result, 'r') as f: + with open(fastq_result_r1, 'r') as f: data = f.read() # should contain the sequences and qualities of the first reads of both samples - assert FASTQ_DATA_A in data - assert FASTQ_DATA_B in data + assert FASTQ_DATA_A_R1 in data + assert FASTQ_DATA_B_R1 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R1.strip()) + 1 + len(FASTQ_DATA_B_R1.strip()) - with open(fastq2_result, 'r') as f: + with open(fastq_result_r2, 'r') as f: data = f.read() # should contain the sequences and qualities of the second reads of both samples - assert FASTQ_DATA_A2 in data - assert FASTQ_DATA_B2 in data + assert FASTQ_DATA_A_R2 in data + assert FASTQ_DATA_B_R2 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R2.strip()) + 1 + len(FASTQ_DATA_B_R2.strip()) @pytest.fixture @@ -83,40 +153,64 @@ def bad_cycles_csv(tmp_path): @pytest.fixture def fastq_files_with_bad_cycles(bad_cycles_csv, tmp_path): - fastq1_a = tmp_path / "fastq1_a.fastq" - fastq2_a = tmp_path / "fastq2_a.fastq" - fastq1_b = tmp_path / "fastq1_b.fastq" - fastq2_b = tmp_path / "fastq2_b.fastq" - fastq1_a.write_text(FASTQ_DATA_A) - fastq2_a.write_text(FASTQ_DATA_A2) - fastq1_b.write_text(FASTQ_DATA_B) - fastq2_b.write_text(FASTQ_DATA_B2) + fastq_a_r1 = tmp_path / "fastq_a_r1.fastq" + fastq_a_r2 = tmp_path / "fastq_a_r2.fastq" + fastq_b_r1 = tmp_path / "fastq_b_r1.fastq" + fastq_b_r2 = tmp_path / "fastq_b_r2.fastq" + fastq_a_r1.write_text(FASTQ_DATA_A_R1) + fastq_a_r2.write_text(FASTQ_DATA_A_R2) + fastq_b_r1.write_text(FASTQ_DATA_B_R1) + fastq_b_r2.write_text(FASTQ_DATA_B_R2) + + fastq_result_r1 = tmp_path / "fastq_result_r1.fastq" + fastq_result_r2 = tmp_path / "fastq_result_r2.fastq" - fastq1_result = tmp_path / "fastq1_result.fastq" - fastq2_result = tmp_path / "fastq2_result.fastq" + return (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, + fastq_result_r1, fastq_result_r2, bad_cycles_csv) - return (fastq1_a, fastq2_a, fastq1_b, fastq2_b, - fastq1_result, fastq2_result, bad_cycles_csv) +@pytest.fixture +def csvs_with_fastq_files_with_bad_cycles(fastq_files_with_bad_cycles, bad_cycles_csv, tmp_path): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2, bad_cycles_csv) \ + = fastq_files_with_bad_cycles + + trimplan = tmp_path / "trimplan.csv" + mergeplan = tmp_path / "mergeplan.csv" + + trimplan.write_text(f""" +r1,r2,bad_cycles +{fastq_a_r1},{fastq_a_r2},{bad_cycles_csv} +{fastq_b_r1},{fastq_b_r2},{''} + """.strip()) + + mergeplan.write_text(f""" +input,output +{fastq_a_r1},{fastq_result_r1} +{fastq_b_r1},{fastq_result_r1} +{fastq_a_r2},{fastq_result_r2} +{fastq_b_r2},{fastq_result_r2} + """.strip()) -def test_rejected_reads(fastq_files_with_bad_cycles): - (fastq1_a, fastq2_a, fastq1_b, fastq2_b, fastq1_result, fastq2_result, bad_cycles_csv) = \ - fastq_files_with_bad_cycles + return (str(trimplan), str(mergeplan)) - args = [str(fastq1_a), str(fastq2_a), str(fastq1_b), str(fastq2_b), - '--bad_cycles_a_csv', bad_cycles_csv, - '--unzipped', str(fastq1_result), str(fastq2_result)] - main(args) +def test_rejected_reads(csvs_with_fastq_files_with_bad_cycles, fastq_files_with_bad_cycles): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2, bad_cycles_csv) \ + = fastq_files_with_bad_cycles + (trimplan, mergeplan) = csvs_with_fastq_files_with_bad_cycles - with open(fastq1_result, 'r') as f: + main([trimplan, mergeplan]) + + with open(fastq_result_r1, 'r') as f: data = f.read() - assert FASTQ_DATA_A not in data # first nucleotide filtered out. - assert FASTQ_DATA_B in data - with open(fastq2_result, 'r') as f: + assert FASTQ_DATA_A_R1 not in data # first nucleotide filtered out. + assert FASTQ_DATA_B_R1 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R1.strip()) + 1 + len(FASTQ_DATA_B_R1.strip()) + with open(fastq_result_r2, 'r') as f: data = f.read() - assert FASTQ_DATA_A2 in data - assert FASTQ_DATA_B2 in data + assert FASTQ_DATA_A_R2 in data + assert FASTQ_DATA_B_R2 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R2.strip()) + 1 + len(FASTQ_DATA_B_R2.strip()) # Helper function to write data to a gzipped file @@ -128,39 +222,156 @@ def write_to_gzip(filepath, data): @pytest.fixture def gzipped_fastq_files(tmp_path): # Create gzipped forward and reverse fastq files for two samples: A and B. - fastq1_a = tmp_path / "fastq1_a.fastq.gz" - fastq2_a = tmp_path / "fastq2_a.fastq.gz" - fastq1_b = tmp_path / "fastq1_b.fastq.gz" - fastq2_b = tmp_path / "fastq2_b.fastq.gz" - write_to_gzip(fastq1_a, FASTQ_DATA_A) - write_to_gzip(fastq2_a, FASTQ_DATA_A2) - write_to_gzip(fastq1_b, FASTQ_DATA_B) - write_to_gzip(fastq2_b, FASTQ_DATA_B2) + fastq_a_r1 = tmp_path / "fastq_a_r1.fastq.gz" + fastq_a_r2 = tmp_path / "fastq_a_r2.fastq.gz" + fastq_b_r1 = tmp_path / "fastq_b_r1.fastq" + fastq_b_r2 = tmp_path / "fastq_b_r2.fastq.gz" + write_to_gzip(fastq_a_r1, FASTQ_DATA_A_R1) + write_to_gzip(fastq_a_r2, FASTQ_DATA_A_R2) + fastq_b_r1.write_text(FASTQ_DATA_B_R1) + write_to_gzip(fastq_b_r2, FASTQ_DATA_B_R2) # Paths for output files - fastq1_result = tmp_path / "fastq1_result.fastq.gz" - fastq2_result = tmp_path / "fastq2_result.fastq.gz" + fastq_result_r1 = tmp_path / "fastq_result_r1.fastq.gz" + fastq_result_r2 = tmp_path / "fastq_result_r2.fastq" + + return (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2) + + +@pytest.fixture +def csvs_with_gzipped_fastq_files(gzipped_fastq_files, tmp_path): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2) \ + = gzipped_fastq_files + + trimplan = tmp_path / "trimplan.csv" + mergeplan = tmp_path / "mergeplan.csv" + zip_file = tmp_path / "zipfile.csv" + + trimplan.write_text(f""" +r1,r2 +{fastq_a_r1},{fastq_a_r2} +{fastq_b_r1},{fastq_b_r2} + """.strip()) + + mergeplan.write_text(f""" +input,output +{fastq_a_r1},{fastq_result_r1} +{fastq_b_r1},{fastq_result_r1} +{fastq_a_r2},{fastq_result_r2} +{fastq_b_r2},{fastq_result_r2} + """.strip()) - return (fastq1_a, fastq2_a, fastq1_b, fastq2_b, fastq1_result, fastq2_result) + zip_file.write_text(f""" +file +{fastq_a_r1} +{fastq_a_r2} +{fastq_b_r2} +{fastq_result_r1} + """.strip()) + return (str(trimplan), str(mergeplan), str(zip_file)) -def test_gzipped_main(gzipped_fastq_files): - (fastq1_a, fastq2_a, fastq1_b, fastq2_b, fastq1_result, fastq2_result) = gzipped_fastq_files - args = [str(fastq1_a), str(fastq2_a), str(fastq1_b), str(fastq2_b), str(fastq1_result), str(fastq2_result)] - main(args) +def test_gzipped_main(csvs_with_gzipped_fastq_files, gzipped_fastq_files): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_result_r1, fastq_result_r2) = gzipped_fastq_files + (trimplan, mergeplan, zip_file) = csvs_with_gzipped_fastq_files + + main([trimplan, mergeplan, "--zipfile", zip_file]) # Add some assertions - assert os.path.exists(fastq1_result) # assert gzipped fastq1_result exists - assert os.path.exists(fastq2_result) # assert gzipped fastq2_result exists + assert os.path.exists(fastq_result_r1) # assert gzipped fastq_result_r1 exists + assert os.path.exists(fastq_result_r2) # assert gzipped fastq_result_r2 exists # Check if the merged gzipped files contain the correct sequences and qualities. - with gzip.open(fastq1_result, 'rt') as f: + with gzip.open(fastq_result_r1, 'rt') as f: + data = f.read() + assert FASTQ_DATA_A_R1 in data + assert FASTQ_DATA_B_R1 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R1.strip()) + len(FASTQ_DATA_B_R1.strip()) + 1 + + with open(fastq_result_r2, 'r') as f: + data = f.read() + assert FASTQ_DATA_A_R2 in data + assert FASTQ_DATA_B_R2 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R2.strip()) + len(FASTQ_DATA_B_R2.strip()) + 1 + + +@pytest.fixture +def many_fastq_files(tmp_path): + # Create forward and reverse fastq files for two samples: A and B. + fastq_a_r1 = tmp_path / "fastq_a_r1.fastq" + fastq_a_r2 = tmp_path / "fastq_a_r2.fastq" + fastq_b_r1 = tmp_path / "fastq_b_r1.fastq" + fastq_b_r2 = tmp_path / "fastq_b_r2.fastq" + fastq_c_r1 = tmp_path / "fastq_c_r1.fastq" + fastq_c_r2 = tmp_path / "fastq_c_r2.fastq" + fastq_a_r1.write_text(FASTQ_DATA_A_R1) + fastq_a_r2.write_text(FASTQ_DATA_A_R2) + fastq_b_r1.write_text(FASTQ_DATA_B_R1) + fastq_b_r2.write_text(FASTQ_DATA_B_R2) + fastq_c_r1.write_text(FASTQ_DATA_C_R1) + fastq_c_r2.write_text(FASTQ_DATA_C_R2) + + # Paths for output files + fastq_result_r1 = tmp_path / "fastq_result_r1.fastq" + fastq_result_r2 = tmp_path / "fastq_result_r2.fastq" + + return (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_c_r1, fastq_c_r2, fastq_result_r1, fastq_result_r2) + + +@pytest.fixture +def csvs_with_many_fastq_files(many_fastq_files, tmp_path): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_c_r1, fastq_c_r2, fastq_result_r1, fastq_result_r2) \ + = many_fastq_files + + trimplan = tmp_path / "trimplan.csv" + mergeplan = tmp_path / "mergeplan.csv" + + trimplan.write_text(f""" +r1,r2 +{fastq_a_r1},{fastq_a_r2} +{fastq_b_r1},{fastq_b_r2} +{fastq_c_r1},{fastq_c_r2} + """.strip()) + + mergeplan.write_text(f""" +input,output +{fastq_a_r1},{fastq_result_r1} +{fastq_b_r1},{fastq_result_r1} +{fastq_c_r1},{fastq_result_r1} +{fastq_a_r2},{fastq_result_r2} +{fastq_b_r2},{fastq_result_r2} +{fastq_c_r2},{fastq_result_r2} + """.strip()) + + return (str(trimplan), str(mergeplan)) + + +def test_many_main(csvs_with_many_fastq_files, many_fastq_files): + (fastq_a_r1, fastq_a_r2, fastq_b_r1, fastq_b_r2, fastq_c_r1, fastq_c_r2, fastq_result_r1, fastq_result_r2) \ + = many_fastq_files + (trimplan, mergeplan) = csvs_with_many_fastq_files + + main([trimplan, mergeplan]) + + # Add some assertions + assert os.path.exists(fastq_result_r1) + assert os.path.exists(fastq_result_r2) + + # New checks: Check if the merged files contain the correct sequences and qualities. + + with open(fastq_result_r1, 'r') as f: data = f.read() - assert FASTQ_DATA_A in data - assert FASTQ_DATA_B in data + # should contain the sequences and qualities of the first reads of both samples + assert FASTQ_DATA_A_R1 in data + assert FASTQ_DATA_B_R1 in data + assert FASTQ_DATA_C_R1 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R1.strip()) + 1 + len(FASTQ_DATA_B_R1.strip()) + 1 + len(FASTQ_DATA_C_R1.strip()) - with gzip.open(fastq2_result, 'rt') as f: + with open(fastq_result_r2, 'r') as f: data = f.read() - assert FASTQ_DATA_A2 in data - assert FASTQ_DATA_B2 in data + # should contain the sequences and qualities of the second reads of both samples + assert FASTQ_DATA_A_R2 in data + assert FASTQ_DATA_B_R2 in data + assert FASTQ_DATA_C_R2 in data + assert len(data.strip()) == len(FASTQ_DATA_A_R2.strip()) + 1 + len(FASTQ_DATA_B_R2.strip()) + 1 + len(FASTQ_DATA_C_R2.strip())