Skip to content

Commit

Permalink
Merge pull request #423 from tcezard/EVA3410_persit_counts
Browse files Browse the repository at this point in the history
EVA-3410 - Persistence of the exploded counts to eva_stats
  • Loading branch information
tcezard authored Mar 18, 2024
2 parents 0c2c2f9 + 0edf527 commit bf21073
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 25 deletions.
5 changes: 3 additions & 2 deletions eva-accession-release-automation/gather_clustering_counts/bash/count_rs_for_all_files.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ do
echo "Process $INPUT"
ASSEMBLY=$(basename $(dirname ${INPUT}));
SC_NAME=$(basename $(dirname $(dirname ${INPUT})));
TYPE=$(echo $(basename ${INPUT}) | cut -f 4- -d '_' | awk '{print substr($0,1,length($0)-11)}')
# SPLIT by GCA is to support format that have a taxonomy prefix (release 5+) or the one that do not (release 4-)
TYPE=$(echo $(basename ${INPUT}) | awk -F 'GCA' '{print $2}' |cut -f 3- -d '_' | awk '{print substr($0,1,length($0)-11)}')
OUTPUT=tmp_${SC_NAME}_${ASSEMBLY}_${TYPE}.txt
if [[ ${INPUT} == *.vcf.gz ]]
then
Expand Down Expand Up @@ -42,5 +43,5 @@ cat $ALL_TMP_OUTPUT | sort \
print ""; }' \
| grep -v '^$' | sort | uniq -c | sort -nr > "$OUTPUT_FILE"

rm $ALL_TMP_OUTPUT
rm -f $ALL_TMP_OUTPUT

Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import glob
import os
from collections import defaultdict, Counter
from functools import lru_cache
from functools import lru_cache, cached_property
from urllib.parse import urlsplit

from ebi_eva_common_pyutils.command_utils import run_command_with_output
from ebi_eva_common_pyutils.common_utils import pretty_print
from ebi_eva_common_pyutils.config_utils import get_metadata_creds_for_profile
from ebi_eva_common_pyutils.logger import logging_config, AppLogger
from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query

from sqlalchemy import Column, String, Integer, BigInteger, ForeignKey, create_engine, URL, MetaData, TEXT, \
UniqueConstraint, select
from sqlalchemy.orm import declarative_base, relationship, mapped_column, Session

logger = logging_config.get_logger(__name__)

Expand All @@ -36,6 +41,8 @@ def find_link(key_set, dict1, dict2, source_linked_set1=None, source_linked_set2
linked_set2 = source_linked_set2.copy()
for key1 in key_set:
if key1 in dict1:
# first set should at least contain the query
linked_set1.add(key1)
for value1 in dict1.get(key1):
linked_set2.add(value1)
if value1 in dict2:
Expand Down Expand Up @@ -89,14 +96,15 @@ def gather_count_for_set_species(release_directory, set_of_species, output_dir):


def collect_files_to_count(release_directory, set_of_species):
"""Collect all the (final) release files for a set of species"""
all_files = []
for species in set_of_species:
species_dir = os.path.join(release_directory, species)
assembly_directories = glob.glob(os.path.join(species_dir, "GCA_*"))
for assembly_dir in assembly_directories:
vcf_pattern = f'*_GCA_*_ids.vcf.gz'
vcf_pattern = f'*GCA_*_ids.vcf.gz'
vcf_files = glob.glob(os.path.join(assembly_dir, vcf_pattern))
txt_pattern = f'*_GCA_*_ids.txt.gz'
txt_pattern = f'*GCA_*_ids.txt.gz'
txt_files = glob.glob(os.path.join(assembly_dir, txt_pattern))
# I don't have the taxonomy to add to the bash pattern so remove the file that start with eva_ or dbsnp_
vcf_files = [f for f in vcf_files if 'dbsnp_' not in f and 'eva_' not in f]
Expand All @@ -107,7 +115,11 @@ def collect_files_to_count(release_directory, set_of_species):
return all_files


def calculate_all_logs(release_dir, output_dir, species_directories=None):
def run_calculation_script_for_species(release_dir, output_dir, species_directories=None):
"""
Run the bash script that count the number of RS for all the specified species directories (all if not set)
and return the logs
"""
all_assemblies_2_species, all_species_2_assemblies = gather_assemblies_and_species_from_directory(release_dir)
all_sets_of_species = set()
# Determine the species that needs to be counted together because they share assemblies
Expand Down Expand Up @@ -141,36 +153,176 @@ def generate_output_tsv(dict_of_counter, output_file, header):
]) + '\n')


Base = declarative_base(metadata=MetaData(schema="eva_stats"))


class RSCountCategory(Base):
"""
Table that provide the metadata associated with a number of RS id
"""
__tablename__ = 'release_rs_count_category'

count_category_id = Column(Integer, primary_key=True, autoincrement=True)
assembly = Column(String)
taxonomy = Column(Integer)
rs_type = Column(String)
release_version = Column(Integer)
rs_count_id = mapped_column(ForeignKey("release_rs_count.id"))
rs_count = relationship("RSCount", back_populates="count_categories")
__table_args__ = (
UniqueConstraint('assembly', 'taxonomy', 'rs_type', 'release_version', 'rs_count_id', name='uix_1'),
)
schema = 'eva_stats'


class RSCount(Base):
"""
Table that provide the count associated with each category
"""
__tablename__ = 'release_rs_count'
id = Column(Integer, primary_key=True, autoincrement=True)
count = Column(BigInteger)
group_description = Column(TEXT, unique=True)
count_categories = relationship("RSCountCategory", back_populates="rs_count")
schema = 'eva_stats'


class ReleaseCounter(AppLogger):

def __init__(self, private_config_xml_file, release_version, logs):
def __init__(self, private_config_xml_file, config_profile, release_version, logs):
self.private_config_xml_file = private_config_xml_file
self.config_profile = config_profile
self.release_version = release_version
self.all_counts_grouped = []
self.parse_logs(logs)
self.parse_count_script_logs(logs)
self.add_annotations()

@lru_cache
def get_taxonomy_and_scientific_name(self, species_folder):
# TODO: Restore this function to only retrieve the taxonomy and scientific name using the taxonomy table in release 6
query = (
f"select distinct c.taxonomy, t.scientific_name "
f"from eva_progress_tracker.clustering_release_tracker c "
f"join evapro.taxonomy t on c.taxonomy=t.taxonomy_id "
f"where release_version={self.release_version} AND release_folder_name='{species_folder}'"
f"where release_folder_name='{species_folder}'"
)
with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn:
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
results = get_all_results_for_query(db_conn, query)
if len(results) < 1:
# Rely only on the clustering_release_tracker when taxonomy is not available in EVAPRO
query = (
f"select distinct taxonomy, scientific_name "
f"from eva_progress_tracker.clustering_release_tracker "
f"where release_folder_name='{species_folder}'"
)
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
results = get_all_results_for_query(db_conn, query)
if len(results) < 1:
# Support for directory from release 1
if species_folder.split('_')[-1].isdigit():
taxonomy = int(species_folder.split('_')[-1])
query = (
f"select distinct taxonomy, scientific_name "
f"from eva_progress_tracker.clustering_release_tracker "
f"where taxonomy={taxonomy}"
)
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
results = get_all_results_for_query(db_conn, query)
if len(results) < 1:
logger.warning(f'Failed to get scientific name and taxonomy for {species_folder}')
return None, None
return results[0][0], results[0][1]

@cached_property
def sqlalchemy_engine(self):
pg_url, pg_user, pg_pass = get_metadata_creds_for_profile(self.config_profile, self.private_config_xml_file)
dbtype, host_url, port_and_db = urlsplit(pg_url).path.split(':')
port, db = port_and_db.split('/')
engine = create_engine(URL.create(
dbtype + '+psycopg2',
username=pg_user,
password=pg_pass,
host=host_url.split('/')[-1],
database=db,
port=port
))
RSCount.__table__.create(bind=engine, checkfirst=True)
RSCountCategory.__table__.create(bind=engine, checkfirst=True)
return engine

def add_annotations(self):
for count_groups in self.all_counts_grouped:
for count_dict in count_groups:
taxonomy, scientific_name = self.get_taxonomy_and_scientific_name(count_dict['release_folder'])
count_dict['taxonomy'] = taxonomy
count_dict['scientific_name'] = scientific_name
if taxonomy:
count_dict['taxonomy'] = taxonomy
if scientific_name:
count_dict['scientific_name'] = scientific_name

def count_descriptor(self, count_dict):
"""Description for associated with specific taxonomy, assembly and type of RS"""
if 'taxonomy' in count_dict:
return f"{count_dict['taxonomy']},{count_dict['assembly']},{count_dict['idtype']}"

def group_descriptor(self, count_groups):
"""Description for associated with group of taxonomy, assembly and type of RS that have the same RS count"""
group_descriptor_list = [
self.count_descriptor(count_dict)
for count_dict in count_groups]
if None not in group_descriptor_list:
return '|'.join(sorted(group_descriptor_list)) + f'_release_{self.release_version}'

def write_counts_to_db(self):
"""
For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already.
Warn if the count already exists and are different.
"""
session = Session(self.sqlalchemy_engine)
with session.begin():
for count_groups in self.all_counts_grouped:
# All the part of the group should have the same count
count_set = set((count_dict['count'] for count_dict in count_groups))
assert len(count_set) == 1
count = count_set.pop()
# This is used to uniquely identify the count for this group so that loading the same value twice does
# not result in duplicates
group_description = self.group_descriptor(count_groups)
if not group_description:
# One of the taxonomy annotation is missing, we should not load that count
continue
query = select(RSCount).where(RSCount.group_description == group_description)
result = session.execute(query).fetchone()
if result:
rs_count = result.RSCount
else:
rs_count = RSCount(count=count, group_description=group_description)
session.add(rs_count)
for count_dict in count_groups:
query = select(RSCountCategory).where(
RSCountCategory.assembly == count_dict['assembly'],
RSCountCategory.taxonomy == count_dict['taxonomy'],
RSCountCategory.rs_type == count_dict['idtype'],
RSCountCategory.release_version == self.release_version,
RSCountCategory.rs_count_id == rs_count.id,
)
result = session.execute(query).fetchone()
if not result:
self.info(f"Create persistence for {count_dict['assembly']}, {count_dict['taxonomy']}, {count_dict['idtype']}, {count_dict['count']}")
rs_category = RSCountCategory(
assembly=count_dict['assembly'],
taxonomy=count_dict['taxonomy'],
rs_type=count_dict['idtype'],
release_version=self.release_version,
rs_count=rs_count
)
session.add(rs_category)
else:
rs_category = result.RSCountCategory
# Check if we were just loading the same value as a previous run
if rs_category.rs_count.count != count_dict['count']:
self.error(f"{self.count_descriptor(count_dict)} already has a count entry in the table "
f"({rs_category.rs_count.count}) different from the one being loaded "
f"{count_dict['count']}")

def get_assembly_counts_from_database(self):
"""
Expand All @@ -186,14 +338,14 @@ def get_assembly_counts_from_database(self):
f" FROM {assembly_table_name} "
f"WHERE release_version={self.release_version}"
)
with get_metadata_connection_handle('production_processing', self.private_config_xml_file) as db_conn:
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
for row in get_all_results_for_query(db_conn, query):
assembly = row[0]
for index, metric in enumerate(all_metrics):
results[assembly][metric] = row[index + 1]
return results

def parse_logs(self, all_logs):
def parse_count_script_logs(self, all_logs):
for log_file in all_logs:
with open(log_file) as open_file:
for line in open_file:
Expand All @@ -204,7 +356,8 @@ def parse_logs(self, all_logs):
for annotation in set_of_annotations:
assembly, release_folder, idtype = annotation.split('-')
all_groups.append(
{'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype}
{'count': count, 'release_folder': release_folder, 'assembly': assembly, 'idtype': idtype,
'annotation': annotation}
)
self.all_counts_grouped.append(all_groups)

Expand Down Expand Up @@ -256,7 +409,7 @@ def compare_assembly_counts_with_db(self, threshold=0, output_csv=None):
header = ('Assembly', 'Metric', 'File', 'DB', 'Diff (file-db)')
rows = []
count_per_assembly_from_files = self.generate_per_assembly_counts()
counts_per_assembly_from_db = self.get_counts_assembly_from_database()
counts_per_assembly_from_db = self.get_assembly_counts_from_database()
all_asms = set(count_per_assembly_from_files.keys()).union(counts_per_assembly_from_db.keys())
for asm in all_asms:
asm_counts_from_files = count_per_assembly_from_files.get(asm, {})
Expand Down Expand Up @@ -284,22 +437,32 @@ def main():
help="base directory where all the release was run.", required=True)
parser.add_argument("--output-dir", type=str,
help="Output directory where all count logs will be created.", required=True)
parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True)
parser.add_argument("--release-version", type=int, help="current release version", required=True)
parser.add_argument("--species-directories", type=str, nargs='+',
help="set of directory names to process. It will process all the related one as well. "
"Process all if missing")
parser.add_argument("--private-config-xml-file", help="ex: /path/to/eva-maven-settings.xml", required=True)
parser.add_argument("--config-profile", help="profile to use in the config xml", required=False,
default='production_processing')


args = parser.parse_args()
logging_config.add_stdout_handler()
output_dir = os.path.abspath(args.output_dir)
# Move to the output dir to make sure the bash tmp dir are written their
os.chdir(output_dir)
logger.info(f'Analyse {args.release_root_path}')
logs = calculate_all_logs(args.release_root_path, args.output_dir, args.species_directories)
counter = ReleaseCounter(args.private_config_xml_file, args.release_version, logs)
counter.detect_inconsistent_types()
generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy')
generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly')
generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly')
counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv'))
log_files = run_calculation_script_for_species(args.release_root_path, output_dir, args.species_directories)
counter = ReleaseCounter(args.private_config_xml_file,
config_profile=args.config_profile, release_version=args.release_version, logs=log_files)
counter.write_counts_to_db()

# TODO: Other analysis should be performed on the database counts
# counter.detect_inconsistent_types()
# generate_output_tsv(counter.generate_per_species_counts(), os.path.join(args.output_dir, 'species_counts.tsv'), 'Taxonomy')
# generate_output_tsv(counter.generate_per_assembly_counts(), os.path.join(args.output_dir, 'assembly_counts.tsv'), 'Assembly')
# generate_output_tsv(counter.generate_per_species_assembly_counts(), os.path.join(args.output_dir, 'species_assembly_counts.tsv'), 'Taxonomy\tAssembly')
# counter.compare_assembly_counts_with_db(output_csv=os.path.join(args.output_dir, 'comparison_assembly_counts.csv'))


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ def test_find_links():
'A': ['1', '2'],
'B': ['2', '5'],
'C': ['3', '4'],
'D': ['5']
'D': ['5'],
'E': []
}
d2 = {
'1': ['A', 'B'],
Expand All @@ -19,3 +20,4 @@ def test_find_links():
assert find_link({'B'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'}))
assert find_link({'C'}, d1, d2) == (frozenset({'C'}), frozenset({'3', '4'}))
assert find_link({'D'}, d1, d2) == (frozenset({'A', 'B', 'D'}), frozenset({'1', '2', '5'}))
assert find_link({'E'}, d1, d2) == (frozenset({'E'}), frozenset({}))
1 change: 1 addition & 0 deletions eva-accession-release-automation/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pytz==2022.6
pyyaml==5.3.1
requests==2.22.0
retry==0.9.2
sqlalchemy==2.0.22

0 comments on commit bf21073

Please sign in to comment.