From 36f83e47566eea808254ce970298a4fa3e1a6004 Mon Sep 17 00:00:00 2001 From: Vitaliy Mysak Date: Tue, 9 Jul 2024 14:03:29 -0700 Subject: [PATCH] Add a separate proviral_landscapes.py script --- gene_splicer/proviral_landscapes.py | 205 ++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 gene_splicer/proviral_landscapes.py diff --git a/gene_splicer/proviral_landscapes.py b/gene_splicer/proviral_landscapes.py new file mode 100644 index 0000000..a5f7dc7 --- /dev/null +++ b/gene_splicer/proviral_landscapes.py @@ -0,0 +1,205 @@ +import csv +import logging +import os +import re +import typing +from typing import TextIO, Mapping, Dict, Set, List, Iterable, Tuple +import argparse +import sys + +import json +import shutil +import subprocess as sp +import glob +from pathlib import Path +from csv import DictWriter, DictReader +from itertools import groupby +from operator import itemgetter + +from gene_splicer.utils import ( + iterate_hivintact_verdicts_1, + iterate_hivseqinr_verdicts_1, + LEFT_PRIMER_END, RIGHT_PRIMER_START, +) + + +logger = logging.getLogger(__name__) + + +def generate_proviral_landscape_csv_1_cont(blastn_reader: csv.DictReader, + landscape_writer: csv.DictWriter, + verdicts: Mapping[str, str], + ) -> None: + + for row in blastn_reader: + if row['qseqid'] in ['8E5LAV', 'HXB2']: + # skip the positive control rows + continue + + ref_start = int(row['sstart']) + ref_end = int(row['send']) + if ref_end <= LEFT_PRIMER_END or ref_start >= RIGHT_PRIMER_START: + # skip unspecific matches of LTR at start and end + continue + + qseqid = row['qseqid'] + try: + [run_name, sample_name, _, _] = qseqid.split('::') + except ValueError: + [run_name, sample_name] = [None, qseqid] + + is_inverted = '' + if ref_end < ref_start: + # automatically recognize inverted regions + new_end = ref_start + ref_start = ref_end + ref_end = new_end + is_inverted = 'yes' + + verdict = verdicts.get(sample_name) + landscape_entry = {'ref_start': ref_start, + 'ref_end': ref_end, + 'samp_name': sample_name, + 'run_name': run_name, + 'is_inverted': is_inverted, + 'is_defective': verdict is not None, + 'defect': verdict, + } + + landscape_writer.writerow(landscape_entry) + + +def get_hivintact_verdicts_1_map(details_dir: Path) -> Mapping[str, str]: + ret: Dict[str, str] = {} + + for [qseqid, verdict] in iterate_hivintact_verdicts_1(details_dir): + ret[qseqid] = verdict + + return ret + + +def get_hivseqinr_verdicts_1_map(details_dir: Path) -> Mapping[str, str]: + ret: Dict[str, str] = {} + + for [qseqid, verdict] in iterate_hivseqinr_verdicts_1(details_dir): + ret[qseqid] = verdict + + return ret + + +def generate_proviral_landscape_csv_1(landscape_writer: csv.DictWriter, + details_dir: Path, + ) -> None: + is_hivintact = (details_dir / "holistic.csv").exists() + if is_hivintact: + verdicts = get_hivintact_verdicts_1_map(details_dir) + blastn_path = details_dir / "blast.csv" + else: + verdicts = get_hivseqinr_verdicts_1_map(details_dir) + blastn_path = details_dir / "Results_Intermediate" / "Output_Blastn_HXB2MEGA28_tabdelim.txt" + + with blastn_path.open() as blastn_file: + if is_hivintact: + blastn_reader = DictReader(blastn_file) + else: + blastn_columns = ['qseqid', + 'qlen', + 'sseqid', + 'sgi', + 'slen', + 'qstart', + 'qend', + 'sstart', + 'send', + 'evalue', + 'bitscore', + 'length', + 'pident', + 'nident', + 'btop', + 'stitle', + 'sstrand'] + blastn_reader = DictReader(blastn_file, fieldnames=blastn_columns, delimiter='\t') + + return generate_proviral_landscape_csv_1_cont( + blastn_reader, + landscape_writer, + verdicts, + ) + + + +class UserError(RuntimeError): + def __init__(self, fmt: str, *fmt_args: object): + self.fmt = fmt + self.fmt_args = fmt_args + self.code = 1 + + +def dir_path(string: str) -> Path: + if os.path.exists(string) and os.path.isdir(string): + return Path(string) + else: + raise UserError("Path %r does not exist or is not a directory.", string) + + +def cli_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Generate Proviral Landscape CSV.") + + parser.add_argument("--details_dir", type=dir_path, required=True, + help="Directory containing details files for verdicts.") + + parser.add_argument("--output", type=argparse.FileType("wt"), required=True, + help="Output CSV file for proviral landscape.") + + verbosity_group = parser.add_mutually_exclusive_group() + verbosity_group.add_argument('--verbose', action='store_true', + help='Increase output verbosity.') + verbosity_group.add_argument('--no-verbose', action='store_true', + help='Normal output verbosity.', default=True) + verbosity_group.add_argument('--debug', action='store_true', + help='Maximum output verbosity.') + verbosity_group.add_argument('--quiet', action='store_true', + help='Minimize output verbosity.') + + return parser + + +def main(argv: list) -> int: + parser = cli_parser() + args = parser.parse_args(argv) + if args.quiet: + logger.setLevel(logging.ERROR) + elif args.verbose: + logger.setLevel(logging.INFO) + elif args.debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.WARN) + + logger.debug("Start.") + + fieldnames = ['ref_start', 'ref_end', 'samp_name', 'run_name', 'is_inverted', 'is_defective', 'defect'] + + landscape_writer = csv.DictWriter(args.output, fieldnames=fieldnames) + landscape_writer.writeheader() + generate_proviral_landscape_csv_1(landscape_writer, args.details_dir) + + logger.debug("Done.") + return 0 + + +if __name__ == '__main__': + try: + rc = main(sys.argv[1:]) + except BrokenPipeError: + logger.debug("Broken pipe.") + rc = 0 + except KeyboardInterrupt: + logger.debug("Interrupted.") + rc = 1 + except UserError as e: + logger.fatal(e.fmt, *e.fmt_args) + rc = e.code + + sys.exit(rc)