From e6fd39f46321ef76fa37ed59152bd9b210d1f31e Mon Sep 17 00:00:00 2001 From: tcezard Date: Wed, 20 Dec 2023 12:00:12 +0000 Subject: [PATCH] Migrate Remapping nextflow pipeline to DSL2 --- .../nextflow/remap_cluster.nf | 149 ++++++++++++------ 1 file changed, 97 insertions(+), 52 deletions(-) diff --git a/eva_assembly_ingestion/nextflow/remap_cluster.nf b/eva_assembly_ingestion/nextflow/remap_cluster.nf index 69b859e..13e3ae4 100644 --- a/eva_assembly_ingestion/nextflow/remap_cluster.nf +++ b/eva_assembly_ingestion/nextflow/remap_cluster.nf @@ -1,5 +1,8 @@ #!/usr/bin/env nextflow + +nextflow.enable.dsl=2 + def helpMessage() { log.info""" Remap one assembly version to another, cluster, and QC. @@ -41,67 +44,77 @@ if (!params.taxonomy_id || !params.source_assembly_accession || !params.target_a exit 1, helpMessage() } -species_name = params.species_name.toLowerCase().replace(" ", "_") -source_to_target = "${params.source_assembly_accession}_to_${params.target_assembly_accession}" // Create an channel that will either be empty if remapping will take place or contain a dummy value if not // This will allow to trigger the clustering even if no remapping is required // We're using params.genome_assembly_dir because the clustering process needs to receive a file object empty_ch = params.remapping_required ? Channel.empty() : Channel.of(params.genome_assembly_dir) + process retrieve_source_genome { + when: + source_assembly_accession != params.target_assembly_accession + + input: + each source_assembly_accession + val species_name output: - path "${params.source_assembly_accession}.fa" into source_fasta - path "${params.source_assembly_accession}_assembly_report.txt" into source_report + path "${params.source_assembly_accession}.fa", emit: source_fasta + path "${params.source_assembly_accession}_assembly_report.txt", emit: source_report """ - $params.executable.genome_downloader --assembly-accession ${params.source_assembly_accession} --species ${species_name} --output-directory ${params.genome_assembly_dir} - ln -s ${params.genome_assembly_dir}/${species_name}/${params.source_assembly_accession}/${params.source_assembly_accession}.fa - ln -s ${params.genome_assembly_dir}/${species_name}/${params.source_assembly_accession}/${params.source_assembly_accession}_assembly_report.txt + $params.executable.genome_downloader --assembly-accession ${source_assembly_accession} --species ${species_name} --output-directory ${params.genome_assembly_dir} + ln -s ${params.genome_assembly_dir}/${species_name}/${source_assembly_accession}/${source_assembly_accession}.fa + ln -s ${params.genome_assembly_dir}/${species_name}/${source_assembly_accession}/${source_assembly_accession}_assembly_report.txt """ } process retrieve_target_genome { + input: + each target_assembly_accession + val species_name + output: - path "${params.target_assembly_accession}.fa" into target_fasta - path "${params.target_assembly_accession}_assembly_report.txt" into target_report + path "${target_assembly_accession}.fa", emit: target_fasta + path "${target_assembly_accession}_assembly_report.txt", emit: target_report """ $params.executable.genome_downloader --assembly-accession ${params.target_assembly_accession} --species ${species_name} --output-directory ${params.genome_assembly_dir} - ln -s ${params.genome_assembly_dir}/${species_name}/${params.target_assembly_accession}/${params.target_assembly_accession}.fa - ln -s ${params.genome_assembly_dir}/${species_name}/${params.target_assembly_accession}/${params.target_assembly_accession}_assembly_report.txt + ln -s ${params.genome_assembly_dir}/${species_name}/${target_assembly_accession}/${target_assembly_accession}.fa + ln -s ${params.genome_assembly_dir}/${species_name}/${target_assembly_accession}/${target_assembly_accession}_assembly_report.txt """ } process update_source_genome { input: - path source_fasta from source_fasta - path source_report from source_report - env REMAPPINGCONFIG from params.remapping_config + val(source_assembly_accession) + path(source_fasta) + path(source_report) + env REMAPPINGCONFIG output: - path "${source_fasta.getBaseName()}_custom.fa" into updated_source_fasta - path "${source_report.getBaseName()}_custom.txt" into updated_source_report + path "${source_fasta.getBaseName()}_custom.fa", emit: updated_source_fasta + path "${source_report.getBaseName()}_custom.txt", emit: updated_source_report """ - ${params.executable.custom_assembly} --assembly-accession ${params.source_assembly_accession} --fasta-file ${source_fasta} --report-file ${source_report} + ${params.executable.custom_assembly} --assembly-accession ${source_assembly_accession} --fasta-file ${source_fasta} --report-file ${source_report} """ } process update_target_genome { input: - path target_fasta from target_fasta - path target_report from target_report - env REMAPPINGCONFIG from params.remapping_config + path target_fasta + path target_report + env REMAPPINGCONFIG output: - path "${target_fasta.getBaseName()}_custom.fa" into updated_target_fasta - path "${target_report.getBaseName()}_custom.txt" into updated_target_report + path "${target_fasta.getBaseName()}_custom.fa", emit: updated_target_fasta + path "${target_report.getBaseName()}_custom.txt", emit: updated_target_report """ ${params.executable.custom_assembly} --assembly-accession ${params.target_assembly_accession} --fasta-file ${target_fasta} --report-file ${target_report} --no-rename @@ -116,17 +129,14 @@ process extract_vcf_from_mongo { memory "${params.memory}GB" clusterOptions "-g /accession" - when: - params.remapping_required - input: - path source_fasta from updated_source_fasta - path source_report from updated_source_report + path source_fasta + path source_report output: - // Store both vcfs (eva and dbsnp) into one channel - path '*.vcf' into source_vcfs - path "${params.source_assembly_accession}_vcf_extractor.log" into log_filename + // Store both vcfs (eva and dbsnp), emit: one channel + path '*.vcf', emit: source_vcfs + path "${params.source_assembly_accession}_vcf_extractor.log", emit: log_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" @@ -147,14 +157,14 @@ process remap_variants { memory "${params.memory}GB" input: - path source_fasta from updated_source_fasta - path target_fasta from updated_target_fasta - path source_vcf from source_vcfs.flatten() + path source_fasta + path target_fasta + path source_vcf output: - path "${basename_source_vcf}_remapped.vcf" into remapped_vcfs - path "${basename_source_vcf}_remapped_unmapped.vcf" into unmapped_vcfs - path "${basename_source_vcf}_remapped_counts.yml" into remapped_ymls + path "${basename_source_vcf}_remapped.vcf", emit: remapped_vcfs + path "${basename_source_vcf}_remapped_unmapped.vcf", emit: unmapped_vcfs + path "${basename_source_vcf}_remapped_counts.yml", emit: remapped_ymls publishDir "$params.output_dir/eva", overwrite: true, mode: "copy", pattern: "*_eva_remapped*" publishDir "$params.output_dir/dbsnp", overwrite: true, mode: "copy", pattern: "*_dbsnp_remapped*" @@ -180,24 +190,24 @@ process remap_variants { /* - * Ingest the remapped submitted variants from a VCF file into the accessioning warehouse. + * Ingest the remapped submitted variants from a VCF file, emit: the accessioning warehouse. */ process ingest_vcf_into_mongo { memory "${params.memory}GB" clusterOptions "-g /accession" input: - path remapped_vcf from remapped_vcfs.flatten() - path target_report from updated_target_report + path remapped_vcf + path target_report output: - path "${remapped_vcf}_ingestion.log" into ingestion_log_filename + path "${remapped_vcf}_ingestion.log", emit: ingestion_log_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" script: """ - # Check the file name to know which database to load the variants into + # Check the file name to know which database to load the variants, emit: if [[ $remapped_vcf == *_eva_remapped.vcf ]] then loadTo=EVA @@ -219,12 +229,13 @@ process process_remapped_variants { clusterOptions "-g /accession" input: - path ingestion_log from empty_ch.mix(ingestion_log_filename.collect()) + path ingestion_log + val source_to_target output: - path "${source_to_target}_process_remapped.log" into process_remapped_log_filename + path "${source_to_target}_process_remapped.log", emit: process_remapped_log_filename // TODO this also generates a rs report, for "newly remapped" rs - should we QC this separately? - // path "${source_to_target}_rs_report.txt" optional true into rs_report_filename + path "${source_to_target}_rs_report.txt", optional: true, emit: rs_report_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" @@ -241,11 +252,12 @@ process cluster_unclustered_variants { clusterOptions "-g /accession/instance-${params.clustering_instance}" input: - path process_remapped_log from process_remapped_log_filename + path process_remapped_log + val source_to_target output: - path "${source_to_target}_clustering.log" into clustering_log_filename - path "${source_to_target}_rs_report.txt" optional true into rs_report_filename + path "${source_to_target}_clustering.log", emit: clustering_log_filename + path "${source_to_target}_rs_report.txt", optional: true, emit: rs_report_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy" @@ -265,10 +277,11 @@ process qc_clustering { clusterOptions "-g /accession" input: - path rs_report from rs_report_filename + path rs_report + val source_to_target output: - path "${source_to_target}_clustering_qc.log" into clustering_qc_log_filename + path "${source_to_target}_clustering_qc.log", emit: clustering_qc_log_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" @@ -289,10 +302,10 @@ process backpropagate_clusters { clusterOptions "-g /accession" input: - path "clustering_qc.log" from clustering_qc_log_filename + path "clustering_qc.log" output: - path "${params.target_assembly_accession}_backpropagate_to_${params.source_assembly_accession}.log" into backpropagate_log_filename + path "${params.target_assembly_accession}_backpropagate_to_${params.source_assembly_accession}.log", emit: backpropagate_log_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" @@ -303,4 +316,36 @@ process backpropagate_clusters { --spring.batch.job.names=BACK_PROPAGATE_SPLIT_OR_MERGED_RS_JOB \ > ${params.target_assembly_accession}_backpropagate_to_${params.source_assembly_accession}.log """ -} \ No newline at end of file +} + +workflow { + main: + species_name = params.species_name.toLowerCase().replace(" ", "_") + source_to_target = "${params.source_assembly_accession}_to_${params.target_assembly_accession}" + + params.remapping_required = params.source_assembly_accession.any {it != params.target_assembly_accession} + if (params.remapping_required){ + retrieve_source_genome(params.source_assembly_accession, species_name) + retrieve_target_genome(params.target_assembly_accession, species_name) + update_source_genome(params.source_assembly_accession, retrieve_source_genome.out.source_fasta, + retrieve_source_genome.out.source_report, params.remapping_config) + update_target_genome(retrieve_target_genome.out.target_fasta, retrieve_target_genome.out.target_report, params.remapping_config) + extract_vcf_from_mongo(update_source_genome.out.updated_source_fasta, update_source_genome.out.updated_source_report) + remap_variants(extract_vcf_from_mongo.out.source_vcfs, update_source_genome.out.updated_source_fasta, + update_target_genome.out.updated_target_fasta) + ingest_vcf_into_mongo(remap_variants.out.remapped_vcfs.flatten(), update_target_genome.out.updated_target_report) + process_remapped_variants(ingest_vcf_into_mongo.out.ingestion_log_filename.collect(), source_to_target) + cluster_unclustered_variants(process_remapped_variants.out.process_remapped_log_filename, source_to_target) + process_remapped_variants.out.rs_report_filename + .concat(cluster_unclustered_variants.out.rs_report_filename) + .set{ rs_reports } + qc_clustering(rs_reports, source_to_target) + backpropagate_clusters(qc_clustering.out.clustering_qc_log_filename.collect()) + }else{ + // We're using params.genome_assembly_dir because cluster_unclustered_variants needs to receive a file object + cluster_unclustered_variants(params.genome_assembly_dir) + qc_clustering(cluster_unclustered_variants.out.rs_report_filename) + } + +} +