Skip to content

Commit

Permalink
Merge pull request #7 from apriltuesday/EVA-2645
Browse files Browse the repository at this point in the history
EVA-2645: hierarchical horizontal merge
  • Loading branch information
apriltuesday authored Nov 22, 2021
2 parents c08d416 + 55b5c45 commit f5cf85b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 116 deletions.
116 changes: 48 additions & 68 deletions eva_vcf_merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

from ebi_eva_common_pyutils.nextflow import NextFlowPipeline, NextFlowProcess

from eva_vcf_merge.multistage import get_multistage_vertical_concat_pipeline
from eva_vcf_merge.utils import write_files_to_list, get_valid_filename, validate_aliases
from eva_vcf_merge.detect import MergeType
from eva_vcf_merge.multistage import get_multistage_merge_pipeline
from eva_vcf_merge.utils import get_valid_filename, validate_aliases


class VCFMerger:
Expand All @@ -30,41 +31,27 @@ def __init__(self, bgzip_binary, bcftools_binary, nextflow_binary, nextflow_conf
self.output_dir = output_dir
self.working_dir = os.path.join(output_dir, 'nextflow')

def horizontal_merge(self, vcf_groups, resume=True):
"""
Merge groups of vcfs horizontally, i.e. by sample, using bcftools.
:param vcf_groups: dict mapping a string (e.g. an analysis alias) to a group of vcf files to be merged
:param resume: whether to resume pipeline (default true)
:returns: dict of merged filenames
"""
if not validate_aliases(vcf_groups.keys()):
raise ValueError('Aliases must be unique when converted to filenames')
pipeline, merged_filenames = self.generate_horizontal_merge_pipeline(vcf_groups)
workflow_file = os.path.join(self.working_dir, 'horizontal_merge.nf')
os.makedirs(self.working_dir, exist_ok=True)
pipeline.run_pipeline(
workflow_file_path=workflow_file,
working_dir=self.working_dir,
nextflow_binary_path=self.nextflow_binary,
nextflow_config_path=self.nextflow_config,
resume=resume
)
return merged_filenames
def horizontal_merge(self, vcf_groups, chunk_size=500, resume=True):
return self.common_merge(MergeType.HORIZONTAL, vcf_groups, chunk_size, resume)

def vertical_merge(self, vcf_groups, chunk_size=500, resume=True):
return self.common_merge(MergeType.VERTICAL, vcf_groups, chunk_size, resume)

def common_merge(self, merge_type, vcf_groups, chunk_size=500, resume=True):
"""
Merge groups of vcfs vertically, i.e. concatenation.
Merge groups of vcfs horizontally or vertically.
:param merge_type: vertical or horizontal merge
:param vcf_groups: dict mapping a string (e.g. an analysis alias) to a group of vcf files to be merged
:param chunk_size: number of vcfs to merge at once (default 500)
:param resume: whether to resume pipeline (default true)
:returns: dict of merged filenames
"""
if not validate_aliases(vcf_groups.keys()):
raise ValueError('Aliases must be unique when converted to filenames')
pipeline, merged_filenames = self.generate_vertical_merge_pipeline(vcf_groups, chunk_size)
workflow_file = os.path.join(self.working_dir, "vertical_concat.nf")
pipeline, merged_filenames = self.generate_merge_pipeline(merge_type, vcf_groups, chunk_size)
workflow_file = os.path.join(self.working_dir, "merge.nf")

os.makedirs(self.working_dir, exist_ok=True)
pipeline.run_pipeline(
workflow_file_path=workflow_file,
Expand All @@ -81,54 +68,40 @@ def vertical_merge(self, vcf_groups, chunk_size=500, resume=True):
merged_filenames[alias] = target_filename
return merged_filenames

def generate_horizontal_merge_pipeline(self, vcf_groups):
def generate_merge_pipeline(self, merge_type, vcf_groups, chunk_size):
"""
Generate horizontal merge pipeline, including compressing and indexing VCFs.
:param vcf_groups: dict mapping a string to a group of vcf files to be merged
:return: complete NextflowPipeline and dict of merged filenames
"""
dependencies = {}
merged_filenames = {}
for alias_idx, (alias, vcfs) in enumerate(vcf_groups.items()):
deps, index_processes, compressed_vcfs = self.compress_and_index(alias_idx, vcfs)
dependencies.update(deps)

safe_alias = get_valid_filename(alias)
list_filename = write_files_to_list(compressed_vcfs, safe_alias, self.working_dir)
merged_filename = os.path.join(self.output_dir, f'{safe_alias}_merged.vcf.gz')
merge_process = NextFlowProcess(
process_name=f'merge_{alias_idx}',
command_to_run=f'{self.bcftools_binary} merge --merge all --file-list {list_filename} '
f'--threads 3 -O z -o {merged_filename}'
)
# each alias's merge process depends on all index processes
dependencies[merge_process] = index_processes
merged_filenames[alias] = merged_filename

return NextFlowPipeline(dependencies), merged_filenames

def generate_vertical_merge_pipeline(self, vcf_groups, chunk_size):
"""
Generate vertical merge (concatenation) pipeline.
Generate merge pipeline, including compressing and indexing VCFs.
:param merge_type: vertical or horizontal merge
:param vcf_groups: dict mapping a string to a group of vcf files to be merged
:param chunk_size: number of vcfs to merge at once
:return: complete NextFlowPipeline and dict of merged filenames
"""
full_pipeline = NextFlowPipeline()
merged_filenames = {}
for alias_idx, (alias, vcfs) in enumerate(vcf_groups.items()):
deps, index_processes, compressed_vcfs = self.compress_and_index(alias_idx, vcfs)
compress_pipeline = NextFlowPipeline(deps)
concat_pipeline, merged_filename = get_multistage_vertical_concat_pipeline(
alias=alias_idx,
vcf_files=compressed_vcfs,
concat_chunk_size=chunk_size,
concat_processing_dir=self.working_dir,
bcftools_binary=self.bcftools_binary
)
pipeline = NextFlowPipeline.join_pipelines(compress_pipeline, concat_pipeline)
compress_pipeline, compressed_vcfs = self.compress_and_index(alias_idx, vcfs)
if merge_type == MergeType.HORIZONTAL:
merge_pipeline, merged_filename = get_multistage_merge_pipeline(
alias=alias_idx,
vcf_files=compressed_vcfs,
chunk_size=chunk_size,
processing_dir=self.working_dir,
bcftools_binary=self.bcftools_binary,
process_name='merge',
process_command=self.merge_command
)
else:
merge_pipeline, merged_filename = get_multistage_merge_pipeline(
alias=alias_idx,
vcf_files=compressed_vcfs,
chunk_size=chunk_size,
processing_dir=self.working_dir,
bcftools_binary=self.bcftools_binary,
process_name='concat',
process_command=self.concat_command
)
pipeline = NextFlowPipeline.join_pipelines(compress_pipeline, merge_pipeline)
full_pipeline = NextFlowPipeline.join_pipelines(full_pipeline, pipeline)
merged_filenames[alias] = merged_filename
return full_pipeline, merged_filenames
Expand All @@ -139,7 +112,7 @@ def compress_and_index(self, alias, vcfs):
:param alias: name of group of vcf files (used to name Nextflow processes uniquely)
:param vcfs: list of vcf files
:return: dependency map, list of final index processes, and list of final filenames
:return: NextFlow pipeline and list of final filenames
"""
dependencies = {}
index_processes = []
Expand All @@ -160,5 +133,12 @@ def compress_and_index(self, alias, vcfs):
index_processes.append(index_process)
# each file's index depends only on compress (if present)
dependencies[index_process] = [compress_process] if compress_process else []
# TODO preferably return a NextFlowPipeline rather than dependencies & final processes
return dependencies, index_processes, compressed_vcfs
return NextFlowPipeline(dependencies), compressed_vcfs

def merge_command(self, files_to_merge_list, output_vcf_file):
return (f'{self.bcftools_binary} merge --merge all --file-list {files_to_merge_list} --threads 3 '
f'-O z -o {output_vcf_file}')

def concat_command(self, files_to_merge_list, output_vcf_file):
return (f'{self.bcftools_binary} concat --allow-overlaps --remove-duplicates --file-list {files_to_merge_list} '
f'-O z -o {output_vcf_file}')
81 changes: 44 additions & 37 deletions eva_vcf_merge/multistage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@

from ebi_eva_common_pyutils.nextflow import NextFlowPipeline, NextFlowProcess

from eva_vcf_merge.utils import write_files_to_list


def get_multistage_vertical_concat_pipeline(
def get_multistage_merge_pipeline(
alias,
vcf_files,
concat_processing_dir,
concat_chunk_size,
processing_dir,
chunk_size,
bcftools_binary,
process_name,
process_command,
stage=0,
prev_stage_processes=None,
pipeline=None
):
"""
# Generate Nextflow pipeline for multi-stage VCF concatenation of 5 VCF files with 2-VCFs concatenated at a time (CONCAT_CHUNK_SIZE=2)
# For illustration purposes only. Usually the CONCAT_CHUNK_SIZE is much higher (ex: 500).
Generate Nextflow pipeline for multi-stage VCF merging or concatenation.
# As an example, below is multi-stage VCF concatenation of 5 VCF files with 2-VCFs concatenated at a time (CHUNK_SIZE=2)
# For illustration purposes only. Usually the CHUNK_SIZE is much higher (ex: 500).
#
# vcf1 vcf2 vcf3 vcf4 vcf5
# \ / \ /
Expand All @@ -51,70 +53,75 @@ def get_multistage_vertical_concat_pipeline(
if not pipeline:
pipeline = NextFlowPipeline()
prev_stage_processes = []
# If we are left with only one file, this means we have reached the last concat stage
# If we are left with only one file, this means we have reached the last merge stage
if len(vcf_files) == 1:
return pipeline, vcf_files[0]

num_batches_in_stage = math.ceil(len(vcf_files) / concat_chunk_size)
num_batches_in_stage = math.ceil(len(vcf_files) / chunk_size)
curr_stage_processes = []
output_vcf_files_from_stage = []
for batch in range(0, num_batches_in_stage):
# split files in the current stage into chunks based on concat_chunk_size
files_in_batch = vcf_files[(concat_chunk_size * batch):(concat_chunk_size * (batch + 1))]
files_to_concat_list = write_files_to_concat_list(files_in_batch, stage, batch, concat_processing_dir)
output_vcf_file = get_output_vcf_file_name(alias, stage, batch, concat_processing_dir)
# split files in the current stage into chunks based on chunk_size
files_in_batch = vcf_files[(chunk_size * batch):(chunk_size * (batch + 1))]
files_to_merge_list = write_files_to_merge_list(process_name, alias, files_in_batch, stage, batch, processing_dir)
output_vcf_file = get_output_vcf_file_name(process_name, alias, stage, batch, processing_dir)

# separate concat & index processes
concat_process = NextFlowProcess(
process_name=f"concat{alias}_stage{stage}_batch{batch}",
command_to_run=f"{bcftools_binary} concat --allow-overlaps --remove-duplicates "
f"--file-list {files_to_concat_list} -o {output_vcf_file} -O z"
# separate merge & index processes
merge_process = NextFlowProcess(
process_name=f"{process_name}{alias}_stage{stage}_batch{batch}",
command_to_run=process_command(files_to_merge_list, output_vcf_file)
)
index_process = NextFlowProcess(
process_name=f"index{alias}_stage{stage}_batch{batch}",
command_to_run=f"{bcftools_binary} index --csi {output_vcf_file}"
)
# index depends only on this batch's concat
pipeline.add_dependencies({index_process: [concat_process]})
# index depends only on this batch's merge
pipeline.add_dependencies({index_process: [merge_process]})
# next stage requires indexing to be complete from this stage
curr_stage_processes.append(index_process)
output_vcf_files_from_stage.append(output_vcf_file)

# Concatenation batch in a given stage will have to wait until the completion of
# n batches in the previous stage where n = concat_chunk_size
# Merge batch in a given stage will have to wait until the completion of
# n batches in the previous stage where n = chunk_size
# Ex: In the illustration above stage 1/batch 0 depends on completion of stage 0/batch 0 and stage 0/batch 1
# While output of any n batches from the previous stage can be worked on as they become available,
# having a predictable formula simplifies pipeline generation and troubleshooting
prev_stage_dependencies = prev_stage_processes[(concat_chunk_size * batch):(concat_chunk_size * (batch + 1))]
pipeline.add_dependencies({concat_process: prev_stage_dependencies})
prev_stage_dependencies = prev_stage_processes[(chunk_size * batch):(chunk_size * (batch + 1))]
pipeline.add_dependencies({merge_process: prev_stage_dependencies})
prev_stage_processes = curr_stage_processes

return get_multistage_vertical_concat_pipeline(
return get_multistage_merge_pipeline(
alias, output_vcf_files_from_stage,
concat_processing_dir, concat_chunk_size,
processing_dir, chunk_size,
bcftools_binary,
process_name,
process_command,
stage=stage+1,
prev_stage_processes=prev_stage_processes,
pipeline=pipeline
)


def write_files_to_concat_list(files_to_concat, concat_stage, concat_batch, concat_processing_dir):
def write_files_to_merge_list(process_name, alias, files_to_merge, stage, batch, processing_dir):
"""
Write the list of files to be concatenated for a given stage and batch
Write the list of files to be merged for a given stage and batch
"""
output_dir = get_concat_output_dir(concat_stage, concat_processing_dir)
alias = f'batch{concat_batch}'
return write_files_to_list(files_to_concat, alias, output_dir)
output_dir = get_output_dir(process_name, alias, stage, processing_dir)
list_filename = os.path.join(output_dir, f"batch{batch}_files.list")
os.makedirs(os.path.dirname(list_filename), exist_ok=True)
with open(list_filename, "w") as handle:
for filename in files_to_merge:
handle.write(filename + "\n")
return list_filename


def get_concat_output_dir(concat_stage_index: int, concat_processing_dir: str):
def get_output_dir(process_name, alias, stage_index, processing_dir):
"""
Get the file name with the list of files to be concatenated for a given stage and batch in the concatenation process
Get the file name with the list of files to be merged for a given stage and batch in the merge process
"""
return os.path.join(concat_processing_dir, "vertical_concat", f"stage_{concat_stage_index}")
return os.path.join(processing_dir, f'{process_name}_{alias}', f"stage_{stage_index}")


def get_output_vcf_file_name(alias: str, concat_stage_index: int, concat_batch_index: int, concat_processing_dir: str):
return os.path.join(get_concat_output_dir(concat_stage_index, concat_processing_dir),
f"concat{alias}_output_stage{concat_stage_index}_batch{concat_batch_index}.vcf.gz")
def get_output_vcf_file_name(process_name, alias, stage_index, batch_index, processing_dir):
return os.path.join(get_output_dir(process_name, alias, stage_index, processing_dir),
f"{process_name}{alias}_output_stage{stage_index}_batch{batch_index}.vcf.gz")
10 changes: 0 additions & 10 deletions eva_vcf_merge/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@ def are_all_elements_unique(elements):
return True


def write_files_to_list(files, alias, output_dir):
"""Write the list of files to a path built from alias and output_dir."""
list_filename = os.path.join(output_dir, f"{alias}_files.list")
os.makedirs(os.path.dirname(list_filename), exist_ok=True)
with open(list_filename, "w") as handle:
for filename in files:
handle.write(filename + "\n")
return list_filename


def get_valid_filename(s):
"""Return string with characters not allowed in filenames replaced by underscore."""
return re.sub(r'[^-.0-9a-zA-Z]+', '_', s)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_concat_uninterrupted(vcf_merger, many_vcfs_to_concat):
# Final_merged <-------- Stage 2
vcfs = {'many': many_vcfs_to_concat}
vcf_merger.vertical_merge(vcfs, chunk_size=2, resume=False)
stage_dirs = glob.glob(f'{vcf_merger.working_dir}/vertical_concat/stage*')
stage_dirs = glob.glob(f'{vcf_merger.working_dir}/concat_0/stage*')
assert len(stage_dirs) == 3
output_vcf_from_multi_stage_concat = os.path.join(vcf_merger.output_dir, 'many_merged.vcf.gz')
output_vcf_from_single_stage_concat = f'{vcf_merger.output_dir}/single_stage_concat_result.vcf.gz'
Expand Down

0 comments on commit f5cf85b

Please sign in to comment.