Skip to content

Commit

Permalink
Copy to output directory (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
brendancsmith authored Mar 7, 2024
1 parent 14089c1 commit ffa76c8
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 90 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"args": [
"tests/resources/cmp-test-a.vcf",
"tests/resources/cmp-test-b.vcf",
"-p",
"tmp"
"-o",
"output"
]
}
]
Expand Down
24 changes: 0 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,3 @@ A common task for bioinformaticians is to compare variants, whether to compare V
This script takes as input two VCFs and performs a comparison of the variants found in each file. The script outputs 3 VCFs, reflecting those variants that are shared and unique to each individual.

**NOTE**: An example VCF is provided at `tests/resources/sample.vcf`. VCFs can grow up to 4 million variants in size, as in the case of whole genome sequencing.

## Part 2) Problem Solving Challenge (to be discussed in person at 2nd interview)

Carefully read the following questions. In your next interview, be prepared to discuss each question and scenario. During the discussion, be as specific as possible regarding how the mentioned technology would be used. Our goal is to find out how quickly you grasp novel technologies, with respect to both their benefits and drawbacks.

### Scenario A:

In the current system large JSON Lines files (100,000 JSON dictionaries per file) are uploaded to S3. Each file is then read line by line and the JSON dictionaries sent to a MongoDB collection using Apache Airflow. The client would like to remove Airflow from this process. The entire contents of the JSON Lines file must be uploaded to MongoDB. Files that are in the process of being uploaded and those that have errored must be tracked.

#### Questions:

1. Take 5-10 minutes to present an architecture that would eliminate the need to use Apache Airflow. Feel free to use any combination of AWS services you wish to solve the problem. Be prepared to justify your technology and architectural decisions.
2. How would you retry the process if MongoDB went offline in the middle of an upload?

### Scenario B

Linux nodes continuously watch a RabbitMQ queue to see if any jobs have been submitted. When a job appears in a queue, one node will pick up the job, thereby preventing a different node from picking up the same job. RabbitMQ’s ack late feature is used to ensure jobs are retried when a node abruptly fails. Only one node is supposed to execute a job at one time, however, under some conditions, it is possible for two nodes to pick up the same job which will cause issues. The only shared systems between the nodes are the RabbitMQ queue and a MongoDB database.

#### Questions:

1. Come up with a strategy on how to keep two jobs from running at the same time. The strategy can only use the given resources.
2. If a new shared resource or technology could be added to the system, what would you add to ensure that two jobs do not run at the same time?

### Good luck!
22 changes: 18 additions & 4 deletions src/vcf_isec/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,22 @@
writable=True,
path_type=Path,
),
default=None,
help="The directory to write isec outputs. Will create a tempory directory if not set.",
)
@click.option(
"-o",
"--output",
type=click.Path(
exists=False,
file_okay=False,
dir_okay=True,
allow_dash=False, # TODO: allow writing to stdout
writable=True,
path_type=Path,
),
default="output",
help="Output directory",
help="The directory to write the output files.",
)
@click.argument(
"file1",
Expand All @@ -45,14 +59,14 @@
path_type=Path,
),
)
def main(prefix, file1, file2):
def main(prefix, output, file1, file2):
try:
with tempfile.TemporaryDirectory() as tmp_dir:
vcf1 = parser.VCFPreparer(file1, tmp_dir).prepare()
vcf2 = parser.VCFPreparer(file2, tmp_dir).prepare()

isec = VCFIntersection(vcf1, vcf2, Path(tmp_dir) / "isec")
shared, unique1, unique2 = isec.compare_vcf()
isec = VCFIntersection(vcf1, vcf2, output, prefix or Path(tmp_dir) / "isec")
shared, unique1, unique2 = isec.intersect()

print(f"Unique to {file1.name}: {len(unique1)}")
print(f"Unique to {file1.name}: {len(unique2)}")
Expand Down
206 changes: 162 additions & 44 deletions src/vcf_isec/isec.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
## vcf_comparator.py
import shutil
import tempfile
from pathlib import Path
from typing import List, NamedTuple
from typing import Iterator, List, NamedTuple

import click
import pysam
from pysam import bcftools
from pysam.utils import SamtoolsError
Expand All @@ -12,78 +14,194 @@


class ISecOutput(NamedTuple):
"""
Represents the output of the intersection operation between two sets of variant records.
"""

intersection: List[pysam.VariantRecord]
complement1: List[pysam.VariantRecord]
complement2: List[pysam.VariantRecord]
setdiff1: List[pysam.VariantRecord]
setdiff2: List[pysam.VariantRecord]


class VCFIntersection:
"""
Class for comparing two VCF files and extracting shared and unique variants.
The provided vcf files need to be compressed and indexed (i.e. csi or tbi files exist in the same directory).
Args:
vcf1 (Path | str): Path to the first VCF file.
vcf2 (Path | str): Path to the second VCF file.
prefix (Path | str | None, optional): Path to the output directory. Defaults to None.
Raises:
FileFormatError: If either isec_dir or out_dir are files (expect directories or nonexistant).
"""

def __init__(
self,
file1_path: Path | str,
file2_path: Path | str,
prefix: Path | str | None = None,
vcf1: Path | str,
vcf2: Path | str,
out_dir: Path | str,
isec_dir: Path | str | None = None,
) -> None:
self.file1_path = file1_path
self.file2_path = file2_path
self.vcf1 = Path(vcf1)
self.vcf2 = Path(vcf2)

if prefix:
self.prefix = Path(prefix)
if self.prefix.is_file():
raise FileFormatError(f"{self.prefix} is not a directory.")
else:
self.prefix = None
self.out_dir = Path(out_dir)
if self.out_dir.is_file():
raise FileFormatError(f"{self.out_dir} is not a directory.")

self.isec_dir = Path(isec_dir) if isec_dir else None
if self.isec_dir and self.isec_dir.is_file():
raise FileFormatError(f"{self.isec_dir} is not a directory.")

def compare_vcf(self) -> ISecOutput:
def intersect(self) -> ISecOutput:
"""
Compares two VCF files and returns a tuple containing lists of shared variants,
unique variants in file1, and unique variants in file2.
Returns:
Tuple[List[str], List[str], List[str]]: shared variants, unique to file1, unique to file2
Raises:
IntersectError: If there is an error in the pysam library bcftools.isec function.
FileFormatError: If there is an error reading the file.
"""

try:
if self.prefix:
self.prefix.mkdir(parents=True, exist_ok=True)
results = self._compare_vcf(self.prefix)
else:
with tempfile.TemporaryDirectory() as tmpdirname:
results = self._compare_vcf(tmpdirname)
out_exists = self.out_dir.is_dir()
isec_exists = self.isec_dir and self.isec_dir.is_dir()

if out_exists and isec_exists:
click.confirm(
f"Directories {self.out_dir} and {self.isec_dir} will be overwritten. Continue?",
default=True,
abort=True,
)
elif out_exists:
click.confirm(
f"Directory {self.out_dir} will be overwritten. Continue?",
default=True,
abort=True,
)
elif isec_exists:
click.confirm(
f"Directory {self.isec_dir} will be overwritten. Continue?",
default=True,
abort=True,
)

if isec_exists:
shutil.rmtree(self.isec_dir) # type: ignore (can't be None)

if self.isec_dir:
self.isec_dir.mkdir(parents=True, exist_ok=True)
isec_files = self._isec(self.isec_dir)
else:
with tempfile.TemporaryDirectory() as tmpdirname:
isec_files = self._isec(Path(tmpdirname))

out_files = self._merge_output(isec_files)

results = self._gather_variant_results(out_files)

return results

def _isec(self, isec_dir: Path) -> Iterator[Path]:
"""
Runs the isec operation and produces the intermediary files.
"""

try:
bcftools.isec(
str(self.vcf1),
str(self.vcf2),
f"--prefix={str(isec_dir)}",
# "--nfiles=2",
"-w 1,2",
capture_stdout=False,
)
except SamtoolsError as e:
raise IntersectError(
f"Error in pysam library bcftools.isec function: {e}"
) from e
except OSError as e:
raise FileFormatError(f"Error reading file: {e.filename}") from e

return results
for i in range(4):
yield isec_dir / f"000{i}.vcf"

def _compare_vcf(self, dirname: Path | str) -> ISecOutput:
dirname = Path(dirname)
def _merge_output(self, isec_files: Iterator[Path]) -> Iterator[Path]:
"""
Copies the output of the intersection operation to the specified directory.
Returns:
Iterator[Path]: The paths to the output files.
"""

if self.out_dir.exists():
shutil.rmtree(self.out_dir)

bcftools.isec(
str(self.file1_path),
str(self.file2_path),
f"--prefix={str(dirname)}",
# "--nfiles=2",
"-w 1,2",
capture_stdout=False,
self.out_dir.mkdir(parents=True, exist_ok=True)

# copy the two setdiff files to the output directory
setdiff1 = self.out_dir / self.vcf1.stem.replace(".vcf", "_setdiff.vcf")
setdiff2 = self.out_dir / self.vcf2.stem.replace(".vcf", "_setdiff.vcf")

shutil.copy(next(isec_files), setdiff1)
shutil.copy(next(isec_files), setdiff2)

intersection = self.out_dir / "intersection.vcf"
intermediate1 = intersection.with_stem(intersection.stem + "-1").with_suffix(
".vcf.gz"
)
intermediate2 = intersection.with_stem(intersection.stem + "-2").with_suffix(
".vcf.gz"
)
bcftools.view(
str(next(isec_files)),
"-Oz",
"-o",
str(intermediate1),
"--write-index",
catch_stdout=False,
)

complements1_path = dirname / "0000.vcf"
complements2_path = dirname / "0001.vcf"
intersection1_path = dirname / "0002.vcf"
intersection2_path = dirname / "0003.vcf"
bcftools.view(
str(next(isec_files)),
"-Oz",
"-o",
str(intermediate2),
"--write-index",
catch_stdout=False,
)

# merge the two intersection files into one
bcftools.merge(
str(intermediate1),
str(intermediate2),
"-Ov",
"-o",
str(intersection),
"--force-samples",
catch_stdout=False,
)

for f in self.out_dir.glob(f"{intersection.stem}-*"):
f.unlink()

yield from [setdiff1, setdiff2, intersection]

# TODO: merge or use sitemap
intersection1 = list(parse_variants(intersection1_path))
intersection2 = list(parse_variants(intersection2_path))
# trunk-ignore(bandit/B101)
assert len(intersection1) == len(intersection2)
def _gather_variant_results(self, out_files) -> ISecOutput:
"""
Gathers the results of the intersection operation.
Returns:
ISecOutput: A named tuple containing the shared variants, unique to file1, and unique to file2.
"""

complement1 = list(parse_variants(complements1_path))
complement2 = list(parse_variants(complements2_path))
unique1 = list(parse_variants(next(out_files)))
unique2 = list(parse_variants(next(out_files)))
shared = list(parse_variants(next(out_files)))

return ISecOutput(intersection1, complement1, complement2)
return ISecOutput(shared, unique1, unique2)
Loading

0 comments on commit ffa76c8

Please sign in to comment.