Skip to content

Commit

Permalink
Merge pull request #6 from apriltuesday/EVA-2631
Browse files Browse the repository at this point in the history
EVA-2631: fix for multiple analysis concat
  • Loading branch information
apriltuesday authored Nov 4, 2021
2 parents 01a35fc + 6f8465b commit c08d416
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
1 change: 1 addition & 0 deletions eva_vcf_merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def generate_vertical_merge_pipeline(self, vcf_groups, chunk_size):
deps, index_processes, compressed_vcfs = self.compress_and_index(alias_idx, vcfs)
compress_pipeline = NextFlowPipeline(deps)
concat_pipeline, merged_filename = get_multistage_vertical_concat_pipeline(
alias=alias_idx,
vcf_files=compressed_vcfs,
concat_chunk_size=chunk_size,
concat_processing_dir=self.working_dir,
Expand Down
13 changes: 7 additions & 6 deletions eva_vcf_merge/multistage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


def get_multistage_vertical_concat_pipeline(
alias,
vcf_files,
concat_processing_dir,
concat_chunk_size,
Expand Down Expand Up @@ -61,16 +62,16 @@ def get_multistage_vertical_concat_pipeline(
# split files in the current stage into chunks based on concat_chunk_size
files_in_batch = vcf_files[(concat_chunk_size * batch):(concat_chunk_size * (batch + 1))]
files_to_concat_list = write_files_to_concat_list(files_in_batch, stage, batch, concat_processing_dir)
output_vcf_file = get_output_vcf_file_name(stage, batch, concat_processing_dir)
output_vcf_file = get_output_vcf_file_name(alias, stage, batch, concat_processing_dir)

# separate concat & index processes
concat_process = NextFlowProcess(
process_name=f"concat_stage{stage}_batch{batch}",
process_name=f"concat{alias}_stage{stage}_batch{batch}",
command_to_run=f"{bcftools_binary} concat --allow-overlaps --remove-duplicates "
f"--file-list {files_to_concat_list} -o {output_vcf_file} -O z"
)
index_process = NextFlowProcess(
process_name=f"index_stage{stage}_batch{batch}",
process_name=f"index{alias}_stage{stage}_batch{batch}",
command_to_run=f"{bcftools_binary} index --csi {output_vcf_file}"
)
# index depends only on this batch's concat
Expand All @@ -89,7 +90,7 @@ def get_multistage_vertical_concat_pipeline(
prev_stage_processes = curr_stage_processes

return get_multistage_vertical_concat_pipeline(
output_vcf_files_from_stage,
alias, output_vcf_files_from_stage,
concat_processing_dir, concat_chunk_size,
bcftools_binary,
stage=stage+1,
Expand All @@ -114,6 +115,6 @@ def get_concat_output_dir(concat_stage_index: int, concat_processing_dir: str):
return os.path.join(concat_processing_dir, "vertical_concat", f"stage_{concat_stage_index}")


def get_output_vcf_file_name(concat_stage_index: int, concat_batch_index: int, concat_processing_dir: str):
def get_output_vcf_file_name(alias: str, concat_stage_index: int, concat_batch_index: int, concat_processing_dir: str):
return os.path.join(get_concat_output_dir(concat_stage_index, concat_processing_dir),
f"concat_output_stage{concat_stage_index}_batch{concat_batch_index}.vcf.gz")
f"concat{alias}_output_stage{concat_stage_index}_batch{concat_batch_index}.vcf.gz")
10 changes: 10 additions & 0 deletions tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def test_vertical_merge(vcf_merger, same_samples_vcfs):
assert_all_files_present(filenames.values())


def test_vertical_merge_multiple_groups(vcf_merger, same_samples_vcfs):
vcfs = {'1': same_samples_vcfs, '2': same_samples_vcfs}
filenames = vcf_merger.vertical_merge(vcfs, resume=False)
assert filenames == {
'1': os.path.join(vcf_merger.output_dir, '1_merged.vcf.gz'),
'2': os.path.join(vcf_merger.output_dir, '2_merged.vcf.gz')
}
assert_all_files_present(filenames.values())


def test_concat_uninterrupted(vcf_merger, many_vcfs_to_concat):
# s0.vcf.gz s1.vcf.gz s2.vcf.gz s3.vcf.gz s4.vcf.gz
# \ / \ /
Expand Down

0 comments on commit c08d416

Please sign in to comment.