From f37027d0d2859dd6c6442a0bfb7d295676b4b3f2 Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Wed, 4 Dec 2024 14:11:01 -0800 Subject: [PATCH] Further improve typing in denovo.py --- micall/core/denovo.py | 18 ++++++++---------- micall/drivers/sample.py | 14 ++++++-------- micall/tests/test_denovo.py | 9 ++++----- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/micall/core/denovo.py b/micall/core/denovo.py index cd06be2c9..43173df08 100644 --- a/micall/core/denovo.py +++ b/micall/core/denovo.py @@ -1,9 +1,9 @@ import argparse import logging -from typing import Optional, TextIO, cast, BinaryIO +from typing import Optional from datetime import datetime from glob import glob -from shutil import rmtree, copyfileobj +import shutil from subprocess import CalledProcessError import subprocess from tempfile import mkdtemp @@ -21,9 +21,9 @@ def count_fasta_sequences(file_path: Path): def denovo(fastq1_path: Path, fastq2_path: Path, - fasta: TextIO, + fasta: Path, work_dir: Path = Path('.'), - merged_contigs_csv: Optional[TextIO] = None, + merged_contigs_csv: Optional[Path] = None, ): """ Use de novo assembly to build contigs from reads. @@ -31,7 +31,7 @@ def denovo(fastq1_path: Path, :param fastq2: FASTQ file for read 2 reads :param fasta: file to write assembled contigs to :param work_dir: path for writing temporary files - :param merged_contigs_csv: open file to read contigs that were merged from + :param merged_contigs_csv: file to read contigs that were merged from amplicon reads """ @@ -41,7 +41,7 @@ def denovo(fastq1_path: Path, old_tmp_dirs = glob(str(work_dir / 'assembly_*')) for old_tmp_dir in old_tmp_dirs: - rmtree(old_tmp_dir, ignore_errors=True) + shutil.rmtree(old_tmp_dir, ignore_errors=True) tmp_dir = Path(mkdtemp(dir=work_dir, prefix='assembly_')) @@ -87,9 +87,7 @@ def denovo(fastq1_path: Path, except CalledProcessError: logger.warning('Haploflow failed to assemble.', exc_info=True) - with open(contigs_fasta_path) as reader: - copyfileobj(cast(BinaryIO, reader), fasta) - fasta.flush() + shutil.copy(contigs_fasta_path, fasta) duration = datetime.now() - start_time contig_count = count_fasta_sequences(contigs_fasta_path) @@ -122,4 +120,4 @@ def denovo(fastq1_path: Path, ) args = parser.parse_args() - denovo(args.fastq1.name, args.fastq2.name, args.fasta) + denovo(args.fastq1.name, args.fastq2.name, args.fasta.name) diff --git a/micall/drivers/sample.py b/micall/drivers/sample.py index c63c3353c..987b88525 100644 --- a/micall/drivers/sample.py +++ b/micall/drivers/sample.py @@ -417,14 +417,12 @@ def run_denovo(self, excluded_seeds): logger.info('Running de novo assembly on %s.', self) scratch_path = self.get_scratch_path() - with open(self.unstitched_contigs_fasta, 'w') as unstitched_contigs_fasta, \ - open(self.merged_contigs_csv, 'r') as merged_contigs_csv: - denovo(self.trimmed1_fastq, - self.trimmed2_fastq, - unstitched_contigs_fasta, - self.scratch_path, - merged_contigs_csv, - ) + denovo(self.trimmed1_fastq, + self.trimmed2_fastq, + self.unstitched_contigs_fasta, + self.scratch_path, + self.merged_contigs_csv, + ) with open(self.unstitched_contigs_csv, 'w') as unstitched_contigs_csv, \ open(self.merged_contigs_csv, 'r') as merged_contigs_csv, \ diff --git a/micall/tests/test_denovo.py b/micall/tests/test_denovo.py index 12e2d791d..8c2aa5e1f 100644 --- a/micall/tests/test_denovo.py +++ b/micall/tests/test_denovo.py @@ -1,4 +1,3 @@ -from io import StringIO from pathlib import Path import re @@ -23,7 +22,7 @@ def normalize_fasta(content: str) -> str: @mark.iva() # skip with -k-iva def test_denovo_iva(tmpdir, hcv_db): microtest_path = Path(__file__).parent / 'microtest' - contigs_fasta = StringIO() + contigs_fasta: Path = tmpdir / 'result.fasta' expected_contigs_fasta = """\ >contig.00001 TGAGGGCCAAAAAGGTAACTTTTGATAGGATGCAAGTGC\ @@ -35,11 +34,11 @@ def test_denovo_iva(tmpdir, hcv_db): AGGCGGTGATGGGGGCTTCTTATGGATTCCAGTACTCCC """ - denovo(str(microtest_path / '2160A-HCV_S19_L001_R1_001.fastq'), - str(microtest_path / '2160A-HCV_S19_L001_R2_001.fastq'), + denovo(microtest_path / '2160A-HCV_S19_L001_R1_001.fastq', + microtest_path / '2160A-HCV_S19_L001_R2_001.fastq', contigs_fasta, tmpdir) - result = contigs_fasta.getvalue() + result = contigs_fasta.read_text() expected = expected_contigs_fasta assert normalize_fasta(result) == normalize_fasta(expected)