Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Memory Efficiency for ppanggolin fasta Command #283

Merged
merged 14 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 352 additions & 3 deletions ppanggolin/formats/readBinaries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

# default libraries
import logging
from optparse import Option
from pathlib import Path
from typing import Dict, Any, Set, List, Tuple, Optional
from typing import Dict, Any, Iterator, Set, List, Tuple, Optional
from collections import defaultdict

# installed libraries
Expand Down Expand Up @@ -279,7 +278,7 @@ def get_non_redundant_gene_sequences_from_file(pangenome_filename: str, output:
file_obj.write(f'{row["dna"].decode()}\n')


def write_gene_sequences_from_pangenome_file(pangenome_filename: str, output: Path, list_cds: iter = None,
def write_gene_sequences_from_pangenome_file(pangenome_filename: str, output: Path, list_cds: Optional[Iterator] = None,
add: str = '', compress: bool = False, disable_bar: bool = False):
"""
Writes the CDS sequences of the Pangenome object to a File object that can be filtered or not by a list of CDS,
Expand Down Expand Up @@ -309,6 +308,356 @@ def write_gene_sequences_from_pangenome_file(pangenome_filename: str, output: Pa
f"{output.absolute()}{'.gz' if compress else ''}")


def read_rgp_genes_from_pangenome_file(h5f: tables.File) -> Set[bytes]:
"""
Retrieves a list of RGP genes from the pangenome file.

:param h5f: The open HDF5 pangenome file containing RGP gene data.
:return: A list of gene names (as bytes) from the RGP.
"""
rgp_genes = {row["gene"] for row in read_chunks(h5f.root.RGP, chunk=20000)}

return rgp_genes


def get_families_from_genes(h5f: tables.File, genes: Set[bytes]) -> Set[bytes]:
"""
Retrieves gene families associated with a specified set of genes from the pangenome file.

:param h5f: The open HDF5 pangenome file containing gene family data.
:param genes: A set of gene names (as bytes) for which to retrieve the associated families.
:return: A set of gene family names (as bytes) associated with the specified genes.
"""

families = set()
for row in read_chunks(h5f.root.geneFamilies, chunk=20000):
if row['gene'] in genes:
families.add(row["geneFam"])

return families

def read_module_families_from_pangenome_file(h5f: tables.File, module_name: str) -> Set[bytes]:
"""
Retrieves gene families associated with a specified module from the pangenome file.


:param h5f: The open HDF5 pangenome file containing module data.
:param module_name: The name of the module (as a string). The module ID is extracted from
the name by removing the "module_" prefix.
:return: A set of gene family names (as bytes) associated with the specified module.
"""

family_to_write = set()
module_id = int(module_name[len("module_"):])
module_table = h5f.root.modules

for row in read_chunks(module_table, chunk=20000):
if row['module'] == module_id:
family_to_write.add(row['geneFam'])

return family_to_write

def get_families_matching_partition(h5f: tables.File, partition: str) -> Set[bytes]:
"""
Retrieves gene families that match the specified partition.

:param h5f: The open HDF5 pangenome file containing gene family information.
:param partition: The partition name (as a string). If "all", all gene families are included.
Otherwise, it filters by the first letter of the partition.
:return: A set of gene family names (as bytes) that match the partition criteria.
"""

family_to_write = set()

gene_fam_info_table = h5f.root.geneFamiliesInfo
parition_first_letter = partition[0].upper()

for row in read_chunks(gene_fam_info_table, chunk=20000):

if partition == "all" or row['partition'].decode().startswith(parition_first_letter):
family_to_write.add(row['name'])

return family_to_write

def get_genes_from_families(h5f: tables.File, families: List[bytes]) -> Set[bytes]:
"""
Retrieves a set of genes that belong to the specified families.

This function reads the gene family data from an HDF5 pangenome file and returns
a set of genes that are part of the given list of gene families.

:param h5f: The open HDF5 pangenome file containing gene family data.
:param families: A list of gene families (as bytes) to filter genes by.
:return: A set of genes (as bytes) that belong to the specified families.
"""

matching_genes = set()

gene_fam_table = h5f.root.geneFamilies

for row in read_chunks(gene_fam_table, chunk=20000):
if row["geneFam"] in families:
matching_genes.add(row['gene'])

return matching_genes

def get_seqid_to_genes(h5f: tables.File, genes:Set[bytes], get_all_genes:bool = False, disable_bar:bool = False) -> Dict[int, List[str]]:
"""
Creates a mapping of sequence IDs to gene names.

:param h5f: The open HDF5 pangenome file containing gene sequence data.
:param genes: A list of gene names to include in the mapping (if `get_all_genes` is False).
:param get_all_genes: Boolean flag to indicate if all genes should be included in the mapping.
If set to True, all genes will be added regardless of the `genes` parameter.
:param disable_bar: Boolean flag to disable the progress bar if set to True.
:return: A dictionary mapping sequence IDs (integers) to lists of gene names (strings).
"""

seq_id_to_genes = defaultdict(list)
gene_seq_table = h5f.root.annotations.geneSequences
match_count = 0
for row in tqdm(read_chunks(gene_seq_table, chunk=20000), total=gene_seq_table.nrows, unit="gene", disable=disable_bar):

if get_all_genes or row['gene'] in genes:
seq_id_to_genes[row['seqid']].append(row['gene'].decode())
match_count += 1

assert get_all_genes or match_count == len(genes), f"Number of sequences found ({match_count}) does not match the number of expected genes {len(genes)}."

return seq_id_to_genes


def write_genes_seq_from_pangenome_file(h5f: tables.File, outpath: Path, compress: bool, seq_id_to_genes: Dict[int, List[str]], disable_bar:bool):
"""
Writes gene sequences from the pangenome file to an output file.

Only sequences whose IDs match the ones in `seq_id_to_genes` will be written.

:param h5f: The open HDF5 pangenome file containing sequence data.
:param outpath: The path to the output file where sequences will be written.
:param compress: Boolean flag to indicate whether output should be compressed.
:param seq_id_to_genes: A dictionary mapping sequence IDs to lists of gene names.
:param disable_bar: Boolean flag to disable the progress bar if set to True.
"""

with write_compressed_or_not(file_path=outpath, compress=compress) as file_obj:

seq_table = h5f.root.annotations.sequences

with tqdm(total=len(seq_id_to_genes), unit="sequence", disable=disable_bar) as pbar:

for row in read_chunks(table=seq_table, chunk=20000):
if row["seqid"] in seq_id_to_genes:

for seq_name in seq_id_to_genes[row["seqid"]]:
file_obj.write(f">{seq_name}\n")
file_obj.write(row["dna"].decode() + "\n")


pbar.update(1)


def get_gene_to_genome(h5f: tables.File) -> Dict[bytes, bytes]:
"""
Generates a mapping between gene IDs and their corresponding genome.

:param h5f: The open HDF5 pangenome file containing contig and gene annotations.
:return: A dictionary mapping gene IDs to genome names.
"""

contig_id_to_genome = {row["ID"]:row['genome'] for row in read_chunks( h5f.root.annotations.contigs, chunk=20000) }

gene_to_genome = {row["ID"]:contig_id_to_genome[row['contig'] ] for row in read_chunks( h5f.root.annotations.genes, chunk=20000) }

return gene_to_genome


def get_family_to_genome_count(h5f: tables.File) -> Dict[bytes, int]:
"""
Computes the number of unique genomes associated with each gene family.

:param h5f: The open HDF5 pangenome file containing contig, gene, and gene family data.
:return: A dictionary mapping gene family names (as bytes) to the count of unique genomes.
"""

contig_id_to_genome = {row["ID"]:row['genome'] for row in read_chunks( h5f.root.annotations.contigs, chunk=20000) }

gene_to_genome = {row["ID"]:contig_id_to_genome[row['contig'] ] for row in read_chunks( h5f.root.annotations.genes, chunk=20000) }

family_to_genomes = defaultdict(set)
for row in read_chunks( h5f.root.geneFamilies, chunk=20000):
family_to_genomes[row['geneFam']].add(gene_to_genome[row["gene"]])

family_to_genome_count = {fam: len(genomes) for fam, genomes in family_to_genomes.items()}

return family_to_genome_count

def get_soft_core_families(h5f: tables.File, soft_core: float) -> Set[bytes]:
"""
Identifies gene families that are present in at least a specified proportion of genomes.

:param h5f: The open HDF5 pangenome file containing gene family and genome data.
:param soft_core: The proportion of genomes (between 0 and 1) that a gene family must be present in
to be considered a soft core family.
:return: A set of gene family names (as bytes) that are classified as soft core.
"""
family_to_genome_count = get_family_to_genome_count(h5f)
pangenome_info = read_info(h5f)
genome_count = pangenome_info["Content"]["Genomes"]

genome_count_threshold = genome_count * soft_core
soft_core_families = {family for family, fam_genome_count in family_to_genome_count.items() if fam_genome_count >= genome_count_threshold}
return soft_core_families

def write_fasta_gene_fam_from_pangenome_file(pangenome_filename: str, output: Path, family_filter: str, soft_core:float = 0.95,
compress: bool = False, disable_bar=False):
"""
Write representative nucleotide sequences of gene families

:param pangenome: Pangenome object with gene families sequences
:param output: Path to output directory
:param gene_families: Selected partition of gene families
:param soft_core: Soft core threshold to use
:param compress: Compress the file in .gz
:param disable_bar: Disable progress bar
"""

outpath = output / f"{family_filter}_nucleotide_families.fasta"

with tables.open_file(pangenome_filename, "r", driver_core_backing_store=0) as h5f:

if family_filter in ["all", 'persistent', 'shell', 'cloud']:
family_to_write = get_families_matching_partition(h5f, family_filter)


elif family_filter.startswith("module_"):
family_to_write = read_module_families_from_pangenome_file(h5f, module_name=family_filter)

elif family_filter == "rgp":
rgp_genes = read_rgp_genes_from_pangenome_file(h5f)
family_to_write = get_families_from_genes(h5f, rgp_genes)

elif family_filter in ["softcore", "core"]:
if family_filter == "core":
soft_core = 1.0

family_to_write = get_soft_core_families(h5f, soft_core)

if len(family_to_write) == 0:
logging.getLogger("PPanGGOLiN").warning(f"No families matching filter {family_filter}.")
return

seq_id_to_genes = get_seqid_to_genes(h5f, set(family_to_write))

write_genes_seq_from_pangenome_file(h5f, outpath, compress, seq_id_to_genes, disable_bar=disable_bar)

logging.getLogger("PPanGGOLiN").info("Done writing the representative nucleotide sequences "
f"of the gene families : '{outpath}{'.gz' if compress else ''}")


def write_fasta_prot_fam_from_pangenome_file(pangenome_filename: str, output: Path, family_filter: str, soft_core:float=0.95,
compress: bool = False, disable_bar=False):
"""
Write representative amino acid sequences of gene families.

:param pangenome: Pangenome object with gene families sequences
:param output: Path to output directory
:param prot_families: Selected partition of protein families
:param soft_core: Soft core threshold to use
:param compress: Compress the file in .gz
:param disable_bar: Disable progress bar
"""

outpath = output / f"{family_filter}_protein_families.faa"

partition_filter = False
family_to_write = []

with tables.open_file(pangenome_filename, "r", driver_core_backing_store=0) as h5f, write_compressed_or_not(outpath, compress) as fasta:

if family_filter in ["all", 'persistent', 'shell', 'cloud']:
partition_filter = True
parition_first_letter = family_filter[0].upper()

elif family_filter == "rgp":
rgp_genes = read_rgp_genes_from_pangenome_file(h5f)
family_to_write = get_families_from_genes(h5f, rgp_genes)

elif family_filter.startswith("module_"):
family_to_write = read_module_families_from_pangenome_file(h5f, module_name=family_filter)

elif family_filter in ["softcore", "core"]:

soft_core_to_apply = 1.0 if family_filter == "core" else soft_core

family_to_write = get_soft_core_families(h5f, soft_core=soft_core_to_apply)

gene_fam_info_table = h5f.root.geneFamiliesInfo


for row in tqdm(read_chunks(gene_fam_info_table, chunk=20000), total=gene_fam_info_table.nrows, unit="family", disable=disable_bar):

partition_match = partition_filter and (family_filter == "all" or row['partition'].decode().startswith(parition_first_letter))
family_match = row['name'] in family_to_write

if partition_match or family_match:

fasta.write(f">{row['name'].decode()}\n")
fasta.write(row['protein'].decode() + "\n")

logging.getLogger("PPanGGOLiN").info(f"Done writing the representative amino acid sequences of the gene families:"
f"'{outpath}{'.gz' if compress else ''}'")


def write_genes_from_pangenome_file(pangenome_filename: str, output: Path, gene_filter: str, soft_core:float=0.95,
compress: bool = False, disable_bar=False):
"""
Write representative nucleotide sequences of gene families

:param pangenome: Pangenome object with gene families sequences
:param output: Path to output directory
:param gene_families: Selected partition of gene families
:param soft_core: Soft core threshold to use
:param compress: Compress the file in .gz
:param disable_bar: Disable progress bar
"""

outpath = output / f"{gene_filter}_genes.fna"
get_all_genes = False


with tables.open_file(pangenome_filename, "r", driver_core_backing_store=0) as h5f:

if gene_filter in ['persistent', 'shell', 'cloud', "softcore", "core"] or gene_filter.startswith("module_"):

if gene_filter.startswith("module_"):
families = read_module_families_from_pangenome_file(h5f, module_name=gene_filter)

elif gene_filter in ["softcore", "core"]:

soft_core_to_apply = 1.0 if gene_filter == "core" else soft_core
families = get_soft_core_families(h5f, soft_core=soft_core_to_apply)

else:
families = get_families_matching_partition(h5f, gene_filter)

genes_to_write = get_genes_from_families(h5f, families)

elif gene_filter == "rgp":
genes_to_write = read_rgp_genes_from_pangenome_file(h5f)

elif gene_filter == "all":
genes_to_write = set()
get_all_genes = True

seq_id_to_genes = get_seqid_to_genes(h5f, genes_to_write, get_all_genes=get_all_genes, disable_bar=disable_bar)

write_genes_seq_from_pangenome_file(h5f, outpath, compress, seq_id_to_genes, disable_bar=disable_bar)

logging.getLogger("PPanGGOLiN").info("Done writing the representative nucleotide sequences "
f"of the gene families : '{outpath}{'.gz' if compress else ''}")


def read_graph(pangenome: Pangenome, h5f: tables.File, disable_bar: bool = False):
"""
Read information about graph in pangenome hdf5 file to add in pangenome object
Expand Down
Loading