From 763838278d3ede4edc5b0d30f7deb251369ebcd5 Mon Sep 17 00:00:00 2001 From: April Shen Date: Fri, 22 Mar 2024 13:19:31 +0000 Subject: [PATCH 1/4] move create progress table to release automation, delete clustering automation folder --- eva-accession-clustering-automation/README.md | 36 ---- .../clustering_automation/__init__.py | 0 .../cluster_from_mongo.py | 147 -------------- .../clustering_automation/cluster_from_vcf.py | 125 ------------ .../create_clustering_properties.py | 189 ------------------ .../update_clustering_status.py | 58 ------ .../requirements.txt | 2 - eva-accession-clustering-automation/setup.py | 19 -- .../create_clustering_progress_table.py | 0 9 files changed, 576 deletions(-) delete mode 100644 eva-accession-clustering-automation/README.md delete mode 100644 eva-accession-clustering-automation/clustering_automation/__init__.py delete mode 100644 eva-accession-clustering-automation/clustering_automation/cluster_from_mongo.py delete mode 100644 eva-accession-clustering-automation/clustering_automation/cluster_from_vcf.py delete mode 100644 eva-accession-clustering-automation/clustering_automation/create_clustering_properties.py delete mode 100644 eva-accession-clustering-automation/clustering_automation/update_clustering_status.py delete mode 100644 eva-accession-clustering-automation/requirements.txt delete mode 100644 eva-accession-clustering-automation/setup.py rename {eva-accession-clustering-automation/clustering_automation => eva-accession-release-automation}/create_clustering_progress_table.py (100%) diff --git a/eva-accession-clustering-automation/README.md b/eva-accession-clustering-automation/README.md deleted file mode 100644 index b97f48673..000000000 --- a/eva-accession-clustering-automation/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# Pre-requisites -* Install the **ebi_eva_common_pyutils** module in your local python environment - ```bash - pip3 install -r requirements.txt - ``` - -# Usage -## Cluster multiple assemblies -The clustering automation script have the following parameters: -* **source\*:** The possible sources are Mongo or VCF -* **asm-vcf-prj-list:** Is a list of one or many assembly#vcf#project combinations. This is required if the source is VCF -* **assembly-list:** Is a list of assemblies to process. This is required if the source is Mongo -* **private-config-xml-file\*:** Maven settings.xml file with the profiles that hold database connection data -* **profile\*:** Profile to run the pipeline. e.g. production -* **output-directory:** Directory where the generated files will be stored -* **logs-directory:** Directory where the logs will be stored -* **clustering-artifact\*:** Clustering artifact path is the latest version of the clustering pipeline -* **only-printing:** Is a flag to only get the commands but not run them -* **memory:** Amount of memory to use when running the clustering jobs - - -## Examples -* Example using Mongo as source - ```bash - python3 path/to/eva-accession/eva-accession-clustering-automation/cluster_multiple_assemblies.py --source MONGO --assembly-list GCA_000233375.4 GCA_000002285.2 --output-directory /output/clustering_automation --logs-directory /output/logs --only-printing --clustering-artifact cluster.jar --profile production --private-config-xml-file /configuration/eva-maven-settings.xml - ``` - -* Example using VCF as source - ```bash - python3 path/to/eva-accession/eva-accession-clustering-automation/cluster_multiple_assemblies.py --source VCF --asm-vcf-prj-list GCA_000233375.4#/nfs/eva/accessioned.vcf.gz#PRJEB1111 GCA_000002285.2#/nfs/eva/file.vcf.gz#PRJEB2222 --output-directory /output/clustering_automation --logs-directory /output/logs --only-printing --clustering-artifact cluster.jar --profile production --private-config-xml-file /configuration/eva-maven-settings.xml - ``` - - -## Notes -* The **settings xml file** should be passed using the parameter --private-config-xml-file. If it is being run from the - EBI cluster deploy the configuration repository and point to the eva settings xml file. \ No newline at end of file diff --git a/eva-accession-clustering-automation/clustering_automation/__init__.py b/eva-accession-clustering-automation/clustering_automation/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/eva-accession-clustering-automation/clustering_automation/cluster_from_mongo.py b/eva-accession-clustering-automation/clustering_automation/cluster_from_mongo.py deleted file mode 100644 index 3e992ada9..000000000 --- a/eva-accession-clustering-automation/clustering_automation/cluster_from_mongo.py +++ /dev/null @@ -1,147 +0,0 @@ -# Copyright 2020 EMBL - European Bioinformatics Institute -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import argparse -import sys -import logging -import datetime -import yaml - -from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle -from ebi_eva_common_pyutils.nextflow import LinearNextFlowPipeline, NextFlowProcess -from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query - -from clustering_automation.create_clustering_properties import create_properties_file -from ebi_eva_common_pyutils.taxonomy.taxonomy import normalise_taxon_scientific_name - -logger = logging.getLogger(__name__) -timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - - -def get_assemblies_and_scientific_name_from_taxonomy(taxonomy_id, metadata_connection_handle, clustering_tracking_table, release_version): - query = (f"SELECT assembly_accession, scientific_name FROM {clustering_tracking_table} " - f"WHERE taxonomy = '{taxonomy_id}' " - f"and release_version = {release_version} " - f"and assembly_accession <> 'Unmapped' " - f"and should_be_clustered = 't'") - results = get_all_results_for_query(metadata_connection_handle, query) - if len(results) == 0: - raise Exception("Could not find assemblies pertaining to taxonomy ID: " + taxonomy_id) - return [result[0] for result in results], results[0][1] - - -def get_common_clustering_properties(common_clustering_properties_file): - return yaml.load(open(common_clustering_properties_file), Loader=yaml.FullLoader) - - -def generate_linear_pipeline(taxonomy_id, scientific_name, assembly_list, common_properties, memory, instance, enable_retryable): - private_config_xml_file = common_properties["private-config-xml-file"] - profile = common_properties["profile"] - clustering_artifact = common_properties["clustering-jar-path"] - python = common_properties["python3-path"] - release_version = common_properties['release-version'] - clustering_folder = common_properties['clustering-folder'] - clustering_tracking_table = common_properties['clustering-release-tracker'] - - pipeline = LinearNextFlowPipeline() - species_directory = os.path.join(clustering_folder, f"{normalise_taxon_scientific_name(scientific_name)}_{taxonomy_id}") - for assembly in assembly_list: - output_directory = os.path.join(species_directory, assembly) - os.makedirs(output_directory, exist_ok=True) - properties_path = create_properties_file('MONGO', None, None, assembly, - private_config_xml_file, profile, output_directory, instance, enable_retryable) - status_update_template = (f'{python} -m clustering_automation.update_clustering_status ' - f'--private-config-xml-file {private_config_xml_file} ' - f'--clustering-tracking-table {clustering_tracking_table} ' - f'--release {release_version} ' - f'--assembly {assembly} ' - f'--taxonomy {taxonomy_id} ' - '--status {status}') # will be filled in later - - suffix = assembly.replace('.', '_') - pipeline.add_process( - process_name=f'start_{suffix}', - command_to_run=status_update_template.format(status='Started'), - ) - - process_directives_for_java_pipelines = {'memory': f'{memory} MB', - 'clusterOptions': (f'-o {output_directory}/cluster_{timestamp}.log ' - f'-e {output_directory}/cluster_{timestamp}.err')} - # Refer to ProcessRemappedVariantsWithRSJobConfiguration.java and ClusterUnclusteredVariantsJobConfiguration.java - # for descriptions and rationale for 2 separate jobs - # Access to internal method _add_new_process needed for process_directives - pipeline._add_new_process(NextFlowProcess( - process_name=f'process_remapped_variants_with_rs_{suffix}', - command_to_run=f'java -Xmx{memory}m -jar {clustering_artifact} --spring.config.location=file:{properties_path} ' - f'--spring.batch.job.names=PROCESS_REMAPPED_VARIANTS_WITH_RS_JOB', - process_directives='clusterOptions': f"{process_directives_for_java_pipelines['clusterOptions']}" - f" -g /accession "} # Limits the overall number of jobs using job tracker - )) - pipeline._add_new_process(NextFlowProcess( - process_name=f'cluster_{suffix}', - command_to_run=f'java -Xmx{memory}m -jar {clustering_artifact} --spring.config.location=file:{properties_path} ' - f'--spring.batch.job.names=CLUSTER_UNCLUSTERED_VARIANTS_JOB', - process_directives={'memory': process_directives_for_java_pipelines['memory'], - 'clusterOptions': f"{process_directives_for_java_pipelines['clusterOptions']}" - f" -g /accession/instance-{instance} "} # needed to serialize accessioning - )) - pipeline.add_process( - process_name=f'end_{suffix}', - command_to_run=status_update_template.format(status='Completed') # TODO: how to choose completed/failed? - ) - # TODO add QA process - return pipeline, species_directory - - -def cluster_multiple_from_mongo(taxonomy_id, common_clustering_properties_file, memory, instance, enable_retryable): - """ - Generates and runs a Nextflow pipeline to cluster all assemblies for a given taxonomy. - """ - common_properties = get_common_clustering_properties(common_clustering_properties_file) - clustering_tracking_table = common_properties["clustering-release-tracker"] - release_version = common_properties["release-version"] - clustering_folder = common_properties['clustering-folder'] - with get_metadata_connection_handle("production_processing", common_properties["private-config-xml-file"]) as metadata_connection_handle: - assembly_list, scientific_name = get_assemblies_and_scientific_name_from_taxonomy(taxonomy_id, metadata_connection_handle, clustering_tracking_table, release_version) - pipeline, output_directory = generate_linear_pipeline(taxonomy_id, scientific_name, assembly_list, common_properties, memory, instance, enable_retryable) - pipeline.run_pipeline( - workflow_file_path=os.path.join(clustering_folder, f'{taxonomy_id}_clustering_workflow_{timestamp}.nf'), - nextflow_binary_path=common_properties['nextflow-binary-path'], - nextflow_config_path=common_properties['nextflow-config-path'], - working_dir=output_directory - ) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Cluster multiple assemblies', add_help=False) - parser.add_argument("--taxonomy-id", help="Taxonomy id", required=True) - parser.add_argument("--common-clustering-properties-file", help="ex: /path/to/clustering/properties.yml", required=True) - parser.add_argument("--memory", help="Amount of memory jobs will use", required=False, default=8192) - parser.add_argument("--instance", help="Accessioning instance id", required=False, default=6, - type=int, choices=range(1, 13)) - parser.add_argument("--enable-retryable", help="Set the clustering to use the retryable reader", default=False, - action='store_true') - parser.add_argument('--help', action='help', help='Show this help message and exit') - - args = {} - try: - args = parser.parse_args() - cluster_multiple_from_mongo(args.taxonomy_id, args.common_clustering_properties_file, args.memory, - args.instance, args.enable_retryable) - except Exception as ex: - logger.exception(ex) - sys.exit(1) - - sys.exit(0) diff --git a/eva-accession-clustering-automation/clustering_automation/cluster_from_vcf.py b/eva-accession-clustering-automation/clustering_automation/cluster_from_vcf.py deleted file mode 100644 index b4fd117e5..000000000 --- a/eva-accession-clustering-automation/clustering_automation/cluster_from_vcf.py +++ /dev/null @@ -1,125 +0,0 @@ -# Copyright 2020 EMBL - European Bioinformatics Institute -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import argparse -import sys -import logging -import datetime -from ebi_eva_common_pyutils.command_utils import run_command_with_output - -from create_clustering_properties import create_properties_file - - -logger = logging.getLogger(__name__) -timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - - -def generate_bsub_command(assembly_accession, properties_path, logs_directory, clustering_artifact, memory, dependency): - job_name = get_job_name(assembly_accession) - log_file = '{assembly_accession}_cluster_{timestamp}.log'.format(assembly_accession=assembly_accession, - timestamp=timestamp) - error_file = '{assembly_accession}_cluster_{timestamp}.err'.format(assembly_accession=assembly_accession, - timestamp=timestamp) - if logs_directory: - log_file = os.path.join(logs_directory, log_file) - error_file = os.path.join(logs_directory, error_file) - - memory_amount = 8192 - if memory: - memory_amount = memory - - dependency_param = '' - if dependency: - dependency_param = '-w {dependency} '.format(dependency=dependency) - - command = 'bsub {dependency_param}-J {job_name} -o {log_file} -e {error_file} -M {memory_amount} ' \ - '-R "rusage[mem={memory_amount}]" java -jar {clustering_artifact} ' \ - '--spring.config.location=file:{properties_path} --spring.batch.job.names=CLUSTERING_FROM_VCF_JOB'\ - .format(dependency_param=dependency_param, job_name=job_name, log_file=log_file, error_file=error_file, - memory_amount=memory_amount, clustering_artifact=clustering_artifact, properties_path=properties_path) - - print(command) - add_to_command_file(properties_path, command) - return command - - -def get_job_name(assembly_accession): - return '{timestamp}_cluster_{assembly_accession}'.format(assembly_accession=assembly_accession, timestamp=timestamp) - - -def add_to_command_file(properties_path, command): - """ - This method writes the commands to a text file in the output folder - """ - commands_path = os.path.dirname(properties_path) + '/commands_' + timestamp + '.txt' - with open(commands_path, 'a+') as commands: - commands.write(command + '\n') - - -def cluster_one(vcf_file, project_accession, assembly_accession, private_config_xml_file, profile, - output_directory, logs_directory, clustering_artifact, only_printing, memory, instance, dependency): - properties_path = create_properties_file('VCF', vcf_file, project_accession, assembly_accession, - private_config_xml_file, profile, output_directory, instance) - command = generate_bsub_command(assembly_accession, properties_path, logs_directory, clustering_artifact, memory, - dependency) - if not only_printing: - run_command_with_output('Run clustering command', command, return_process_output=True) - - -def cluster_multiple_from_vcf(asm_vcf_prj_list, private_config_xml_file, profile, - output_directory, logs_directory, clustering_artifact, only_printing, memory, instance): - """ - The list will be of the form: GCA_000000001.1#/file1.vcf.gz#PRJEB1111 GCA_000000002.2#/file2.vcf.gz#PRJEB2222 ... - This method splits the triplets and then call the run_clustering method for each one - """ - dependency = None - for triplet in asm_vcf_prj_list: - data = triplet.split('#') - assembly_accession = data[0] - vcf_file = data[1] - project_accession = data[2] - cluster_one(vcf_file, project_accession, assembly_accession, private_config_xml_file, profile, - output_directory, logs_directory, clustering_artifact, only_printing, memory, instance, dependency) - dependency = get_job_name(assembly_accession) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Cluster multiple assemblies', add_help=False) - parser.add_argument("--asm-vcf-prj-list", help="List of Assembly, VCF, project to be clustered, " - "e.g. GCA_000233375.4#/nfs/eva/accessioned.vcf.gz#PRJEB1111 " - "GCA_000002285.2#/nfs/eva/file.vcf.gz#PRJEB2222. " - "Required when the source is VCF", required=True, nargs='+') - parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) - parser.add_argument("--profile", help="Profile to get the properties, e.g.production", required=True) - parser.add_argument("--output-directory", help="Output directory for the properties file", required=False) - parser.add_argument("--logs-directory", help="Directory for logs files", required=False) - parser.add_argument("--clustering-artifact", help="Artifact of the clustering pipeline", required=True) - parser.add_argument("--only-printing", help="Prepare and write the commands, but don't run them", - action='store_true', required=False) - parser.add_argument("--memory", help="Amount of memory jobs will use", required=False, default=8192) - parser.add_argument("--instance", help="Accessioning instance id", required=False, default=1, choices=range(1, 13)) - parser.add_argument('--help', action='help', help='Show this help message and exit') - - args = {} - try: - args = parser.parse_args() - cluster_multiple_from_vcf(args.asm_vcf_prj_list, args.private_config_xml_file, - args.profile, args.output_directory, args.logs_directory, args.clustering_artifact, - args.only_printing, args.memory, args.instance) - except Exception as ex: - logger.exception(ex) - sys.exit(1) - - sys.exit(0) diff --git a/eva-accession-clustering-automation/clustering_automation/create_clustering_properties.py b/eva-accession-clustering-automation/clustering_automation/create_clustering_properties.py deleted file mode 100644 index c9f66736f..000000000 --- a/eva-accession-clustering-automation/clustering_automation/create_clustering_properties.py +++ /dev/null @@ -1,189 +0,0 @@ -# Copyright 2020 EMBL - European Bioinformatics Institute -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys -import argparse -import logging -from ebi_eva_common_pyutils.config_utils import get_properties_from_xml_file - -logger = logging.getLogger(__name__) - - -def create_properties_file(source, vcf_file, project_accession, assembly_accession, private_config_xml_file, profile, - output_directory, instance, enable_retryable=False): - """ - This method creates the application properties file - """ - check_vcf_source_requirements(source, vcf_file, project_accession) - properties = get_properties_from_xml_file(profile, private_config_xml_file) - path = get_properties_path(source, vcf_file, project_accession, assembly_accession, output_directory) - with open(path, 'w') as properties_file: - add_clustering_properties(properties_file, assembly_accession, project_accession, source, enable_retryable) - add_accessioning_properties(properties_file, instance) - add_count_service_properties(properties_file, properties) - add_mongo_properties(properties_file, properties) - add_job_tracker_properties(properties_file, properties) - add_spring_properties(properties_file) - return path - - -def get_properties_path(source, vcf_file, project_accession, assembly_accession, output_directory): - path = output_directory + '/' + assembly_accession - if source.upper() == 'VCF': - path += '_' + os.path.basename(vcf_file) + '_' + project_accession - path += '.properties' - return path - - -def add_clustering_properties(properties_file, assembly_accession, project_accession, vcf_file, enable_retryable): - vcf = vcf_file or '' - project = project_accession or '' - - clustering_properties = (f""" -parameters.assemblyAccession={assembly_accession} -parameters.remappedFrom= -parameters.vcf={vcf} -parameters.projectAccession={project} -parameters.allowRetry={str(enable_retryable).lower()} -parameters.projects= -parameters.rsReportPath={assembly_accession}_rs_report.txt -""") - properties_file.write(clustering_properties) - - -def add_accessioning_properties(properties_file, instance): - properties_file.write(f""" -parameters.chunkSize=200 - -accessioning.instanceId=instance-{instance} -accessioning.submitted.categoryId=ss -accessioning.clustered.categoryId=rs - -accessioning.monotonic.ss.blockSize=100000 -accessioning.monotonic.ss.blockStartValue=5000000000 -accessioning.monotonic.ss.nextBlockInterval=1000000000 -accessioning.monotonic.rs.blockSize=100000 -accessioning.monotonic.rs.blockStartValue=3000000000 -accessioning.monotonic.rs.nextBlockInterval=1000000000 -""") - - -def add_spring_properties(properties_file): - properties_file.write(""" -#See https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.1-Release-Notes#bean-overriding -spring.main.allow-bean-definition-overriding=true -#As this is a spring batch application, disable the embedded tomcat. This is the new way to do that for spring 2. -spring.main.web-application-type=none - -# This entry is put just to avoid a warning message in the logs when you start the spring-boot application. -# This bug is from hibernate which tries to retrieve some metadata from postgresql db and failed to find that and logs as a warning -# It doesnt cause any issue though. -spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation = true""") - - -def add_mongo_properties(properties_file, properties): - mongo_hosts_and_ports = str(properties['eva.mongo.host']) - mongo_host, mongo_port = get_mongo_primary_host_and_port(mongo_hosts_and_ports) - mongo_database = str(properties['eva.accession.mongo.database']) - mongo_username = str(properties['eva.mongo.user']) - mongo_password = str(properties['eva.mongo.passwd']) - - mongo_properties = (""" -spring.data.mongodb.host={host} -spring.data.mongodb.port={port} -spring.data.mongodb.database={database} -spring.data.mongodb.username={username} -spring.data.mongodb.password={password} -spring.data.mongodb.authentication-database=admin -mongodb.read-preference=primary - """).format(database=mongo_database, username=mongo_username, password=mongo_password, host=mongo_host, - port=mongo_port) - properties_file.write(mongo_properties) - -def add_count_service_properties(properties_file, properties): - count_service_url = str(properties['eva.count-stats.url']) - count_service_username = str(properties['eva.count-stats.username']) - count_service_password = str(properties['eva.count-stats.password']) - - count_service_properties = (""" -eva.count-stats.url={url} -eva.count-stats.username={username} -eva.count-stats.password={password} - """).format(url=count_service_url, username=count_service_username, password=count_service_password) - properties_file.write(count_service_properties) - - -def get_mongo_primary_host_and_port(mongo_hosts_and_ports): - """ - :param mongo_hosts_and_ports: All host and ports stored in the private settings xml - :return: mongo primary host and port - """ - for host_and_port in mongo_hosts_and_ports.split(','): - if '001' in host_and_port: - properties = host_and_port.split(':') - return properties[0], properties[1] - - -def add_job_tracker_properties(properties_file, properties): - postgres_url = str(properties['eva.accession.jdbc.url']) - postgres_username = str(properties['eva.accession.user']) - postgres_password = str(properties['eva.accession.password']) - - postgres_properties = (""" -spring.datasource.driver-class-name=org.postgresql.Driver -spring.datasource.url={postgres_url} -spring.datasource.username={postgres_username} -spring.datasource.password={postgres_password} -spring.datasource.tomcat.max-active=3 - """).format(postgres_url=postgres_url, postgres_username=postgres_username, postgres_password=postgres_password) - properties_file.write(postgres_properties) - - -def check_vcf_source_requirements(source, vcf_file, project_accession): - """ - This method checks that if the source is VCF the VCF file and project accession are provided - """ - if source == 'VCF' and not (vcf_file and project_accession): - raise ValueError('If the source is VCF the file path and project accession must be provided') - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Create clustering properties file', add_help=False) - parser.add_argument("--source", help="mongo database or VCF", required=True, choices=['VCF', 'MONGO']) - parser.add_argument("--vcf-file", help="Path to the VCF file, required when the source is VCF", required=False) - parser.add_argument("--project-accession", help="Project accession, required when the source is VCF", - required=False) - parser.add_argument("--assembly-accession", help="Assembly for which the process has to be run, " - "e.g. GCA_000002285.2", required=True) - parser.add_argument("--instance", help="Accessioning instance id", required=False, default=1, - type=int, choices=range(1, 13)) - parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) - parser.add_argument("--profile", help="Profile to get the properties, e.g.production", required=True) - parser.add_argument("--output-directory", help="Output directory for the properties file", required=False) - parser.add_argument("--enable-retryable", help="Set the clustering to use the retryable reader", default=False, - action='store_true') - parser.add_argument('--help', action='help', help='Show this help message and exit') - - args = {} - try: - args = parser.parse_args() - create_properties_file(args.source, args.vcf_file, args.project_accession, args.assembly_accession, - args.private_config_xml_file, args.profile, args.output_directory, args.instance, - args.enable_retryable) - except Exception as ex: - logger.exception(ex) - sys.exit(1) - - sys.exit(0) diff --git a/eva-accession-clustering-automation/clustering_automation/update_clustering_status.py b/eva-accession-clustering-automation/clustering_automation/update_clustering_status.py deleted file mode 100644 index 19bc73ddc..000000000 --- a/eva-accession-clustering-automation/clustering_automation/update_clustering_status.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2021 EMBL - European Bioinformatics Institute -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import argparse -import datetime -import logging - -from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle -from ebi_eva_common_pyutils.pg_utils import execute_query - - -logger = logging.getLogger(__name__) -timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - - -def set_clustering_status(private_config_xml_file, clustering_tracking_table, assembly, tax_id, release_version, status): - now = datetime.datetime.now().isoformat() - update_status_query = f"UPDATE {clustering_tracking_table} " - update_status_query += f"SET clustering_status='{status}'" - if status == 'Started': - update_status_query += f", clustering_start='{now}'" - elif status == 'Completed': - update_status_query += f", clustering_end='{now}'" - update_status_query += (f" WHERE assembly_accession='{assembly}' AND taxonomy='{tax_id}' " - f"AND release_version={release_version}") - with get_metadata_connection_handle("production_processing", private_config_xml_file) as metadata_connection_handle: - execute_query(metadata_connection_handle, update_status_query) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Update clustering progress', add_help=False) - parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) - parser.add_argument("--clustering-tracking-table", help="", required=True) - parser.add_argument("--release", help="Release version", required=True) - parser.add_argument("--assembly", help="Assembly accession", required=True) - parser.add_argument("--taxonomy", help="Taxonomy id", required=True) - parser.add_argument("--status", help="Status to set", required=True, choices=["Started", "Completed", "Failed"]) - args = {} - try: - args = parser.parse_args() - set_clustering_status(args.private_config_xml_file, args.clustering_tracking_table, args.assembly, args.taxonomy, args.release, args.status) - except Exception as ex: - logger.exception(ex) - sys.exit(1) - - sys.exit(0) diff --git a/eva-accession-clustering-automation/requirements.txt b/eva-accession-clustering-automation/requirements.txt deleted file mode 100644 index 3e2b31ef6..000000000 --- a/eva-accession-clustering-automation/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -ebi_eva_common_pyutils==0.3.16 -retry diff --git a/eva-accession-clustering-automation/setup.py b/eva-accession-clustering-automation/setup.py deleted file mode 100644 index 8de946510..000000000 --- a/eva-accession-clustering-automation/setup.py +++ /dev/null @@ -1,19 +0,0 @@ -import os -from setuptools import find_packages, setup - - -def get_requires(): - requires = [] - with open(os.path.join(os.path.dirname(__file__), "requirements.txt"), "rt") as req_file: - for line in req_file: - requires.append(line.rstrip()) - return requires - - -setup(name='clustering_automation', - version='0.0.1', - packages=find_packages(), - install_requires=get_requires(), - tests_require=get_requires(), - setup_requires=get_requires() -) diff --git a/eva-accession-clustering-automation/clustering_automation/create_clustering_progress_table.py b/eva-accession-release-automation/create_clustering_progress_table.py similarity index 100% rename from eva-accession-clustering-automation/clustering_automation/create_clustering_progress_table.py rename to eva-accession-release-automation/create_clustering_progress_table.py From 4e096b0c9549a201011367d494b508d548d8aeea Mon Sep 17 00:00:00 2001 From: April Shen Date: Mon, 25 Mar 2024 13:40:09 +0000 Subject: [PATCH 2/4] refactor --- .../create_clustering_progress_table.py | 318 ------------------ .../create_release_tracking_table.py | 304 +++++++++++++++++ 2 files changed, 304 insertions(+), 318 deletions(-) delete mode 100644 eva-accession-release-automation/create_clustering_progress_table.py create mode 100644 eva-accession-release-automation/create_release_tracking_table.py diff --git a/eva-accession-release-automation/create_clustering_progress_table.py b/eva-accession-release-automation/create_clustering_progress_table.py deleted file mode 100644 index f4fe1f19a..000000000 --- a/eva-accession-release-automation/create_clustering_progress_table.py +++ /dev/null @@ -1,318 +0,0 @@ -#!/usr/bin/env python -# Copyright 2020 EMBL - European Bioinformatics Institute -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import argparse -from collections import defaultdict -from itertools import cycle - -from ebi_eva_common_pyutils.assembly import NCBIAssembly -from ebi_eva_common_pyutils.config_utils import get_mongo_uri_for_eva_profile -from ebi_eva_common_pyutils.logger import logging_config -from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle -from ebi_eva_common_pyutils.mongodb import MongoDatabase -from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query, execute_query -from ebi_eva_common_pyutils.taxonomy.taxonomy import normalise_taxon_scientific_name, get_scientific_name_from_ensembl - -logger = logging_config.get_logger(__name__) - -# round-robin through the instances from 1 to 10 -tempmongo_instances = cycle([f'tempmongo-{instance}' for instance in range(1, 11)]) - -all_tasks = ['fill_release_entries', 'fill_should_be_released'] - -def create_table_if_not_exists(private_config_xml_file): - query_create_table = ( - 'create table if not exists eva_progress_tracker.clustering_release_tracker(' - 'taxonomy int4 not null, ' - 'scientific_name text not null, ' - 'assembly_accession text not null, ' - 'release_version int8 not null, ' - 'sources text not null,' - 'clustering_status text null, ' - 'clustering_start timestamp null, ' - 'clustering_end timestamp null, ' - 'should_be_clustered boolean null, ' - 'fasta_path text null, ' - 'report_path text null, ' - 'tempmongo_instance text null, ' - 'should_be_released boolean null, ' - 'num_rs_to_release int8 null, ' - 'total_num_variants int8 null, ' - 'release_folder_name text null, ' - 'release_status text null, ' - 'primary key (taxonomy, assembly_accession, release_version))' - ) - with get_metadata_connection_handle("production_processing", private_config_xml_file) as pg_conn: - execute_query(pg_conn, query_create_table) - - -def fill_in_from_previous_release(private_config_xml_file, profile, curr_release_version, ref_dir): - query = f"""select taxonomy, scientific_name, assembly_accession, sources, fasta_path, report_path, - release_folder_name from eva_progress_tracker.clustering_release_tracker - where release_version = {curr_release_version - 1}""" - with get_metadata_connection_handle("production_processing", private_config_xml_file) as pg_conn: - for tax, sc_name, asm_acc, src, fs_path, rpt_path, rls_folder_name in get_all_results_for_query(pg_conn, query): - insert_entry_for_taxonomy_and_assembly(private_config_xml_file, profile, ref_dir, tax, asm_acc, - curr_release_version, src, sc_name, fs_path, rpt_path, - rls_folder_name) - - -def fill_in_from_eva(private_config_xml_file, profile, release_version, ref_dir): - query = f"""select distinct pt.taxonomy_id as taxonomy, asm.assembly_accession as assembly_accession - from evapro.project_taxonomy pt - join evapro.project_analysis pa on pt.project_accession = pa.project_accession - join evapro.analysis a on a.analysis_accession = pa.analysis_accession - join evapro.assembly asm on asm.assembly_set_id = a.assembly_set_id - and asm.assembly_accession is not null and assembly_accession like 'GCA%'""" - - with get_metadata_connection_handle("production_processing", private_config_xml_file) as pg_conn: - sources = 'EVA' - for tax, asm_acc in get_all_results_for_query(pg_conn, query): - insert_entry_for_taxonomy_and_assembly(private_config_xml_file, profile, ref_dir, tax, asm_acc, - release_version, sources) - - -def fill_in_from_supported_assembly_tracker(private_config_xml_file, profile, release_version, ref_dir): - query = f"""select distinct taxonomy_id as taxonomy, assembly_id as assembly_accession - from evapro.supported_assembly_tracker sat""" - - with get_metadata_connection_handle("production_processing", private_config_xml_file) as pg_conn: - sources = 'DBSNP, EVA' - for tax, asm_acc in get_all_results_for_query(pg_conn, query): - insert_entry_for_taxonomy_and_assembly(private_config_xml_file, profile, ref_dir, tax, asm_acc, - release_version, sources) - - -def insert_entry_for_taxonomy_and_assembly(private_config_xml_file, profile, ref_dir, tax, asm_acc, release_version, - sources, sc_name=None, fasta_path=None, report_path=None, - release_folder_name=None): - with get_metadata_connection_handle(profile, private_config_xml_file) as pg_conn: - sc_name = sc_name if sc_name else get_scientific_name_from_ensembl(tax) - if asm_acc != 'Unmapped': - ncbi_assembly = NCBIAssembly(asm_acc, sc_name, ref_dir) - fasta_path = fasta_path if fasta_path else ncbi_assembly.assembly_fasta_path - report_path = report_path if report_path else ncbi_assembly.assembly_report_path - release_folder_name = release_folder_name if release_folder_name else normalise_taxon_scientific_name(sc_name) - - tempongo_instance = next(tempmongo_instances) - src_in_db = get_sources_for_taxonomy_assembly(private_config_xml_file, profile, release_version, tax, asm_acc) - - if not src_in_db: - # entry does not exist for tax and asm - insert_query = f"""INSERT INTO eva_progress_tracker.clustering_release_tracker( - taxonomy, scientific_name, assembly_accession, release_version, sources, - fasta_path, report_path, tempmongo_instance, release_folder_name) - VALUES ({tax}, '{sc_name}', '{asm_acc}', {release_version}, '{sources}', - '{fasta_path}', '{report_path}', '{tempongo_instance}', '{release_folder_name}') - ON CONFLICT DO NOTHING""" - - execute_query(pg_conn, insert_query) - else: - # if DB source is equal to what we are trying to insert or if the DB source already contains both EVA and DBSNP - # no need to insert again - if src_in_db == sources or ('EVA' in src_in_db and 'DBSNP' in src_in_db): - logger.info(f"Entry already present for taxonomy {tax} and assembly {asm_acc} with sources {sources}") - else: - # We have different sources which means we need to update entry to have both DBNSP and EVA in sources - update_query = f"""update eva_progress_tracker.clustering_release_tracker set sources='DBSNP, EVA' - where taxonomy={tax} and assembly_accession='{asm_acc}' and - release_version={release_version}""" - - execute_query(pg_conn, update_query) - - -def get_assembly_list_for_taxonomy_for_release(private_config_xml_file, profile, release_version, taxonomy): - assembly_source = {} - with get_metadata_connection_handle(profile, private_config_xml_file) as pg_conn: - query = f"""SELECT distinct assembly_accession, sources from eva_progress_tracker.clustering_release_tracker - where taxonomy = {taxonomy} and release_version = {release_version}""" - for assembly, sources in get_all_results_for_query(pg_conn, query): - assembly_source[assembly] = sources - - return assembly_source - - -def get_taxonomy_list_for_release(private_config_xml_file, profile, release_version): - tax_asm = defaultdict(defaultdict) - query = f"""select distinct taxonomy, assembly_accession, sources - from eva_progress_tracker.clustering_release_tracker - where release_version={release_version}""" - with get_metadata_connection_handle(profile, private_config_xml_file) as pg_conn: - for tax, asm_acc, sources in get_all_results_for_query(pg_conn, query): - tax_asm[tax][asm_acc] = sources - return tax_asm - - -def get_sources_for_taxonomy_assembly(private_config_xml_file, profile, release_version, taxonomy, assembly): - with get_metadata_connection_handle(profile, private_config_xml_file) as pg_conn: - query = f"""SELECT sources from eva_progress_tracker.clustering_release_tracker - where taxonomy = {taxonomy} and assembly_accession='{assembly}' - and release_version = {release_version}""" - - result = get_all_results_for_query(pg_conn, query) - if not result: - return None - else: - return result[0][0] - - -def determine_should_be_released_for_coll(mongo_source, tax, asm, ss_coll, rs_coll, ss_query, rs_query): - logger.info(f"Looking for SS with RS for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}") - collection = mongo_source.mongo_handle[mongo_source.db_name][ss_coll] - ss_with_rs = collection.find_one(ss_query) - if ss_with_rs: - logger.info(f'Found SS with RS for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}, SS: {ss_with_rs}') - return True - else: - logger.warning(f'No SS with RS found for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}') - - # Looking for RS if no SS with RS is found, for cases where there might not be a variant in SS, but there might be a - # RS in corresponding CVE collection. - # (For release we will look up against both dbsnpSVE and SVE for records in a given EVA or dbSNP CVE collection but - # only if we mark the sources in the release table, see below - # https://github.com/EBIvariation/eva-accession/blob/5f827ae8f062ae923a83c16070f6ebf08c544e31/eva-accession-release/src/main/java/uk/ac/ebi/eva/accession/release/batch/io/active/AccessionedVariantMongoReader.java#L83)) - logger.info(f"Looking for RS with Taxonomy {tax} and Assembly {asm} in collection {rs_coll}") - collection = mongo_source.mongo_handle[mongo_source.db_name][rs_coll] - rs_with_tax_asm = collection.find_one(rs_query) - if rs_with_tax_asm: - logger.info(f'Found RS with Taxonomy {tax} and Assembly {asm} in collection {rs_coll}, RS: {rs_with_tax_asm}') - return True - else: - logger.warning(f'No RS found for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}') - return False - - -def determine_release_for_taxonomy_and_assembly(private_config_xml_file, tax, asm, src, profile, release_version, - mongo_source): - should_be_released_eva = should_be_released_dbsnp = False - if asm != 'Unmapped': - ss_query = {'tax': tax, 'seq': asm, 'rs': {'$exists': True}} - rs_query = {'tax': tax, 'asm': asm} - if 'EVA' in src: - eva_ss_coll = 'submittedVariantEntity' - eva_rs_coll = 'clusteredVariantEntity' - should_be_released_eva = determine_should_be_released_for_coll(mongo_source, tax, asm, eva_ss_coll, - eva_rs_coll, ss_query, rs_query) - if 'DBSNP' in src: - dbsnp_ss_coll = 'dbsnpSubmittedVariantEntity' - dbsnp_rs_coll = 'dbsnpClusteredVariantEntity' - should_be_released_dbsnp = determine_should_be_released_for_coll(mongo_source, tax, asm, dbsnp_ss_coll, - dbsnp_rs_coll, ss_query, rs_query) - - should_be_released = should_be_released_eva or should_be_released_dbsnp - else: - should_be_released = False - - logger.info(f"For taxonomy {tax} and assembly {asm}, should_be_released is {should_be_released} " - f"(should_be_released_eva = {should_be_released_eva}, " - f"should_be_released_dbsnp = {should_be_released_dbsnp})") - num_rs_to_release = 1 if should_be_released else 0 - - with get_metadata_connection_handle(profile, private_config_xml_file) as pg_conn: - update_should_be_released_query = f"""update eva_progress_tracker.clustering_release_tracker - set should_be_released={should_be_released}, num_rs_to_release={num_rs_to_release} - where taxonomy={tax} and assembly_accession='{asm}' and release_version={release_version}""" - - execute_query(pg_conn, update_should_be_released_query) - - # for any taxonomy and assembly, if sources have both DBSNP and EVA but one of them does not have any variants - # to release, then remove that from the sources - if should_be_released and ('DBSNP' in src and 'EVA' in src): - if not should_be_released_dbsnp or not should_be_released_eva: - if should_be_released_eva: - logger.info(f"For taxonomy {tax} and assembly {asm}, " - f"putting the source as EVA as DBSNP does not have any variants to release") - sources = 'EVA' - elif should_be_released_dbsnp: - logger.info(f"For taxonomy {tax} and assembly {asm}, " - f"putting the source as DBSNP as EVA does not have any variants to release") - sources = 'DBSNP' - - logger.info(f"For tax {tax} and assembly {asm} Updating sources to {sources}") - - update_sources_query = f"""update eva_progress_tracker.clustering_release_tracker - set sources='{sources}' where taxonomy={tax} and assembly_accession='{asm}' - and release_version={release_version}""" - execute_query(pg_conn, update_sources_query) - - -def fill_should_be_released_for_taxonomy(private_config_xml_file, tax, asm_sources, profile, release_version, - mongo_source): - for asm_acc in asm_sources: - determine_release_for_taxonomy_and_assembly(private_config_xml_file, tax, asm_acc, asm_sources[asm_acc], - profile, release_version, mongo_source) - - -def fill_should_be_released_for_all_in_release(private_config_xml_file, profile, release_version, mongo_source): - tax_asm = get_taxonomy_list_for_release(private_config_xml_file, profile, release_version) - for tax in tax_asm: - fill_should_be_released_for_taxonomy(private_config_xml_file, tax, tax_asm[tax], profile, release_version, - mongo_source) - - -def main(): - parser = argparse.ArgumentParser(description='Create and load the clustering and release tracking table', - add_help=False) - parser.add_argument("--private-config-xml-file", required=True, help="ex: /path/to/eva-maven-settings.xml") - parser.add_argument("--release-version", required=True, type=int, help="version of the release") - parser.add_argument("--reference-directory", required=True, - help="Directory where the reference genomes exists or should be downloaded") - parser.add_argument("--profile", required=True, help="profile where entries should be made e.g. development") - parser.add_argument('--tasks', required=False, type=str, nargs='+', choices=all_tasks, - help='Task or set of tasks to perform') - parser.add_argument("--taxonomy", required=False, type=int, - help="taxonomy id for which should be released needs to be updated") - parser.add_argument("--assembly", required=False, - help="assembly accession for which should be released needs to be updated") - parser.add_argument('--help', action='help', help='Show this help message and exit') - args = parser.parse_args() - - logging_config.add_stdout_handler() - - if not args.tasks: - args.tasks = ['fill_release_entries'] - - create_table_if_not_exists(args.private_config_xml_file) - - mongo_source_uri = get_mongo_uri_for_eva_profile('production', args.private_config_xml_file) - mongo_source = MongoDatabase(uri=mongo_source_uri, db_name="eva_accession_sharded") - - if 'fill_release_entries' in args.tasks: - fill_in_from_previous_release(args.private_config_xml_file, args.profile, args.release_version, - args.reference_directory) - fill_in_from_eva(args.private_config_xml_file, args.profile, args.release_version, args.reference_directory) - fill_in_from_supported_assembly_tracker(args.private_config_xml_file, args.profile, args.release_version, - args.reference_directory) - - fill_should_be_released_for_all_in_release(args.private_config_xml_file, args.profile, args.release_version, - mongo_source) - - if 'fill_should_be_released' in args.tasks: - if not args.taxonomy: - raise Exception("For running task 'fill_should_be_released', it is mandatory to provide --taxonomy") - if not args.assembly: - asm_list = get_assembly_list_for_taxonomy_for_release(args.private_config_xml_file, args.profile, - args.release_version, args.taxonomy) - fill_should_be_released_for_taxonomy(args.private_config_xml_file, args.taxonomy, asm_list, args.profile, - args.release_version, mongo_source) - else: - sources = get_sources_for_taxonomy_assembly(args.private_config_xml_file, args.profile, - args.release_version, args.taxonomy, args.assembly) - determine_release_for_taxonomy_and_assembly(args.private_config_xml_file, args.taxonomy, - args.assembly, sources, args.profile, - args.release_version, mongo_source) - - -if __name__ == '__main__': - main() diff --git a/eva-accession-release-automation/create_release_tracking_table.py b/eva-accession-release-automation/create_release_tracking_table.py new file mode 100644 index 000000000..d5aa6b70a --- /dev/null +++ b/eva-accession-release-automation/create_release_tracking_table.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python +# Copyright 2020 EMBL - European Bioinformatics Institute +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse +from collections import defaultdict +from functools import cached_property +from itertools import cycle + +from ebi_eva_common_pyutils.assembly import NCBIAssembly +from ebi_eva_common_pyutils.config_utils import get_mongo_uri_for_eva_profile +from ebi_eva_common_pyutils.logger import logging_config, AppLogger +from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle +from ebi_eva_common_pyutils.mongodb import MongoDatabase +from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query, execute_query +from ebi_eva_common_pyutils.taxonomy.taxonomy import normalise_taxon_scientific_name, get_scientific_name_from_ensembl + + +# round-robin through the instances from 1 to 10 +tempmongo_instances = cycle([f'tempmongo-{instance}' for instance in range(1, 11)]) + +all_tasks = ['fill_release_entries', 'fill_should_be_released'] + + +class ReleaseTracker(AppLogger): + + def __init__(self, private_config_xml_file, maven_profile, release_version, reference_directory): + self.private_config_xml_file = private_config_xml_file + self.maven_profile = maven_profile + self.release_version = release_version + self.ref_dir = reference_directory + + @cached_property + def metadata_conn(self): + return get_metadata_connection_handle(self.maven_profile, self.private_config_xml_file) + + @cached_property + def mongo_conn(self): + mongo_uri = get_mongo_uri_for_eva_profile(self.maven_profile, self.private_config_xml_file) + return MongoDatabase(uri=mongo_uri, db_name="eva_accession_sharded") + + def create_table_if_not_exists(self): + query_create_table = ( + 'create table if not exists eva_progress_tracker.clustering_release_tracker(' + 'taxonomy int4 not null, ' + 'scientific_name text not null, ' + 'assembly_accession text not null, ' + 'release_version int8 not null, ' + 'sources text not null,' + 'clustering_status text null, ' # unused + 'clustering_start timestamp null, ' # unused + 'clustering_end timestamp null, ' # unused + 'should_be_clustered boolean null, ' # unused + 'fasta_path text null, ' + 'report_path text null, ' + 'tempmongo_instance text null, ' + 'should_be_released boolean null, ' + 'num_rs_to_release int8 null, ' # not computed but still used by release automation + 'total_num_variants int8 null, ' # not computed and unused + 'release_folder_name text null, ' + 'release_status text null, ' + 'primary key (taxonomy, assembly_accession, release_version))' + ) + execute_query(self.metadata_conn, query_create_table) + + def fill_release_entries(self): + """Fill in release table based on previous release data, EVA metadata, and supported assembly tracker. + Also fills in should_be_released values.""" + self._fill_from_previous_release() + self._fill_from_eva_metadata() + self._fill_from_supported_assembly_tracker() + self.fill_should_be_released_for_all() + + def _fill_from_previous_release(self): + query = f"""select taxonomy, scientific_name, assembly_accession, sources, fasta_path, report_path, + release_folder_name from eva_progress_tracker.clustering_release_tracker + where release_version = {self.release_version - 1}""" + for tax, sc_name, asm_acc, src, fs_path, rpt_path, rls_folder_name in get_all_results_for_query( + self.metadata_conn, query): + self._insert_entry_for_taxonomy_and_assembly(tax, asm_acc, src, sc_name, fs_path, rpt_path, + rls_folder_name) + + def _fill_from_eva_metadata(self): + query = f"""select distinct pt.taxonomy_id as taxonomy, asm.assembly_accession as assembly_accession + from evapro.project_taxonomy pt + join evapro.project_analysis pa on pt.project_accession = pa.project_accession + join evapro.analysis a on a.analysis_accession = pa.analysis_accession + join evapro.assembly asm on asm.assembly_set_id = a.assembly_set_id + and asm.assembly_accession is not null and assembly_accession like 'GCA%'""" + sources = 'EVA' + for tax, asm_acc in get_all_results_for_query(self.metadata_conn, query): + self._insert_entry_for_taxonomy_and_assembly(tax, asm_acc, sources) + + def _fill_from_supported_assembly_tracker(self): + query = f"""select distinct taxonomy_id as taxonomy, assembly_id as assembly_accession + from evapro.supported_assembly_tracker""" + sources = 'DBSNP, EVA' + for tax, asm_acc in get_all_results_for_query(self.metadata_conn, query): + self._insert_entry_for_taxonomy_and_assembly(tax, asm_acc, sources) + + def _insert_entry_for_taxonomy_and_assembly(self, tax, asm_acc, sources, sc_name=None, fasta_path=None, + report_path=None, release_folder_name=None): + sc_name = sc_name if sc_name else get_scientific_name_from_ensembl(tax) + if asm_acc != 'Unmapped': + ncbi_assembly = NCBIAssembly(asm_acc, sc_name, self.ref_dir) + fasta_path = fasta_path if fasta_path else ncbi_assembly.assembly_fasta_path + report_path = report_path if report_path else ncbi_assembly.assembly_report_path + release_folder_name = release_folder_name if release_folder_name else normalise_taxon_scientific_name(sc_name) + + tempongo_instance = next(tempmongo_instances) + src_in_db = self.get_sources_for_taxonomy_assembly(tax, asm_acc) + + if not src_in_db: + # entry does not exist for tax and asm + insert_query = f"""INSERT INTO eva_progress_tracker.clustering_release_tracker( + taxonomy, scientific_name, assembly_accession, release_version, sources, + fasta_path, report_path, tempmongo_instance, release_folder_name) + VALUES ({tax}, '{sc_name}', '{asm_acc}', {self.release_version}, '{sources}', + '{fasta_path}', '{report_path}', '{tempongo_instance}', '{release_folder_name}') + ON CONFLICT DO NOTHING""" + execute_query(self.metadata_conn, insert_query) + else: + # if DB source is equal to what we are trying to insert or if the DB source already contains + # both EVA and DBSNP, no need to insert again + if src_in_db == sources or ('EVA' in src_in_db and 'DBSNP' in src_in_db): + self.info(f"Entry already present for taxonomy {tax} and assembly {asm_acc} with sources {sources}") + else: + # We have different sources which means we need to update entry to have both DBNSP and EVA in sources + update_query = f"""update eva_progress_tracker.clustering_release_tracker set sources='DBSNP, EVA' + where taxonomy={tax} and assembly_accession='{asm_acc}' and + release_version={self.release_version}""" + execute_query(self.metadata_conn, update_query) + + def fill_should_be_released_for_all(self): + tax_asm = self.get_taxonomy_list_for_release() + for tax in tax_asm: + self.fill_should_be_released_for_taxonomy(tax, tax_asm[tax]) + + def fill_should_be_released_for_taxonomy(self, tax, asm_sources): + for asm_acc in asm_sources: + self.fill_should_be_released_for_taxonomy_and_assembly(tax, asm_acc, asm_sources[asm_acc]) + + def fill_should_be_released_for_taxonomy_and_assembly(self, tax, asm, src): + """Fills should_be_released for a taxonomy/assembly pair based on whether there are current RS IDs, + and updates the sources column as well. + TODO Also check for deprecated (https://www.ebi.ac.uk/panda/jira/browse/EVA-3402)""" + should_be_released_eva = should_be_released_dbsnp = False + if asm != 'Unmapped': + ss_query = {'tax': tax, 'seq': asm, 'rs': {'$exists': True}} + rs_query = {'tax': tax, 'asm': asm} + if 'EVA' in src: + eva_ss_coll = 'submittedVariantEntity' + eva_rs_coll = 'clusteredVariantEntity' + should_be_released_eva = self._determine_should_be_released_for_collection( + tax, asm, eva_ss_coll, eva_rs_coll, ss_query, rs_query) + if 'DBSNP' in src: + dbsnp_ss_coll = 'dbsnpSubmittedVariantEntity' + dbsnp_rs_coll = 'dbsnpClusteredVariantEntity' + should_be_released_dbsnp = self._determine_should_be_released_for_collection( + tax, asm, dbsnp_ss_coll, dbsnp_rs_coll, ss_query, rs_query) + should_be_released = should_be_released_eva or should_be_released_dbsnp + else: + should_be_released = False + + self.info(f"For taxonomy {tax} and assembly {asm}, should_be_released is {should_be_released} " + f"(should_be_released_eva = {should_be_released_eva}, " + f"should_be_released_dbsnp = {should_be_released_dbsnp})") + num_rs_to_release = 1 if should_be_released else 0 + + update_should_be_released_query = f"""update eva_progress_tracker.clustering_release_tracker + set should_be_released={should_be_released}, num_rs_to_release={num_rs_to_release} + where taxonomy={tax} and assembly_accession='{asm}' and release_version={self.release_version}""" + execute_query(self.metadata_conn, update_should_be_released_query) + + # for any taxonomy and assembly, if sources have both DBSNP and EVA but one of them does not have any variants + # to release, then remove that from the sources + if should_be_released and ('DBSNP' in src and 'EVA' in src): + if not should_be_released_dbsnp or not should_be_released_eva: + if should_be_released_eva: + self.info(f"For taxonomy {tax} and assembly {asm}, " + f"putting the source as EVA as DBSNP does not have any variants to release") + sources = 'EVA' + elif should_be_released_dbsnp: + self.info(f"For taxonomy {tax} and assembly {asm}, " + f"putting the source as DBSNP as EVA does not have any variants to release") + sources = 'DBSNP' + + self.info(f"For tax {tax} and assembly {asm} Updating sources to {sources}") + update_sources_query = f"""update eva_progress_tracker.clustering_release_tracker + set sources='{sources}' where taxonomy={tax} and assembly_accession='{asm}' + and release_version={self.release_version}""" + execute_query(self.metadata_conn, update_sources_query) + + def _determine_should_be_released_for_collection(self, tax, asm, ss_coll, rs_coll, ss_query, rs_query): + self.info(f"Looking for SS with RS for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}") + collection = self.mongo_conn.mongo_handle[self.mongo_conn.db_name][ss_coll] + ss_with_rs = collection.find_one(ss_query) + if ss_with_rs: + self.info(f'Found SS with RS for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}, SS: {ss_with_rs}') + return True + else: + self.warning(f'No SS with RS found for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}') + + # Looking for RS if no SS with RS is found, for cases where there might not be a variant in SS, but there might + # be a RS in corresponding CVE collection. + # (For release we will look up against both dbsnpSVE and SVE for records in a given EVA or dbSNP CVE collection + # but only if we mark the sources in the release table, see below + # https://github.com/EBIvariation/eva-accession/blob/5f827ae8f062ae923a83c16070f6ebf08c544e31/eva-accession-release/src/main/java/uk/ac/ebi/eva/accession/release/batch/io/active/AccessionedVariantMongoReader.java#L83)) + self.info(f"Looking for RS with Taxonomy {tax} and Assembly {asm} in collection {rs_coll}") + collection = self.mongo_conn.mongo_handle[self.mongo_conn.db_name][rs_coll] + rs_with_tax_asm = collection.find_one(rs_query) + if rs_with_tax_asm: + self.info(f'Found RS with Taxonomy {tax} and Assembly {asm} in collection {rs_coll}, RS: {rs_with_tax_asm}') + return True + else: + self.warning(f'No RS found for Taxonomy {tax} and Assembly {asm} in collection {ss_coll}') + return False + + def get_taxonomy_list_for_release(self): + """Get all taxonomies with assemblies and sources for the current release version.""" + tax_asm = defaultdict(defaultdict) + query = f"""select distinct taxonomy, assembly_accession, sources + from eva_progress_tracker.clustering_release_tracker + where release_version={self.release_version}""" + for tax, asm_acc, sources in get_all_results_for_query(self.metadata_conn, query): + tax_asm[tax][asm_acc] = sources + return tax_asm + + def get_assemblies_and_sources_for_taxonomy(self, taxonomy): + assembly_source = {} + query = f"""SELECT distinct assembly_accession, sources from eva_progress_tracker.clustering_release_tracker + where taxonomy = {taxonomy} and release_version = {self.release_version}""" + for assembly, sources in get_all_results_for_query(self.metadata_conn, query): + assembly_source[assembly] = sources + return assembly_source + + def get_sources_for_taxonomy_assembly(self, taxonomy, assembly): + query = f"""SELECT sources from eva_progress_tracker.clustering_release_tracker + where taxonomy = {taxonomy} and assembly_accession='{assembly}' + and release_version = {self.release_version}""" + result = get_all_results_for_query(self.metadata_conn, query) + if not result: + return None + else: + return result[0][0] + + +def main(): + parser = argparse.ArgumentParser(description='Create and load the clustering and release tracking table', + add_help=False) + parser.add_argument("--private-config-xml-file", required=True, help="ex: /path/to/eva-maven-settings.xml") + parser.add_argument("--release-version", required=True, type=int, help="version of the release") + parser.add_argument("--reference-directory", required=True, + help="Directory where the reference genomes exists or should be downloaded") + parser.add_argument("--profile", required=True, help="Profile where entries should be made e.g. development") + parser.add_argument('--tasks', required=False, type=str, nargs='+', choices=all_tasks, + help='Task or set of tasks to perform') + parser.add_argument("--taxonomy", required=False, type=int, + help="taxonomy id for which should be released needs to be updated") + parser.add_argument("--assembly", required=False, + help="assembly accession for which should be released needs to be updated") + parser.add_argument('--help', action='help', help='Show this help message and exit') + args = parser.parse_args() + + logging_config.add_stdout_handler() + + if not args.tasks: + args.tasks = ['fill_release_entries'] + + release_tracker = ReleaseTracker( + private_config_xml_file=args.private_config_xml_file, + maven_profile=args.profile, + release_version=args.release_version, + reference_directory=args.reference_directory + ) + + release_tracker.create_table_if_not_exists() + + if 'fill_release_entries' in args.tasks: + release_tracker.fill_release_entries() + + if 'fill_should_be_released' in args.tasks: + if not args.taxonomy: + raise Exception("For running task 'fill_should_be_released', it is mandatory to provide --taxonomy") + if not args.assembly: + asm_and_sources = release_tracker.get_assemblies_and_sources_for_taxonomy(args.taxonomy) + release_tracker.fill_should_be_released_for_taxonomy(args.taxonomy, asm_and_sources) + else: + sources = release_tracker.get_sources_for_taxonomy_assembly(args.taxonomy, args.assembly) + release_tracker.fill_should_be_released_for_taxonomy_and_assembly(args.taxonomy, args.assembly, sources) + + +if __name__ == '__main__': + main() From 249847d3cee38c129193b1f5565adfccf6e6cf38 Mon Sep 17 00:00:00 2001 From: April Shen Date: Tue, 26 Mar 2024 13:51:43 +0000 Subject: [PATCH 3/4] fix on test --- .../create_release_tracking_table.py | 2 ++ 1 file changed, 2 insertions(+) rename eva-accession-release-automation/{ => run_release_in_embassy}/create_release_tracking_table.py (99%) diff --git a/eva-accession-release-automation/create_release_tracking_table.py b/eva-accession-release-automation/run_release_in_embassy/create_release_tracking_table.py similarity index 99% rename from eva-accession-release-automation/create_release_tracking_table.py rename to eva-accession-release-automation/run_release_in_embassy/create_release_tracking_table.py index d5aa6b70a..e02fbdb06 100644 --- a/eva-accession-release-automation/create_release_tracking_table.py +++ b/eva-accession-release-automation/run_release_in_embassy/create_release_tracking_table.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import argparse +import re from collections import defaultdict from functools import cached_property from itertools import cycle @@ -111,6 +112,7 @@ def _fill_from_supported_assembly_tracker(self): def _insert_entry_for_taxonomy_and_assembly(self, tax, asm_acc, sources, sc_name=None, fasta_path=None, report_path=None, release_folder_name=None): sc_name = sc_name if sc_name else get_scientific_name_from_ensembl(tax) + sc_name = re.sub("'", "\\'", sc_name) # a special fix to insert Ambystoma 'unisexual hybrid' verbatim if asm_acc != 'Unmapped': ncbi_assembly = NCBIAssembly(asm_acc, sc_name, self.ref_dir) fasta_path = fasta_path if fasta_path else ncbi_assembly.assembly_fasta_path From 0e3f90e1dc4183db46238c18b0999efaf8640220 Mon Sep 17 00:00:00 2001 From: April Shen Date: Tue, 26 Mar 2024 15:54:27 +0000 Subject: [PATCH 4/4] actual fix for single quote --- .../run_release_in_embassy/create_release_tracking_table.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eva-accession-release-automation/run_release_in_embassy/create_release_tracking_table.py b/eva-accession-release-automation/run_release_in_embassy/create_release_tracking_table.py index e02fbdb06..8329f2c2d 100644 --- a/eva-accession-release-automation/run_release_in_embassy/create_release_tracking_table.py +++ b/eva-accession-release-automation/run_release_in_embassy/create_release_tracking_table.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import argparse -import re from collections import defaultdict from functools import cached_property from itertools import cycle @@ -112,7 +111,7 @@ def _fill_from_supported_assembly_tracker(self): def _insert_entry_for_taxonomy_and_assembly(self, tax, asm_acc, sources, sc_name=None, fasta_path=None, report_path=None, release_folder_name=None): sc_name = sc_name if sc_name else get_scientific_name_from_ensembl(tax) - sc_name = re.sub("'", "\\'", sc_name) # a special fix to insert Ambystoma 'unisexual hybrid' verbatim + sc_name = sc_name.replace("'", "\''") if asm_acc != 'Unmapped': ncbi_assembly = NCBIAssembly(asm_acc, sc_name, self.ref_dir) fasta_path = fasta_path if fasta_path else ncbi_assembly.assembly_fasta_path