Skip to content

Commit

Permalink
migrate Nextflow for running on SLURM
Browse files Browse the repository at this point in the history
  • Loading branch information
apriltuesday committed Jun 4, 2024
1 parent ff8384c commit a072bf1
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ jobs:
- name: Test nextflow workflow
run: |
# Run nextflow tests
export NXF_DEFAULT_DSL=1
export NXF_DEFAULT_DSL=2
tests/nextflow-tests/run_tests.sh
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Example usage:
# Run everything
add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5

# Run remapping and clustering only, resume and run on a specific instance
add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5 --tasks remap_cluster --instance 3 --resume
# Run remapping and clustering only, resume
add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5 --tasks remap_cluster --resume
```

### Custom assembly generation
Expand Down
3 changes: 0 additions & 3 deletions bin/add_target_assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ def main():
help='Task or set of tasks to perform (defaults to all)')
argparse.add_argument('--release_version', required=True, type=int,
help='Release version this assembly will be processed for')
argparse.add_argument('--instance', help="Accessioning instance id for clustering", required=False, default=6,
type=int, choices=range(1, 13))
argparse.add_argument('--resume', help='If a process has been run already this will resume it.',
action='store_true', default=False)
args = argparse.parse_args()
Expand All @@ -45,7 +43,6 @@ def main():

job.run_all(
tasks=args.tasks,
instance=args.instance,
source_of_assembly=args.source_of_assembly,
resume=args.resume
)
Expand Down
19 changes: 9 additions & 10 deletions eva_assembly_ingestion/assembly_ingestion_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from ebi_eva_common_pyutils.taxonomy.taxonomy import get_scientific_name_from_taxonomy
from psycopg2.extras import execute_values

from eva_assembly_ingestion.config import get_nextflow_config_flag
from eva_assembly_ingestion.parse_counts import count_variants_extracted, count_variants_remapped, \
count_variants_ingested

Expand Down Expand Up @@ -67,11 +68,11 @@ def taxonomies(self):
taxonomy_list = [self.source_taxonomy]
return taxonomy_list

def run_all(self, tasks, instance, source_of_assembly, resume):
def run_all(self, tasks, source_of_assembly, resume):
if 'load_tracker' in tasks:
self.load_tracker()
if 'remap_cluster' in tasks:
self.run_remapping_and_clustering(instance, resume)
self.run_remapping_and_clustering(resume)
if 'update_dbs' in tasks:
self.update_dbs(source_of_assembly)

Expand Down Expand Up @@ -151,14 +152,14 @@ def get_source_assemblies_and_num_studies_dbsnp(self):
)
return get_all_results_for_query(pg_conn, query)

def run_remapping_and_clustering(self, instance, resume):
def run_remapping_and_clustering(self, resume):
"""Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming
the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)"""
source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies()
for source_assembly, taxonomy_list in source_assemblies_and_taxonomies:
self.info(f'Running remapping and clustering for the following assemblies: {source_assembly} '
f'for taxonomy {", ".join([str(t) for t in taxonomy_list])}')
self.process_one_assembly(source_assembly, taxonomy_list, instance, resume)
self.process_one_assembly(source_assembly, taxonomy_list, resume)

def get_incomplete_assemblies_and_taxonomies(self):
incomplete_assemblies = []
Expand All @@ -170,7 +171,7 @@ def get_incomplete_assemblies_and_taxonomies(self):
incomplete_assemblies.append((source_assembly, taxonomies))
return incomplete_assemblies

def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume):
def process_one_assembly(self, source_assembly, taxonomy_list, resume):
self.set_status_start(source_assembly, taxonomy_list)
base_directory = cfg['remapping']['base_directory']
nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf')
Expand All @@ -188,7 +189,6 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume
)
clustering_template_file = self.create_clustering_properties(
output_file_path=os.path.join(assembly_directory, 'clustering_template.properties'),
instance=instance,
source_assembly=source_assembly
)

Expand All @@ -207,7 +207,6 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume
'extraction_properties': extraction_properties_file,
'ingestion_properties': ingestion_properties_file,
'clustering_properties': clustering_template_file,
'clustering_instance': instance,
'remapping_config': cfg.config_file,
'remapping_required': remapping_required
}
Expand All @@ -221,7 +220,8 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume
'-log', remapping_log,
'run', nextflow_pipeline,
'-params-file', remap_cluster_config_file,
'-work-dir', work_dir
'-work-dir', work_dir,
get_nextflow_config_flag()
]
if resume:
command.append('-resume')
Expand Down Expand Up @@ -261,9 +261,8 @@ def create_ingestion_properties(self, output_file_path, source_assembly):
open_file.write(properties)
return output_file_path

def create_clustering_properties(self, output_file_path, instance, source_assembly):
def create_clustering_properties(self, output_file_path, source_assembly):
properties = self.properties_generator.get_clustering_properties(
instance=instance,
source_assembly=source_assembly,
target_assembly=self.target_assembly,
rs_report_path=f'{source_assembly}_to_{self.target_assembly}_rs_report.txt'
Expand Down
12 changes: 12 additions & 0 deletions eva_assembly_ingestion/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ def load_config(*args):
os.getenv('ASSEMBLYCONFIG'),
os.path.expanduser('~/.assembly_config.yml'),
)


def get_nextflow_config_flag():
"""
Return the commandline flag for Nextflow to use the config provided in environment variable ASSEMBLY_NEXTFLOW_CONFIG.
If not provided, return an empty string, which allows Nextflow to use the default precedence as described here:
https://www.nextflow.io/docs/latest/config.html
"""
env_val = os.getenv('ASSEMBLY_NEXTFLOW_CONFIG')
if env_val:
return f'-c {env_val}'
return ''
41 changes: 18 additions & 23 deletions eva_assembly_ingestion/nextflow/remap_cluster.nf
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@ def helpMessage() {
--extraction_properties path to extraction properties file
--ingestion_properties path to ingestion properties file
--clustering_properties path to clustering properties file
--clustering_instance instance id to use for clustering
--output_dir path to the directory where the output file should be copied.
--remapping_config path to the remapping configuration file
--remapping_required flag that sets the remapping as required if true otherwise the remapping is skipped and only the clustering can be run
--memory memory in GB to use for memory-hungry processes (e.g. Java), default 8GB
"""
}

params.source_assembly_accession = null
params.target_assembly_accession = null
params.species_name = null
params.memory = 8
// help
params.help = null

Expand All @@ -52,6 +49,8 @@ empty_ch = params.remapping_required ? Channel.empty() : Channel.of(params.genom


process retrieve_source_genome {
label 'short_time', 'med_mem'

when:
source_assembly_accession != params.target_assembly_accession

Expand All @@ -72,6 +71,7 @@ process retrieve_source_genome {


process retrieve_target_genome {
label 'short_time', 'med_mem'

input:
val target_assembly_accession
Expand All @@ -89,6 +89,7 @@ process retrieve_target_genome {
}

process update_source_genome {
label 'short_time', 'med_mem'

input:
val(source_assembly_accession)
Expand All @@ -106,6 +107,7 @@ process update_source_genome {
}

process update_target_genome {
label 'short_time', 'med_mem'

input:
path target_fasta
Expand All @@ -126,8 +128,7 @@ process update_target_genome {
* Extract the submitted variants to remap from the accessioning warehouse and store them in a VCF file.
*/
process extract_vcf_from_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path source_fasta
Expand All @@ -142,7 +143,7 @@ process extract_vcf_from_mongo {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.vcf_extractor \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_extractor \
--spring.config.location=file:${params.extraction_properties} \
--parameters.fasta=${source_fasta} \
--parameters.assemblyReportUrl=file:${source_report} \
Expand All @@ -156,7 +157,7 @@ process extract_vcf_from_mongo {
* Variant remapping pipeline
*/
process remap_variants {
memory "${params.memory}GB"
label 'long_time', 'med_mem'

input:
each path(source_vcf)
Expand Down Expand Up @@ -195,8 +196,7 @@ process remap_variants {
* Ingest the remapped submitted variants from a VCF file into the accessioning warehouse.
*/
process ingest_vcf_into_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
each path(remapped_vcf)
Expand All @@ -217,7 +217,7 @@ process ingest_vcf_into_mongo {
loadTo=DBSNP
fi
java -Xmx8G -jar $params.jar.vcf_ingestion \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_ingestion \
--spring.config.location=file:${params.ingestion_properties} \
--parameters.vcf=${remapped_vcf} \
--parameters.assemblyReportUrl=file:${target_report} \
Expand All @@ -227,31 +227,28 @@ process ingest_vcf_into_mongo {
}

process process_remapped_variants {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path ingestion_log
val source_to_target

output:
path "${source_to_target}_process_remapped.log", emit: process_remapped_log_filename
// TODO this also generates a rs report, for "newly remapped" rs - should we QC this separately?
path "${source_to_target}_rs_report.txt", optional: true, emit: rs_report_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=PROCESS_REMAPPED_VARIANTS_WITH_RS_JOB \
> ${source_to_target}_process_remapped.log
"""
}

process cluster_unclustered_variants {
memory "${params.memory}GB"
clusterOptions "-g /accession/instance-${params.clustering_instance}"
label 'long_time', 'med_mem'

input:
path process_remapped_log
Expand All @@ -264,7 +261,7 @@ process cluster_unclustered_variants {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=CLUSTER_UNCLUSTERED_VARIANTS_JOB \
> ${source_to_target}_clustering.log
Expand All @@ -275,8 +272,7 @@ process cluster_unclustered_variants {
* Run clustering QC job
*/
process qc_clustering {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path rs_report
Expand All @@ -288,7 +284,7 @@ process qc_clustering {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=NEW_CLUSTERED_VARIANTS_QC_JOB \
> ${source_to_target}_clustering_qc.log
Expand All @@ -300,8 +296,7 @@ process qc_clustering {
* Run Back propagation of new clustered RS
*/
process backpropagate_clusters {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path "clustering_qc.log"
Expand All @@ -312,7 +307,7 @@ process backpropagate_clusters {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--parameters.remappedFrom=${params.source_assembly_accession} \
--spring.batch.job.names=BACK_PROPAGATE_SPLIT_OR_MERGED_RS_JOB \
Expand Down
6 changes: 6 additions & 0 deletions tests/nextflow-tests/nextflow.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
process {
executor = 'local'

time = '30m'
memory = '5 GB'
}
1 change: 0 additions & 1 deletion tests/nextflow-tests/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ nextflow run ${SOURCE_DIR}/eva_assembly_ingestion/nextflow/remap_cluster.nf -par
--extraction_properties ${SCRIPT_DIR}/template.properties \
--ingestion_properties ${SCRIPT_DIR}/template.properties \
--clustering_properties ${SCRIPT_DIR}/template.properties \
--clustering_instance 1 \
--output_dir ${SCRIPT_DIR}/output \
--remapping_config ${SCRIPT_DIR}/test_config.yaml \
--remapping_required 1 \
Expand Down

0 comments on commit a072bf1

Please sign in to comment.