From c86ccf0d6349fdf0badd54fa50b01104312b2a86 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 18 Dec 2023 23:13:20 +0000 Subject: [PATCH 1/8] First path at adding multy taxonomy support --- bin/add_target_assembly.py | 3 - .../assembly_ingestion_job.py | 116 ++++++++++++------ 2 files changed, 81 insertions(+), 38 deletions(-) diff --git a/bin/add_target_assembly.py b/bin/add_target_assembly.py index bf1cd52..a4ce8b1 100644 --- a/bin/add_target_assembly.py +++ b/bin/add_target_assembly.py @@ -40,9 +40,6 @@ def main(): load_config() - if not args.taxonomy or not args.target_assembly or not args.release_version: - raise ArgumentError(None, 'Must provide --taxonomy, --target_assembly, and --release_version') - job = AssemblyIngestionJob(args.taxonomy, args.target_assembly, args.release_version) logging_config.add_stdout_handler() diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index 4307be2..69133b7 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -14,6 +14,7 @@ import datetime import os import subprocess +from functools import lru_cache import yaml from cached_property import cached_property @@ -33,22 +34,38 @@ from eva_assembly_ingestion.parse_counts import count_variants_extracted, count_variants_remapped, \ count_variants_ingested +SUPPORTED_ASSEMBLY_TRACKER_TABLE = "evapro.supported_assembly_tracker" class AssemblyIngestionJob(AppLogger): all_tasks = ['load_tracker', 'remap_cluster', 'update_dbs'] tracking_table = 'eva_progress_tracker.remapping_tracker' def __init__(self, taxonomy, target_assembly, release_version): - self.taxonomy = taxonomy self.target_assembly = target_assembly self.release_version = release_version self.private_settings_file = cfg['maven']['settings_file'] self.maven_profile = cfg['maven']['environment'] self.properties_generator = SpringPropertiesGenerator(self.maven_profile, self.private_settings_file) + self.source_taxonomy = taxonomy + + @lru_cache + def scientific_name(self, taxonomy): + return get_scientific_name_from_taxonomy(taxonomy) @cached_property - def scientific_name(self): - return get_scientific_name_from_taxonomy(self.taxonomy) + def taxonomies(self): + # The taxonomy involved are all the taxonomies that have the same current target assembly + taxonomy_query = ( + f"select taxonomy from SUPPORTED_ASSEMBLY_TRACKER_TABLE where current=true AND assembly_id in (" + f" SELECT assembly_id FROM {SUPPORTED_ASSEMBLY_TRACKER_TABLE} " + f" WHERE taxonomy_id={self.source_taxonomy} AND current=true" + f");" + ) + with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: + taxonomies = (t for t, in get_all_results_for_query(pg_conn, taxonomy_query)) + if not taxonomies: + taxonomies = (self.source_taxonomy,) + return taxonomies def run_all(self, tasks, instance, source_of_assembly, resume): if 'load_tracker' in tasks: @@ -59,13 +76,13 @@ def run_all(self, tasks, instance, source_of_assembly, resume): self.update_dbs(source_of_assembly) def load_tracker(self): - """Load the tracking table with the source assemblies for this taxonomy. Will not load anything if jobs in + """Load the tracking table with the source assemblies for these taxonomies. Will not load anything if jobs in the tracker already exist for this taxonomy/target assembly pair.""" header_to_print = ( - 'Sources', 'Taxonomy', 'Scientific Name', 'Assembly', 'Target Assembly', 'Num Studies', 'Status') + 'Sources', 'Taxonomies', 'Scientific Name', 'Assembly', 'Target Assembly', 'Num Studies', 'Status') existing_jobs = self.get_job_information_from_tracker() if existing_jobs: - self.warning(f'Jobs already exist for taxonomy {self.taxonomy} and target assembly {self.target_assembly}, ' + self.warning(f'Jobs already exist for taxonomies {self.taxonomies} and target assembly {self.target_assembly}, ' f'not loading anything new.') pretty_print(header_to_print, existing_jobs) return @@ -75,18 +92,18 @@ def load_tracker(self): 'remapping_status') rows = [] rows_to_print = [] - for source_assembly, projects in self.get_source_assemblies_and_projects(): - rows.append(('EVA', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly, + for source_assembly, taxonomy, projects in self.get_source_assemblies_and_projects(): + rows.append(('EVA', taxonomy, self.scientific_name, source_assembly, self.target_assembly, 1, self.release_version, len(projects), 1, None, 'Pending')) - rows_to_print.append(('EVA', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly, + rows_to_print.append(('EVA', taxonomy, self.scientific_name, source_assembly, self.target_assembly, len(projects), 'Pending')) - for source_assembly, num_studies in self.get_source_assemblies_and_num_studies_dbsnp(): - rows.append(('DBSNP', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly, + for source_assembly, taxonomy, num_studies in self.get_source_assemblies_and_num_studies_dbsnp(): + rows.append(('DBSNP', taxonomy, self.scientific_name, source_assembly, self.target_assembly, 1, self.release_version, num_studies, 1, None, 'Pending')) - rows_to_print.append(('DBSNP', self.taxonomy, self.scientific_name, source_assembly, self.target_assembly, + rows_to_print.append(('DBSNP', taxonomy, self.scientific_name, source_assembly, self.target_assembly, num_studies, 'Pending')) if len(rows) == 0: - self.warning(f'Nothing to process for taxonomy {self.taxonomy} and target assembly {self.target_assembly}') + self.warning(f'Nothing to process for taxonomy {self.taxonomies} and target assembly {self.target_assembly}') return with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: with pg_conn.cursor() as cursor: @@ -95,29 +112,32 @@ def load_tracker(self): pretty_print(header_to_print, rows_to_print) def get_job_information_from_tracker(self): - """Gets jobs from tracker by target assembly, taxonomy, and release version""" + """Gets jobs from tracker by target assembly, taxonomies, and release version""" query = ( - f"SELECT source, taxonomy, scientific_name, origin_assembly_accession, assembly_accession, " - f"num_studies, remapping_status " + f"SELECT source, string_agg(taxonomy::text, ','), scientific_name, origin_assembly_accession, " + f"assembly_accession, num_studies, remapping_status " f"FROM {self.tracking_table} " f"WHERE release_version={self.release_version} " - f"AND assembly_accession='{self.target_assembly}' AND taxonomy={self.taxonomy}" + f"AND assembly_accession='{self.target_assembly}' " + f"AND taxonomy in ({', '.join([str(t) for t in self.taxonomies])})" + f"GROUP BY taxonomy" ) with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: return get_all_results_for_query(pg_conn, query) def get_source_assemblies_and_projects(self): - """Query metadata for all public projects with this taxonomy, of these getting all reference accessions + """Query metadata for all public projects with these taxonomies, of these getting all reference accessions for all analyses.""" with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: query = ( - f"SELECT DISTINCT vcf_reference_accession, ARRAY_AGG(project_accession) " + f"SELECT DISTINCT vcf_reference_accession, taxonomy_id, ARRAY_AGG(project_accession) " f"FROM evapro.project " f"LEFT OUTER JOIN evapro.project_taxonomy USING (project_accession) " f"LEFT OUTER JOIN evapro.project_analysis USING (project_accession) " f"LEFT OUTER JOIN evapro.analysis USING (analysis_accession) " - f"WHERE taxonomy_id={self.taxonomy} AND ena_status=4 AND hidden_in_eva=0 " - f"GROUP BY vcf_reference_accession" + f"WHERE taxonomy_id in ({', '.join([str(t) for t in self.taxonomies])}) " + f"AND ena_status=4 AND hidden_in_eva=0 " + f"GROUP BY vcf_reference_accession, taxonomy_id" ) return get_all_results_for_query(pg_conn, query) @@ -125,33 +145,35 @@ def get_source_assemblies_and_num_studies_dbsnp(self): # Source assemblies for dbSNP are not in metadata, so instead we get them from release 3 in the tracker with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: query = ( - f"SELECT origin_assembly_accession, num_studies FROM {self.tracking_table} " - f"WHERE release_version=3 AND taxonomy={self.taxonomy} AND source='DBSNP'" + f"SELECT origin_assembly_accession, taxonomy, num_studies FROM {self.tracking_table} " + f"WHERE release_version=3 AND taxonomy in ({', '.join([str(t) for t in self.taxonomies])}) " + f"AND source='DBSNP'" ) return get_all_results_for_query(pg_conn, query) def run_remapping_and_clustering(self, instance, resume): """Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)""" - source_assemblies = self.get_incomplete_assemblies() + source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies() self.info(f'Running remapping and clustering for the following assemblies: {source_assemblies}') - for source_assembly in source_assemblies: - self.process_one_assembly(source_assembly, instance, resume) + for source_assembly, taxonomies in source_assemblies_and_taxonomies: + self.process_one_assembly(source_assembly, taxonomies, instance, resume) - def get_incomplete_assemblies(self): + def get_incomplete_assemblies_and_taxonomies(self): incomplete_assemblies = [] for row in self.get_job_information_from_tracker(): + taxonomies = row[1] source_assembly = row[3] status = row[6] if status != 'Completed': - incomplete_assemblies.append(source_assembly) + incomplete_assemblies.append(source_assembly, taxonomies) return incomplete_assemblies - def process_one_assembly(self, source_assembly, instance, resume): + def process_one_assembly(self, source_assembly, taxonomies, instance, resume): self.set_status_start(source_assembly) base_directory = cfg['remapping']['base_directory'] nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf') - assembly_directory = os.path.join(base_directory, str(self.taxonomy), source_assembly) + assembly_directory = os.path.join(base_directory, str(taxonomies), source_assembly) work_dir = os.path.join(assembly_directory, 'work') os.makedirs(work_dir, exist_ok=True) @@ -173,7 +195,7 @@ def process_one_assembly(self, source_assembly, instance, resume): remap_cluster_config_file = os.path.join(assembly_directory, 'remap_cluster_config.yaml') remapping_required = self.check_remapping_required(source_assembly) remap_cluster_config = { - 'taxonomy_id': self.taxonomy, + 'taxonomies': taxonomies, 'source_assembly_accession': source_assembly, 'target_assembly_accession': self.target_assembly, 'species_name': self.scientific_name, @@ -221,7 +243,6 @@ def check_remapping_required(self, source_assembly): def create_extraction_properties(self, output_file_path, source_assembly): properties = self.properties_generator.get_remapping_extraction_properties( - taxonomy=self.taxonomy, source_assembly=source_assembly, output_folder='.', ) @@ -337,9 +358,9 @@ def count_variants_from_logs(self, assembly_directory, source_assembly): def update_dbs(self, source_of_assembly): """Update all relevant databases to reflect the new assembly.""" - incomplete_assemblies = self.get_incomplete_assemblies() - if incomplete_assemblies: - self.warning(f'Processing for the following source assemblies is not yet complete: {incomplete_assemblies}') + incomplete_assemblies_taxonomies = self.get_incomplete_assemblies_and_taxonomies() + if incomplete_assemblies_taxonomies: + self.warning(f'Processing for the following source assemblies is not yet complete: {incomplete_assemblies_taxonomies}') self.warning('Not updating databases.') return with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: @@ -358,3 +379,28 @@ def add_to_contig_alias(self): self.maven_profile, self.private_settings_file) client = ContigAliasClient(contig_alias_url, contig_alias_user, contig_alias_pass) client.insert_assembly(self.target_assembly) + + + +def get_taxonomy_involved(taxonomy_id: int): + # First check if the current assembly is already target - if so don't do anything + + if len(results) > 0 and results[0][0] == target_assembly: + logger.warning(f'Current assembly for taxonomy {taxonomy_id} is already {target_assembly}!') + return + + # Deprecate the last current assembly + update_query = ( + f"UPDATE {SUPPORTED_ASSEMBLY_TRACKER_TABLE} " + f"SET current=false, end_date='{today}' " + f"WHERE taxonomy_id={taxonomy_id} AND current=true;" + ) + execute_query(metadata_connection_handle, update_query) + + # Then insert the new assembly + insert_query = ( + f"INSERT INTO {SUPPORTED_ASSEMBLY_TRACKER_TABLE} " + f"(taxonomy_id, source, assembly_id, current, start_date) " + f"VALUES({taxonomy_id}, '{source_of_assembly}', '{target_assembly}', true, '{today}');" + ) + execute_query(metadata_connection_handle, insert_query) \ No newline at end of file From 7baff5e8db1b6df6d8697ce36cd0d456bbe88f7e Mon Sep 17 00:00:00 2001 From: tcezard Date: Fri, 5 Jan 2024 17:33:34 +0000 Subject: [PATCH 2/8] Nextflow pipeline remap for multiple taxonomies --- .../assembly_ingestion_job.py | 23 +++++++++++-------- .../nextflow/remap_cluster.nf | 14 +++++++---- .../java/FakeExtractionPipeline.java | 15 ++++++++---- tests/nextflow-tests/run_tests.sh | 1 - tests/nextflow-tests/test_config.yaml | 1 + 5 files changed, 35 insertions(+), 19 deletions(-) diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index 69133b7..6c681dc 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -155,25 +155,26 @@ def run_remapping_and_clustering(self, instance, resume): """Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)""" source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies() - self.info(f'Running remapping and clustering for the following assemblies: {source_assemblies}') - for source_assembly, taxonomies in source_assemblies_and_taxonomies: - self.process_one_assembly(source_assembly, taxonomies, instance, resume) + for source_assembly, taxonomy_list in source_assemblies_and_taxonomies: + self.info(f'Running remapping and clustering for the following assemblies: {source_assembly} ' + f'for taxonomy {", ".join([str(t) for t in taxonomy_list])}') + self.process_one_assembly(source_assembly, taxonomy_list, instance, resume) def get_incomplete_assemblies_and_taxonomies(self): incomplete_assemblies = [] for row in self.get_job_information_from_tracker(): - taxonomies = row[1] + taxonomies = row[1].split(',') # Comma separated list of taxonomies source_assembly = row[3] status = row[6] if status != 'Completed': incomplete_assemblies.append(source_assembly, taxonomies) return incomplete_assemblies - def process_one_assembly(self, source_assembly, taxonomies, instance, resume): + def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume): self.set_status_start(source_assembly) base_directory = cfg['remapping']['base_directory'] nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf') - assembly_directory = os.path.join(base_directory, str(taxonomies), source_assembly) + assembly_directory = os.path.join(base_directory, ",".join([str(t) for t in taxonomy_list]), source_assembly) work_dir = os.path.join(assembly_directory, 'work') os.makedirs(work_dir, exist_ok=True) @@ -195,7 +196,7 @@ def process_one_assembly(self, source_assembly, taxonomies, instance, resume): remap_cluster_config_file = os.path.join(assembly_directory, 'remap_cluster_config.yaml') remapping_required = self.check_remapping_required(source_assembly) remap_cluster_config = { - 'taxonomies': taxonomies, + 'taxonomy_list': taxonomy_list, 'source_assembly_accession': source_assembly, 'target_assembly_accession': self.target_assembly, 'species_name': self.scientific_name, @@ -364,15 +365,17 @@ def update_dbs(self, source_of_assembly): self.warning('Not updating databases.') return with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: - add_to_supported_assemblies(metadata_connection_handle=pg_conn, source_of_assembly=source_of_assembly, - target_assembly=self.target_assembly, taxonomy_id=self.taxonomy) + for taxonomy in self.taxonomies: + add_to_supported_assemblies(metadata_connection_handle=pg_conn, source_of_assembly=source_of_assembly, + target_assembly=self.target_assembly, taxonomy_id=taxonomy) self.add_to_metadata() self.add_to_contig_alias() self.info('Metadata database updates complete.') def add_to_metadata(self): with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: - insert_new_assembly_and_taxonomy(pg_conn, self.target_assembly, self.taxonomy) + for taxonomy in self.taxonomies: + insert_new_assembly_and_taxonomy(pg_conn, self.target_assembly, taxonomy) def add_to_contig_alias(self): contig_alias_url, contig_alias_user, contig_alias_pass = get_contig_alias_db_creds_for_profile( diff --git a/eva_assembly_ingestion/nextflow/remap_cluster.nf b/eva_assembly_ingestion/nextflow/remap_cluster.nf index 3f7a8ed..b73f242 100644 --- a/eva_assembly_ingestion/nextflow/remap_cluster.nf +++ b/eva_assembly_ingestion/nextflow/remap_cluster.nf @@ -8,7 +8,7 @@ def helpMessage() { Remap one assembly version to another, cluster, and QC. Inputs: - --taxonomy_id taxonomy id of submitted variants that needs to be remapped. + --taxonomy_list list of taxonomy id of submitted variants that needs to be remapped. --source_assembly_accession assembly accession of the submitted variants are currently mapped to. --target_assembly_accession assembly accession the submitted variants will be remapped to. --species_name scientific name to be used for the species. @@ -35,8 +35,8 @@ params.help = null if (params.help) exit 0, helpMessage() // Test input files -if (!params.taxonomy_id || !params.source_assembly_accession || !params.target_assembly_accession || !params.species_name || !params.genome_assembly_dir ) { - if (!params.taxonomy_id) log.warn('Provide the taxonomy id of the source submitted variants using --taxonomy_id') +if (!params.taxonomy_list || !params.source_assembly_accession || !params.target_assembly_accession || !params.species_name || !params.genome_assembly_dir ) { + if (!params.taxonomy_list) log.warn('Provide the taxonomy id of the source submitted variants using --taxonomy_list') if (!params.source_assembly_accession) log.warn('Provide the source assembly using --source_assembly_accession') if (!params.target_assembly_accession) log.warn('Provide the target assembly using --target_assembly_accession') if (!params.species_name) log.warn('Provide a species name using --species_name') @@ -132,6 +132,7 @@ process extract_vcf_from_mongo { input: path source_fasta path source_report + each taxonomy output: // Store both vcfs (eva and dbsnp), emit: one channel @@ -145,6 +146,7 @@ process extract_vcf_from_mongo { --spring.config.location=file:${params.extraction_properties} \ --parameters.fasta=${source_fasta} \ --parameters.assemblyReportUrl=file:${source_report} \ + --parameters.taxonomy=${taxonomy} > ${params.source_assembly_accession}_vcf_extractor.log """ } @@ -330,7 +332,11 @@ workflow { update_source_genome(params.source_assembly_accession, retrieve_source_genome.out.source_fasta, retrieve_source_genome.out.source_report, params.remapping_config) update_target_genome(retrieve_target_genome.out.target_fasta, retrieve_target_genome.out.target_report, params.remapping_config) - extract_vcf_from_mongo(update_source_genome.out.updated_source_fasta, update_source_genome.out.updated_source_report) + extract_vcf_from_mongo( + update_source_genome.out.updated_source_fasta, + update_source_genome.out.updated_source_report, + params.taxonomy_list + ) remap_variants(extract_vcf_from_mongo.out.source_vcfs.flatten(), update_source_genome.out.updated_source_fasta, update_target_genome.out.updated_target_fasta) ingest_vcf_into_mongo(remap_variants.out.remapped_vcfs, update_target_genome.out.updated_target_report) diff --git a/tests/nextflow-tests/java/FakeExtractionPipeline.java b/tests/nextflow-tests/java/FakeExtractionPipeline.java index 79ff9be..eb69721 100644 --- a/tests/nextflow-tests/java/FakeExtractionPipeline.java +++ b/tests/nextflow-tests/java/FakeExtractionPipeline.java @@ -6,14 +6,21 @@ public class FakeExtractionPipeline { public static void main(String[] args) { String outString = "java -jar extraction.jar"; - String inFile = null; + String accession = null; + String taxonomy = null; for (String arg: args) { outString += " " + arg; - if (arg.startsWith("--parameters.fasta=")) - inFile = arg.substring("--parameters.fasta=".length(), arg.length()-"_custom.fa".length()); + if (arg.startsWith("--parameters.fasta=")){ + accession = arg.substring("--parameters.fasta=".length(), arg.length()-"_custom.fa".length()); + } + if (arg.startsWith("--parameters.taxonomy=")){ + taxonomy = arg.substring("--parameters.taxonomy=".length(), arg.length()); + } } System.out.println(outString); - System.out.println(inFile); + System.out.println(accession); + System.out.println(taxonomy); + String inFile = accession + "_" + taxonomy; // real pipeline gets this from properties String outFile1 = inFile + "_dbsnp.vcf"; diff --git a/tests/nextflow-tests/run_tests.sh b/tests/nextflow-tests/run_tests.sh index 5660edf..e3870b4 100755 --- a/tests/nextflow-tests/run_tests.sh +++ b/tests/nextflow-tests/run_tests.sh @@ -13,7 +13,6 @@ PATH=${SCRIPT_DIR}/bin:$PATH printf "\e[32m===== REMAPPING AND CLUSTERING PIPELINE =====\e[0m\n" nextflow run ${SOURCE_DIR}/eva_assembly_ingestion/nextflow/remap_cluster.nf -params-file test_config.yaml \ - --taxonomy_id 1234 \ --source_assembly_accession GCA_0000001 \ --target_assembly_accession GCA_0000002 \ --species_name "Thingy thungus" \ diff --git a/tests/nextflow-tests/test_config.yaml b/tests/nextflow-tests/test_config.yaml index ae7bfba..660f574 100644 --- a/tests/nextflow-tests/test_config.yaml +++ b/tests/nextflow-tests/test_config.yaml @@ -1,3 +1,4 @@ +taxonomy_list: [1233, 1234] executable: genome_downloader: ../../../bin/fake_genome_downloader.py From 2aeb9dc6990f99246be0f9dd2ae8c2304aadb03f Mon Sep 17 00:00:00 2001 From: tcezard Date: Fri, 5 Jan 2024 17:37:51 +0000 Subject: [PATCH 3/8] Remove unused function --- .../assembly_ingestion_job.py | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index 6c681dc..e7813ef 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -382,28 +382,3 @@ def add_to_contig_alias(self): self.maven_profile, self.private_settings_file) client = ContigAliasClient(contig_alias_url, contig_alias_user, contig_alias_pass) client.insert_assembly(self.target_assembly) - - - -def get_taxonomy_involved(taxonomy_id: int): - # First check if the current assembly is already target - if so don't do anything - - if len(results) > 0 and results[0][0] == target_assembly: - logger.warning(f'Current assembly for taxonomy {taxonomy_id} is already {target_assembly}!') - return - - # Deprecate the last current assembly - update_query = ( - f"UPDATE {SUPPORTED_ASSEMBLY_TRACKER_TABLE} " - f"SET current=false, end_date='{today}' " - f"WHERE taxonomy_id={taxonomy_id} AND current=true;" - ) - execute_query(metadata_connection_handle, update_query) - - # Then insert the new assembly - insert_query = ( - f"INSERT INTO {SUPPORTED_ASSEMBLY_TRACKER_TABLE} " - f"(taxonomy_id, source, assembly_id, current, start_date) " - f"VALUES({taxonomy_id}, '{source_of_assembly}', '{target_assembly}', true, '{today}');" - ) - execute_query(metadata_connection_handle, insert_query) \ No newline at end of file From c904ad557bff99c41bfa5877f5aa88cf8998e724 Mon Sep 17 00:00:00 2001 From: tcezard Date: Fri, 5 Jan 2024 17:58:35 +0000 Subject: [PATCH 4/8] Add taxonomy in the extraction log file name --- .../nextflow/remap_cluster.nf | 4 ++-- tests/nextflow-tests/run_tests.sh | 24 ++++++++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/eva_assembly_ingestion/nextflow/remap_cluster.nf b/eva_assembly_ingestion/nextflow/remap_cluster.nf index b73f242..861126a 100644 --- a/eva_assembly_ingestion/nextflow/remap_cluster.nf +++ b/eva_assembly_ingestion/nextflow/remap_cluster.nf @@ -137,7 +137,7 @@ process extract_vcf_from_mongo { output: // Store both vcfs (eva and dbsnp), emit: one channel path '*.vcf', emit: source_vcfs - path "${params.source_assembly_accession}_vcf_extractor.log", emit: log_filename + path "${params.source_assembly_accession}_${taxonomy}_vcf_extractor.log", emit: log_filename publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*" @@ -147,7 +147,7 @@ process extract_vcf_from_mongo { --parameters.fasta=${source_fasta} \ --parameters.assemblyReportUrl=file:${source_report} \ --parameters.taxonomy=${taxonomy} - > ${params.source_assembly_accession}_vcf_extractor.log + > ${params.source_assembly_accession}_${taxonomy}_vcf_extractor.log """ } diff --git a/tests/nextflow-tests/run_tests.sh b/tests/nextflow-tests/run_tests.sh index e3870b4..8baedcb 100755 --- a/tests/nextflow-tests/run_tests.sh +++ b/tests/nextflow-tests/run_tests.sh @@ -26,15 +26,21 @@ nextflow run ${SOURCE_DIR}/eva_assembly_ingestion/nextflow/remap_cluster.nf -par --remapping_required 1 \ --memory 2 -ls ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_dbsnp_remapped.vcf \ - ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_dbsnp_remapped_unmapped.vcf \ - ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_dbsnp_remapped_counts.yml \ - ${SCRIPT_DIR}/output/eva/GCA_0000001_eva_remapped.vcf \ - ${SCRIPT_DIR}/output/eva/GCA_0000001_eva_remapped_unmapped.vcf \ - ${SCRIPT_DIR}/output/eva/GCA_0000001_eva_remapped_counts.yml - -# Test we have 7 log files in the logs directory (1 extraction, 2 ingestion, 3 clustering, 1 backpropagate) -[[ $(find ${SCRIPT_DIR}/output/logs/ -type f -name "*.log" | wc -l) -eq 7 ]] +ls ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_1233_dbsnp_remapped.vcf \ + ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_1233_dbsnp_remapped_unmapped.vcf \ + ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_1233_dbsnp_remapped_counts.yml \ + ${SCRIPT_DIR}/output/eva/GCA_0000001_1233_eva_remapped.vcf \ + ${SCRIPT_DIR}/output/eva/GCA_0000001_1233_eva_remapped_unmapped.vcf \ + ${SCRIPT_DIR}/output/eva/GCA_0000001_1233_eva_remapped_counts.yml \ + ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_1234_dbsnp_remapped.vcf \ + ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_1234_dbsnp_remapped_unmapped.vcf \ + ${SCRIPT_DIR}/output/dbsnp/GCA_0000001_1234_dbsnp_remapped_counts.yml \ + ${SCRIPT_DIR}/output/eva/GCA_0000001_1234_eva_remapped.vcf \ + ${SCRIPT_DIR}/output/eva/GCA_0000001_1234_eva_remapped_unmapped.vcf \ + ${SCRIPT_DIR}/output/eva/GCA_0000001_1234_eva_remapped_counts.yml + +# Test we have 10 log files in the logs directory (2 extraction, 4 ingestion, 3 clustering, 1 backpropagate) +[[ $(find ${SCRIPT_DIR}/output/logs/ -type f -name "*.log" | wc -l) -eq 10 ]] # Test we have 1 rs_report in the logs directory [[ $(find ${SCRIPT_DIR}/output/logs/ -type f -name "*.txt" | wc -l) -eq 1 ]] From 405cdf62aee2498748ada7124c238fe96c98d45d Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 8 Jan 2024 15:48:02 +0000 Subject: [PATCH 5/8] Fixes from testing --- .../assembly_ingestion_job.py | 70 ++++++++++--------- .../nextflow/remap_cluster.nf | 2 +- 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index e7813ef..77ec077 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -56,16 +56,16 @@ def scientific_name(self, taxonomy): def taxonomies(self): # The taxonomy involved are all the taxonomies that have the same current target assembly taxonomy_query = ( - f"select taxonomy from SUPPORTED_ASSEMBLY_TRACKER_TABLE where current=true AND assembly_id in (" + f"select taxonomy_id from {SUPPORTED_ASSEMBLY_TRACKER_TABLE} where current=true AND assembly_id in (" f" SELECT assembly_id FROM {SUPPORTED_ASSEMBLY_TRACKER_TABLE} " f" WHERE taxonomy_id={self.source_taxonomy} AND current=true" f");" ) with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: - taxonomies = (t for t, in get_all_results_for_query(pg_conn, taxonomy_query)) - if not taxonomies: - taxonomies = (self.source_taxonomy,) - return taxonomies + taxonomy_list = list(set([t for t, in get_all_results_for_query(pg_conn, taxonomy_query)])) + if not taxonomy_list: + taxonomy_list = [self.source_taxonomy] + return taxonomy_list def run_all(self, tasks, instance, source_of_assembly, resume): if 'load_tracker' in tasks: @@ -93,15 +93,15 @@ def load_tracker(self): rows = [] rows_to_print = [] for source_assembly, taxonomy, projects in self.get_source_assemblies_and_projects(): - rows.append(('EVA', taxonomy, self.scientific_name, source_assembly, self.target_assembly, + rows.append(('EVA', taxonomy, self.scientific_name(taxonomy), source_assembly, self.target_assembly, 1, self.release_version, len(projects), 1, None, 'Pending')) - rows_to_print.append(('EVA', taxonomy, self.scientific_name, source_assembly, self.target_assembly, - len(projects), 'Pending')) + rows_to_print.append(('EVA', taxonomy, self.scientific_name(taxonomy), source_assembly, + self.target_assembly, len(projects), 'Pending')) for source_assembly, taxonomy, num_studies in self.get_source_assemblies_and_num_studies_dbsnp(): - rows.append(('DBSNP', taxonomy, self.scientific_name, source_assembly, self.target_assembly, + rows.append(('DBSNP', taxonomy, self.scientific_name(taxonomy), source_assembly, self.target_assembly, 1, self.release_version, num_studies, 1, None, 'Pending')) - rows_to_print.append(('DBSNP', taxonomy, self.scientific_name, source_assembly, self.target_assembly, - num_studies, 'Pending')) + rows_to_print.append(('DBSNP', taxonomy, self.scientific_name(taxonomy), source_assembly, + self.target_assembly, num_studies, 'Pending')) if len(rows) == 0: self.warning(f'Nothing to process for taxonomy {self.taxonomies} and target assembly {self.target_assembly}') return @@ -120,7 +120,7 @@ def get_job_information_from_tracker(self): f"WHERE release_version={self.release_version} " f"AND assembly_accession='{self.target_assembly}' " f"AND taxonomy in ({', '.join([str(t) for t in self.taxonomies])})" - f"GROUP BY taxonomy" + f"GROUP BY source, scientific_name, origin_assembly_accession, assembly_accession, num_studies, remapping_status" ) with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: return get_all_results_for_query(pg_conn, query) @@ -167,11 +167,11 @@ def get_incomplete_assemblies_and_taxonomies(self): source_assembly = row[3] status = row[6] if status != 'Completed': - incomplete_assemblies.append(source_assembly, taxonomies) + incomplete_assemblies.append((source_assembly, taxonomies)) return incomplete_assemblies def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume): - self.set_status_start(source_assembly) + self.set_status_start(source_assembly, taxonomy_list) base_directory = cfg['remapping']['base_directory'] nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf') assembly_directory = os.path.join(base_directory, ",".join([str(t) for t in taxonomy_list]), source_assembly) @@ -199,7 +199,9 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume 'taxonomy_list': taxonomy_list, 'source_assembly_accession': source_assembly, 'target_assembly_accession': self.target_assembly, - 'species_name': self.scientific_name, + # the actual species name does not need to match the taxonomy + # since it is only here to locate the fasta/report files + 'species_name': self.scientific_name(taxonomy_list[0]), 'output_dir': assembly_directory, 'genome_assembly_dir': cfg['genome_downloader']['output_directory'], 'extraction_properties': extraction_properties_file, @@ -209,7 +211,6 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume 'remapping_config': cfg.config_file, 'remapping_required': remapping_required } - for part in ['executable', 'nextflow', 'jar']: remap_cluster_config[part] = cfg[part] with open(remap_cluster_config_file, 'w') as open_file: @@ -271,27 +272,28 @@ def create_clustering_properties(self, output_file_path, instance, source_assemb open_file.write(properties) return output_file_path - def set_status(self, source_assembly, status, start_time=None, end_time=None): - query = f"UPDATE {self.tracking_table} SET remapping_status='{status}' " - if start_time: - query += f", remapping_start='{start_time.isoformat()}' " - if end_time: - query += f", remapping_end='{end_time.isoformat()}' " - query += ( - f"WHERE release_version={self.release_version} " - f"AND origin_assembly_accession='{source_assembly}' AND taxonomy={self.taxonomy}" - ) - with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: - execute_query(pg_conn, query) + def set_status(self, source_assembly, taxonomy_list, status, start_time=None, end_time=None): + for taxonomy in taxonomy_list: + query = f"UPDATE {self.tracking_table} SET remapping_status='{status}' " + if start_time: + query += f", remapping_start='{start_time.isoformat()}' " + if end_time: + query += f", remapping_end='{end_time.isoformat()}' " + query += ( + f"WHERE release_version={self.release_version} " + f"AND origin_assembly_accession='{source_assembly}' AND taxonomy={taxonomy}" + ) + with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: + execute_query(pg_conn, query) - def set_status_start(self, source_assembly): - self.set_status(source_assembly, 'Started', start_time=datetime.datetime.now()) + def set_status_start(self, source_assembly, taxonomy_list): + self.set_status(source_assembly, taxonomy_list, 'Started', start_time=datetime.datetime.now()) - def set_status_end(self, source_assembly): - self.set_status(source_assembly, 'Completed', end_time=datetime.datetime.now()) + def set_status_end(self, source_assembly, taxonomy_list): + self.set_status(source_assembly, taxonomy_list, 'Completed', end_time=datetime.datetime.now()) - def set_status_failed(self, source_assembly): - self.set_status(source_assembly, 'Failed') + def set_status_failed(self, source_assembly, taxonomy_list): + self.set_status(source_assembly, taxonomy_list, 'Failed') def set_counts(self, source_assembly, source, nb_variant_extracted=None, nb_variant_remapped=None, nb_variant_ingested=None): diff --git a/eva_assembly_ingestion/nextflow/remap_cluster.nf b/eva_assembly_ingestion/nextflow/remap_cluster.nf index 861126a..aaaacd6 100644 --- a/eva_assembly_ingestion/nextflow/remap_cluster.nf +++ b/eva_assembly_ingestion/nextflow/remap_cluster.nf @@ -123,7 +123,7 @@ process update_target_genome { /* - * Extract the submitted variants to remap from the accesioning warehouse and store them in a VCF file. + * Extract the submitted variants to remap from the accessioning warehouse and store them in a VCF file. */ process extract_vcf_from_mongo { memory "${params.memory}GB" From 9f1e8a459790bc1ecb6feebaa23c6b751af48c53 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 8 Jan 2024 17:15:12 +0000 Subject: [PATCH 6/8] Add counts specific to taxonomy --- .../assembly_ingestion_job.py | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index 77ec077..dae4356 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -230,13 +230,13 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume run_command_with_output('Nextflow remapping process', ' '.join(command)) except subprocess.CalledProcessError as e: self.error('Nextflow remapping pipeline failed') - self.set_status_failed(source_assembly) + self.set_status_failed(source_assembly, taxonomy_list) raise e finally: os.chdir(curr_working_dir) - self.set_status_end(source_assembly) + self.set_status_end(source_assembly, taxonomy_list) if remapping_required: - self.count_variants_from_logs(assembly_directory, source_assembly) + self.count_variants_from_logs(assembly_directory, source_assembly, taxonomy_list) else: self.info(f"No remapping required. Skipping variant counts from logs") @@ -295,13 +295,13 @@ def set_status_end(self, source_assembly, taxonomy_list): def set_status_failed(self, source_assembly, taxonomy_list): self.set_status(source_assembly, taxonomy_list, 'Failed') - def set_counts(self, source_assembly, source, nb_variant_extracted=None, nb_variant_remapped=None, + def set_counts(self, source_assembly, taxonomy, source, nb_variant_extracted=None, nb_variant_remapped=None, nb_variant_ingested=None): set_statements = [] query = ( f"SELECT * FROM {self.tracking_table} " f"WHERE release_version={self.release_version} AND origin_assembly_accession='{source_assembly}' " - f"AND taxonomy='{self.taxonomy}' AND source='{source}'" + f"AND taxonomy='{taxonomy}' AND source='{source}'" ) with get_metadata_connection_handle(self.maven_profile, self.private_settings_file) as pg_conn: # Check that this row exists @@ -323,38 +323,44 @@ def set_counts(self, source_assembly, source, nb_variant_extracted=None, nb_vari with get_metadata_connection_handle(cfg['maven']['environment'], cfg['maven']['settings_file']) as pg_conn: execute_query(pg_conn, query) - def count_variants_from_logs(self, assembly_directory, source_assembly): - vcf_extractor_log = os.path.join(assembly_directory, 'logs', source_assembly + '_vcf_extractor.log') - eva_remapping_count = os.path.join(assembly_directory, 'eva', source_assembly + '_eva_remapped_counts.yml') - dbsnp_remapping_count = os.path.join(assembly_directory, 'dbsnp', source_assembly + '_dbsnp_remapped_counts.yml') - eva_ingestion_log = os.path.join(assembly_directory, 'logs', source_assembly + '_eva_remapped.vcf_ingestion.log') - dbsnp_ingestion_log = os.path.join(assembly_directory, 'logs', source_assembly + '_dbsnp_remapped.vcf_ingestion.log') - - eva_total, eva_written, dbsnp_total, dbsnp_written = count_variants_extracted(vcf_extractor_log) - eva_candidate, eva_remapped, eva_unmapped = count_variants_remapped(eva_remapping_count) - dbsnp_candidate, dbsnp_remapped, dbsnp_unmapped = count_variants_remapped(dbsnp_remapping_count) - # Use the number of variant read rather than the number of variant ingested to get the total number of variant - # when some might have been written in previous execution. - eva_ingestion_candidate, eva_ingested, eva_duplicates = count_variants_ingested(eva_ingestion_log) - dbsnp_ingestion_candidate, dbsnp_ingested, dbsnp_duplicates = count_variants_ingested(dbsnp_ingestion_log) + def count_variants_from_logs(self, assembly_directory, source_assembly, taxonomy_list): + for taxonomy in taxonomy_list: + vcf_extractor_log = os.path.join(assembly_directory, 'logs', + f'{source_assembly}_{taxonomy}_vcf_extractor.log') + eva_remapping_count = os.path.join(assembly_directory, 'eva', + f'{source_assembly}_{taxonomy}_eva_remapped_counts.yml') + dbsnp_remapping_count = os.path.join(assembly_directory, 'dbsnp', + f'{source_assembly}_{taxonomy}_dbsnp_remapped_counts.yml') + eva_ingestion_log = os.path.join(assembly_directory, 'logs', + f'{source_assembly}_{taxonomy}_eva_remapped.vcf_ingestion.log') + dbsnp_ingestion_log = os.path.join(assembly_directory, 'logs', + f'{source_assembly}_{taxonomy}_dbsnp_remapped.vcf_ingestion.log') + + eva_total, eva_written, dbsnp_total, dbsnp_written = count_variants_extracted(vcf_extractor_log) + eva_candidate, eva_remapped, eva_unmapped = count_variants_remapped(eva_remapping_count) + dbsnp_candidate, dbsnp_remapped, dbsnp_unmapped = count_variants_remapped(dbsnp_remapping_count) + # Use the number of variant read rather than the number of variant ingested to get the total number of variant + # when some might have been written in previous execution. + eva_ingestion_candidate, eva_ingested, eva_duplicates = count_variants_ingested(eva_ingestion_log) + dbsnp_ingestion_candidate, dbsnp_ingested, dbsnp_duplicates = count_variants_ingested(dbsnp_ingestion_log) self.set_counts( - source_assembly, 'EVA', + source_assembly, taxonomy, 'EVA', nb_variant_extracted=eva_written, nb_variant_remapped=eva_remapped, nb_variant_ingested=eva_ingestion_candidate ) self.set_counts( - source_assembly, 'DBSNP', + source_assembly, taxonomy, 'DBSNP', nb_variant_extracted=dbsnp_written, nb_variant_remapped=dbsnp_remapped, nb_variant_ingested=dbsnp_ingestion_candidate ) - self.info(f'For Taxonomy: {self.taxonomy} and Assembly: {source_assembly} Source: EVA ') + self.info(f'For Taxonomy: {taxonomy} and Assembly: {source_assembly} Source: EVA ') self.info(f'Number of variant read:{eva_total}, written:{eva_written}, attempt remapping: {eva_candidate}, ' f'remapped: {eva_remapped}, failed remapped {eva_unmapped}') - self.info(f'For Taxonomy: {self.taxonomy} and Assembly: {source_assembly} Source: DBSNP ') + self.info(f'For Taxonomy: {taxonomy} and Assembly: {source_assembly} Source: DBSNP ') self.info( f'Number of variant read:{dbsnp_total}, written:{dbsnp_written}, attempt remapping: {dbsnp_candidate}, ' f'remapped: {dbsnp_remapped}, failed remapped {dbsnp_unmapped}') From d654c0817d9811e7a6808df71ee2f6da4a9d0e4e Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 8 Jan 2024 21:02:56 +0000 Subject: [PATCH 7/8] use the taxonomy parameter --- eva_assembly_ingestion/assembly_ingestion_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index dae4356..3a0da7f 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -318,7 +318,7 @@ def set_counts(self, source_assembly, taxonomy, source, nb_variant_extracted=Non query = ( f"UPDATE {self.tracking_table} SET {', '.join(set_statements)} " f"WHERE release_version={self.release_version} AND origin_assembly_accession='{source_assembly}' " - f"AND taxonomy='{self.taxonomy}' AND source='{source}'" + f"AND taxonomy='{taxonomy}' AND source='{source}'" ) with get_metadata_connection_handle(cfg['maven']['environment'], cfg['maven']['settings_file']) as pg_conn: execute_query(pg_conn, query) From c39f74c1710a79991d75bbe9b7f7bb289b58fcd8 Mon Sep 17 00:00:00 2001 From: tcezard Date: Mon, 15 Jan 2024 09:22:38 +0000 Subject: [PATCH 8/8] switch to using the source taxonomy --- eva_assembly_ingestion/assembly_ingestion_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eva_assembly_ingestion/assembly_ingestion_job.py b/eva_assembly_ingestion/assembly_ingestion_job.py index 3a0da7f..006677d 100644 --- a/eva_assembly_ingestion/assembly_ingestion_job.py +++ b/eva_assembly_ingestion/assembly_ingestion_job.py @@ -201,7 +201,7 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume 'target_assembly_accession': self.target_assembly, # the actual species name does not need to match the taxonomy # since it is only here to locate the fasta/report files - 'species_name': self.scientific_name(taxonomy_list[0]), + 'species_name': self.scientific_name(self.source_taxonomy), 'output_dir': assembly_directory, 'genome_assembly_dir': cfg['genome_downloader']['output_directory'], 'extraction_properties': extraction_properties_file,