diff --git a/crisprme.py b/crisprme.py index 2a26a26..561d3f6 100755 --- a/crisprme.py +++ b/crisprme.py @@ -9,7 +9,7 @@ import re -version = "2.1.3" # CRISPRme version; TODO: update when required +version = "2.1.4" # CRISPRme version; TODO: update when required __version__ = version script_path = os.path.dirname(os.path.abspath(__file__)) @@ -972,83 +972,123 @@ def target_integration(): ) +def print_help_gnomad_converter(): + """ + Prints the help information for the gnomAD converter functionality, providing + details on the conversion process from gnomAD VCFs to VCFs compatible with + CRISPRme. It outlines the options available for specifying directories, + sample IDs, variant filtering, multiallelic site handling, and thread usage + during the conversion process. + + Raises: + SystemExit: If the help information is displayed to guide users on using + the gnomAD converter functionality. + """ + + # functionality description + sys.stderr.write( + "The gnomAD converter functionality simplifies the conversion process " + "of gnomAD VCFs (versions 3.1 and 4.0) into VCFs supported by CRISPRme. " + "It ensures a seamless transition while maintaining compatibility with " + "CRISPRme's requirements, focusing on the structure and content of " + "precomputed sample IDs file \n\n" + ) + # options + sys.stderr.write( + "Options:\n" + "\t--gnomAD_VCFdir, specifies the directory containing gnomAD VCFs. " + "Files must have the BGZ extension\n" + "\t--samplesID, specifies the precomputed sample IDs file necessary " + "for incorporating population-specific information into the output " + "VCFs\n" + "\t--keep, optional flag to retain all variants, regardless of their " + "filter flag. By default, variants with a filter flag different from " + "PASS are discarded\n" + "\t--multiallelic, optional flag to merge variants mapped to the " + "same position, creating multiallelic sites in the output VCFs. By " + "default, each site remains biallelic\n" + "\t--thread, used to set the number of thread used in the conversion " + "process [default 8]\n" + ) + sys.exit(1) + + def gnomAD_converter(): - if "--help" in input_args: - print( - "This is the VCF gnomAD converter provided to convert all gnomADv3.1 VCFs into CRISPRme supported VCFs" - ) - print("These are the flags that must be used in order to run this function:") - print( - "\t--gnomAD_VCFdir, used to specify the directory containing gnomADv3.1 original VCFs" - ) - print( - "\t--samplesID, used to specify the pre-generated samplesID file necessary to introduce samples into gnomAD variant" + """ + Runs the gnomAD converter functionality based on specified arguments, converting + gnomAD VCF files into formats compatible with CRISPRme. + + Raises: + ValueError: If mandatory arguments are missing or have incorrect values. + FileExistsError: If the specified gnomAD VCF directory cannot be located. + FileNotFoundError: If the specified sample IDs file cannot be found. + subprocess.SubprocessError: If an error occurs during the gnomAD VCF + conversion process. + """ + + args = input_args[2:] # reover gnomAD converter args + if "--help" in args or not args: # print help + print_help_gnomad_converter() + sys.exit(1) + if "--gnomAD_VCFdir" not in args: + raise ValueError( + "--gnomAD_VCFdir is a mandatory argument required for the conversion " + "process. Please specify the directory containing gnomAD VCFs using " + "this option\n" ) - print( - "\t--thread, used to specify the number of core used to process VCFs in parallel (DEFAULT is ALL available minus 2) [OPTIONAL]" + if "--samplesID" not in args: + raise ValueError( + "--samplesID is a mandatory argument required for the conversion " + "process. Please specify the sample IDs file this option\n" ) - exit(0) - - if "--gnomAD_VCFdir" not in input_args: - print("--gnomAD_VCFdir not in input, MANDATORY TO CONVERT DATA") - # vcf_dir = script_path+'vuota/' - exit(1) - else: - try: - vcf_dir = os.path.abspath( - input_args[input_args.index("--gnomAD_VCFdir") + 1] - ) - except IndexError: - print("Please input some parameter for flag --gnomAD_VCFdir") - exit(1) - if not os.path.isdir(vcf_dir): - print("The folder specified for --gnomAD_VCFdir does not exist") - exit(1) - - if "--thread" not in input_args: - # print("--thread must be contained in the input") - # exit(1) - thread = len(os.sched_getaffinity(0)) - 2 - else: - try: - thread = input_args[input_args.index("--thread") + 1] - except IndexError: - print("Please input some parameter for flag --thread") - exit(1) - try: - thread = int(thread) - except: - print("Please input a number for flag thread") - exit(1) - if thread <= 0 or thread > len(os.sched_getaffinity(0)) - 2: - print("thread is set to default (ALL available minus 2)") - thread = len(os.sched_getaffinity(0)) - 2 - # exit(1) - - if "--samplesID" not in input_args: - print("--samplesID not in input, MANDATORY TO CONVERT DATA") - exit(1) - elif "--samplesID" in input_args: + # read gnomAD directory arg + try: + gnomad_dir = args[args.index("--gnomAD_VCFdir") + 1] + if gnomad_dir.startswith("--"): + raise ValueError("Please input some parameter for flag --gnomAD_VCFdir\n") + gnomad_dir = os.path.abspath(gnomad_dir) # first sanity check passed + if not os.path.isdir(gnomad_dir): + raise FileExistsError(f"Unable to locate {gnomad_dir}") + except IndexError as e: + raise ValueError( + "Please input some parameter for flag --gnomAD_VCFdir\n" + ) from e + # read samples ids arg + try: + samples_ids = args[args.index("--samplesID") + 1] + if samples_ids.startswith("--"): + raise ValueError("Please input some parameter for flag --samplesID") + samples_ids = os.path.abspath(samples_ids) # first sanity check passed + if not os.path.isfile(samples_ids): + raise FileNotFoundError(f"Unable to locate {samples_ids}") + except IndexError as e: + raise ValueError("Please input some parameter for flag --samplesID") from e + # read keep arg + keep = "--keep" in args # keep all variants regardless of filter label + # read multiallelic arg + multiallelic = "--multiallelic" in args # merge variants in multiallelic sites + # read threads arg + threads = 8 + if "--threads" in args: try: - samplefile = os.path.abspath( - input_args[input_args.index("--samplesID") + 1] - ) - except IndexError: - print("Please input some parameter for flag --samplesID") - exit(1) - if not os.path.isfile(samplefile): - print("The file specified for --samplesID does not exist") - exit(1) - - os.system( - script_path - + "./convert_gnomAD.py " - + vcf_dir - + " " - + samplefile - + " " - + str(thread) + threads = int(args[args.index("--threads") + 1]) + if threads <= 0: + raise ValueError(f"Forbidden number of threads ({threads})") + except IndexError as e: + raise ValueError("Missing or forbidden threads value") from e + # run gnom AD converter + gnomad_converter_script = os.path.join( + script_path.replace("PostProcess", "src"), "convert_gnomAD_vcfs.py" ) + cmd = ( + f"python {gnomad_converter_script} {gnomad_dir} {samples_ids} {keep} " + f"{multiallelic} {threads}" + ) + code = subprocess.call(cmd, shell=True) + if code != 0: + raise subprocess.SubprocessError( + f"An error occurred while converting gnomAD VCFs in {gnomad_dir}" + ) def personal_card(): @@ -1134,20 +1174,47 @@ def crisprme_version(): sys.stdout.write(f"v{__version__}\n") +def print_help_complete_test(): + """ + Prints the help information for executing comprehensive testing of the + complete-search functionality provided by CRISPRme. + + Raises: + SystemExit: If the help information is displayed to guide users on + executing comprehensive testing. + """ + + # write intro message to stdout + sys.stderr.write( + "Execute comprehensive testing for complete-search functionality " + "provided by CRISPRme\n" + ) + # list functionality options + sys.stderr.write( + "Options:\n" + "\t--chrom, test the complete-search functionality on the specified " + "chromosome (e.g., chr22). By default, the test is conducted on all " + "chromosomes\n" + "\t--vcf_dataset, VCFs dataset to be used during CRISPRme testing. " + "Available options include 1000 Genomes (1000G) and Human Genome " + "Diversity Project (HGDP). The default dataset is 1000 Genomes.\n" + "\t--debug, debug mode\n" + ) + sys.exit(1) + + def complete_test_crisprme(): + """ + Executes comprehensive testing for the complete-search functionality provided + by CRISPRme based on specified arguments. + + Raises: + OSError: If the CRISPRme test fails, indicating an issue with the testing + process. + """ + if "--help" in input_args: - sys.stderr.write( - "Execute comprehensive testing for complete-search functionality " - "provided by CRISPRme\n" - "Options:\n" - "--chrom, test the complete-search functionality on the specified " - "chromosome (e.g., chr22). By default, the test is conducted on all " - "chromosomes\n" - "--vcf_dataset, VCFs dataset to be used during CRISPRme testing. " - "Available options include 1000 Genomes (1000G) and Human Genome " - "Diversity Project (HGDP). The default dataset is 1000 Genomes.\n" - "--debug, debug mode\n" - ) + print_help_complete_test() chrom = "all" if "--chrom" in input_args: # individual chrom to test try: diff --git a/src/convert_gnomAD_vcfs.py b/src/convert_gnomAD_vcfs.py new file mode 100644 index 0000000..efec095 --- /dev/null +++ b/src/convert_gnomAD_vcfs.py @@ -0,0 +1,326 @@ +""" +""" + +from utils import remove # type: ignore + +from functools import partial +from typing import List, Tuple +from glob import glob +from io import TextIOWrapper + +import multiprocessing +import subprocess +import pysam +import gzip +import time +import sys +import os + +GT = ["0/0", "0/1"] +GTLINE = '##FORMAT=' +BCFTOOLSNORM = "bcftools norm" + + +def read_samples_ids(samples_ids: str): + """ + Reads sample IDs from a file and returns a list of sample IDs excluding comments. + + Args: + samples_ids (str): Path to the file containing sample IDs. + + Returns: + list: List of sample IDs extracted from the file. + + Raises: + FileNotFoundError: If the specified samples file is not found. + IOError: If an error occurs while parsing the samples IDs file. + """ + + if not os.path.isfile(samples_ids): + raise FileNotFoundError(f"Unable to locate {samples_ids}") + try: + with open(samples_ids, mode="r") as infile: # parse sample ids file + samplesdata = [line.strip() for line in infile] + except IOError as e: + raise IOError( + f"An error occurred while parsing samples IDs file {samples_ids}" + ) from e + # create samples list + return [sample.split()[0] for sample in samplesdata if not sample.startswith("#")] + + +def tabix_index(vcf_fname: str) -> str: + """ + Creates an index for a VCF file using tabix and returns the path to the + generated index file. + + Args: + vcf_fname (str): Path to the VCF file to be indexed. + + Returns: + str: Path to the generated tabix index file. + + Raises: + FileExistsError: If indexing of the VCF file fails. + """ + + start = time.time() + pysam.tabix_index(vcf_fname, preset="vcf") # index input vcf with tabix + tbi_index = f"{vcf_fname}.tbi" + if not os.path.isfile(tbi_index) and os.stat(tbi_index).st_size <= 0: + raise FileExistsError(f"Indexing {vcf_fname} failed") + sys.stderr.write( + f"Indexing {vcf_fname} completed in {(time.time() - start):.2f}s\n" + ) + return tbi_index + + +def load_vcf(vcf_fname: str) -> pysam.VariantFile: + """ + Loads a VCF file from the specified path, automatically indexing it if the + tabix index is not found. + + Args: + vcf_fname (str): Path to the VCF file to load. + + Returns: + pysam.VariantFile: Loaded VCF file object. + """ + + # search for tabix index, if not found index the vcf + tbi_index = ( + f"{vcf_fname}.tbi" + if os.path.isfile(f"{vcf_fname}.tbi") + else tabix_index(vcf_fname) + ) + return pysam.VariantFile(vcf_fname, index_filename=tbi_index) + + +def update_header(header: pysam.VariantHeader, samples: List[str]) -> str: + """ + Updates the header of a VCF file with the specified samples and additional + metadata fields. + + Args: + header (pysam.VariantHeader): The original VCF header to be updated. + samples (List[str]): List of sample names to be added to the header. + + Returns: + str: Updated VCF header as a string. + """ + + header.add_line(GTLINE) # add FORMAT metadata field + header.add_samples(samples) # add samples to header + return str(header) + + +def variant_observed(allele_count: Tuple[int]) -> bool: + """ + Checks if any allele count in the provided tuple is greater than zero, indicating + the variant is observed. + + Args: + allele_count (Tuple[int]): Tuple of allele counts for the variant. + + Returns: + bool: True if any allele count is greater than zero, False otherwise. + """ + + return any((ac > 0 for ac in allele_count)) + + +def format_variant_record(variant: pysam.VariantRecord, genotypes: str) -> str: + """ + Formats a variant record into a string with specified genotypes and variant + information. + + Args: + variant (pysam.VariantRecord): The variant record to be formatted. + genotypes (str): Genotypes information to be included in the format. + + Returns: + str: Formatted variant record as a tab-separated string. + """ + + try: + af = ",".join(list(map(str, variant.info["AF"]))) + except KeyError: # catch potential AF missing in variant INFO field + af = ",".join(["0.0" for _ in variant.alts]) # type: ignore + variant_format = [ + variant.chrom, + variant.pos, + variant.id, + variant.ref, + ",".join(variant.alts), # handle multiallelic sites # type: ignore + variant.qual, + ";".join(variant.filter.keys()), # type: ignore + f"AF={af}", # keep allele frequencies + "GT", # add genotype to format + genotypes, + ] + return "\t".join([f"{e}" if e is not None else "." for e in variant_format]) + + +def convert_vcf(vcf_fname: str, samples: List[str], keep: bool): + """ + Converts a VCF file by updating the header, filtering variants, and creating + a new compressed VCF file. + + Args: + vcf_fname (str): Path to the input VCF file. + samples (List[str]): List of sample names. + keep (bool): Flag to determine whether to keep variants that do not pass + the filter. + + Returns: + str: Path to the converted and compressed VCF file. + + Raises: + OSError: If an error occurs while loading the input VCF file. + IOError: If an error occurs during the conversion process. + """ + + vcf_outfname = f"{os.path.splitext(vcf_fname)[0]}.gz" # replace bgz with gz + try: + vcf = load_vcf(vcf_fname) # use pysam for optimized VCF loading + except OSError as e: + raise OSError(f"An error occurred while loading {vcf_fname}") from e + samples_ac = [ + f"AC_{sample}" for sample in samples + ] # recover allele count field for each input sample + try: + with gzip.open(vcf_outfname, mode="wt") as outfile: + # write the upated header to the converted vcf + outfile.write(update_header(vcf.header.copy(), samples)) + for variant in vcf: + if not keep and "PASS" not in variant.filter.keys(): + continue + genotypes = "\t".join( + [ + GT[1] if variant_observed(variant.info[sac]) else GT[0] + for sac in samples_ac + ] + ) + outfile.write( + f"{format_variant_record(variant, genotypes)}\n" + ) # write the formatted VCF line to the out vcf + except IOError as e: + raise IOError(f"An error occurred while converting {vcf_fname}") from e + assert os.stat(vcf_outfname).st_size > 0 + return vcf_outfname + + +def bcftools_merge(vcf_fname: str, multiallelic: bool) -> str: + """ + Merges alleles in a VCF file using bcftools and returns the path to the merged + VCF file. + + Args: + vcf_fname (str clean): Path to the input VCF file. + multiallelic (bool): Flag indicating whether to handle multiallelic + variants, determining the output file suffix. + + Returns: + str: Path to the merged VCF file. + + Raises: + subprocess.SubprocessError: If an error occurs during the allele merging + process. + """ + + # recover file prefix + vcf_outfname_prefix = os.path.splitext(os.path.splitext(vcf_fname)[0])[0] + suffix = "multiallelic" if multiallelic else "biallelic" + vcf_outfname = f"{vcf_outfname_prefix}.{suffix}.vcf.gz" + m = "-m+" if multiallelic else "-m-" + code = subprocess.call( + f"{BCFTOOLSNORM} {m} -O z -o {vcf_outfname} {vcf_fname}", + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + if code != 0: + raise subprocess.SubprocessError( + f"An error occurred while merging alleles in {vcf_fname}" + ) + assert os.stat(vcf_outfname).st_size > 0 + remove(vcf_fname) # remove intermediate vcf file + return vcf_outfname + + +def run_conversion_pipeline( + vcf_fname: str, samples: List[str], keep: bool, multiallelic: bool +) -> None: + """ + Runs a conversion pipeline to process a VCF file by adding genotypes and merging + variants based on specified options. + + Args: + vcf_fname (str): Path to the input VCF file. + samples (List[str]): List of sample names. + keep (bool): Flag to determine whether to keep variants that do not pass + the filter. + multiallelic (bool): Flag indicating whether to handle multiallelic variants + during merging. + + Returns: + None + """ + + vcf_genotypes = convert_vcf(vcf_fname, samples, keep) # add genotypes to input VCF + # merge variants into mutlialleic/biallelic sites + vcf_merged = bcftools_merge(vcf_genotypes, multiallelic) + assert os.path.isfile(vcf_merged) and os.stat(vcf_merged).st_size > 0 + + +def convert_gnomad_vcfs(): + """ + Converts gnomAD VCF files based on specified parameters and sample data. + + Raises: + ValueError: If the number of input arguments is incorrect, the gnomAD VCF + directory is invalid, or an invalid number of threads is selected. + FileNotFoundError: If no gnomAD VCF files are found in the specified directory. + OSError: If an error occurs during the gnomAD VCF conversion process. + """ + + argv = sys.argv[1:] + if len(argv) != 5: + raise ValueError( + "Wrong number of input arguments, cannot proceed with gnomAD VCF conversion" + ) + gnomad_vcfs_dir, samples_ids, keep, multiallelic, threads = argv + if not os.path.isdir(gnomad_vcfs_dir): + raise ValueError( + f"The specified gnomAD VCF directory is not a directory ({gnomad_vcfs_dir})" + ) + threads = int(threads) + if threads > multiprocessing.cpu_count() or threads < 0: + raise ValueError(f"Forbidden number of threads selected ({threads})") + keep = keep == "True" + multiallelic = multiallelic == "True" + start = time.time() + # recover gnomAD file within the specified location (compressed with bgz extension) + gnomad_vcfs = glob(os.path.join(gnomad_vcfs_dir, "*.vcf*bgz")) + if not gnomad_vcfs: # no gnomAD vcf found + raise FileNotFoundError(f"No gnomAD VCF file found in {gnomad_vcfs_dir}") + samples = read_samples_ids(samples_ids) # recover samples data from sample file + threads = multiprocessing.cpu_count() if threads == 0 else threads + try: + pool = multiprocessing.Pool(processes=threads) + partial_run_conversion_pipeline = partial( + run_conversion_pipeline, + samples=samples, + keep=keep, + multiallelic=multiallelic, + ) + pool.map(partial_run_conversion_pipeline, gnomad_vcfs) + pool.close() + pool.join() + except OSError as e: + raise OSError("An error occurred during gnomAD VCF conversion") from e + sys.stderr.write(f"Elapsed time {(time.time() - start):.2f}s\n") + + +if __name__ == "__main__": + convert_gnomad_vcfs()