diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index cb5ceeb..b5167d3 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -60,5 +60,5 @@ jobs: - name: Test nextflow workflow run: | # Run nextflow tests - export NXF_DEFAULT_DSL=1 + export NXF_DEFAULT_DSL=2 tests/nextflow-tests/run_tests.sh diff --git a/README.md b/README.md index 814477d..3d210cc 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ Example usage: # Run everything add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5 -# Run remapping and clustering only, resume and run on a specific instance -add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5 --tasks remap_cluster --instance 3 --resume +# Run remapping and clustering only, resume +add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5 --tasks remap_cluster --resume ``` ### Custom assembly generation diff --git a/bin/add_target_assembly.py b/bin/add_target_assembly.py index a4ce8b1..b297d5a 100644 --- a/bin/add_target_assembly.py +++ b/bin/add_target_assembly.py @@ -32,8 +32,6 @@ def main(): help='Task or set of tasks to perform (defaults to all)') argparse.add_argument('--release_version', required=True, type=int, help='Release version this assembly will be processed for') - argparse.add_argument('--instance', help="Accessioning instance id for clustering", required=False, default=6, - type=int, choices=range(1, 13)) argparse.add_argument('--resume', help='If a process has been run already this will resume it.', action='store_true', default=False) args = argparse.parse_args() @@ -45,7 +43,6 @@ def main(): job.run_all( tasks=args.tasks, - instance=args.instance, source_of_assembly=args.source_of_assembly, resume=args.resume ) diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index 006677d..0af5c06 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -31,6 +31,7 @@ from ebi_eva_common_pyutils.taxonomy.taxonomy import get_scientific_name_from_taxonomy from psycopg2.extras import execute_values +from eva_assembly_ingestion.config import get_nextflow_config_flag from eva_assembly_ingestion.parse_counts import count_variants_extracted, count_variants_remapped, \ count_variants_ingested @@ -67,11 +68,11 @@ def taxonomies(self): taxonomy_list = [self.source_taxonomy] return taxonomy_list - def run_all(self, tasks, instance, source_of_assembly, resume): + def run_all(self, tasks, source_of_assembly, resume): if 'load_tracker' in tasks: self.load_tracker() if 'remap_cluster' in tasks: - self.run_remapping_and_clustering(instance, resume) + self.run_remapping_and_clustering(resume) if 'update_dbs' in tasks: self.update_dbs(source_of_assembly) @@ -151,14 +152,14 @@ def get_source_assemblies_and_num_studies_dbsnp(self): ) return get_all_results_for_query(pg_conn, query) - def run_remapping_and_clustering(self, instance, resume): + def run_remapping_and_clustering(self, resume): """Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)""" source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies() for source_assembly, taxonomy_list in source_assemblies_and_taxonomies: self.info(f'Running remapping and clustering for the following assemblies: {source_assembly} ' f'for taxonomy {", ".join([str(t) for t in taxonomy_list])}') - self.process_one_assembly(source_assembly, taxonomy_list, instance, resume) + self.process_one_assembly(source_assembly, taxonomy_list, resume) def get_incomplete_assemblies_and_taxonomies(self): incomplete_assemblies = [] @@ -170,7 +171,7 @@ def get_incomplete_assemblies_and_taxonomies(self): incomplete_assemblies.append((source_assembly, taxonomies)) return incomplete_assemblies - def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume): + def process_one_assembly(self, source_assembly, taxonomy_list, resume): self.set_status_start(source_assembly, taxonomy_list) base_directory = cfg['remapping']['base_directory'] nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf') @@ -188,7 +189,6 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume ) clustering_template_file = self.create_clustering_properties( output_file_path=os.path.join(assembly_directory, 'clustering_template.properties'), - instance=instance, source_assembly=source_assembly ) @@ -207,7 +207,6 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume 'extraction_properties': extraction_properties_file, 'ingestion_properties': ingestion_properties_file, 'clustering_properties': clustering_template_file, - 'clustering_instance': instance, 'remapping_config': cfg.config_file, 'remapping_required': remapping_required } @@ -221,7 +220,8 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume '-log', remapping_log, 'run', nextflow_pipeline, '-params-file', remap_cluster_config_file, - '-work-dir', work_dir + '-work-dir', work_dir, + get_nextflow_config_flag() ] if resume: command.append('-resume') @@ -261,9 +261,8 @@ def create_ingestion_properties(self, output_file_path, source_assembly): open_file.write(properties) return output_file_path - def create_clustering_properties(self, output_file_path, instance, source_assembly): + def create_clustering_properties(self, output_file_path, source_assembly): properties = self.properties_generator.get_clustering_properties( - instance=instance, source_assembly=source_assembly, target_assembly=self.target_assembly, rs_report_path=f'{source_assembly}_to_{self.target_assembly}_rs_report.txt' diff --git a/eva_assembly_ingestion/config.py b/eva_assembly_ingestion/config.py index 4a87ff1..4e75132 100755 --- a/eva_assembly_ingestion/config.py +++ b/eva_assembly_ingestion/config.py @@ -14,3 +14,15 @@ def load_config(*args): os.getenv('ASSEMBLYCONFIG'), os.path.expanduser('~/.assembly_config.yml'), ) + + +def get_nextflow_config_flag(): + """ + Return the commandline flag for Nextflow to use the config provided in environment variable ASSEMBLY_NEXTFLOW_CONFIG. + If not provided, return an empty string, which allows Nextflow to use the default precedence as described here: + https://www.nextflow.io/docs/latest/config.html + """ + env_val = os.getenv('ASSEMBLY_NEXTFLOW_CONFIG') + if env_val: + return f'-c {env_val}' + return '' diff --git a/eva_assembly_ingestion/nextflow/remap_cluster.nf b/eva_assembly_ingestion/nextflow/remap_cluster.nf index aaaacd6..228aae9 100644 --- a/eva_assembly_ingestion/nextflow/remap_cluster.nf +++ b/eva_assembly_ingestion/nextflow/remap_cluster.nf @@ -16,18 +16,15 @@ def helpMessage() { --extraction_properties path to extraction properties file --ingestion_properties path to ingestion properties file --clustering_properties path to clustering properties file - --clustering_instance instance id to use for clustering --output_dir path to the directory where the output file should be copied. --remapping_config path to the remapping configuration file --remapping_required flag that sets the remapping as required if true otherwise the remapping is skipped and only the clustering can be run - --memory memory in GB to use for memory-hungry processes (e.g. Java), default 8GB """ } params.source_assembly_accession = null params.target_assembly_accession = null params.species_name = null -params.memory = 8 // help params.help = null @@ -52,6 +49,8 @@ empty_ch = params.remapping_required ? Channel.empty() : Channel.of(params.genom process retrieve_source_genome { + label 'short_time', 'med_mem' + when: source_assembly_accession != params.target_assembly_accession @@ -72,6 +71,7 @@ process retrieve_source_genome { process retrieve_target_genome { + label 'short_time', 'med_mem' input: val target_assembly_accession @@ -89,6 +89,7 @@ process retrieve_target_genome { } process update_source_genome { + label 'short_time', 'med_mem' input: val(source_assembly_accession) @@ -106,6 +107,7 @@ process update_source_genome { } process update_target_genome { + label 'short_time', 'med_mem' input: path target_fasta @@ -126,8 +128,7 @@ process update_target_genome { * Extract the submitted variants to remap from the accessioning warehouse and store them in a VCF file. */ process extract_vcf_from_mongo { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: path source_fasta @@ -142,7 +143,7 @@ process extract_vcf_from_mongo { publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" """ - java -Xmx8G -jar $params.jar.vcf_extractor \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_extractor \ --spring.config.location=file:${params.extraction_properties} \ --parameters.fasta=${source_fasta} \ --parameters.assemblyReportUrl=file:${source_report} \ @@ -156,7 +157,7 @@ process extract_vcf_from_mongo { * Variant remapping pipeline */ process remap_variants { - memory "${params.memory}GB" + label 'long_time', 'med_mem' input: each path(source_vcf) @@ -195,8 +196,7 @@ process remap_variants { * Ingest the remapped submitted variants from a VCF file into the accessioning warehouse. */ process ingest_vcf_into_mongo { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: each path(remapped_vcf) @@ -217,7 +217,7 @@ process ingest_vcf_into_mongo { loadTo=DBSNP fi - java -Xmx8G -jar $params.jar.vcf_ingestion \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_ingestion \ --spring.config.location=file:${params.ingestion_properties} \ --parameters.vcf=${remapped_vcf} \ --parameters.assemblyReportUrl=file:${target_report} \ @@ -227,8 +227,7 @@ process ingest_vcf_into_mongo { } process process_remapped_variants { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: path ingestion_log @@ -236,13 +235,12 @@ process process_remapped_variants { output: path "${source_to_target}_process_remapped.log", emit: process_remapped_log_filename - // TODO this also generates a rs report, for "newly remapped" rs - should we QC this separately? path "${source_to_target}_rs_report.txt", optional: true, emit: rs_report_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" """ - java -Xmx8G -jar $params.jar.clustering \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \ --spring.config.location=file:${params.clustering_properties} \ --spring.batch.job.names=PROCESS_REMAPPED_VARIANTS_WITH_RS_JOB \ > ${source_to_target}_process_remapped.log @@ -250,8 +248,7 @@ process process_remapped_variants { } process cluster_unclustered_variants { - memory "${params.memory}GB" - clusterOptions "-g /accession/instance-${params.clustering_instance}" + label 'long_time', 'med_mem' input: path process_remapped_log @@ -264,7 +261,7 @@ process cluster_unclustered_variants { publishDir "$params.output_dir/logs", overwrite: true, mode: "copy" """ - java -Xmx8G -jar $params.jar.clustering \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \ --spring.config.location=file:${params.clustering_properties} \ --spring.batch.job.names=CLUSTER_UNCLUSTERED_VARIANTS_JOB \ > ${source_to_target}_clustering.log @@ -275,8 +272,7 @@ process cluster_unclustered_variants { * Run clustering QC job */ process qc_clustering { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: path rs_report @@ -288,7 +284,7 @@ process qc_clustering { publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" """ - java -Xmx8G -jar $params.jar.clustering \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \ --spring.config.location=file:${params.clustering_properties} \ --spring.batch.job.names=NEW_CLUSTERED_VARIANTS_QC_JOB \ > ${source_to_target}_clustering_qc.log @@ -300,8 +296,7 @@ process qc_clustering { * Run Back propagation of new clustered RS */ process backpropagate_clusters { - memory "${params.memory}GB" - clusterOptions "-g /accession" + label 'long_time', 'med_mem' input: path "clustering_qc.log" @@ -312,7 +307,7 @@ process backpropagate_clusters { publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" """ - java -Xmx8G -jar $params.jar.clustering \ + java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \ --spring.config.location=file:${params.clustering_properties} \ --parameters.remappedFrom=${params.source_assembly_accession} \ --spring.batch.job.names=BACK_PROPAGATE_SPLIT_OR_MERGED_RS_JOB \ diff --git a/tests/nextflow-tests/nextflow.config b/tests/nextflow-tests/nextflow.config new file mode 100644 index 0000000..aba8fba --- /dev/null +++ b/tests/nextflow-tests/nextflow.config @@ -0,0 +1,6 @@ +process { + executor = 'local' + + time = '30m' + memory = '5 GB' +} \ No newline at end of file diff --git a/tests/nextflow-tests/run_tests.sh b/tests/nextflow-tests/run_tests.sh index 8baedcb..84d101c 100755 --- a/tests/nextflow-tests/run_tests.sh +++ b/tests/nextflow-tests/run_tests.sh @@ -20,7 +20,6 @@ nextflow run ${SOURCE_DIR}/eva_assembly_ingestion/nextflow/remap_cluster.nf -par --extraction_properties ${SCRIPT_DIR}/template.properties \ --ingestion_properties ${SCRIPT_DIR}/template.properties \ --clustering_properties ${SCRIPT_DIR}/template.properties \ - --clustering_instance 1 \ --output_dir ${SCRIPT_DIR}/output \ --remapping_config ${SCRIPT_DIR}/test_config.yaml \ --remapping_required 1 \