Skip to content

Commit

Permalink
Migrate Remapping nextflow pipeline to DSL2
Browse files Browse the repository at this point in the history
  • Loading branch information
tcezard committed Dec 20, 2023
1 parent 33c3828 commit e6fd39f
Showing 1 changed file with 97 additions and 52 deletions.
149 changes: 97 additions & 52 deletions eva_assembly_ingestion/nextflow/remap_cluster.nf
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/usr/bin/env nextflow


nextflow.enable.dsl=2

def helpMessage() {
log.info"""
Remap one assembly version to another, cluster, and QC.
Expand Down Expand Up @@ -41,67 +44,77 @@ if (!params.taxonomy_id || !params.source_assembly_accession || !params.target_a
exit 1, helpMessage()
}

species_name = params.species_name.toLowerCase().replace(" ", "_")
source_to_target = "${params.source_assembly_accession}_to_${params.target_assembly_accession}"

// Create an channel that will either be empty if remapping will take place or contain a dummy value if not
// This will allow to trigger the clustering even if no remapping is required
// We're using params.genome_assembly_dir because the clustering process needs to receive a file object
empty_ch = params.remapping_required ? Channel.empty() : Channel.of(params.genome_assembly_dir)


process retrieve_source_genome {
when:
source_assembly_accession != params.target_assembly_accession

input:
each source_assembly_accession
val species_name

output:
path "${params.source_assembly_accession}.fa" into source_fasta
path "${params.source_assembly_accession}_assembly_report.txt" into source_report
path "${params.source_assembly_accession}.fa", emit: source_fasta
path "${params.source_assembly_accession}_assembly_report.txt", emit: source_report

"""
$params.executable.genome_downloader --assembly-accession ${params.source_assembly_accession} --species ${species_name} --output-directory ${params.genome_assembly_dir}
ln -s ${params.genome_assembly_dir}/${species_name}/${params.source_assembly_accession}/${params.source_assembly_accession}.fa
ln -s ${params.genome_assembly_dir}/${species_name}/${params.source_assembly_accession}/${params.source_assembly_accession}_assembly_report.txt
$params.executable.genome_downloader --assembly-accession ${source_assembly_accession} --species ${species_name} --output-directory ${params.genome_assembly_dir}
ln -s ${params.genome_assembly_dir}/${species_name}/${source_assembly_accession}/${source_assembly_accession}.fa
ln -s ${params.genome_assembly_dir}/${species_name}/${source_assembly_accession}/${source_assembly_accession}_assembly_report.txt
"""
}


process retrieve_target_genome {

input:
each target_assembly_accession
val species_name

output:
path "${params.target_assembly_accession}.fa" into target_fasta
path "${params.target_assembly_accession}_assembly_report.txt" into target_report
path "${target_assembly_accession}.fa", emit: target_fasta
path "${target_assembly_accession}_assembly_report.txt", emit: target_report

"""
$params.executable.genome_downloader --assembly-accession ${params.target_assembly_accession} --species ${species_name} --output-directory ${params.genome_assembly_dir}
ln -s ${params.genome_assembly_dir}/${species_name}/${params.target_assembly_accession}/${params.target_assembly_accession}.fa
ln -s ${params.genome_assembly_dir}/${species_name}/${params.target_assembly_accession}/${params.target_assembly_accession}_assembly_report.txt
ln -s ${params.genome_assembly_dir}/${species_name}/${target_assembly_accession}/${target_assembly_accession}.fa
ln -s ${params.genome_assembly_dir}/${species_name}/${target_assembly_accession}/${target_assembly_accession}_assembly_report.txt
"""
}

process update_source_genome {

input:
path source_fasta from source_fasta
path source_report from source_report
env REMAPPINGCONFIG from params.remapping_config
val(source_assembly_accession)
path(source_fasta)
path(source_report)
env REMAPPINGCONFIG

output:
path "${source_fasta.getBaseName()}_custom.fa" into updated_source_fasta
path "${source_report.getBaseName()}_custom.txt" into updated_source_report
path "${source_fasta.getBaseName()}_custom.fa", emit: updated_source_fasta
path "${source_report.getBaseName()}_custom.txt", emit: updated_source_report

"""
${params.executable.custom_assembly} --assembly-accession ${params.source_assembly_accession} --fasta-file ${source_fasta} --report-file ${source_report}
${params.executable.custom_assembly} --assembly-accession ${source_assembly_accession} --fasta-file ${source_fasta} --report-file ${source_report}
"""
}

process update_target_genome {

input:
path target_fasta from target_fasta
path target_report from target_report
env REMAPPINGCONFIG from params.remapping_config
path target_fasta
path target_report
env REMAPPINGCONFIG

output:
path "${target_fasta.getBaseName()}_custom.fa" into updated_target_fasta
path "${target_report.getBaseName()}_custom.txt" into updated_target_report
path "${target_fasta.getBaseName()}_custom.fa", emit: updated_target_fasta
path "${target_report.getBaseName()}_custom.txt", emit: updated_target_report

"""
${params.executable.custom_assembly} --assembly-accession ${params.target_assembly_accession} --fasta-file ${target_fasta} --report-file ${target_report} --no-rename
Expand All @@ -116,17 +129,14 @@ process extract_vcf_from_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"

when:
params.remapping_required

input:
path source_fasta from updated_source_fasta
path source_report from updated_source_report
path source_fasta
path source_report

output:
// Store both vcfs (eva and dbsnp) into one channel
path '*.vcf' into source_vcfs
path "${params.source_assembly_accession}_vcf_extractor.log" into log_filename
// Store both vcfs (eva and dbsnp), emit: one channel
path '*.vcf', emit: source_vcfs
path "${params.source_assembly_accession}_vcf_extractor.log", emit: log_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

Expand All @@ -147,14 +157,14 @@ process remap_variants {
memory "${params.memory}GB"

input:
path source_fasta from updated_source_fasta
path target_fasta from updated_target_fasta
path source_vcf from source_vcfs.flatten()
path source_fasta
path target_fasta
path source_vcf

output:
path "${basename_source_vcf}_remapped.vcf" into remapped_vcfs
path "${basename_source_vcf}_remapped_unmapped.vcf" into unmapped_vcfs
path "${basename_source_vcf}_remapped_counts.yml" into remapped_ymls
path "${basename_source_vcf}_remapped.vcf", emit: remapped_vcfs
path "${basename_source_vcf}_remapped_unmapped.vcf", emit: unmapped_vcfs
path "${basename_source_vcf}_remapped_counts.yml", emit: remapped_ymls

publishDir "$params.output_dir/eva", overwrite: true, mode: "copy", pattern: "*_eva_remapped*"
publishDir "$params.output_dir/dbsnp", overwrite: true, mode: "copy", pattern: "*_dbsnp_remapped*"
Expand All @@ -180,24 +190,24 @@ process remap_variants {


/*
* Ingest the remapped submitted variants from a VCF file into the accessioning warehouse.
* Ingest the remapped submitted variants from a VCF file, emit: the accessioning warehouse.
*/
process ingest_vcf_into_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"

input:
path remapped_vcf from remapped_vcfs.flatten()
path target_report from updated_target_report
path remapped_vcf
path target_report

output:
path "${remapped_vcf}_ingestion.log" into ingestion_log_filename
path "${remapped_vcf}_ingestion.log", emit: ingestion_log_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

script:
"""
# Check the file name to know which database to load the variants into
# Check the file name to know which database to load the variants, emit:
if [[ $remapped_vcf == *_eva_remapped.vcf ]]
then
loadTo=EVA
Expand All @@ -219,12 +229,13 @@ process process_remapped_variants {
clusterOptions "-g /accession"

input:
path ingestion_log from empty_ch.mix(ingestion_log_filename.collect())
path ingestion_log
val source_to_target

output:
path "${source_to_target}_process_remapped.log" into process_remapped_log_filename
path "${source_to_target}_process_remapped.log", emit: process_remapped_log_filename
// TODO this also generates a rs report, for "newly remapped" rs - should we QC this separately?
// path "${source_to_target}_rs_report.txt" optional true into rs_report_filename
path "${source_to_target}_rs_report.txt", optional: true, emit: rs_report_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

Expand All @@ -241,11 +252,12 @@ process cluster_unclustered_variants {
clusterOptions "-g /accession/instance-${params.clustering_instance}"

input:
path process_remapped_log from process_remapped_log_filename
path process_remapped_log
val source_to_target

output:
path "${source_to_target}_clustering.log" into clustering_log_filename
path "${source_to_target}_rs_report.txt" optional true into rs_report_filename
path "${source_to_target}_clustering.log", emit: clustering_log_filename
path "${source_to_target}_rs_report.txt", optional: true, emit: rs_report_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy"

Expand All @@ -265,10 +277,11 @@ process qc_clustering {
clusterOptions "-g /accession"

input:
path rs_report from rs_report_filename
path rs_report
val source_to_target

output:
path "${source_to_target}_clustering_qc.log" into clustering_qc_log_filename
path "${source_to_target}_clustering_qc.log", emit: clustering_qc_log_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

Expand All @@ -289,10 +302,10 @@ process backpropagate_clusters {
clusterOptions "-g /accession"

input:
path "clustering_qc.log" from clustering_qc_log_filename
path "clustering_qc.log"

output:
path "${params.target_assembly_accession}_backpropagate_to_${params.source_assembly_accession}.log" into backpropagate_log_filename
path "${params.target_assembly_accession}_backpropagate_to_${params.source_assembly_accession}.log", emit: backpropagate_log_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

Expand All @@ -303,4 +316,36 @@ process backpropagate_clusters {
--spring.batch.job.names=BACK_PROPAGATE_SPLIT_OR_MERGED_RS_JOB \
> ${params.target_assembly_accession}_backpropagate_to_${params.source_assembly_accession}.log
"""
}
}

workflow {
main:
species_name = params.species_name.toLowerCase().replace(" ", "_")
source_to_target = "${params.source_assembly_accession}_to_${params.target_assembly_accession}"

params.remapping_required = params.source_assembly_accession.any {it != params.target_assembly_accession}
if (params.remapping_required){
retrieve_source_genome(params.source_assembly_accession, species_name)
retrieve_target_genome(params.target_assembly_accession, species_name)
update_source_genome(params.source_assembly_accession, retrieve_source_genome.out.source_fasta,
retrieve_source_genome.out.source_report, params.remapping_config)
update_target_genome(retrieve_target_genome.out.target_fasta, retrieve_target_genome.out.target_report, params.remapping_config)
extract_vcf_from_mongo(update_source_genome.out.updated_source_fasta, update_source_genome.out.updated_source_report)
remap_variants(extract_vcf_from_mongo.out.source_vcfs, update_source_genome.out.updated_source_fasta,
update_target_genome.out.updated_target_fasta)
ingest_vcf_into_mongo(remap_variants.out.remapped_vcfs.flatten(), update_target_genome.out.updated_target_report)
process_remapped_variants(ingest_vcf_into_mongo.out.ingestion_log_filename.collect(), source_to_target)
cluster_unclustered_variants(process_remapped_variants.out.process_remapped_log_filename, source_to_target)
process_remapped_variants.out.rs_report_filename
.concat(cluster_unclustered_variants.out.rs_report_filename)
.set{ rs_reports }
qc_clustering(rs_reports, source_to_target)
backpropagate_clusters(qc_clustering.out.clustering_qc_log_filename.collect())
}else{
// We're using params.genome_assembly_dir because cluster_unclustered_variants needs to receive a file object
cluster_unclustered_variants(params.genome_assembly_dir)
qc_clustering(cluster_unclustered_variants.out.rs_report_filename)
}

}

0 comments on commit e6fd39f

Please sign in to comment.