diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 30ffe11f8..e6ecc8c65 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -17,7 +17,7 @@ from sqlalchemy.orm import Session from gather_clustering_counts.release_count_models import RSCountCategory, RSCount, get_sql_alchemy_engine, \ - RSCountPerTaxonomy, RSCountPerAssembly + RSCountPerTaxonomy, RSCountPerAssembly, RSCountPerTaxonomyAssembly logger = logging_config.get_logger(__name__) @@ -275,6 +275,7 @@ def _write_per_taxonomy_counts(self, session): result = session.execute(query).fetchone() if result: taxonomy_row = result.RSCountPerTaxonomy + self.info(f"Update counts for aggregate per taxonomy {taxonomy_id}") else: self.info(f"Create persistence for aggregate per taxonomy {taxonomy_id}") taxonomy_row = RSCountPerTaxonomy( @@ -294,11 +295,11 @@ def _write_per_taxonomy_counts(self, session): else: prev_count_for_taxonomy = None for rs_type in taxonomy_counts.get(taxonomy_id): + count_prev = 0 if prev_count_for_taxonomy: - count_new = taxonomy_counts.get(taxonomy_id).get(rs_type) - getattr(prev_count_for_taxonomy, - self._type_to_column(rs_type)) - else: - count_new = 0 + count_prev = getattr(prev_count_for_taxonomy, self._type_to_column(rs_type)) + + count_new = taxonomy_counts.get(taxonomy_id).get(rs_type) - count_prev setattr(taxonomy_row, self._type_to_column(rs_type), taxonomy_counts.get(taxonomy_id).get(rs_type)) setattr(taxonomy_row, self._type_to_column(rs_type, is_new=True), count_new) session.add(taxonomy_row) @@ -314,6 +315,7 @@ def _write_per_assembly_counts(self, session): result = session.execute(query).fetchone() if result: assembly_row = result.RSCountPerAssembly + self.info(f"Update counts for aggregate per assembly {assembly}") else: self.info(f"Create persistence for aggregate per assembly {assembly}") assembly_row = RSCountPerAssembly( @@ -333,15 +335,57 @@ def _write_per_assembly_counts(self, session): else: prev_count_for_assembly = None for rs_type in assembly_counts.get(assembly): + count_prev = 0 if prev_count_for_assembly: - count_new = assembly_counts.get(assembly).get(rs_type) - getattr(prev_count_for_assembly, - self._type_to_column(rs_type)) - else: - count_new = 0 + count_prev = getattr(prev_count_for_assembly, self._type_to_column(rs_type)) + count_new = assembly_counts.get(assembly).get(rs_type) -count_prev setattr(assembly_row, self._type_to_column(rs_type), assembly_counts.get(assembly).get(rs_type)) setattr(assembly_row, self._type_to_column(rs_type, is_new=True), count_new) session.add(assembly_row) + def _write_per_taxonomy_and_assembly_counts(self, session): + """Load the aggregated count per assembly (assume previous version of the release was loaded already)""" + species_assembly_counts, species_assembly_annotations = self.generate_per_taxonomy_and_assembly_counts() + for taxonomy, assembly in species_assembly_counts: + query = select(RSCountPerTaxonomyAssembly).where( + RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy, + RSCountPerTaxonomyAssembly.assembly_accession == assembly, + RSCountPerTaxonomyAssembly.release_version == self.release_version + ) + result = session.execute(query).fetchone() + if result: + taxonomy_assembly_row = result.RSCountPerTaxonomyAssembly + self.info(f"Update counts for aggregate per taxonomy {taxonomy} and assembly {assembly}") + else: + self.info(f"Create persistence for aggregate per taxonomy {taxonomy} and assembly {assembly}") + taxonomy_assembly_row = RSCountPerTaxonomyAssembly( + taxonomy_id=taxonomy, + assembly_accession=assembly, + release_folder=species_assembly_annotations.get((taxonomy, assembly)).get('release_folder'), + release_version=self.release_version, + ) + # Retrieve the count for the previous release + query = select(RSCountPerTaxonomyAssembly).where( + RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy, + RSCountPerTaxonomyAssembly.assembly_accession == assembly, + RSCountPerTaxonomyAssembly.release_version == self.release_version - 1 + ) + result = session.execute(query).fetchone() + if result: + prev_count_for_taxonomy_assembly = result.RSCountPerTaxonomyAssembly + else: + prev_count_for_taxonomy_assembly = None + for rs_type in species_assembly_counts.get((taxonomy, assembly)): + count_prev = 0 + if prev_count_for_taxonomy_assembly: + count_prev = getattr(prev_count_for_taxonomy_assembly, self._type_to_column(rs_type)) + + count_new = species_assembly_counts.get((taxonomy, assembly)).get(rs_type) - count_prev + setattr(taxonomy_assembly_row, self._type_to_column(rs_type), + species_assembly_counts.get((taxonomy, assembly)).get(rs_type)) + setattr(taxonomy_assembly_row, self._type_to_column(rs_type, is_new=True), count_new) + session.add(taxonomy_assembly_row) + def write_counts_to_db(self): """ For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already. @@ -352,6 +396,7 @@ def write_counts_to_db(self): self._write_exploded_counts(session) self._write_per_taxonomy_counts(session) self._write_per_assembly_counts(session) + self._write_per_taxonomy_and_assembly_counts(session) def get_assembly_counts_from_database(self): """ @@ -400,6 +445,7 @@ def generate_per_taxonomy_counts(self): species_annotations = defaultdict(dict) for count_groups in self.all_counts_grouped: taxonomy_and_types = set([(count_dict['taxonomy'], count_dict['idtype']) for count_dict in count_groups]) + release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder']) for count_dict in count_groups) for taxonomy, rstype in taxonomy_and_types: if taxonomy not in species_annotations: species_annotations[taxonomy] = {'assemblies': set(), 'release_folder': None} @@ -412,7 +458,7 @@ def generate_per_taxonomy_counts(self): if count_dict['taxonomy'] is taxonomy and count_dict['idtype'] is rstype ]) ) - species_annotations[taxonomy]['release_folder'] = count_groups[0]['release_folder'] + species_annotations[taxonomy]['release_folder'] = release_folder_map[taxonomy] return species_counts, species_annotations def generate_per_assembly_counts(self): @@ -431,16 +477,26 @@ def generate_per_assembly_counts(self): for count_dict in count_groups if count_dict['assembly'] is assembly_accession and count_dict['idtype'] is rstype ])) - assembly_annotations[assembly_accession]['release_folder'] = assembly_accession return assembly_counts, assembly_annotations - # def generate_per_species_assembly_counts(self): - # species_assembly_counts = defaultdict(Counter) - # for count_groups in self.all_counts_grouped: - # for count_dict in count_groups: - # species_assembly_counts[count_dict['assembly'] + '\t' + count_dict['assembly']][count_dict['idtype'] + '_rs'] += count_dict['count'] - # return species_assembly_counts + def generate_per_taxonomy_and_assembly_counts(self): + species_assembly_counts = defaultdict(Counter) + species_assembly_annotations = defaultdict(dict) + for count_groups in self.all_counts_grouped: + taxonomy_assembly_and_types = set([(count_dict['taxonomy'], count_dict['assembly'], count_dict['idtype']) for count_dict in count_groups]) + release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder']) for count_dict in count_groups) + for taxonomy, assembly, rstype in taxonomy_assembly_and_types: + if (taxonomy, assembly) not in species_assembly_annotations: + species_assembly_annotations[(taxonomy, assembly)] = {'release_folder': None} + # All count_dict have the same count in a group + species_assembly_counts[(taxonomy, assembly)][rstype] += count_groups[0]['count'] + if assembly != 'Unmapped': + species_assembly_annotations[(taxonomy, assembly)]['release_folder'] = \ + release_folder_map[taxonomy] + '/' + assembly + else: + species_assembly_annotations[(taxonomy, assembly)]['release_folder'] = release_folder_map[taxonomy] + return species_assembly_counts, species_assembly_annotations def detect_inconsistent_types(self): inconsistent_types = [] diff --git a/eva-accession-release-automation/gather_clustering_counts/release_count_models.py b/eva-accession-release-automation/gather_clustering_counts/release_count_models.py index b38f242f2..440783016 100644 --- a/eva-accession-release-automation/gather_clustering_counts/release_count_models.py +++ b/eva-accession-release-automation/gather_clustering_counts/release_count_models.py @@ -89,6 +89,31 @@ class RSCountPerAssembly(Base): schema = 'eva_stats' +class RSCountPerTaxonomyAssembly(Base): + """ + Table that provide the aggregated count per taxonomy and assembly + """ + __tablename__ = 'release_rs_count_per_taxonomy_assembly' + + taxonomy_id = Column(Integer, primary_key=True) + assembly_accession = Column(String, primary_key=True) + release_version = Column(Integer, primary_key=True) + release_folder = Column(String) + current_rs = Column(BigInteger, default=0) + multimap_rs = Column(BigInteger, default=0) + merged_rs = Column(BigInteger, default=0) + deprecated_rs = Column(BigInteger, default=0) + merged_deprecated_rs = Column(BigInteger, default=0) + unmapped_rs = Column(BigInteger, default=0) + new_current_rs = Column(BigInteger, default=0) + new_multimap_rs = Column(BigInteger, default=0) + new_merged_rs = Column(BigInteger, default=0) + new_deprecated_rs = Column(BigInteger, default=0) + new_merged_deprecated_rs = Column(BigInteger, default=0) + new_unmapped_rs = Column(BigInteger, default=0) + schema = 'eva_stats' + + def get_sql_alchemy_engine(dbtype, username, password, host_url, database, port): engine = create_engine(URL.create( dbtype + '+psycopg2', @@ -104,6 +129,7 @@ def get_sql_alchemy_engine(dbtype, username, password, host_url, database, port) RSCountCategory.__table__.create(bind=engine, checkfirst=True) RSCountPerAssembly.__table__.create(bind=engine, checkfirst=True) RSCountPerTaxonomy.__table__.create(bind=engine, checkfirst=True) + RSCountPerTaxonomyAssembly.__table__.create(bind=engine, checkfirst=True) return engine diff --git a/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py index 747616c63..f0a3f4df9 100644 --- a/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/tests/test_gather_release_counts.py @@ -1,16 +1,15 @@ import os -from itertools import cycle from unittest import TestCase from unittest.mock import patch -from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle -from ebi_eva_common_pyutils.pg_utils import execute_query +from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle +from ebi_eva_internal_pyutils.pg_utils import execute_query from sqlalchemy import select from sqlalchemy.orm import Session from gather_clustering_counts.gather_release_counts import find_link, ReleaseCounter from gather_clustering_counts.release_count_models import RSCountPerTaxonomy, RSCountPerAssembly, RSCountCategory, \ - RSCount + RSCount, RSCountPerTaxonomyAssembly def test_find_links(): @@ -35,7 +34,6 @@ def test_find_links(): assert find_link({'E'}, d1, d2) == (frozenset({'E'}), frozenset({})) - class TestReleaseCounter(TestCase): resource_folder = os.path.dirname(__file__) @@ -43,10 +41,15 @@ class TestReleaseCounter(TestCase): def setUp(self): self.private_config_xml_file = os.path.join(self.resource_folder, 'config_xml_file.xml') self.config_profile = "localhost" + + def tearDown(self): with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn: - for sqlalchemy_class in [RSCountCategory, RSCount, RSCountPerTaxonomy, RSCountPerAssembly]: - query = f'DROP TABLE {sqlalchemy_class.schema}.{sqlalchemy_class.__tablename__}' - execute_query(db_conn, query) + table_names = ','.join(f'{sqlalchemy_class.schema}.{sqlalchemy_class.__tablename__}' for sqlalchemy_class in + [RSCountCategory, RSCount, RSCountPerTaxonomy, RSCountPerAssembly, + RSCountPerTaxonomyAssembly]) + query = f'DROP TABLE IF EXISTS {table_names};' + print(query) + execute_query(db_conn, query) def test_write_counts_to_db(self): """This test require a postgres database running on localhost. See config_xml_file.xml for detail.""" @@ -93,7 +96,19 @@ def test_write_counts_to_db(self): assert rs_assembly_count.new_current_rs == 0 assert rs_assembly_count.release_folder == 'GCA_000003205.6' + query = select(RSCountPerTaxonomyAssembly).where( + RSCountPerTaxonomyAssembly.assembly_accession == 'GCA_000003205.6', + RSCountPerTaxonomyAssembly.taxonomy_id == 9913, + RSCountPerTaxonomyAssembly.release_version == 1 + ) + result = session.execute(query).fetchone() + rs_count_per_taxonomy_assembly = result.RSCountPerTaxonomyAssembly + assert rs_count_per_taxonomy_assembly.current_rs == 61038394 + assert rs_count_per_taxonomy_assembly.new_current_rs == 0 + assert rs_count_per_taxonomy_assembly.release_folder == 'Cow_9913/GCA_000003205.6' + def test_write_counts_to_db2(self): + """This test require a postgres database running on localhost. See config_xml_file.xml for detail.""" log_files_release = [os.path.join(self.resource_folder, 'count_for_haplochromini_oreochromis_niloticus.log')] folder_to_taxonomy = {'oreochromis_niloticus': 8128, 'haplochromini': 319058} diff --git a/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py b/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py index f4e195dcf..8faa59feb 100644 --- a/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py +++ b/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py @@ -27,6 +27,7 @@ from ebi_eva_common_pyutils.logger import logging_config from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle from ebi_eva_internal_pyutils.pg_utils import get_all_results_for_query + from run_release_in_embassy.release_common_utils import get_release_folder_name from run_release_in_embassy.run_release_for_species import load_config, get_release_folder from run_release_in_embassy.release_metadata import release_vcf_file_categories, release_text_file_categories