Skip to content

Commit

Permalink
Merge fastqs: support arbitrary number of inputs
Browse files Browse the repository at this point in the history
Also improve the compression handling.
  • Loading branch information
Donaim committed Feb 15, 2024
1 parent 6840f80 commit ba55472
Show file tree
Hide file tree
Showing 3 changed files with 428 additions and 169 deletions.
223 changes: 134 additions & 89 deletions micall/core/merge_fastqs.py
Original file line number Diff line number Diff line change
@@ -1,127 +1,172 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import argparse
import logging
import shutil
import sys
import os
import tempfile
import gzip
import csv
from typing import TextIO, List, Optional, Dict, IO, Set, Tuple

from micall.core.trim_fastqs import TrimSteps, trim


logging.basicConfig(level=logging.INFO,
format='%(asctime)s[%(levelname)s]%(name)s.%(funcName)s(): %(message)s')
logger = logging.getLogger('micall')
logger = logging.getLogger(__name__)


def concatenate_files(input_file1, input_file2, output_file):
with open(input_file1, 'rb') as src1, \
open(input_file2, 'rb') as src2, \
open(output_file, 'wb') as dst:
def concatenate_files(input_files: List[str], output_file: str) -> None:
with open(output_file, 'wb') as dst:
for input_file in input_files:
with open(input_file, 'rb') as src:
shutil.copyfileobj(src, dst)

shutil.copyfileobj(src1, dst)
shutil.copyfileobj(src2, dst)

def parse_inputs_and_merge_fastqs(trim_file: TextIO, mergeplan_file: TextIO, zip_file: Optional[TextIO]) -> None:
mergeplan: Dict[str, List[str]] = {}
trimplan: Set[Tuple[str, ...]] = set()
zipped: Set[str] = set()
bad_cycles: Dict[str, str] = {}

def merge_fastqs(args):
with tempfile.NamedTemporaryFile() as trimmed_fastq1_a, \
tempfile.NamedTemporaryFile() as trimmed_fastq2_a, \
tempfile.NamedTemporaryFile() as trimmed_fastq1_b, \
tempfile.NamedTemporaryFile() as trimmed_fastq2_b:
mergeplan_reader = csv.DictReader(mergeplan_file)
trim_reader = csv.DictReader(trim_file)
zip_reader = csv.DictReader(zip_file) if zip_file is not None else None

logger.info('Processing reads of Sample A.')
for row in mergeplan_reader:
input_path = row["input"]
output_path = row["output"]

trim((args.fastq1_a, args.fastq2_a),
args.bad_cycles_a_csv,
(trimmed_fastq1_a.name, trimmed_fastq2_a.name),
use_gzip=not args.unzipped)
if output_path not in mergeplan:
mergeplan[output_path] = []
mergeplan[output_path].append(input_path)

logger.info('Processing reads of Sample B.')
no_badcycles_fields = list(sorted(
(field for field in trim_reader.fieldnames or [] if field != "bad_cycles"),
key=lambda field: field.lower()))
expected_no_badcycles_fields = [f"r{i + 1}" for i in range(len(no_badcycles_fields))]

trim((args.fastq1_b, args.fastq2_b),
args.bad_cycles_b_csv,
(trimmed_fastq1_b.name, trimmed_fastq2_b.name),
use_gzip=not args.unzipped)
if [field.lower() for field in no_badcycles_fields] \
!= expected_no_badcycles_fields:
raise ValueError(f"Bad field names: {no_badcycles_fields}, expected {expected_no_badcycles_fields}")

logger.info('Merging resulting reads files.')
for row in trim_reader:
input_paths = tuple(row[field] for field in no_badcycles_fields)
trimplan.add(input_paths)
bad_cycles_path = row.get("bad_cycles", "")
if bad_cycles_path:
for input_path in input_paths:
bad_cycles[input_path] = bad_cycles_path

os.makedirs(os.path.dirname(args.fastq1_result), exist_ok=True)
os.makedirs(os.path.dirname(args.fastq2_result), exist_ok=True)
if zip_reader is not None:
for row in zip_reader:
zipped.add(row["file"])

if args.unzipped:
concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name,
args.fastq1_result)
concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name,
args.fastq2_result)
return merge_fastqs(trimplan, mergeplan, zipped, bad_cycles)

else:
with tempfile.NamedTemporaryFile() as temp_fastq1, \
tempfile.NamedTemporaryFile() as temp_fastq2:

temp_fastq1.close()
temp_fastq2.close()
def compress_file(input_path: str, output_path: str) -> None:
with open(input_path, 'rb') as input_file, \
open(output_path, 'wb') as output_file:
with gzip.GzipFile(fileobj=output_file, mode='wb') as gzip_file:
shutil.copyfileobj(input_file, gzip_file)

concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name,
temp_fastq1.name)
concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name,
temp_fastq2.name)

logger.info('Compressing final outputs.')
def uncompress_file(input_path: str, output_path: str) -> None:
with open(input_path, 'rb') as compressed_file, \
open(output_path, 'w+b') as ret:
with gzip.GzipFile(fileobj=compressed_file, mode='rb') as gzip_file:
shutil.copyfileobj(gzip_file, ret)

with open(temp_fastq1.name, 'rb') as f_in, gzip.open(args.fastq1_result, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)

with open(temp_fastq2.name, 'rb') as f_in, gzip.open(args.fastq2_result, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
def get_transitive(graph: Dict[str, str], key: str) -> str:
if key in graph:
return get_transitive(graph, graph[key])
else:
return key

logger.info('Done.')

def merge_fastqs(trimplan: Set[Tuple[str, ...]],
mergeplan: Dict[str, List[str]],
zipped: Set[str] = set(),
bad_cycles: Dict[str, str] = {},
) -> None:

inputs = [value for values in mergeplan.values() for value in values]
outputs = list(mergeplan.keys())
name_mapping: Dict[str, str] = {}
temporary: List[IO] = []

for output_path in outputs:
os.makedirs(os.path.dirname(output_path), exist_ok=True)

for input_path in inputs:
if input_path in zipped:
logger.info('Uncompressing %s.', input_path)
uncompressed = tempfile.NamedTemporaryFile(mode="w+b")
uncompressed_path = uncompressed.name
uncompress_file(input_path, uncompressed_path)
temporary.append(uncompressed)
name_mapping[input_path] = uncompressed_path

for to_trim in trimplan:
assert len(to_trim) > 0

trim_outputs: List[str] = []
trim_inputs: List[str] = []

def main(argv) -> int:
parser = ArgumentParser(
description="Combine and filter the FASTQ files from two samples into a single output file.",
formatter_class=ArgumentDefaultsHelpFormatter)

parser.add_argument(
"fastq1_a",
help="FASTQ file containing forward reads of sample A",
)
parser.add_argument(
"fastq2_a",
help="FASTQ file containing reverse reads of sample A",
)
parser.add_argument(
"fastq1_b",
help="FASTQ file containing forward reads of sample B",
)
parser.add_argument(
"fastq2_b",
help="FASTQ file containing reverse reads of sample B",
)
parser.add_argument(
"fastq1_result",
help="Resulting combined FASTQ file containing forward reads",
)
parser.add_argument(
"fastq2_result",
help="Resulting combined FASTQ file containing reverse reads",
)
parser.add_argument(
"--bad_cycles_a_csv",
help="list of tiles and cycles rejected for poor quality in sample A",
)
parser.add_argument(
"--bad_cycles_b_csv",
help="list of tiles and cycles rejected for poor quality in sample B",
)
parser.add_argument(
'--unzipped', '-u',
action='store_true',
help='Set if the original FASTQ files are not compressed',
)
all_bad_cycles_paths = set(bad_cycles[path] for path in to_trim if path in bad_cycles)
if len(all_bad_cycles_paths) == 0:
bad_cycles_path = None
elif len(all_bad_cycles_paths) == 1:
bad_cycles_path = next(iter(all_bad_cycles_paths))
else:
raise ValueError(f"Ambiguous bad_cycles for {to_trim}: {all_bad_cycles_paths}.")

for path in to_trim:
path = get_transitive(name_mapping, path)
tmp = tempfile.NamedTemporaryFile()
trim_inputs.append(path)
trim_outputs.append(tmp.name)
temporary.append(tmp)
name_mapping[path] = tmp.name

logger.info('Trimming samples %s.', ','.join(map(repr, to_trim)))
trim(trim_inputs, bad_cycles_path, trim_outputs, use_gzip=False)

for output_path in mergeplan:
input_files = mergeplan[output_path]
logger.info('Merging results %s to %s.', ','.join(map(repr, input_files)), output_path)
input_files = [get_transitive(name_mapping, path) for path in input_files]
tmp = tempfile.NamedTemporaryFile()
temporary.append(tmp)
name_mapping[output_path] = tmp.name
output_path = tmp.name
concatenate_files(input_files, output_path)

for output_path in mergeplan:
concatenated = name_mapping[output_path]
if output_path in zipped:
logger.info('Compressing output %s.', output_path)
compress_file(concatenated, output_path)
else:
os.rename(concatenated, output_path)

for toclose in temporary:
try: toclose.close()
except: pass

logger.info('Done.')


def main(argv: List[str]) -> int:
parser = argparse.ArgumentParser(description="Combine and filter the FASTQ files from multiple samples into single output files.")
parser.add_argument("trimplan", type=argparse.FileType('rt'), help="A CSV file containing the lists of files to be trimmed.")
parser.add_argument("mergeplan", type=argparse.FileType('rt'), help="A CSV file containing merge plan.")
parser.add_argument("--zipfile", type=argparse.FileType('rt'), help="A CSV file containing a list of files that are compressed or need to be compressed.")
args = parser.parse_args(argv)
merge_fastqs(args)

parse_inputs_and_merge_fastqs(args.trimplan, args.mergeplan, args.zipfile)
return 0


Expand Down
9 changes: 5 additions & 4 deletions micall/core/trim_fastqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pathlib import Path

import typing
from typing import Optional

from Bio import SeqIO

Expand Down Expand Up @@ -57,12 +58,12 @@ def parse_args():


def trim(original_fastq_filenames: typing.Sequence[str],
bad_cycles_filename: str,
bad_cycles_filename: Optional[str],
trimmed_fastq_filenames: typing.Sequence[str],
use_gzip: bool = True,
summary_file: typing.TextIO = None,
skip: typing.Tuple[str] = (),
project_code: str = None):
summary_file: Optional[typing.TextIO] = None,
skip: typing.Tuple[str, ...] = (),
project_code: Optional[str] = None):
"""
:param original_fastq_filenames: sequence of two filenames, containing
Expand Down
Loading

0 comments on commit ba55472

Please sign in to comment.