Skip to content

Commit

Permalink
Merge pull request #301 from labgem/gff_parsing_fix
Browse files Browse the repository at this point in the history
Fix Contig Length Detection for GFF Parsing
  • Loading branch information
axbazin authored Nov 19, 2024
2 parents 23c4fd7 + 361db0c commit 583e0cb
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 70 deletions.
55 changes: 31 additions & 24 deletions ppanggolin/annotate/annotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# local libraries
from ppanggolin.annotate.synta import (
annotate_organism,
read_fasta,
get_contigs_from_fasta_file,
get_dna_sequence,
init_contig_counter,
contig_counter,
Expand Down Expand Up @@ -1034,17 +1034,22 @@ def check_chevrons_in_start_and_stop(
dbxref_metadata
)

if fields_gff[gff_seqname] in circular_contigs or (
if (
"IS_CIRCULAR" in attributes
and attributes["IS_CIRCULAR"] == "true"
):
# WARNING: In case we have prodigal gff with is_circular attributes.
# This would fail as contig is not defined. However, is_circular should not be found in prodigal gff
logging.getLogger("PPanGGOLiN").debug(
f"Contig {contig.name} is circular."
)
contig.is_circular = True
assert contig.name == fields_gff[gff_seqname]
contig_name = fields_gff[gff_seqname]

if contig is not None:
logging.getLogger("PPanGGOLiN").debug(
f"Contig {contig.name} is circular."
)
contig.is_circular = True
assert contig.name == contig_name
else:
# contig object has not been initialized yet.
# let's keep the circularity info in the circular_contigs list
circular_contigs.append(contig_name)

elif fields_gff[gff_type] == "CDS" or "RNA" in fields_gff[gff_type]:

Expand Down Expand Up @@ -1120,9 +1125,9 @@ def check_chevrons_in_start_and_stop(
gene_frame = int(fields_gff[frame])

if (
gene_id in id_attr_to_gene_id
id_attribute in id_attr_to_gene_id
): # the ID has already been seen at least once in this genome
existing_gene = id_attr_to_gene_id[gene_id]
existing_gene = id_attr_to_gene_id[id_attribute]
new_gene_info = {
"strand": fields_gff[gff_strand],
"type": fields_gff[gff_type],
Expand All @@ -1141,7 +1146,7 @@ def check_chevrons_in_start_and_stop(

gene = Gene(org.name + "_CDS_" + str(gene_counter).zfill(4))

id_attr_to_gene_id[gene_id] = gene
id_attr_to_gene_id[id_attribute] = gene

# here contig is filled in order, so position is the number of genes already stored in the contig.
gene.fill_annotations(
Expand Down Expand Up @@ -1182,9 +1187,6 @@ def check_chevrons_in_start_and_stop(
rna_counter += 1
contig.add_rna(rna)

# Correct coordinates of genes that overlap the edge of circulars contig
correct_putative_overlaps(org.contigs)

# Fix partial genes coordinates
for contig in org.contigs:
for gene in contig.genes:
Expand All @@ -1201,14 +1203,11 @@ def check_chevrons_in_start_and_stop(
has_fasta = False

if has_fasta:
contig_sequences = read_fasta(
org, fasta_string.split("\n")
) # _ is total contig length
contig_sequences = get_contigs_from_fasta_file(org, fasta_string.split("\n"))

correct_putative_overlaps(org.contigs)

for contig in org.contigs:
if contig.length != len(contig_sequences[contig.name]):
raise ValueError(
"The contig length defined is different than the sequence length"
)

for gene in contig.genes:
gene.add_sequence(get_dna_sequence(contig_sequences[contig.name], gene))
Expand All @@ -1220,7 +1219,7 @@ def check_chevrons_in_start_and_stop(
add_metadata_from_gff_file(contig_name_to_region_info, org, gff_file_path)

if used_transl_table_arg:
logging.getLogger("PPanGGOLiN").info(
logging.getLogger("PPanGGOLiN").debug(
f"transl_table tag was not found for {used_transl_table_arg} CDS "
f"in {gff_file_path}. Provided translation_table argument value was used instead: {translation_table}."
)
Expand Down Expand Up @@ -1616,7 +1615,15 @@ def get_gene_sequences_from_fastas(
f"your fasta file are different."
)
with read_compressed_or_not(Path(elements[1])) as currFastaFile:
fasta_dict[org] = read_fasta(org, currFastaFile)
fasta_dict[org] = get_contigs_from_fasta_file(org, currFastaFile)

# When dealing with GFF files, some genes may have coordinates extending beyond the actual
# length of contigs, especially when they overlap the edges. This usually needs to be split
# into two parts to handle the circular genome wrapping.
# If the GFF file lacks associated FASTA sequences and it was not possible to determine the
# contig length from the GFF file, we must apply this correction while parsing the external FASTA file.

correct_putative_overlaps(org.contigs)

if set(pangenome.organisms) > set(fasta_dict.keys()):
missing = pangenome.number_of_organisms - len(
Expand Down
136 changes: 95 additions & 41 deletions ppanggolin/annotate/synta.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from subprocess import Popen, PIPE
import ast
from collections import defaultdict
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Union, Generator, Tuple
from pathlib import Path

# install libraries
Expand Down Expand Up @@ -245,49 +245,103 @@ def launch_infernal(
return gene_objs


def read_fasta(org: Organism, fna_file: Union[TextIOWrapper, list]) -> Dict[str, str]:
"""Reads a fna file (or stream, or string) and stores it in a dictionary with contigs as key and sequence as value.
def check_sequence_tuple(name: str, sequence: str):
"""
Checks and validates a sequence name and its corresponding sequence.
:param name: The name (header) of the sequence, typically extracted from the FASTA file header.
:param sequence: The sequence string corresponding to the name, containing the nucleotide or protein sequence.
:param org: Organism corresponding to fasta file
:param fna_file: Input fasta file with sequences or list of each line as sequence
:return: A tuple containing the validated name and sequence.
:return: Dictionary with contig_name as keys and contig sequence in values
:raises ValueError:
- If the sequence is empty, a ValueError is raised with a message containing the header name.
- If the name is empty, a ValueError is raised with a message containing a preview of the sequence.
"""
global contig_counter
try:
contigs = {}
contig_seq = ""
contig = None
for line in fna_file:
if line.startswith(">"):
if len(contig_seq) >= 1: # contig filter = 1
contigs[contig.name] = contig_seq.upper()
contig.length = len(contig_seq)
contig_seq = ""
try:
contig = org.get(line.split()[0][1:])
except KeyError:
with contig_counter.get_lock():
contig = Contig(contig_counter.value, line.split()[0][1:])
contig_counter.value += 1
org.add(contig)
else:
contig_seq += line.strip()
if len(contig_seq) >= 1: # processing the last contig
contigs[contig.name] = contig_seq.upper()
contig.length = len(contig_seq)

except AttributeError as e:
raise AttributeError(
f"{e}\nAn error was raised when reading file: '{fna_file.name}'. "
f"One possibility for this error is that the file did not start with a '>' "
f"as it would be expected from a fna file."
)
except Exception as err: # To manage other exception which can occur
raise Exception(
f"{err}: Please check your input file and if everything looks fine, "
"please post an issue on our github"
if not sequence:
raise ValueError(f"Found an empty sequence with header '{name}'")

if not name:
raise ValueError(
f"Found a sequence with empty name (sequence starts as '{sequence[:60]}')"
)

return name, sequence


def parse_fasta(
fna_file: Union[TextIOWrapper, list]
) -> Generator[Tuple[str, str], None, None]:
"""Yields each sequence name and sequence from a FASTA file or stream as a tuple.
:param fna_file: Input FASTA file or list of lines as sequences.
:yield: Tuple with contig header (without '>') and sequence.
:raises ValueError: If the file does not contain valid FASTA format.
"""
name = None
sequence = ""

for line in fna_file:
line = line.strip()

if line.startswith(">"): # New header
if name: # Yield previous header and sequence if available
yield check_sequence_tuple(name, sequence)

name = line[1:].split()[
0
] # Strip '>' and extract the first word as the name
sequence = ""

elif line: # Only append non-empty lines
sequence += line

else:
# You can skip or handle empty lines here if required
pass

# Yield the final contig if exists
if name:
yield check_sequence_tuple(name, sequence)

# Check if there was any valid data (at least one header and sequence)
if not name:
raise ValueError("The file does not contain any valid FASTA content.")


def get_contigs_from_fasta_file(
org: Organism, fna_file: Union[TextIOWrapper, list]
) -> Dict[str, str]:
"""Processes contigs from a parsed FASTA generator and stores in a dictionary.
:param org: Organism instance to update with contig info.
:param fna_file: Input FASTA file or list of lines as sequences.
:return: Dictionary with contig names as keys and sequences as values.
"""

global contig_counter
contigs = {}

for contig_name, sequence in parse_fasta(fna_file):

# Retrieve or create the contig
try:
contig = org.get(contig_name)
except KeyError:
with contig_counter.get_lock():
contig = Contig(contig_counter.value, contig_name)
contig_counter.value += 1
org.add(contig)

# Update contig information
if contig.length is not None and contig.length != len(sequence):
raise ValueError(
f"Length mismatch for contig {contig_name}: expected {contig.length}, found {len(sequence)} from the fasta sequence."
)

contig.length = len(sequence)
contigs[contig_name] = sequence.upper()

return contigs


Expand Down Expand Up @@ -464,7 +518,7 @@ def annotate_organism(

fasta_file = read_compressed_or_not(file_name)

contig_sequences = read_fasta(org, fasta_file)
contig_sequences = get_contigs_from_fasta_file(org, fasta_file)
if is_compressed(file_name): # TODO simply copy file with shutil.copyfileobj
fasta_file = write_tmp_fasta(contig_sequences, tmpdir)
if procedure is None: # prodigal procedure is not force by user
Expand Down
2 changes: 0 additions & 2 deletions ppanggolin/genome.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,6 @@ def __setitem__(self, coordinate: Tuple[int, int, str], gene: Gene):
@property
def length(self) -> Union[int, None]:
"""Get the length of the contig"""
if self._length is None:
logging.getLogger("PPanGGOLiN").warning("Contig length is unknown")
return self._length

@length.setter
Expand Down
6 changes: 3 additions & 3 deletions ppanggolin/projection/projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import yaml

# # local libraries
from ppanggolin.annotate.synta import read_fasta, get_dna_sequence
from ppanggolin.annotate.synta import get_contigs_from_fasta_file, get_dna_sequence
from ppanggolin.annotate.annotate import (
init_contig_counter,
read_anno_file,
Expand Down Expand Up @@ -679,7 +679,7 @@ def get_gene_sequences_from_fasta_files(organisms, genome_name_to_annot_path):
org_fasta_file = genome_name_to_annot_path[org.name]["path"]

with read_compressed_or_not(org_fasta_file) as currFastaFile:
org_contig_to_seq = read_fasta(org, currFastaFile)
org_contig_to_seq = get_contigs_from_fasta_file(org, currFastaFile)

for contig in org.contigs:
try:
Expand Down Expand Up @@ -997,7 +997,7 @@ def retrieve_gene_sequences_from_fasta_file(input_organism, fasta_file):
"""

with read_compressed_or_not(fasta_file) as currFastaFile:
contig_id2seq = read_fasta(input_organism, currFastaFile)
contig_id2seq = get_contigs_from_fasta_file(input_organism, currFastaFile)

for contig in input_organism.contigs:
try:
Expand Down
38 changes: 38 additions & 0 deletions tests/annotate/test_annotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
shift_end_coordinates,
)

from ppanggolin.annotate.synta import check_sequence_tuple, parse_fasta


@pytest.mark.parametrize(
"input_string, expected_positions, expected_complement, expected_partialgene_start, expected_partialgene_end",
Expand Down Expand Up @@ -531,3 +533,39 @@ def test_shift_start_coordinates(coordinates, shift, expected):
def test_shift_end_coordinates(coordinates, shift, expected):
result = shift_end_coordinates(coordinates, shift)
assert result == expected


def test_check_sequence_tuple_valid():
name, sequence = check_sequence_tuple("seq1", "ATGC")
assert name == "seq1"
assert sequence == "ATGC"


def test_check_sequence_tuple_empty_name():
with pytest.raises(ValueError):
check_sequence_tuple("", "ATGC")


def test_check_sequence_tuple_empty_sequence():
with pytest.raises(ValueError):
check_sequence_tuple("seq1", "")


def test_parse_fasta_valid():
fasta_data = ">seq1\nATGC\n>seq2\nGCTA"

result = list(parse_fasta(fasta_data.split("\n")))

assert result == [("seq1", "ATGC"), ("seq2", "GCTA")]


def test_parse_fasta_empty_sequence():
fasta_data = ">seq1\n>seq2\nGCTA"
with pytest.raises(ValueError):
list(parse_fasta(fasta_data.split("\n")))


def test_parse_fasta_no_header():
fasta_data = "seq1\nATGC\nseq2\nGCTA".split("\n")
with pytest.raises(ValueError):
list(parse_fasta(fasta_data))

0 comments on commit 583e0cb

Please sign in to comment.