Skip to content

Commit

Permalink
Merge pull request #458 from tcezard/taxonomy_and_assembly
Browse files Browse the repository at this point in the history
Fix release folder assignation and add table to store aggregate per taxonomy and assembly
  • Loading branch information
tcezard authored Sep 23, 2024
2 parents 9f966bf + 19a65b0 commit fc7f343
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sqlalchemy.orm import Session

from gather_clustering_counts.release_count_models import RSCountCategory, RSCount, get_sql_alchemy_engine, \
RSCountPerTaxonomy, RSCountPerAssembly
RSCountPerTaxonomy, RSCountPerAssembly, RSCountPerTaxonomyAssembly

logger = logging_config.get_logger(__name__)

Expand Down Expand Up @@ -275,6 +275,7 @@ def _write_per_taxonomy_counts(self, session):
result = session.execute(query).fetchone()
if result:
taxonomy_row = result.RSCountPerTaxonomy
self.info(f"Update counts for aggregate per taxonomy {taxonomy_id}")
else:
self.info(f"Create persistence for aggregate per taxonomy {taxonomy_id}")
taxonomy_row = RSCountPerTaxonomy(
Expand All @@ -294,11 +295,11 @@ def _write_per_taxonomy_counts(self, session):
else:
prev_count_for_taxonomy = None
for rs_type in taxonomy_counts.get(taxonomy_id):
count_prev = 0
if prev_count_for_taxonomy:
count_new = taxonomy_counts.get(taxonomy_id).get(rs_type) - getattr(prev_count_for_taxonomy,
self._type_to_column(rs_type))
else:
count_new = 0
count_prev = getattr(prev_count_for_taxonomy, self._type_to_column(rs_type))

count_new = taxonomy_counts.get(taxonomy_id).get(rs_type) - count_prev
setattr(taxonomy_row, self._type_to_column(rs_type), taxonomy_counts.get(taxonomy_id).get(rs_type))
setattr(taxonomy_row, self._type_to_column(rs_type, is_new=True), count_new)
session.add(taxonomy_row)
Expand All @@ -314,6 +315,7 @@ def _write_per_assembly_counts(self, session):
result = session.execute(query).fetchone()
if result:
assembly_row = result.RSCountPerAssembly
self.info(f"Update counts for aggregate per assembly {assembly}")
else:
self.info(f"Create persistence for aggregate per assembly {assembly}")
assembly_row = RSCountPerAssembly(
Expand All @@ -333,15 +335,57 @@ def _write_per_assembly_counts(self, session):
else:
prev_count_for_assembly = None
for rs_type in assembly_counts.get(assembly):
count_prev = 0
if prev_count_for_assembly:
count_new = assembly_counts.get(assembly).get(rs_type) - getattr(prev_count_for_assembly,
self._type_to_column(rs_type))
else:
count_new = 0
count_prev = getattr(prev_count_for_assembly, self._type_to_column(rs_type))
count_new = assembly_counts.get(assembly).get(rs_type) -count_prev
setattr(assembly_row, self._type_to_column(rs_type), assembly_counts.get(assembly).get(rs_type))
setattr(assembly_row, self._type_to_column(rs_type, is_new=True), count_new)
session.add(assembly_row)

def _write_per_taxonomy_and_assembly_counts(self, session):
"""Load the aggregated count per assembly (assume previous version of the release was loaded already)"""
species_assembly_counts, species_assembly_annotations = self.generate_per_taxonomy_and_assembly_counts()
for taxonomy, assembly in species_assembly_counts:
query = select(RSCountPerTaxonomyAssembly).where(
RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy,
RSCountPerTaxonomyAssembly.assembly_accession == assembly,
RSCountPerTaxonomyAssembly.release_version == self.release_version
)
result = session.execute(query).fetchone()
if result:
taxonomy_assembly_row = result.RSCountPerTaxonomyAssembly
self.info(f"Update counts for aggregate per taxonomy {taxonomy} and assembly {assembly}")
else:
self.info(f"Create persistence for aggregate per taxonomy {taxonomy} and assembly {assembly}")
taxonomy_assembly_row = RSCountPerTaxonomyAssembly(
taxonomy_id=taxonomy,
assembly_accession=assembly,
release_folder=species_assembly_annotations.get((taxonomy, assembly)).get('release_folder'),
release_version=self.release_version,
)
# Retrieve the count for the previous release
query = select(RSCountPerTaxonomyAssembly).where(
RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy,
RSCountPerTaxonomyAssembly.assembly_accession == assembly,
RSCountPerTaxonomyAssembly.release_version == self.release_version - 1
)
result = session.execute(query).fetchone()
if result:
prev_count_for_taxonomy_assembly = result.RSCountPerTaxonomyAssembly
else:
prev_count_for_taxonomy_assembly = None
for rs_type in species_assembly_counts.get((taxonomy, assembly)):
count_prev = 0
if prev_count_for_taxonomy_assembly:
count_prev = getattr(prev_count_for_taxonomy_assembly, self._type_to_column(rs_type))

count_new = species_assembly_counts.get((taxonomy, assembly)).get(rs_type) - count_prev
setattr(taxonomy_assembly_row, self._type_to_column(rs_type),
species_assembly_counts.get((taxonomy, assembly)).get(rs_type))
setattr(taxonomy_assembly_row, self._type_to_column(rs_type, is_new=True), count_new)
session.add(taxonomy_assembly_row)

def write_counts_to_db(self):
"""
For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already.
Expand All @@ -352,6 +396,7 @@ def write_counts_to_db(self):
self._write_exploded_counts(session)
self._write_per_taxonomy_counts(session)
self._write_per_assembly_counts(session)
self._write_per_taxonomy_and_assembly_counts(session)

def get_assembly_counts_from_database(self):
"""
Expand Down Expand Up @@ -400,6 +445,7 @@ def generate_per_taxonomy_counts(self):
species_annotations = defaultdict(dict)
for count_groups in self.all_counts_grouped:
taxonomy_and_types = set([(count_dict['taxonomy'], count_dict['idtype']) for count_dict in count_groups])
release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder']) for count_dict in count_groups)
for taxonomy, rstype in taxonomy_and_types:
if taxonomy not in species_annotations:
species_annotations[taxonomy] = {'assemblies': set(), 'release_folder': None}
Expand All @@ -412,7 +458,7 @@ def generate_per_taxonomy_counts(self):
if count_dict['taxonomy'] is taxonomy and count_dict['idtype'] is rstype
])
)
species_annotations[taxonomy]['release_folder'] = count_groups[0]['release_folder']
species_annotations[taxonomy]['release_folder'] = release_folder_map[taxonomy]
return species_counts, species_annotations

def generate_per_assembly_counts(self):
Expand All @@ -431,16 +477,26 @@ def generate_per_assembly_counts(self):
for count_dict in count_groups
if count_dict['assembly'] is assembly_accession and count_dict['idtype'] is rstype
]))

assembly_annotations[assembly_accession]['release_folder'] = assembly_accession
return assembly_counts, assembly_annotations

# def generate_per_species_assembly_counts(self):
# species_assembly_counts = defaultdict(Counter)
# for count_groups in self.all_counts_grouped:
# for count_dict in count_groups:
# species_assembly_counts[count_dict['assembly'] + '\t' + count_dict['assembly']][count_dict['idtype'] + '_rs'] += count_dict['count']
# return species_assembly_counts
def generate_per_taxonomy_and_assembly_counts(self):
species_assembly_counts = defaultdict(Counter)
species_assembly_annotations = defaultdict(dict)
for count_groups in self.all_counts_grouped:
taxonomy_assembly_and_types = set([(count_dict['taxonomy'], count_dict['assembly'], count_dict['idtype']) for count_dict in count_groups])
release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder']) for count_dict in count_groups)
for taxonomy, assembly, rstype in taxonomy_assembly_and_types:
if (taxonomy, assembly) not in species_assembly_annotations:
species_assembly_annotations[(taxonomy, assembly)] = {'release_folder': None}
# All count_dict have the same count in a group
species_assembly_counts[(taxonomy, assembly)][rstype] += count_groups[0]['count']
if assembly != 'Unmapped':
species_assembly_annotations[(taxonomy, assembly)]['release_folder'] = \
release_folder_map[taxonomy] + '/' + assembly
else:
species_assembly_annotations[(taxonomy, assembly)]['release_folder'] = release_folder_map[taxonomy]
return species_assembly_counts, species_assembly_annotations

def detect_inconsistent_types(self):
inconsistent_types = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,31 @@ class RSCountPerAssembly(Base):
schema = 'eva_stats'


class RSCountPerTaxonomyAssembly(Base):
"""
Table that provide the aggregated count per taxonomy and assembly
"""
__tablename__ = 'release_rs_count_per_taxonomy_assembly'

taxonomy_id = Column(Integer, primary_key=True)
assembly_accession = Column(String, primary_key=True)
release_version = Column(Integer, primary_key=True)
release_folder = Column(String)
current_rs = Column(BigInteger, default=0)
multimap_rs = Column(BigInteger, default=0)
merged_rs = Column(BigInteger, default=0)
deprecated_rs = Column(BigInteger, default=0)
merged_deprecated_rs = Column(BigInteger, default=0)
unmapped_rs = Column(BigInteger, default=0)
new_current_rs = Column(BigInteger, default=0)
new_multimap_rs = Column(BigInteger, default=0)
new_merged_rs = Column(BigInteger, default=0)
new_deprecated_rs = Column(BigInteger, default=0)
new_merged_deprecated_rs = Column(BigInteger, default=0)
new_unmapped_rs = Column(BigInteger, default=0)
schema = 'eva_stats'


def get_sql_alchemy_engine(dbtype, username, password, host_url, database, port):
engine = create_engine(URL.create(
dbtype + '+psycopg2',
Expand All @@ -104,6 +129,7 @@ def get_sql_alchemy_engine(dbtype, username, password, host_url, database, port)
RSCountCategory.__table__.create(bind=engine, checkfirst=True)
RSCountPerAssembly.__table__.create(bind=engine, checkfirst=True)
RSCountPerTaxonomy.__table__.create(bind=engine, checkfirst=True)
RSCountPerTaxonomyAssembly.__table__.create(bind=engine, checkfirst=True)
return engine


Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import os
from itertools import cycle
from unittest import TestCase
from unittest.mock import patch

from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_common_pyutils.pg_utils import execute_query
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_internal_pyutils.pg_utils import execute_query
from sqlalchemy import select
from sqlalchemy.orm import Session

from gather_clustering_counts.gather_release_counts import find_link, ReleaseCounter
from gather_clustering_counts.release_count_models import RSCountPerTaxonomy, RSCountPerAssembly, RSCountCategory, \
RSCount
RSCount, RSCountPerTaxonomyAssembly


def test_find_links():
Expand All @@ -35,18 +34,22 @@ def test_find_links():
assert find_link({'E'}, d1, d2) == (frozenset({'E'}), frozenset({}))



class TestReleaseCounter(TestCase):

resource_folder = os.path.dirname(__file__)

def setUp(self):
self.private_config_xml_file = os.path.join(self.resource_folder, 'config_xml_file.xml')
self.config_profile = "localhost"

def tearDown(self):
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
for sqlalchemy_class in [RSCountCategory, RSCount, RSCountPerTaxonomy, RSCountPerAssembly]:
query = f'DROP TABLE {sqlalchemy_class.schema}.{sqlalchemy_class.__tablename__}'
execute_query(db_conn, query)
table_names = ','.join(f'{sqlalchemy_class.schema}.{sqlalchemy_class.__tablename__}' for sqlalchemy_class in
[RSCountCategory, RSCount, RSCountPerTaxonomy, RSCountPerAssembly,
RSCountPerTaxonomyAssembly])
query = f'DROP TABLE IF EXISTS {table_names};'
print(query)
execute_query(db_conn, query)

def test_write_counts_to_db(self):
"""This test require a postgres database running on localhost. See config_xml_file.xml for detail."""
Expand Down Expand Up @@ -93,7 +96,19 @@ def test_write_counts_to_db(self):
assert rs_assembly_count.new_current_rs == 0
assert rs_assembly_count.release_folder == 'GCA_000003205.6'

query = select(RSCountPerTaxonomyAssembly).where(
RSCountPerTaxonomyAssembly.assembly_accession == 'GCA_000003205.6',
RSCountPerTaxonomyAssembly.taxonomy_id == 9913,
RSCountPerTaxonomyAssembly.release_version == 1
)
result = session.execute(query).fetchone()
rs_count_per_taxonomy_assembly = result.RSCountPerTaxonomyAssembly
assert rs_count_per_taxonomy_assembly.current_rs == 61038394
assert rs_count_per_taxonomy_assembly.new_current_rs == 0
assert rs_count_per_taxonomy_assembly.release_folder == 'Cow_9913/GCA_000003205.6'

def test_write_counts_to_db2(self):
"""This test require a postgres database running on localhost. See config_xml_file.xml for detail."""
log_files_release = [os.path.join(self.resource_folder, 'count_for_haplochromini_oreochromis_niloticus.log')]
folder_to_taxonomy = {'oreochromis_niloticus': 8128, 'haplochromini': 319058}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ebi_eva_common_pyutils.logger import logging_config
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_internal_pyutils.pg_utils import get_all_results_for_query

from run_release_in_embassy.release_common_utils import get_release_folder_name
from run_release_in_embassy.run_release_for_species import load_config, get_release_folder
from run_release_in_embassy.release_metadata import release_vcf_file_categories, release_text_file_categories
Expand Down

0 comments on commit fc7f343

Please sign in to comment.