diff --git a/eva_vcf_merge/merge.py b/eva_vcf_merge/merge.py index cc04954..32adcbb 100644 --- a/eva_vcf_merge/merge.py +++ b/eva_vcf_merge/merge.py @@ -17,8 +17,9 @@ from ebi_eva_common_pyutils.nextflow import NextFlowPipeline, NextFlowProcess -from eva_vcf_merge.multistage import get_multistage_vertical_concat_pipeline -from eva_vcf_merge.utils import write_files_to_list, get_valid_filename, validate_aliases +from eva_vcf_merge.detect import MergeType +from eva_vcf_merge.multistage import get_multistage_merge_pipeline +from eva_vcf_merge.utils import get_valid_filename, validate_aliases class VCFMerger: @@ -30,32 +31,17 @@ def __init__(self, bgzip_binary, bcftools_binary, nextflow_binary, nextflow_conf self.output_dir = output_dir self.working_dir = os.path.join(output_dir, 'nextflow') - def horizontal_merge(self, vcf_groups, resume=True): - """ - Merge groups of vcfs horizontally, i.e. by sample, using bcftools. - - :param vcf_groups: dict mapping a string (e.g. an analysis alias) to a group of vcf files to be merged - :param resume: whether to resume pipeline (default true) - :returns: dict of merged filenames - """ - if not validate_aliases(vcf_groups.keys()): - raise ValueError('Aliases must be unique when converted to filenames') - pipeline, merged_filenames = self.generate_horizontal_merge_pipeline(vcf_groups) - workflow_file = os.path.join(self.working_dir, 'horizontal_merge.nf') - os.makedirs(self.working_dir, exist_ok=True) - pipeline.run_pipeline( - workflow_file_path=workflow_file, - working_dir=self.working_dir, - nextflow_binary_path=self.nextflow_binary, - nextflow_config_path=self.nextflow_config, - resume=resume - ) - return merged_filenames + def horizontal_merge(self, vcf_groups, chunk_size=500, resume=True): + return self.common_merge(MergeType.HORIZONTAL, vcf_groups, chunk_size, resume) def vertical_merge(self, vcf_groups, chunk_size=500, resume=True): + return self.common_merge(MergeType.VERTICAL, vcf_groups, chunk_size, resume) + + def common_merge(self, merge_type, vcf_groups, chunk_size=500, resume=True): """ - Merge groups of vcfs vertically, i.e. concatenation. + Merge groups of vcfs horizontally or vertically. + :param merge_type: vertical or horizontal merge :param vcf_groups: dict mapping a string (e.g. an analysis alias) to a group of vcf files to be merged :param chunk_size: number of vcfs to merge at once (default 500) :param resume: whether to resume pipeline (default true) @@ -63,8 +49,9 @@ def vertical_merge(self, vcf_groups, chunk_size=500, resume=True): """ if not validate_aliases(vcf_groups.keys()): raise ValueError('Aliases must be unique when converted to filenames') - pipeline, merged_filenames = self.generate_vertical_merge_pipeline(vcf_groups, chunk_size) - workflow_file = os.path.join(self.working_dir, "vertical_concat.nf") + pipeline, merged_filenames = self.generate_merge_pipeline(merge_type, vcf_groups, chunk_size) + workflow_file = os.path.join(self.working_dir, "merge.nf") + os.makedirs(self.working_dir, exist_ok=True) pipeline.run_pipeline( workflow_file_path=workflow_file, @@ -81,37 +68,11 @@ def vertical_merge(self, vcf_groups, chunk_size=500, resume=True): merged_filenames[alias] = target_filename return merged_filenames - def generate_horizontal_merge_pipeline(self, vcf_groups): + def generate_merge_pipeline(self, merge_type, vcf_groups, chunk_size): """ - Generate horizontal merge pipeline, including compressing and indexing VCFs. - - :param vcf_groups: dict mapping a string to a group of vcf files to be merged - :return: complete NextflowPipeline and dict of merged filenames - """ - dependencies = {} - merged_filenames = {} - for alias_idx, (alias, vcfs) in enumerate(vcf_groups.items()): - deps, index_processes, compressed_vcfs = self.compress_and_index(alias_idx, vcfs) - dependencies.update(deps) - - safe_alias = get_valid_filename(alias) - list_filename = write_files_to_list(compressed_vcfs, safe_alias, self.working_dir) - merged_filename = os.path.join(self.output_dir, f'{safe_alias}_merged.vcf.gz') - merge_process = NextFlowProcess( - process_name=f'merge_{alias_idx}', - command_to_run=f'{self.bcftools_binary} merge --merge all --file-list {list_filename} ' - f'--threads 3 -O z -o {merged_filename}' - ) - # each alias's merge process depends on all index processes - dependencies[merge_process] = index_processes - merged_filenames[alias] = merged_filename - - return NextFlowPipeline(dependencies), merged_filenames - - def generate_vertical_merge_pipeline(self, vcf_groups, chunk_size): - """ - Generate vertical merge (concatenation) pipeline. + Generate merge pipeline, including compressing and indexing VCFs. + :param merge_type: vertical or horizontal merge :param vcf_groups: dict mapping a string to a group of vcf files to be merged :param chunk_size: number of vcfs to merge at once :return: complete NextFlowPipeline and dict of merged filenames @@ -119,16 +80,28 @@ def generate_vertical_merge_pipeline(self, vcf_groups, chunk_size): full_pipeline = NextFlowPipeline() merged_filenames = {} for alias_idx, (alias, vcfs) in enumerate(vcf_groups.items()): - deps, index_processes, compressed_vcfs = self.compress_and_index(alias_idx, vcfs) - compress_pipeline = NextFlowPipeline(deps) - concat_pipeline, merged_filename = get_multistage_vertical_concat_pipeline( - alias=alias_idx, - vcf_files=compressed_vcfs, - concat_chunk_size=chunk_size, - concat_processing_dir=self.working_dir, - bcftools_binary=self.bcftools_binary - ) - pipeline = NextFlowPipeline.join_pipelines(compress_pipeline, concat_pipeline) + compress_pipeline, compressed_vcfs = self.compress_and_index(alias_idx, vcfs) + if merge_type == MergeType.HORIZONTAL: + merge_pipeline, merged_filename = get_multistage_merge_pipeline( + alias=alias_idx, + vcf_files=compressed_vcfs, + chunk_size=chunk_size, + processing_dir=self.working_dir, + bcftools_binary=self.bcftools_binary, + process_name='merge', + process_command=self.merge_command + ) + else: + merge_pipeline, merged_filename = get_multistage_merge_pipeline( + alias=alias_idx, + vcf_files=compressed_vcfs, + chunk_size=chunk_size, + processing_dir=self.working_dir, + bcftools_binary=self.bcftools_binary, + process_name='concat', + process_command=self.concat_command + ) + pipeline = NextFlowPipeline.join_pipelines(compress_pipeline, merge_pipeline) full_pipeline = NextFlowPipeline.join_pipelines(full_pipeline, pipeline) merged_filenames[alias] = merged_filename return full_pipeline, merged_filenames @@ -139,7 +112,7 @@ def compress_and_index(self, alias, vcfs): :param alias: name of group of vcf files (used to name Nextflow processes uniquely) :param vcfs: list of vcf files - :return: dependency map, list of final index processes, and list of final filenames + :return: NextFlow pipeline and list of final filenames """ dependencies = {} index_processes = [] @@ -160,5 +133,12 @@ def compress_and_index(self, alias, vcfs): index_processes.append(index_process) # each file's index depends only on compress (if present) dependencies[index_process] = [compress_process] if compress_process else [] - # TODO preferably return a NextFlowPipeline rather than dependencies & final processes - return dependencies, index_processes, compressed_vcfs + return NextFlowPipeline(dependencies), compressed_vcfs + + def merge_command(self, files_to_merge_list, output_vcf_file): + return (f'{self.bcftools_binary} merge --merge all --file-list {files_to_merge_list} --threads 3 ' + f'-O z -o {output_vcf_file}') + + def concat_command(self, files_to_merge_list, output_vcf_file): + return (f'{self.bcftools_binary} concat --allow-overlaps --remove-duplicates --file-list {files_to_merge_list} ' + f'-O z -o {output_vcf_file}') diff --git a/eva_vcf_merge/multistage.py b/eva_vcf_merge/multistage.py index 72ee8d2..5f1f3e4 100644 --- a/eva_vcf_merge/multistage.py +++ b/eva_vcf_merge/multistage.py @@ -17,22 +17,24 @@ from ebi_eva_common_pyutils.nextflow import NextFlowPipeline, NextFlowProcess -from eva_vcf_merge.utils import write_files_to_list - -def get_multistage_vertical_concat_pipeline( +def get_multistage_merge_pipeline( alias, vcf_files, - concat_processing_dir, - concat_chunk_size, + processing_dir, + chunk_size, bcftools_binary, + process_name, + process_command, stage=0, prev_stage_processes=None, pipeline=None ): """ - # Generate Nextflow pipeline for multi-stage VCF concatenation of 5 VCF files with 2-VCFs concatenated at a time (CONCAT_CHUNK_SIZE=2) - # For illustration purposes only. Usually the CONCAT_CHUNK_SIZE is much higher (ex: 500). + Generate Nextflow pipeline for multi-stage VCF merging or concatenation. + + # As an example, below is multi-stage VCF concatenation of 5 VCF files with 2-VCFs concatenated at a time (CHUNK_SIZE=2) + # For illustration purposes only. Usually the CHUNK_SIZE is much higher (ex: 500). # # vcf1 vcf2 vcf3 vcf4 vcf5 # \ / \ / @@ -51,70 +53,75 @@ def get_multistage_vertical_concat_pipeline( if not pipeline: pipeline = NextFlowPipeline() prev_stage_processes = [] - # If we are left with only one file, this means we have reached the last concat stage + # If we are left with only one file, this means we have reached the last merge stage if len(vcf_files) == 1: return pipeline, vcf_files[0] - num_batches_in_stage = math.ceil(len(vcf_files) / concat_chunk_size) + num_batches_in_stage = math.ceil(len(vcf_files) / chunk_size) curr_stage_processes = [] output_vcf_files_from_stage = [] for batch in range(0, num_batches_in_stage): - # split files in the current stage into chunks based on concat_chunk_size - files_in_batch = vcf_files[(concat_chunk_size * batch):(concat_chunk_size * (batch + 1))] - files_to_concat_list = write_files_to_concat_list(files_in_batch, stage, batch, concat_processing_dir) - output_vcf_file = get_output_vcf_file_name(alias, stage, batch, concat_processing_dir) + # split files in the current stage into chunks based on chunk_size + files_in_batch = vcf_files[(chunk_size * batch):(chunk_size * (batch + 1))] + files_to_merge_list = write_files_to_merge_list(process_name, alias, files_in_batch, stage, batch, processing_dir) + output_vcf_file = get_output_vcf_file_name(process_name, alias, stage, batch, processing_dir) - # separate concat & index processes - concat_process = NextFlowProcess( - process_name=f"concat{alias}_stage{stage}_batch{batch}", - command_to_run=f"{bcftools_binary} concat --allow-overlaps --remove-duplicates " - f"--file-list {files_to_concat_list} -o {output_vcf_file} -O z" + # separate merge & index processes + merge_process = NextFlowProcess( + process_name=f"{process_name}{alias}_stage{stage}_batch{batch}", + command_to_run=process_command(files_to_merge_list, output_vcf_file) ) index_process = NextFlowProcess( process_name=f"index{alias}_stage{stage}_batch{batch}", command_to_run=f"{bcftools_binary} index --csi {output_vcf_file}" ) - # index depends only on this batch's concat - pipeline.add_dependencies({index_process: [concat_process]}) + # index depends only on this batch's merge + pipeline.add_dependencies({index_process: [merge_process]}) # next stage requires indexing to be complete from this stage curr_stage_processes.append(index_process) output_vcf_files_from_stage.append(output_vcf_file) - # Concatenation batch in a given stage will have to wait until the completion of - # n batches in the previous stage where n = concat_chunk_size + # Merge batch in a given stage will have to wait until the completion of + # n batches in the previous stage where n = chunk_size # Ex: In the illustration above stage 1/batch 0 depends on completion of stage 0/batch 0 and stage 0/batch 1 # While output of any n batches from the previous stage can be worked on as they become available, # having a predictable formula simplifies pipeline generation and troubleshooting - prev_stage_dependencies = prev_stage_processes[(concat_chunk_size * batch):(concat_chunk_size * (batch + 1))] - pipeline.add_dependencies({concat_process: prev_stage_dependencies}) + prev_stage_dependencies = prev_stage_processes[(chunk_size * batch):(chunk_size * (batch + 1))] + pipeline.add_dependencies({merge_process: prev_stage_dependencies}) prev_stage_processes = curr_stage_processes - return get_multistage_vertical_concat_pipeline( + return get_multistage_merge_pipeline( alias, output_vcf_files_from_stage, - concat_processing_dir, concat_chunk_size, + processing_dir, chunk_size, bcftools_binary, + process_name, + process_command, stage=stage+1, prev_stage_processes=prev_stage_processes, pipeline=pipeline ) -def write_files_to_concat_list(files_to_concat, concat_stage, concat_batch, concat_processing_dir): +def write_files_to_merge_list(process_name, alias, files_to_merge, stage, batch, processing_dir): """ - Write the list of files to be concatenated for a given stage and batch + Write the list of files to be merged for a given stage and batch """ - output_dir = get_concat_output_dir(concat_stage, concat_processing_dir) - alias = f'batch{concat_batch}' - return write_files_to_list(files_to_concat, alias, output_dir) + output_dir = get_output_dir(process_name, alias, stage, processing_dir) + list_filename = os.path.join(output_dir, f"batch{batch}_files.list") + os.makedirs(os.path.dirname(list_filename), exist_ok=True) + with open(list_filename, "w") as handle: + for filename in files_to_merge: + handle.write(filename + "\n") + return list_filename -def get_concat_output_dir(concat_stage_index: int, concat_processing_dir: str): +def get_output_dir(process_name, alias, stage_index, processing_dir): """ - Get the file name with the list of files to be concatenated for a given stage and batch in the concatenation process + Get the file name with the list of files to be merged for a given stage and batch in the merge process """ - return os.path.join(concat_processing_dir, "vertical_concat", f"stage_{concat_stage_index}") + return os.path.join(processing_dir, f'{process_name}_{alias}', f"stage_{stage_index}") -def get_output_vcf_file_name(alias: str, concat_stage_index: int, concat_batch_index: int, concat_processing_dir: str): - return os.path.join(get_concat_output_dir(concat_stage_index, concat_processing_dir), - f"concat{alias}_output_stage{concat_stage_index}_batch{concat_batch_index}.vcf.gz") +def get_output_vcf_file_name(process_name, alias, stage_index, batch_index, processing_dir): + return os.path.join(get_output_dir(process_name, alias, stage_index, processing_dir), + f"{process_name}{alias}_output_stage{stage_index}_batch{batch_index}.vcf.gz") diff --git a/eva_vcf_merge/utils.py b/eva_vcf_merge/utils.py index 850ec22..53d0b67 100644 --- a/eva_vcf_merge/utils.py +++ b/eva_vcf_merge/utils.py @@ -26,16 +26,6 @@ def are_all_elements_unique(elements): return True -def write_files_to_list(files, alias, output_dir): - """Write the list of files to a path built from alias and output_dir.""" - list_filename = os.path.join(output_dir, f"{alias}_files.list") - os.makedirs(os.path.dirname(list_filename), exist_ok=True) - with open(list_filename, "w") as handle: - for filename in files: - handle.write(filename + "\n") - return list_filename - - def get_valid_filename(s): """Return string with characters not allowed in filenames replaced by underscore.""" return re.sub(r'[^-.0-9a-zA-Z]+', '_', s) diff --git a/tests/test_merge.py b/tests/test_merge.py index 3a1988a..5f50f4f 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -63,7 +63,7 @@ def test_concat_uninterrupted(vcf_merger, many_vcfs_to_concat): # Final_merged <-------- Stage 2 vcfs = {'many': many_vcfs_to_concat} vcf_merger.vertical_merge(vcfs, chunk_size=2, resume=False) - stage_dirs = glob.glob(f'{vcf_merger.working_dir}/vertical_concat/stage*') + stage_dirs = glob.glob(f'{vcf_merger.working_dir}/concat_0/stage*') assert len(stage_dirs) == 3 output_vcf_from_multi_stage_concat = os.path.join(vcf_merger.output_dir, 'many_merged.vcf.gz') output_vcf_from_single_stage_concat = f'{vcf_merger.output_dir}/single_stage_concat_result.vcf.gz'