diff --git a/ppanggolin/annotate/annotate.py b/ppanggolin/annotate/annotate.py index f99c1f46..c4359e34 100644 --- a/ppanggolin/annotate/annotate.py +++ b/ppanggolin/annotate/annotate.py @@ -22,7 +22,7 @@ # local libraries from ppanggolin.annotate.synta import ( annotate_organism, - read_fasta, + get_contigs_from_fasta_file, get_dna_sequence, init_contig_counter, contig_counter, @@ -1034,17 +1034,22 @@ def check_chevrons_in_start_and_stop( dbxref_metadata ) - if fields_gff[gff_seqname] in circular_contigs or ( + if ( "IS_CIRCULAR" in attributes and attributes["IS_CIRCULAR"] == "true" ): - # WARNING: In case we have prodigal gff with is_circular attributes. - # This would fail as contig is not defined. However, is_circular should not be found in prodigal gff - logging.getLogger("PPanGGOLiN").debug( - f"Contig {contig.name} is circular." - ) - contig.is_circular = True - assert contig.name == fields_gff[gff_seqname] + contig_name = fields_gff[gff_seqname] + + if contig is not None: + logging.getLogger("PPanGGOLiN").debug( + f"Contig {contig.name} is circular." + ) + contig.is_circular = True + assert contig.name == contig_name + else: + # contig object has not been initialized yet. + # let's keep the circularity info in the circular_contigs list + circular_contigs.append(contig_name) elif fields_gff[gff_type] == "CDS" or "RNA" in fields_gff[gff_type]: @@ -1120,9 +1125,9 @@ def check_chevrons_in_start_and_stop( gene_frame = int(fields_gff[frame]) if ( - gene_id in id_attr_to_gene_id + id_attribute in id_attr_to_gene_id ): # the ID has already been seen at least once in this genome - existing_gene = id_attr_to_gene_id[gene_id] + existing_gene = id_attr_to_gene_id[id_attribute] new_gene_info = { "strand": fields_gff[gff_strand], "type": fields_gff[gff_type], @@ -1141,7 +1146,7 @@ def check_chevrons_in_start_and_stop( gene = Gene(org.name + "_CDS_" + str(gene_counter).zfill(4)) - id_attr_to_gene_id[gene_id] = gene + id_attr_to_gene_id[id_attribute] = gene # here contig is filled in order, so position is the number of genes already stored in the contig. gene.fill_annotations( @@ -1182,9 +1187,6 @@ def check_chevrons_in_start_and_stop( rna_counter += 1 contig.add_rna(rna) - # Correct coordinates of genes that overlap the edge of circulars contig - correct_putative_overlaps(org.contigs) - # Fix partial genes coordinates for contig in org.contigs: for gene in contig.genes: @@ -1201,14 +1203,11 @@ def check_chevrons_in_start_and_stop( has_fasta = False if has_fasta: - contig_sequences = read_fasta( - org, fasta_string.split("\n") - ) # _ is total contig length + contig_sequences = get_contigs_from_fasta_file(org, fasta_string.split("\n")) + + correct_putative_overlaps(org.contigs) + for contig in org.contigs: - if contig.length != len(contig_sequences[contig.name]): - raise ValueError( - "The contig length defined is different than the sequence length" - ) for gene in contig.genes: gene.add_sequence(get_dna_sequence(contig_sequences[contig.name], gene)) @@ -1220,7 +1219,7 @@ def check_chevrons_in_start_and_stop( add_metadata_from_gff_file(contig_name_to_region_info, org, gff_file_path) if used_transl_table_arg: - logging.getLogger("PPanGGOLiN").info( + logging.getLogger("PPanGGOLiN").debug( f"transl_table tag was not found for {used_transl_table_arg} CDS " f"in {gff_file_path}. Provided translation_table argument value was used instead: {translation_table}." ) @@ -1616,7 +1615,15 @@ def get_gene_sequences_from_fastas( f"your fasta file are different." ) with read_compressed_or_not(Path(elements[1])) as currFastaFile: - fasta_dict[org] = read_fasta(org, currFastaFile) + fasta_dict[org] = get_contigs_from_fasta_file(org, currFastaFile) + + # When dealing with GFF files, some genes may have coordinates extending beyond the actual + # length of contigs, especially when they overlap the edges. This usually needs to be split + # into two parts to handle the circular genome wrapping. + # If the GFF file lacks associated FASTA sequences and it was not possible to determine the + # contig length from the GFF file, we must apply this correction while parsing the external FASTA file. + + correct_putative_overlaps(org.contigs) if set(pangenome.organisms) > set(fasta_dict.keys()): missing = pangenome.number_of_organisms - len( diff --git a/ppanggolin/annotate/synta.py b/ppanggolin/annotate/synta.py index 2b65ea40..f3c74a92 100644 --- a/ppanggolin/annotate/synta.py +++ b/ppanggolin/annotate/synta.py @@ -9,7 +9,7 @@ from subprocess import Popen, PIPE import ast from collections import defaultdict -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Union, Generator, Tuple from pathlib import Path # install libraries @@ -245,49 +245,103 @@ def launch_infernal( return gene_objs -def read_fasta(org: Organism, fna_file: Union[TextIOWrapper, list]) -> Dict[str, str]: - """Reads a fna file (or stream, or string) and stores it in a dictionary with contigs as key and sequence as value. +def check_sequence_tuple(name: str, sequence: str): + """ + Checks and validates a sequence name and its corresponding sequence. + + :param name: The name (header) of the sequence, typically extracted from the FASTA file header. + :param sequence: The sequence string corresponding to the name, containing the nucleotide or protein sequence. - :param org: Organism corresponding to fasta file - :param fna_file: Input fasta file with sequences or list of each line as sequence + :return: A tuple containing the validated name and sequence. - :return: Dictionary with contig_name as keys and contig sequence in values + :raises ValueError: + - If the sequence is empty, a ValueError is raised with a message containing the header name. + - If the name is empty, a ValueError is raised with a message containing a preview of the sequence. """ - global contig_counter - try: - contigs = {} - contig_seq = "" - contig = None - for line in fna_file: - if line.startswith(">"): - if len(contig_seq) >= 1: # contig filter = 1 - contigs[contig.name] = contig_seq.upper() - contig.length = len(contig_seq) - contig_seq = "" - try: - contig = org.get(line.split()[0][1:]) - except KeyError: - with contig_counter.get_lock(): - contig = Contig(contig_counter.value, line.split()[0][1:]) - contig_counter.value += 1 - org.add(contig) - else: - contig_seq += line.strip() - if len(contig_seq) >= 1: # processing the last contig - contigs[contig.name] = contig_seq.upper() - contig.length = len(contig_seq) - - except AttributeError as e: - raise AttributeError( - f"{e}\nAn error was raised when reading file: '{fna_file.name}'. " - f"One possibility for this error is that the file did not start with a '>' " - f"as it would be expected from a fna file." - ) - except Exception as err: # To manage other exception which can occur - raise Exception( - f"{err}: Please check your input file and if everything looks fine, " - "please post an issue on our github" + if not sequence: + raise ValueError(f"Found an empty sequence with header '{name}'") + + if not name: + raise ValueError( + f"Found a sequence with empty name (sequence starts as '{sequence[:60]}')" ) + + return name, sequence + + +def parse_fasta( + fna_file: Union[TextIOWrapper, list] +) -> Generator[Tuple[str, str], None, None]: + """Yields each sequence name and sequence from a FASTA file or stream as a tuple. + + :param fna_file: Input FASTA file or list of lines as sequences. + :yield: Tuple with contig header (without '>') and sequence. + :raises ValueError: If the file does not contain valid FASTA format. + """ + name = None + sequence = "" + + for line in fna_file: + line = line.strip() + + if line.startswith(">"): # New header + if name: # Yield previous header and sequence if available + yield check_sequence_tuple(name, sequence) + + name = line[1:].split()[ + 0 + ] # Strip '>' and extract the first word as the name + sequence = "" + + elif line: # Only append non-empty lines + sequence += line + + else: + # You can skip or handle empty lines here if required + pass + + # Yield the final contig if exists + if name: + yield check_sequence_tuple(name, sequence) + + # Check if there was any valid data (at least one header and sequence) + if not name: + raise ValueError("The file does not contain any valid FASTA content.") + + +def get_contigs_from_fasta_file( + org: Organism, fna_file: Union[TextIOWrapper, list] +) -> Dict[str, str]: + """Processes contigs from a parsed FASTA generator and stores in a dictionary. + + :param org: Organism instance to update with contig info. + :param fna_file: Input FASTA file or list of lines as sequences. + :return: Dictionary with contig names as keys and sequences as values. + """ + + global contig_counter + contigs = {} + + for contig_name, sequence in parse_fasta(fna_file): + + # Retrieve or create the contig + try: + contig = org.get(contig_name) + except KeyError: + with contig_counter.get_lock(): + contig = Contig(contig_counter.value, contig_name) + contig_counter.value += 1 + org.add(contig) + + # Update contig information + if contig.length is not None and contig.length != len(sequence): + raise ValueError( + f"Length mismatch for contig {contig_name}: expected {contig.length}, found {len(sequence)} from the fasta sequence." + ) + + contig.length = len(sequence) + contigs[contig_name] = sequence.upper() + return contigs @@ -464,7 +518,7 @@ def annotate_organism( fasta_file = read_compressed_or_not(file_name) - contig_sequences = read_fasta(org, fasta_file) + contig_sequences = get_contigs_from_fasta_file(org, fasta_file) if is_compressed(file_name): # TODO simply copy file with shutil.copyfileobj fasta_file = write_tmp_fasta(contig_sequences, tmpdir) if procedure is None: # prodigal procedure is not force by user diff --git a/ppanggolin/genome.py b/ppanggolin/genome.py index 46cbdcdd..331db45c 100644 --- a/ppanggolin/genome.py +++ b/ppanggolin/genome.py @@ -553,8 +553,6 @@ def __setitem__(self, coordinate: Tuple[int, int, str], gene: Gene): @property def length(self) -> Union[int, None]: """Get the length of the contig""" - if self._length is None: - logging.getLogger("PPanGGOLiN").warning("Contig length is unknown") return self._length @length.setter diff --git a/ppanggolin/projection/projection.py b/ppanggolin/projection/projection.py index d577e840..b0a36c5f 100644 --- a/ppanggolin/projection/projection.py +++ b/ppanggolin/projection/projection.py @@ -20,7 +20,7 @@ import yaml # # local libraries -from ppanggolin.annotate.synta import read_fasta, get_dna_sequence +from ppanggolin.annotate.synta import get_contigs_from_fasta_file, get_dna_sequence from ppanggolin.annotate.annotate import ( init_contig_counter, read_anno_file, @@ -679,7 +679,7 @@ def get_gene_sequences_from_fasta_files(organisms, genome_name_to_annot_path): org_fasta_file = genome_name_to_annot_path[org.name]["path"] with read_compressed_or_not(org_fasta_file) as currFastaFile: - org_contig_to_seq = read_fasta(org, currFastaFile) + org_contig_to_seq = get_contigs_from_fasta_file(org, currFastaFile) for contig in org.contigs: try: @@ -997,7 +997,7 @@ def retrieve_gene_sequences_from_fasta_file(input_organism, fasta_file): """ with read_compressed_or_not(fasta_file) as currFastaFile: - contig_id2seq = read_fasta(input_organism, currFastaFile) + contig_id2seq = get_contigs_from_fasta_file(input_organism, currFastaFile) for contig in input_organism.contigs: try: diff --git a/tests/annotate/test_annotate.py b/tests/annotate/test_annotate.py index cd933eab..377dab8f 100644 --- a/tests/annotate/test_annotate.py +++ b/tests/annotate/test_annotate.py @@ -16,6 +16,8 @@ shift_end_coordinates, ) +from ppanggolin.annotate.synta import check_sequence_tuple, parse_fasta + @pytest.mark.parametrize( "input_string, expected_positions, expected_complement, expected_partialgene_start, expected_partialgene_end", @@ -531,3 +533,39 @@ def test_shift_start_coordinates(coordinates, shift, expected): def test_shift_end_coordinates(coordinates, shift, expected): result = shift_end_coordinates(coordinates, shift) assert result == expected + + +def test_check_sequence_tuple_valid(): + name, sequence = check_sequence_tuple("seq1", "ATGC") + assert name == "seq1" + assert sequence == "ATGC" + + +def test_check_sequence_tuple_empty_name(): + with pytest.raises(ValueError): + check_sequence_tuple("", "ATGC") + + +def test_check_sequence_tuple_empty_sequence(): + with pytest.raises(ValueError): + check_sequence_tuple("seq1", "") + + +def test_parse_fasta_valid(): + fasta_data = ">seq1\nATGC\n>seq2\nGCTA" + + result = list(parse_fasta(fasta_data.split("\n"))) + + assert result == [("seq1", "ATGC"), ("seq2", "GCTA")] + + +def test_parse_fasta_empty_sequence(): + fasta_data = ">seq1\n>seq2\nGCTA" + with pytest.raises(ValueError): + list(parse_fasta(fasta_data.split("\n"))) + + +def test_parse_fasta_no_header(): + fasta_data = "seq1\nATGC\nseq2\nGCTA".split("\n") + with pytest.raises(ValueError): + list(parse_fasta(fasta_data))