From ffa76c8b5136681a730b27f386a6b538e8ef1313 Mon Sep 17 00:00:00 2001 From: Brendan Smith Date: Thu, 7 Mar 2024 09:38:34 -0600 Subject: [PATCH] Copy to output directory (#3) --- .vscode/launch.json | 4 +- README.md | 24 ----- src/vcf_isec/cli.py | 22 +++- src/vcf_isec/isec.py | 206 ++++++++++++++++++++++++++++++-------- src/vcf_isec/parser.py | 53 +++++++--- tests/pytest/test_isec.py | 8 +- 6 files changed, 227 insertions(+), 90 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index a8b4363..ff0fdcc 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,8 +12,8 @@ "args": [ "tests/resources/cmp-test-a.vcf", "tests/resources/cmp-test-b.vcf", - "-p", - "tmp" + "-o", + "output" ] } ] diff --git a/README.md b/README.md index 5f46ec2..43a649c 100644 --- a/README.md +++ b/README.md @@ -17,27 +17,3 @@ A common task for bioinformaticians is to compare variants, whether to compare V This script takes as input two VCFs and performs a comparison of the variants found in each file. The script outputs 3 VCFs, reflecting those variants that are shared and unique to each individual. **NOTE**: An example VCF is provided at `tests/resources/sample.vcf`. VCFs can grow up to 4 million variants in size, as in the case of whole genome sequencing. - -## Part 2) Problem Solving Challenge (to be discussed in person at 2nd interview) - -Carefully read the following questions. In your next interview, be prepared to discuss each question and scenario. During the discussion, be as specific as possible regarding how the mentioned technology would be used. Our goal is to find out how quickly you grasp novel technologies, with respect to both their benefits and drawbacks. - -### Scenario A: - -In the current system large JSON Lines files (100,000 JSON dictionaries per file) are uploaded to S3. Each file is then read line by line and the JSON dictionaries sent to a MongoDB collection using Apache Airflow. The client would like to remove Airflow from this process. The entire contents of the JSON Lines file must be uploaded to MongoDB. Files that are in the process of being uploaded and those that have errored must be tracked. - -#### Questions: - -1. Take 5-10 minutes to present an architecture that would eliminate the need to use Apache Airflow. Feel free to use any combination of AWS services you wish to solve the problem. Be prepared to justify your technology and architectural decisions. -2. How would you retry the process if MongoDB went offline in the middle of an upload? - -### Scenario B - -Linux nodes continuously watch a RabbitMQ queue to see if any jobs have been submitted. When a job appears in a queue, one node will pick up the job, thereby preventing a different node from picking up the same job. RabbitMQ’s ack late feature is used to ensure jobs are retried when a node abruptly fails. Only one node is supposed to execute a job at one time, however, under some conditions, it is possible for two nodes to pick up the same job which will cause issues. The only shared systems between the nodes are the RabbitMQ queue and a MongoDB database. - -#### Questions: - -1. Come up with a strategy on how to keep two jobs from running at the same time. The strategy can only use the given resources. -2. If a new shared resource or technology could be added to the system, what would you add to ensure that two jobs do not run at the same time? - -### Good luck! diff --git a/src/vcf_isec/cli.py b/src/vcf_isec/cli.py index b571cdc..d31ba8f 100644 --- a/src/vcf_isec/cli.py +++ b/src/vcf_isec/cli.py @@ -20,8 +20,22 @@ writable=True, path_type=Path, ), + default=None, + help="The directory to write isec outputs. Will create a tempory directory if not set.", +) +@click.option( + "-o", + "--output", + type=click.Path( + exists=False, + file_okay=False, + dir_okay=True, + allow_dash=False, # TODO: allow writing to stdout + writable=True, + path_type=Path, + ), default="output", - help="Output directory", + help="The directory to write the output files.", ) @click.argument( "file1", @@ -45,14 +59,14 @@ path_type=Path, ), ) -def main(prefix, file1, file2): +def main(prefix, output, file1, file2): try: with tempfile.TemporaryDirectory() as tmp_dir: vcf1 = parser.VCFPreparer(file1, tmp_dir).prepare() vcf2 = parser.VCFPreparer(file2, tmp_dir).prepare() - isec = VCFIntersection(vcf1, vcf2, Path(tmp_dir) / "isec") - shared, unique1, unique2 = isec.compare_vcf() + isec = VCFIntersection(vcf1, vcf2, output, prefix or Path(tmp_dir) / "isec") + shared, unique1, unique2 = isec.intersect() print(f"Unique to {file1.name}: {len(unique1)}") print(f"Unique to {file1.name}: {len(unique2)}") diff --git a/src/vcf_isec/isec.py b/src/vcf_isec/isec.py index c406a58..69b7108 100644 --- a/src/vcf_isec/isec.py +++ b/src/vcf_isec/isec.py @@ -1,8 +1,10 @@ ## vcf_comparator.py +import shutil import tempfile from pathlib import Path -from typing import List, NamedTuple +from typing import Iterator, List, NamedTuple +import click import pysam from pysam import bcftools from pysam.utils import SamtoolsError @@ -12,45 +14,113 @@ class ISecOutput(NamedTuple): + """ + Represents the output of the intersection operation between two sets of variant records. + """ + intersection: List[pysam.VariantRecord] - complement1: List[pysam.VariantRecord] - complement2: List[pysam.VariantRecord] + setdiff1: List[pysam.VariantRecord] + setdiff2: List[pysam.VariantRecord] class VCFIntersection: + """ + Class for comparing two VCF files and extracting shared and unique variants. + + The provided vcf files need to be compressed and indexed (i.e. csi or tbi files exist in the same directory). + + Args: + vcf1 (Path | str): Path to the first VCF file. + vcf2 (Path | str): Path to the second VCF file. + prefix (Path | str | None, optional): Path to the output directory. Defaults to None. + + Raises: + FileFormatError: If either isec_dir or out_dir are files (expect directories or nonexistant). + """ + def __init__( self, - file1_path: Path | str, - file2_path: Path | str, - prefix: Path | str | None = None, + vcf1: Path | str, + vcf2: Path | str, + out_dir: Path | str, + isec_dir: Path | str | None = None, ) -> None: - self.file1_path = file1_path - self.file2_path = file2_path + self.vcf1 = Path(vcf1) + self.vcf2 = Path(vcf2) - if prefix: - self.prefix = Path(prefix) - if self.prefix.is_file(): - raise FileFormatError(f"{self.prefix} is not a directory.") - else: - self.prefix = None + self.out_dir = Path(out_dir) + if self.out_dir.is_file(): + raise FileFormatError(f"{self.out_dir} is not a directory.") + + self.isec_dir = Path(isec_dir) if isec_dir else None + if self.isec_dir and self.isec_dir.is_file(): + raise FileFormatError(f"{self.isec_dir} is not a directory.") - def compare_vcf(self) -> ISecOutput: + def intersect(self) -> ISecOutput: """ Compares two VCF files and returns a tuple containing lists of shared variants, unique variants in file1, and unique variants in file2. Returns: Tuple[List[str], List[str], List[str]]: shared variants, unique to file1, unique to file2 + + Raises: + IntersectError: If there is an error in the pysam library bcftools.isec function. + FileFormatError: If there is an error reading the file. """ - try: - if self.prefix: - self.prefix.mkdir(parents=True, exist_ok=True) - results = self._compare_vcf(self.prefix) - else: - with tempfile.TemporaryDirectory() as tmpdirname: - results = self._compare_vcf(tmpdirname) + out_exists = self.out_dir.is_dir() + isec_exists = self.isec_dir and self.isec_dir.is_dir() + + if out_exists and isec_exists: + click.confirm( + f"Directories {self.out_dir} and {self.isec_dir} will be overwritten. Continue?", + default=True, + abort=True, + ) + elif out_exists: + click.confirm( + f"Directory {self.out_dir} will be overwritten. Continue?", + default=True, + abort=True, + ) + elif isec_exists: + click.confirm( + f"Directory {self.isec_dir} will be overwritten. Continue?", + default=True, + abort=True, + ) + + if isec_exists: + shutil.rmtree(self.isec_dir) # type: ignore (can't be None) + + if self.isec_dir: + self.isec_dir.mkdir(parents=True, exist_ok=True) + isec_files = self._isec(self.isec_dir) + else: + with tempfile.TemporaryDirectory() as tmpdirname: + isec_files = self._isec(Path(tmpdirname)) + out_files = self._merge_output(isec_files) + + results = self._gather_variant_results(out_files) + + return results + + def _isec(self, isec_dir: Path) -> Iterator[Path]: + """ + Runs the isec operation and produces the intermediary files. + """ + + try: + bcftools.isec( + str(self.vcf1), + str(self.vcf2), + f"--prefix={str(isec_dir)}", + # "--nfiles=2", + "-w 1,2", + capture_stdout=False, + ) except SamtoolsError as e: raise IntersectError( f"Error in pysam library bcftools.isec function: {e}" @@ -58,32 +128,80 @@ def compare_vcf(self) -> ISecOutput: except OSError as e: raise FileFormatError(f"Error reading file: {e.filename}") from e - return results + for i in range(4): + yield isec_dir / f"000{i}.vcf" - def _compare_vcf(self, dirname: Path | str) -> ISecOutput: - dirname = Path(dirname) + def _merge_output(self, isec_files: Iterator[Path]) -> Iterator[Path]: + """ + Copies the output of the intersection operation to the specified directory. + + Returns: + Iterator[Path]: The paths to the output files. + """ + + if self.out_dir.exists(): + shutil.rmtree(self.out_dir) - bcftools.isec( - str(self.file1_path), - str(self.file2_path), - f"--prefix={str(dirname)}", - # "--nfiles=2", - "-w 1,2", - capture_stdout=False, + self.out_dir.mkdir(parents=True, exist_ok=True) + + # copy the two setdiff files to the output directory + setdiff1 = self.out_dir / self.vcf1.stem.replace(".vcf", "_setdiff.vcf") + setdiff2 = self.out_dir / self.vcf2.stem.replace(".vcf", "_setdiff.vcf") + + shutil.copy(next(isec_files), setdiff1) + shutil.copy(next(isec_files), setdiff2) + + intersection = self.out_dir / "intersection.vcf" + intermediate1 = intersection.with_stem(intersection.stem + "-1").with_suffix( + ".vcf.gz" + ) + intermediate2 = intersection.with_stem(intersection.stem + "-2").with_suffix( + ".vcf.gz" + ) + bcftools.view( + str(next(isec_files)), + "-Oz", + "-o", + str(intermediate1), + "--write-index", + catch_stdout=False, ) - complements1_path = dirname / "0000.vcf" - complements2_path = dirname / "0001.vcf" - intersection1_path = dirname / "0002.vcf" - intersection2_path = dirname / "0003.vcf" + bcftools.view( + str(next(isec_files)), + "-Oz", + "-o", + str(intermediate2), + "--write-index", + catch_stdout=False, + ) + + # merge the two intersection files into one + bcftools.merge( + str(intermediate1), + str(intermediate2), + "-Ov", + "-o", + str(intersection), + "--force-samples", + catch_stdout=False, + ) + + for f in self.out_dir.glob(f"{intersection.stem}-*"): + f.unlink() + + yield from [setdiff1, setdiff2, intersection] - # TODO: merge or use sitemap - intersection1 = list(parse_variants(intersection1_path)) - intersection2 = list(parse_variants(intersection2_path)) - # trunk-ignore(bandit/B101) - assert len(intersection1) == len(intersection2) + def _gather_variant_results(self, out_files) -> ISecOutput: + """ + Gathers the results of the intersection operation. + + Returns: + ISecOutput: A named tuple containing the shared variants, unique to file1, and unique to file2. + """ - complement1 = list(parse_variants(complements1_path)) - complement2 = list(parse_variants(complements2_path)) + unique1 = list(parse_variants(next(out_files))) + unique2 = list(parse_variants(next(out_files))) + shared = list(parse_variants(next(out_files))) - return ISecOutput(intersection1, complement1, complement2) + return ISecOutput(shared, unique1, unique2) diff --git a/src/vcf_isec/parser.py b/src/vcf_isec/parser.py index c1d206b..2d25893 100644 --- a/src/vcf_isec/parser.py +++ b/src/vcf_isec/parser.py @@ -8,7 +8,11 @@ class VCFPreparer: - """Builder pattern for preparing VCF files for intersection operation.""" + """ + Builder pattern for preparing VCF files for intersection operation. + + Instantiate the class then call prepare() to compress and index the VCF file. + """ def __init__(self, path: Path | str, prefix: Path | str = "output"): self.provided_path = Path(path) @@ -20,6 +24,19 @@ def __init__(self, path: Path | str, prefix: Path | str = "output"): self.tbi_path = None def prepare(self) -> Path: + """ + Prepares the VCF file for processing. + + If the provided file has a '.vcf' extension, it compresses the file. + If the provided file has a '.gz' extension, it sets the gz_path attribute. + Then, it indexes the file. + + Returns: + The path to the gzipped VCF file. + + Raises: + RuntimeError: If the BGZipped VCF file is not set. + """ if self.provided_path.suffix == ".vcf": self.compress() elif self.provided_path.suffix == ".gz": @@ -32,15 +49,10 @@ def prepare(self) -> Path: return self.gz_path - def compress(self): + def compress(self) -> None: """ Compresses the provided VCF file to a BGZipped VCF file. - - - - - - If the user chooses to overwrite the existing file or if no - existing file is found, a new BGZipped VCF file is created. - Returns: None """ @@ -71,10 +83,18 @@ def compress(self): self.gz_is_new = True def index(self): - # TODO: I think bcftools.isec expects index to be in the same directory, - # so we may need to copy the VCF to the prefix directory and write - # the index there too + """ + Indexes a compressed VCF file. + This method indexes a compressed VCF file using the tabix indexing tool from the pysam library. + It checks if an index file already exists and prompts the user for actions accordingly. + + Raises: + RuntimeError: If `self.gz_path` is not set. + + Returns: + None + """ # Ensure that we have a compressed VCF file to index if self.gz_path is None: import inspect @@ -132,6 +152,15 @@ def index(self): def parse_header(vcf_path: Path | str) -> str: + """ + Parses the header of a VCF file and returns it as a string. + + Args: + vcf_path (Path | str): The path to the VCF file. + + Returns: + str: The header of the VCF file as a string. + """ with pysam.VariantFile(str(vcf_path)) as vcf: return str(vcf.header) @@ -141,10 +170,10 @@ def parse_variants(vcf_path: Path | str) -> Iterable[VariantRecord]: Extracts variants from a VCF file using pysam.VariantFile. Args: - vcf_path (str): Path to the VCF file. + vcf_path (Path | str): Path to the VCF file. Returns: - Iterable[str]: Iterable of variants. + Iterable[VariantRecord]: Iterable of variants. """ with pysam.VariantFile(str(vcf_path)) as vcf: diff --git a/tests/pytest/test_isec.py b/tests/pytest/test_isec.py index f93a46a..05f0f5e 100644 --- a/tests/pytest/test_isec.py +++ b/tests/pytest/test_isec.py @@ -75,7 +75,7 @@ def test_isec_simple(self, monkeypatch: pytest.MonkeyPatch): Test the VCFIntersection class with simple data. """ - monkeypatch.setattr("click.confirm", lambda *args, **kwargs: False) + monkeypatch.setattr("click.confirm", lambda *args, **kwargs: None) file1, file2 = SIMPLE_CMP_VCFS with tempfile.TemporaryDirectory() as tmp_dir: @@ -83,8 +83,8 @@ def test_isec_simple(self, monkeypatch: pytest.MonkeyPatch): vcf2 = VCFPreparer(file2, tmp_dir).prepare() prefix = Path(tmp_dir) / "isec" - isec = VCFIntersection(vcf1, vcf2, prefix) - intersected, unique1, unique2 = isec.compare_vcf() + isec = VCFIntersection(vcf1, vcf2, "output", prefix) + intersected, unique1, unique2 = isec.intersect() assert len(intersected) == 5, "Expected 5 intersected variants." assert len(unique1) == 1, "Expected no unique variants in file1." assert len(unique2) == 1, "Expected no unique variants in file2." @@ -104,7 +104,7 @@ def test_randomized_isec( prefix = Path(tmp_dir) / "isec" isec = VCFIntersection(vcf1, vcf2, prefix) - intersected, unique1, unique2 = isec.compare_vcf() + intersected, unique1, unique2 = isec.intersect() assert ( len(intersected) == vcf_fixture.intersected_count ), f"Expected {vcf_fixture.intersected_count} intersected variants, found {len(intersected)}."