Skip to content

Commit

Permalink
Further improve typing in denovo.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Donaim committed Dec 4, 2024
1 parent 2316ae4 commit f37027d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 23 deletions.
18 changes: 8 additions & 10 deletions micall/core/denovo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import argparse
import logging
from typing import Optional, TextIO, cast, BinaryIO
from typing import Optional
from datetime import datetime
from glob import glob
from shutil import rmtree, copyfileobj
import shutil
from subprocess import CalledProcessError
import subprocess
from tempfile import mkdtemp
Expand All @@ -21,17 +21,17 @@ def count_fasta_sequences(file_path: Path):

def denovo(fastq1_path: Path,
fastq2_path: Path,
fasta: TextIO,
fasta: Path,
work_dir: Path = Path('.'),
merged_contigs_csv: Optional[TextIO] = None,
merged_contigs_csv: Optional[Path] = None,
):
""" Use de novo assembly to build contigs from reads.
:param fastq1: FASTQ file for read 1 reads
:param fastq2: FASTQ file for read 2 reads
:param fasta: file to write assembled contigs to
:param work_dir: path for writing temporary files
:param merged_contigs_csv: open file to read contigs that were merged from
:param merged_contigs_csv: file to read contigs that were merged from
amplicon reads
"""

Expand All @@ -41,7 +41,7 @@ def denovo(fastq1_path: Path,

old_tmp_dirs = glob(str(work_dir / 'assembly_*'))
for old_tmp_dir in old_tmp_dirs:
rmtree(old_tmp_dir, ignore_errors=True)
shutil.rmtree(old_tmp_dir, ignore_errors=True)

tmp_dir = Path(mkdtemp(dir=work_dir, prefix='assembly_'))

Expand Down Expand Up @@ -87,9 +87,7 @@ def denovo(fastq1_path: Path,
except CalledProcessError:
logger.warning('Haploflow failed to assemble.', exc_info=True)

with open(contigs_fasta_path) as reader:
copyfileobj(cast(BinaryIO, reader), fasta)
fasta.flush()
shutil.copy(contigs_fasta_path, fasta)

duration = datetime.now() - start_time
contig_count = count_fasta_sequences(contigs_fasta_path)
Expand Down Expand Up @@ -122,4 +120,4 @@ def denovo(fastq1_path: Path,
)

args = parser.parse_args()
denovo(args.fastq1.name, args.fastq2.name, args.fasta)
denovo(args.fastq1.name, args.fastq2.name, args.fasta.name)
14 changes: 6 additions & 8 deletions micall/drivers/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,12 @@ def run_denovo(self, excluded_seeds):
logger.info('Running de novo assembly on %s.', self)
scratch_path = self.get_scratch_path()

with open(self.unstitched_contigs_fasta, 'w') as unstitched_contigs_fasta, \
open(self.merged_contigs_csv, 'r') as merged_contigs_csv:
denovo(self.trimmed1_fastq,
self.trimmed2_fastq,
unstitched_contigs_fasta,
self.scratch_path,
merged_contigs_csv,
)
denovo(self.trimmed1_fastq,
self.trimmed2_fastq,
self.unstitched_contigs_fasta,
self.scratch_path,
self.merged_contigs_csv,
)

with open(self.unstitched_contigs_csv, 'w') as unstitched_contigs_csv, \
open(self.merged_contigs_csv, 'r') as merged_contigs_csv, \
Expand Down
9 changes: 4 additions & 5 deletions micall/tests/test_denovo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from io import StringIO
from pathlib import Path
import re

Expand All @@ -23,7 +22,7 @@ def normalize_fasta(content: str) -> str:
@mark.iva() # skip with -k-iva
def test_denovo_iva(tmpdir, hcv_db):
microtest_path = Path(__file__).parent / 'microtest'
contigs_fasta = StringIO()
contigs_fasta: Path = tmpdir / 'result.fasta'
expected_contigs_fasta = """\
>contig.00001
TGAGGGCCAAAAAGGTAACTTTTGATAGGATGCAAGTGC\
Expand All @@ -35,11 +34,11 @@ def test_denovo_iva(tmpdir, hcv_db):
AGGCGGTGATGGGGGCTTCTTATGGATTCCAGTACTCCC
"""

denovo(str(microtest_path / '2160A-HCV_S19_L001_R1_001.fastq'),
str(microtest_path / '2160A-HCV_S19_L001_R2_001.fastq'),
denovo(microtest_path / '2160A-HCV_S19_L001_R1_001.fastq',
microtest_path / '2160A-HCV_S19_L001_R2_001.fastq',
contigs_fasta,
tmpdir)

result = contigs_fasta.getvalue()
result = contigs_fasta.read_text()
expected = expected_contigs_fasta
assert normalize_fasta(result) == normalize_fasta(expected)

0 comments on commit f37027d

Please sign in to comment.