From b99dde49f7163256e523160d31ddf02a43f7100a Mon Sep 17 00:00:00 2001 From: tcezard Date: Tue, 31 Oct 2023 13:44:32 +0000 Subject: [PATCH 01/13] Initial persistence to db --- .../gather_release_counts.py | 86 +++++++++++++++++-- .../requirements.txt | 1 + 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 1ef0cc3cd..f9a274750 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -3,14 +3,18 @@ import glob import os from collections import defaultdict, Counter -from functools import lru_cache +from functools import lru_cache, cached_property +from typing import List +from urllib.parse import urlsplit from ebi_eva_common_pyutils.command_utils import run_command_with_output from ebi_eva_common_pyutils.common_utils import pretty_print +from ebi_eva_common_pyutils.config_utils import get_metadata_creds_for_profile from ebi_eva_common_pyutils.logger import logging_config, AppLogger from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query - +from sqlalchemy import Column, String, Integer, BigInteger, ForeignKey, Engine, create_engine, URL +from sqlalchemy.orm import declarative_base, Mapped, relationship, mapped_column, Session logger = logging_config.get_logger(__name__) @@ -141,10 +145,37 @@ def generate_output_tsv(dict_of_counter, output_file, header): ]) + '\n') +Base = declarative_base() + + +class RSCountCategory(Base): + """ + Table that provide the metadata associated with a number of RS id + """ + __tablename__ = 'rscountcatergory' + + assembly = Column(String, primary_key=True) + taxonomy = Column(Integer, primary_key=True) + rs_type = Column(String, primary_key=True) + release_version = Column(Integer, primary_key=True) + rs_count_id: Mapped[int] = mapped_column(ForeignKey("rscount.id")) + rs_count = Mapped["RSCount"] = relationship(back_populates="count_categories") + schema = 'eva_stats' + + +class RSCount(Base): + __tablename__ = 'eva_stats.rscount' + id = Column(Integer, primary_key=True, autoincrement=True) + count = Column(BigInteger) + count_categories: Mapped[List["RSCountCategory"]] = relationship(back_populates="parent") + schema = 'eva_stats' + + class ReleaseCounter(AppLogger): - def __init__(self, private_config_xml_file, release_version, logs): + def __init__(self, private_config_xml_file, config_profile, release_version, logs): self.private_config_xml_file = private_config_xml_file + self.config_profile = config_profile self.release_version = release_version self.all_counts_grouped = [] self.parse_logs(logs) @@ -158,13 +189,27 @@ def get_taxonomy_and_scientific_name(self, species_folder): f"join evapro.taxonomy t on c.taxonomy=t.taxonomy_id " f"where release_version={self.release_version} AND release_folder_name='{species_folder}'" ) - with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn: + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: results = get_all_results_for_query(db_conn, query) if len(results) < 1: logger.warning(f'Failed to get scientific name and taxonomy for {species_folder}') return None, None return results[0][0], results[0][1] + @cached_property + def sqlalchemy_engine(self): + pg_url, pg_user, pg_pass = get_metadata_creds_for_profile(self.config_profile, self.private_config_xml_file) + dbtype, host_url, port_and_db = urlsplit(pg_url).path.split(':') + port, db = port_and_db.split('/') + return create_engine(URL.create( + dbtype + '+psycopg2', + username=pg_user, + password=pg_pass, + host=host_url.split('/')[-1], + database=db, + port=port + )) + def add_annotations(self): for count_groups in self.all_counts_grouped: for count_dict in count_groups: @@ -172,6 +217,28 @@ def add_annotations(self): count_dict['taxonomy'] = taxonomy count_dict['scientific_name'] = scientific_name + def write_to_db(self): + session = Session(self.sqlalchemy_engine) + with session.begin(): + try: + for count_groups in self.all_counts_grouped: + for count_dict in count_groups: + session.add(RSCountCategory( + assembly=count_dict['assembly'], + taxonomy=count_dict['taxonomy'], + rs_type=count_dict['idtype'], + rs_count=count_dict['count'] + )) + session.commit() + session.flush() + except: + session.rollback() + finally: + session.close() + + + + def get_assembly_counts_from_database(self): """ DB counts are loaded to the per-assembly counts table by gather_clustering_counts_from_mongo, @@ -186,7 +253,7 @@ def get_assembly_counts_from_database(self): f" FROM {assembly_table_name} " f"WHERE release_version={self.release_version}" ) - with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn: + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: for row in get_all_results_for_query(db_conn, query): assembly = row[0] for index, metric in enumerate(all_metrics): @@ -284,17 +351,22 @@ def main(): help="base directory where all the release was run.", required=True) parser.add_argument("--output-dir", type=str, help="Output directory where all count logs will be created.", required=True) - parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) parser.add_argument("--release-version", type=int, help="current release version", required=True) parser.add_argument("--species-directories", type=str, nargs='+', help="set of directory names to process. It will process all the related one as well. " "Process all if missing") + parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True) + parser.add_argument("--config-profile", help="profile to use in the config xml", required=False, + default='production_processing') + args = parser.parse_args() logging_config.add_stdout_handler() logger.info(f'Analyse {args.release_root_path}') logs = calculate_all_logs(args.release_root_path, args.output_dir, args.species_directories) - counter = ReleaseCounter(args.private_config_xml_file, args.release_version, logs) + counter = ReleaseCounter(args.private_config_xml_file, + config_profile=args.config_profile, release_version=args.release_version, logs=logs) + counter.write_to_db() counter.detect_inconsistent_types() generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy') generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly') diff --git a/eva-accession-release-automation/requirements.txt b/eva-accession-release-automation/requirements.txt index 2bf2101e8..1546ff9f5 100644 --- a/eva-accession-release-automation/requirements.txt +++ b/eva-accession-release-automation/requirements.txt @@ -4,3 +4,4 @@ pytz==2022.6 pyyaml==5.3.1 requests==2.22.0 retry==0.9.2 +sqlalchemy==2.0.22 From bfb6959ee11eeb411d7b3b0a833342ed63b7e836 Mon Sep 17 00:00:00 2001 From: tcezard Date: Wed, 1 Nov 2023 14:47:06 +0000 Subject: [PATCH 02/13] Fix persistence Add unique constraint Support loading the same data multiple times --- .../gather_release_counts.py | 117 ++++++++++++------ 1 file changed, 79 insertions(+), 38 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index f9a274750..f15a2d132 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -4,7 +4,6 @@ import os from collections import defaultdict, Counter from functools import lru_cache, cached_property -from typing import List from urllib.parse import urlsplit from ebi_eva_common_pyutils.command_utils import run_command_with_output @@ -13,8 +12,10 @@ from ebi_eva_common_pyutils.logger import logging_config, AppLogger from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query -from sqlalchemy import Column, String, Integer, BigInteger, ForeignKey, Engine, create_engine, URL -from sqlalchemy.orm import declarative_base, Mapped, relationship, mapped_column, Session + +from sqlalchemy import Column, String, Integer, BigInteger, ForeignKey, create_engine, URL, MetaData, TEXT, \ + UniqueConstraint, select +from sqlalchemy.orm import declarative_base, relationship, mapped_column, Session logger = logging_config.get_logger(__name__) @@ -111,7 +112,7 @@ def collect_files_to_count(release_directory, set_of_species): return all_files -def calculate_all_logs(release_dir, output_dir, species_directories=None): +def run_calculation_script_for_species(release_dir, output_dir, species_directories=None): all_assemblies_2_species, all_species_2_assemblies = gather_assemblies_and_species_from_directory(release_dir) all_sets_of_species = set() # Determine the species that needs to be counted together because they share assemblies @@ -145,29 +146,37 @@ def generate_output_tsv(dict_of_counter, output_file, header): ]) + '\n') -Base = declarative_base() +Base = declarative_base(metadata=MetaData(schema="eva_stats")) class RSCountCategory(Base): """ Table that provide the metadata associated with a number of RS id - """ - __tablename__ = 'rscountcatergory' - - assembly = Column(String, primary_key=True) - taxonomy = Column(Integer, primary_key=True) - rs_type = Column(String, primary_key=True) - release_version = Column(Integer, primary_key=True) - rs_count_id: Mapped[int] = mapped_column(ForeignKey("rscount.id")) - rs_count = Mapped["RSCount"] = relationship(back_populates="count_categories") + """ + __tablename__ = 'release_rs_count_category' + + count_category_id = Column(Integer, primary_key=True, autoincrement=True) + assembly = Column(String) + taxonomy = Column(Integer) + rs_type = Column(String) + release_version = Column(Integer) + rs_count_id = mapped_column(ForeignKey("release_rs_count.id")) + rs_count = relationship("RSCount", back_populates="count_categories") + __table_args__ = ( + UniqueConstraint('assembly', 'taxonomy', 'rs_type', 'release_version', 'rs_count_id', name='uix_1'), + ) schema = 'eva_stats' class RSCount(Base): - __tablename__ = 'eva_stats.rscount' + """ + Table that provide the count associated with each category + """ + __tablename__ = 'release_rs_count' id = Column(Integer, primary_key=True, autoincrement=True) count = Column(BigInteger) - count_categories: Mapped[List["RSCountCategory"]] = relationship(back_populates="parent") + group_description = Column(TEXT, unique=True) + count_categories = relationship("RSCountCategory", back_populates="rs_count") schema = 'eva_stats' @@ -178,7 +187,7 @@ def __init__(self, private_config_xml_file, config_profile, release_version, log self.config_profile = config_profile self.release_version = release_version self.all_counts_grouped = [] - self.parse_logs(logs) + self.parse_count_script_logs(logs) self.add_annotations() @lru_cache @@ -201,7 +210,7 @@ def sqlalchemy_engine(self): pg_url, pg_user, pg_pass = get_metadata_creds_for_profile(self.config_profile, self.private_config_xml_file) dbtype, host_url, port_and_db = urlsplit(pg_url).path.split(':') port, db = port_and_db.split('/') - return create_engine(URL.create( + engine = create_engine(URL.create( dbtype + '+psycopg2', username=pg_user, password=pg_pass, @@ -209,6 +218,9 @@ def sqlalchemy_engine(self): database=db, port=port )) + RSCount.__table__.create(bind=engine, checkfirst=True) + RSCountCategory.__table__.create(bind=engine, checkfirst=True) + return engine def add_annotations(self): for count_groups in self.all_counts_grouped: @@ -220,24 +232,52 @@ def add_annotations(self): def write_to_db(self): session = Session(self.sqlalchemy_engine) with session.begin(): - try: - for count_groups in self.all_counts_grouped: - for count_dict in count_groups: - session.add(RSCountCategory( + for count_groups in self.all_counts_grouped: + # All the part of the group should have the same count + count_set = set((count_dict['count'] for count_dict in count_groups)) + assert len(count_set) == 1 + count = count_set.pop() + + def count_descriptor(count_dict): + return f"{count_dict['assembly']},{count_dict['taxonomy']},{count_dict['idtype']}" + # This is used to uniquely identify the count for this group so that loading the same value twice does + # not results in duplicates + group_description = '_'.join([ + count_descriptor(count_dict) + for count_dict in sorted(count_groups, key=count_descriptor)]) + f'_release_{self.release_version}' + query = select(RSCount).where(RSCount.group_description == group_description) + result = session.execute(query).fetchone() + if result: + rs_count = result.RSCount + else: + rs_count = RSCount(count=count, group_description=group_description) + session.add(rs_count) + for count_dict in count_groups: + query = select(RSCountCategory).where( + RSCountCategory.assembly == count_dict['assembly'], + RSCountCategory.taxonomy == count_dict['taxonomy'], + RSCountCategory.rs_type == count_dict['idtype'], + RSCountCategory.release_version == self.release_version, + RSCountCategory.rs_count_id == rs_count.id, + ) + result = session.execute(query).fetchone() + if not result: + self.info(f"Create persistence for {count_dict['assembly']}, {count_dict['taxonomy']}, {count_dict['idtype']}, {count_dict['count']}") + rs_category = RSCountCategory( assembly=count_dict['assembly'], taxonomy=count_dict['taxonomy'], rs_type=count_dict['idtype'], - rs_count=count_dict['count'] - )) - session.commit() - session.flush() - except: - session.rollback() - finally: - session.close() - - - + release_version=self.release_version, + rs_count=rs_count + ) + session.add(rs_category) + else: + rs_category = result.RSCountCategory + # Check if we were just loading the same value as a previous run + if rs_category.rs_count.count != count_dict['count']: + self.warning(f"{count_descriptor(count_dict)} already has a count entry in the table " + f"({rs_category.rs_count.count}) different from the one being loaded " + f"{count_dict['count']}") def get_assembly_counts_from_database(self): """ @@ -260,7 +300,7 @@ def get_assembly_counts_from_database(self): results[assembly][metric] = row[index + 1] return results - def parse_logs(self, all_logs): + def parse_count_script_logs(self, all_logs): for log_file in all_logs: with open(log_file) as open_file: for line in open_file: @@ -271,7 +311,8 @@ def parse_logs(self, all_logs): for annotation in set_of_annotations: assembly, release_folder, idtype = annotation.split('-') all_groups.append( - {'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype} + {'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype, + 'annotation': annotation} ) self.all_counts_grouped.append(all_groups) @@ -323,7 +364,7 @@ def compare_assembly_counts_with_db(self, threshold=0, output_csv=None): header = ('Assembly', 'Metric', 'File', 'DB', 'Diff (file-db)') rows = [] count_per_assembly_from_files = self.generate_per_assembly_counts() - counts_per_assembly_from_db = self.get_counts_assembly_from_database() + counts_per_assembly_from_db = self.get_assembly_counts_from_database() all_asms = set(count_per_assembly_from_files.keys()).union(counts_per_assembly_from_db.keys()) for asm in all_asms: asm_counts_from_files = count_per_assembly_from_files.get(asm, {}) @@ -363,9 +404,9 @@ def main(): args = parser.parse_args() logging_config.add_stdout_handler() logger.info(f'Analyse {args.release_root_path}') - logs = calculate_all_logs(args.release_root_path, args.output_dir, args.species_directories) + log_files = run_calculation_script_for_species(args.release_root_path, args.output_dir, args.species_directories) counter = ReleaseCounter(args.private_config_xml_file, - config_profile=args.config_profile, release_version=args.release_version, logs=logs) + config_profile=args.config_profile, release_version=args.release_version, logs=log_files) counter.write_to_db() counter.detect_inconsistent_types() generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy') From bec0695d53b1d85c3388b8aded74ad2b26a6c21c Mon Sep 17 00:00:00 2001 From: tcezard Date: Wed, 1 Nov 2023 16:25:36 +0000 Subject: [PATCH 03/13] Make count_rs_for_all_files.sh executable --- .../gather_clustering_counts/bash/count_rs_for_all_files.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh diff --git a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh old mode 100644 new mode 100755 From df38b618420a76659fd45eb16f4312349b75861f Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 09:06:23 +0000 Subject: [PATCH 04/13] Make the counting script compatible with previous releases --- .../gather_clustering_counts/bash/count_rs_for_all_files.sh | 4 ++-- .../gather_clustering_counts/gather_release_counts.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh index 1409a9d91..c37df2ee7 100755 --- a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh +++ b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh @@ -14,7 +14,7 @@ do echo "Process $INPUT" ASSEMBLY=$(basename $(dirname ${INPUT})); SC_NAME=$(basename $(dirname $(dirname ${INPUT}))); - TYPE=$(echo $(basename ${INPUT}) | cut -f 4- -d '_' | awk '{print substr($0,1,length($0)-11)}') + TYPE=$(echo $(basename ${INPUT}) | awk -F '_' '{print $(NF-1)"_"$NF}' | awk '{print substr($0,1,length($0)-11)}') OUTPUT=tmp_${SC_NAME}_${ASSEMBLY}_${TYPE}.txt if [[ ${INPUT} == *.vcf.gz ]] then @@ -42,5 +42,5 @@ cat $ALL_TMP_OUTPUT | sort \ print ""; }' \ | grep -v '^$' | sort | uniq -c | sort -nr > "$OUTPUT_FILE" -rm $ALL_TMP_OUTPUT +rm -f $ALL_TMP_OUTPUT diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index f15a2d132..1875b074a 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -99,9 +99,9 @@ def collect_files_to_count(release_directory, set_of_species): species_dir = os.path.join(release_directory, species) assembly_directories = glob.glob(os.path.join(species_dir, "GCA_*")) for assembly_dir in assembly_directories: - vcf_pattern = f'*_GCA_*_ids.vcf.gz' + vcf_pattern = f'*GCA_*_ids.vcf.gz' vcf_files = glob.glob(os.path.join(assembly_dir, vcf_pattern)) - txt_pattern = f'*_GCA_*_ids.txt.gz' + txt_pattern = f'*GCA_*_ids.txt.gz' txt_files = glob.glob(os.path.join(assembly_dir, txt_pattern)) # I don't have the taxonomy to add to the bash pattern so remove the file that start with eva_ or dbsnp_ vcf_files = [f for f in vcf_files if 'dbsnp_' not in f and 'eva_' not in f] From be9ec07af80188e73246a10711408f1bcb31329c Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 11:59:25 +0000 Subject: [PATCH 05/13] Fix the counting script to include species that have no assembly at all (only unmapped) --- .../gather_clustering_counts/bash/count_rs_for_release.sh | 0 .../gather_clustering_counts/gather_release_counts.py | 4 ++++ .../tests/test_gather_release_counts.py | 4 +++- 3 files changed, 7 insertions(+), 1 deletion(-) mode change 100644 => 100755 eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_release.sh diff --git a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_release.sh b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_release.sh old mode 100644 new mode 100755 diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 1875b074a..f7267a97f 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -41,6 +41,8 @@ def find_link(key_set, dict1, dict2, source_linked_set1=None, source_linked_set2 linked_set2 = source_linked_set2.copy() for key1 in key_set: if key1 in dict1: + # first set should at least contain the query + linked_set1.update(key1) for value1 in dict1.get(key1): linked_set2.add(value1) if value1 in dict2: @@ -126,8 +128,10 @@ def run_calculation_script_for_species(release_dir, output_dir, species_director for species in species_to_search: if species not in all_species_added: set_of_species, set_of_assemblies = find_link({species}, all_species_2_assemblies, all_assemblies_2_species) + all_sets_of_species.add(set_of_species) all_species_added.update(set_of_species) + set_of_species logger.info(f'Aggregate species in {len(all_sets_of_species)} groups') all_logs = [] for set_of_species in all_sets_of_species: diff --git a/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py index b57d2fae6..33d3149bd 100644 --- a/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py @@ -6,7 +6,8 @@ def test_find_links(): 'A': ['1', '2'], 'B': ['2', '5'], 'C': ['3', '4'], - 'D': ['5'] + 'D': ['5'], + 'E': [] } d2 = { '1': ['A', 'B'], @@ -19,3 +20,4 @@ def test_find_links(): assert find_link({'B'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'})) assert find_link({'C'}, d1, d2) == (frozenset({'C'}), frozenset({'3', '4'})) assert find_link({'D'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'})) + assert find_link({'E'}, d1, d2) == (frozenset({'E'}), frozenset({})) From 56ce9cf4409f5bf613fd4e47b77d1ab37442c6f4 Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 12:09:54 +0000 Subject: [PATCH 06/13] Should use add not update --- .../gather_clustering_counts/gather_release_counts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index f7267a97f..9873ff12b 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -42,7 +42,7 @@ def find_link(key_set, dict1, dict2, source_linked_set1=None, source_linked_set2 for key1 in key_set: if key1 in dict1: # first set should at least contain the query - linked_set1.update(key1) + linked_set1.add(key1) for value1 in dict1.get(key1): linked_set2.add(value1) if value1 in dict2: From ed1430b8f610032bf05d5ed1d379446c8132e16a Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 12:29:56 +0000 Subject: [PATCH 07/13] Do not load counts for directories for which we could not resolve the taxonomy --- .../gather_release_counts.py | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 9873ff12b..309715b3f 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -230,8 +230,21 @@ def add_annotations(self): for count_groups in self.all_counts_grouped: for count_dict in count_groups: taxonomy, scientific_name = self.get_taxonomy_and_scientific_name(count_dict['release_folder']) - count_dict['taxonomy'] = taxonomy - count_dict['scientific_name'] = scientific_name + if taxonomy: + count_dict['taxonomy'] = taxonomy + if scientific_name: + count_dict['scientific_name'] = scientific_name + + def count_descriptor(self, count_dict): + if 'taxonomy' in count_dict: + return f"{count_dict['assembly']},{count_dict['taxonomy']},{count_dict['idtype']}" + + def group_descriptor(self, count_groups): + group_descriptor_list = [ + self.count_descriptor(count_dict) + for count_dict in sorted(count_groups, key=self.count_descriptor)] + if None not in group_descriptor_list: + return '_'.join(group_descriptor_list) + f'_release_{self.release_version}' def write_to_db(self): session = Session(self.sqlalchemy_engine) @@ -241,14 +254,12 @@ def write_to_db(self): count_set = set((count_dict['count'] for count_dict in count_groups)) assert len(count_set) == 1 count = count_set.pop() - - def count_descriptor(count_dict): - return f"{count_dict['assembly']},{count_dict['taxonomy']},{count_dict['idtype']}" # This is used to uniquely identify the count for this group so that loading the same value twice does - # not results in duplicates - group_description = '_'.join([ - count_descriptor(count_dict) - for count_dict in sorted(count_groups, key=count_descriptor)]) + f'_release_{self.release_version}' + # not result in duplicates + group_description = self.group_descriptor(count_groups) + if not group_description: + # One of the taxonomy annotation is missing, we should not load that count + continue query = select(RSCount).where(RSCount.group_description == group_description) result = session.execute(query).fetchone() if result: @@ -412,11 +423,13 @@ def main(): counter = ReleaseCounter(args.private_config_xml_file, config_profile=args.config_profile, release_version=args.release_version, logs=log_files) counter.write_to_db() - counter.detect_inconsistent_types() - generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy') - generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly') - generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly') - counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv')) + + # TODO: Other analysis should be performed on the database counts + # counter.detect_inconsistent_types() + # generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy') + # generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly') + # generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly') + # counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv')) if __name__ == "__main__": From fbb7cf49a61bf63936253fa664bfa9f1b6ac85c1 Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 13:44:22 +0000 Subject: [PATCH 08/13] Search for the taxonomy and scientific name in the release tracker when cannot be found in EVAPRO --- .../bash/count_rs_for_all_files.sh | 3 ++- .../gather_release_counts.py | 15 +++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh index c37df2ee7..4f8ee1b82 100755 --- a/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh +++ b/eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh @@ -14,7 +14,8 @@ do echo "Process $INPUT" ASSEMBLY=$(basename $(dirname ${INPUT})); SC_NAME=$(basename $(dirname $(dirname ${INPUT}))); - TYPE=$(echo $(basename ${INPUT}) | awk -F '_' '{print $(NF-1)"_"$NF}' | awk '{print substr($0,1,length($0)-11)}') + # SPLIT by GCA is to support format that have a taxonomy prefix (release 5+) or the one that do not (release 4-) + TYPE=$(echo $(basename ${INPUT}) | awk -F 'GCA' '{print $2}' |cut -f 3- -d '_' | awk '{print substr($0,1,length($0)-11)}') OUTPUT=tmp_${SC_NAME}_${ASSEMBLY}_${TYPE}.txt if [[ ${INPUT} == *.vcf.gz ]] then diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 309715b3f..3ed23691c 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -128,10 +128,8 @@ def run_calculation_script_for_species(release_dir, output_dir, species_director for species in species_to_search: if species not in all_species_added: set_of_species, set_of_assemblies = find_link({species}, all_species_2_assemblies, all_assemblies_2_species) - all_sets_of_species.add(set_of_species) all_species_added.update(set_of_species) - set_of_species logger.info(f'Aggregate species in {len(all_sets_of_species)} groups') all_logs = [] for set_of_species in all_sets_of_species: @@ -204,6 +202,15 @@ def get_taxonomy_and_scientific_name(self, species_folder): ) with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: results = get_all_results_for_query(db_conn, query) + if len(results) < 1: + # Rely only on the clustering_release_tracker when taxonomy is not available in EVAPRO + query = ( + f"select distinct taxonomy, scientific_name " + f"from eva_progress_tracker.clustering_release_tracker " + f"where release_version={self.release_version} AND release_folder_name='{species_folder}'" + ) + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: + results = get_all_results_for_query(db_conn, query) if len(results) < 1: logger.warning(f'Failed to get scientific name and taxonomy for {species_folder}') return None, None @@ -242,9 +249,9 @@ def count_descriptor(self, count_dict): def group_descriptor(self, count_groups): group_descriptor_list = [ self.count_descriptor(count_dict) - for count_dict in sorted(count_groups, key=self.count_descriptor)] + for count_dict in count_groups] if None not in group_descriptor_list: - return '_'.join(group_descriptor_list) + f'_release_{self.release_version}' + return '_'.join(sorted(group_descriptor_list)) + f'_release_{self.release_version}' def write_to_db(self): session = Session(self.sqlalchemy_engine) From ca87322e75af0a0783f0999c0bff6bffcd58c590 Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 13:46:44 +0000 Subject: [PATCH 09/13] Make the group descriptor pipe (|) separated --- .../gather_clustering_counts/gather_release_counts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 3ed23691c..d840296a0 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -251,7 +251,7 @@ def group_descriptor(self, count_groups): self.count_descriptor(count_dict) for count_dict in count_groups] if None not in group_descriptor_list: - return '_'.join(sorted(group_descriptor_list)) + f'_release_{self.release_version}' + return '|'.join(sorted(group_descriptor_list)) + f'_release_{self.release_version}' def write_to_db(self): session = Session(self.sqlalchemy_engine) From 666c492caa6325fd4889678a52c8cc1118d1abb6 Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 13:49:35 +0000 Subject: [PATCH 10/13] taxonomy comes first in count descriptor --- .../gather_clustering_counts/gather_release_counts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index d840296a0..190443d9d 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -244,7 +244,7 @@ def add_annotations(self): def count_descriptor(self, count_dict): if 'taxonomy' in count_dict: - return f"{count_dict['assembly']},{count_dict['taxonomy']},{count_dict['idtype']}" + return f"{count_dict['taxonomy']}, {count_dict['assembly']},{count_dict['idtype']}" def group_descriptor(self, count_groups): group_descriptor_list = [ From 9095e69cafdecee00b0a69eb66291b4eafb61973 Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 13:59:57 +0000 Subject: [PATCH 11/13] Add documentation --- .../gather_release_counts.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 190443d9d..9cd5d304a 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -96,6 +96,7 @@ def gather_count_for_set_species(release_directory, set_of_species, output_dir): def collect_files_to_count(release_directory, set_of_species): + """Collect all the (final) release files for a set of species""" all_files = [] for species in set_of_species: species_dir = os.path.join(release_directory, species) @@ -115,6 +116,10 @@ def collect_files_to_count(release_directory, set_of_species): def run_calculation_script_for_species(release_dir, output_dir, species_directories=None): + """ + Run the bash script that count the number of RS for all the specified species directories (all if not set) + and return the logs + """ all_assemblies_2_species, all_species_2_assemblies = gather_assemblies_and_species_from_directory(release_dir) all_sets_of_species = set() # Determine the species that needs to be counted together because they share assemblies @@ -243,17 +248,23 @@ def add_annotations(self): count_dict['scientific_name'] = scientific_name def count_descriptor(self, count_dict): + """Description for associated with specific taxonomy, assembly and type of RS""" if 'taxonomy' in count_dict: - return f"{count_dict['taxonomy']}, {count_dict['assembly']},{count_dict['idtype']}" + return f"{count_dict['taxonomy']},{count_dict['assembly']},{count_dict['idtype']}" def group_descriptor(self, count_groups): + """Description for associated with group of taxonomy, assembly and type of RS that have the same RS count""" group_descriptor_list = [ self.count_descriptor(count_dict) for count_dict in count_groups] if None not in group_descriptor_list: return '|'.join(sorted(group_descriptor_list)) + f'_release_{self.release_version}' - def write_to_db(self): + def write_counts_to_db(self): + """ + For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already. + Warn if the count already exists and are different. + """ session = Session(self.sqlalchemy_engine) with session.begin(): for count_groups in self.all_counts_grouped: @@ -297,9 +308,9 @@ def write_to_db(self): rs_category = result.RSCountCategory # Check if we were just loading the same value as a previous run if rs_category.rs_count.count != count_dict['count']: - self.warning(f"{count_descriptor(count_dict)} already has a count entry in the table " - f"({rs_category.rs_count.count}) different from the one being loaded " - f"{count_dict['count']}") + self.error(f"{self.count_descriptor(count_dict)} already has a count entry in the table " + f"({rs_category.rs_count.count}) different from the one being loaded " + f"{count_dict['count']}") def get_assembly_counts_from_database(self): """ @@ -429,7 +440,7 @@ def main(): log_files = run_calculation_script_for_species(args.release_root_path, args.output_dir, args.species_directories) counter = ReleaseCounter(args.private_config_xml_file, config_profile=args.config_profile, release_version=args.release_version, logs=log_files) - counter.write_to_db() + counter.write_counts_to_db() # TODO: Other analysis should be performed on the database counts # counter.detect_inconsistent_types() From f07e38d57e48a928ff1ce276e8a770cb47806055 Mon Sep 17 00:00:00 2001 From: tcezard Date: Thu, 2 Nov 2023 14:28:45 +0000 Subject: [PATCH 12/13] Ensure the current dir is the output dir to ensure the tmp files land in there --- .../gather_clustering_counts/gather_release_counts.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 9cd5d304a..bbba29ba0 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -436,8 +436,11 @@ def main(): args = parser.parse_args() logging_config.add_stdout_handler() + output_dir = os.path.abspath(args.output_dir) + # Move to the output dir to make sure the bash tmp dir are written their + os.chdir(output_dir) logger.info(f'Analyse {args.release_root_path}') - log_files = run_calculation_script_for_species(args.release_root_path, args.output_dir, args.species_directories) + log_files = run_calculation_script_for_species(args.release_root_path, output_dir, args.species_directories) counter = ReleaseCounter(args.private_config_xml_file, config_profile=args.config_profile, release_version=args.release_version, logs=log_files) counter.write_counts_to_db() From 0edf527dd7d236cb63ce6c6f59ac257cc308b3d2 Mon Sep 17 00:00:00 2001 From: tcezard Date: Fri, 3 Nov 2023 09:22:48 +0000 Subject: [PATCH 13/13] Relax taxonomy retrieval to enable loading data from Release 1 to 5 --- .../gather_release_counts.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index bbba29ba0..5333edb8a 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -199,11 +199,12 @@ def __init__(self, private_config_xml_file, config_profile, release_version, log @lru_cache def get_taxonomy_and_scientific_name(self, species_folder): + # TODO: Restore this function to only retrieve the taxonomy and scientific name using the taxonomy table in release 6 query = ( f"select distinct c.taxonomy, t.scientific_name " f"from eva_progress_tracker.clustering_release_tracker c " f"join evapro.taxonomy t on c.taxonomy=t.taxonomy_id " - f"where release_version={self.release_version} AND release_folder_name='{species_folder}'" + f"where release_folder_name='{species_folder}'" ) with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: results = get_all_results_for_query(db_conn, query) @@ -212,10 +213,21 @@ def get_taxonomy_and_scientific_name(self, species_folder): query = ( f"select distinct taxonomy, scientific_name " f"from eva_progress_tracker.clustering_release_tracker " - f"where release_version={self.release_version} AND release_folder_name='{species_folder}'" + f"where release_folder_name='{species_folder}'" ) with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: results = get_all_results_for_query(db_conn, query) + if len(results) < 1: + # Support for directory from release 1 + if species_folder.split('_')[-1].isdigit(): + taxonomy = int(species_folder.split('_')[-1]) + query = ( + f"select distinct taxonomy, scientific_name " + f"from eva_progress_tracker.clustering_release_tracker " + f"where taxonomy={taxonomy}" + ) + with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: + results = get_all_results_for_query(db_conn, query) if len(results) < 1: logger.warning(f'Failed to get scientific name and taxonomy for {species_folder}') return None, None