From 61dacfeb42dc6a2df718b8c09e113fd73b8a5c60 Mon Sep 17 00:00:00 2001 From: JeanMainguy Date: Mon, 23 Oct 2023 16:07:06 +0200 Subject: [PATCH] refac launch of projection --- ppanggolin/projection/projection.py | 300 ++++++++++++++++++---------- 1 file changed, 200 insertions(+), 100 deletions(-) diff --git a/ppanggolin/projection/projection.py b/ppanggolin/projection/projection.py index 3bd6a42a..0ad4cd2a 100644 --- a/ppanggolin/projection/projection.py +++ b/ppanggolin/projection/projection.py @@ -51,23 +51,29 @@ class NewSpot(Spot): def __str__(self): return f'new_spot_{str(self.ID)}' -def launch(args: argparse.Namespace): +def check_pangenome_for_projection(pangenome: Pangenome, fast_aln:bool): """ - Command launcher + Check the status of a pangenome and determine whether projection is possible. - :param args: All arguments provide by user - """ + :param pangenome: The pangenome to be checked. + :param fast_aln: Whether to use the fast alignment option for gene projection. - output_dir = Path(args.output) - mk_outdir(output_dir, args.force) + This function checks various attributes of a pangenome to determine whether it is suitable for projecting + features into a provided genome. + + Returns: + A tuple indicating whether RGP prediction, spot projection, and module projection + are possible (True) or not (False) based on the pangenome's status. + + Raises: + NameError: If the pangenome has not been partitioned. + Exception: If the pangenome lacks gene sequences or gene family sequences, and fast alignment is not enabled. + """ - # For the moment these elements of the pangenome are predicted by default project_modules = True predict_rgp = True project_spots = True - pangenome = Pangenome() - pangenome.add_file(args.pangenome) if pangenome.status["partitioned"] not in ["Computed", "Loaded", "inFile"]: raise NameError(f"The provided pangenome has not been partitioned. " @@ -91,7 +97,7 @@ def launch(args: argparse.Namespace): project_modules = False - if pangenome.status["geneSequences"] not in ["Loaded", "Computed", "inFile"] and not args.fast: + if pangenome.status["geneSequences"] not in ["Loaded", "Computed", "inFile"] and not fast_aln: raise Exception("The provided pangenome has no gene sequences. " "Projection is still possible with the --fast option to use representative " "sequences rather than all genes to annotate input genes.") @@ -100,70 +106,52 @@ def launch(args: argparse.Namespace): raise Exception("The provided pangenome has no gene families sequences. " "This is not possible to annotate an input organism to this pangenome.") + return predict_rgp, project_spots, project_modules - check_pangenome_info(pangenome, need_annotations=True, need_families=True, disable_bar=args.disable_prog_bar, - need_rgp=predict_rgp, need_modules=project_modules, need_gene_sequences=False, - need_spots=project_spots) - - logging.getLogger('PPanGGOLiN').info('Retrieving parameters from the provided pangenome file.') - pangenome_params = argparse.Namespace( - **{step: argparse.Namespace(**k_v) for step, k_v in pangenome.parameters.items()}) - - # dup margin value here is specified in argument and is used to compute completeness. - # Thats mean it can be different than dup margin used in spot and RGPS. - - # TODO make this single_copy_fams a method of class Pangenome that should be used in write --stats - single_copy_fams = set() - - for fam in pangenome.gene_families: - if fam.named_partition == "persistent": - dup = len([genes for genes in fam.get_org_dict().values() if - len([gene for gene in genes if not gene.is_fragment]) > 1]) - - if (dup / fam.number_of_organisms) < args.dup_margin: - single_copy_fams.add(fam) - - - # genome_name_to_fasta_path, genome_name_to_annot_path = None, None +def manage_input_genomes_annotation(pangenome, input_mode, anno, fasta, + organism_name, circular_contigs, pangenome_params, + cpu, use_pseudo, disable_bar, tmpdir, config): + """ + """ genome_name_to_path = None - if args.input_mode == "multiple": - if args.anno: + if input_mode == "multiple": + if anno: input_type = "annotation" - genome_name_to_path = parse_input_paths_file(args.anno) + genome_name_to_path = parse_input_paths_file(anno) - elif args.fasta: + elif fasta: input_type = "fasta" - genome_name_to_path = parse_input_paths_file(args.fasta) + genome_name_to_path = parse_input_paths_file(fasta) else: # args.input_mode == "single: - circular_contigs = args.circular_contigs if args.circular_contigs else [] - if args.anno: + circular_contigs = circular_contigs if circular_contigs else [] + if anno: input_type = "annotation" - genome_name_to_path = {args.organism_name: {"path": args.annot, + genome_name_to_path = {organism_name: {"path": anno, "circular_contigs": circular_contigs}} - elif args.fasta: + elif fasta: input_type = "fasta" - genome_name_to_path = {args.organism_name: {"path": args.fasta, + genome_name_to_path = {organism_name: {"path": fasta, "circular_contigs": circular_contigs}} - + if input_type == "annotation": check_input_names(pangenome, genome_name_to_path) - organisms, org_2_has_fasta = read_annotation_files(genome_name_to_path, cpu=args.cpu, pseudo=args.use_pseudo, - disable_bar=args.disable_prog_bar) + organisms, org_2_has_fasta = read_annotation_files(genome_name_to_path, cpu=cpu, pseudo=use_pseudo, + disable_bar=disable_bar) if not all((has_fasta for has_fasta in org_2_has_fasta.values())): organisms_with_no_fasta = {org for org, has_fasta in org_2_has_fasta.items() if not has_fasta} - if args.fasta: + if fasta: get_gene_sequences_from_fasta_files(organisms_with_no_fasta, genome_name_to_path) else: raise ValueError(f"You provided GFF files for {len(organisms_with_no_fasta)} (out of {len(organisms)}) " - "organisms without associated sequence data, and you did not provide " + "organisms without associated sequence data, and you did not provide " "FASTA sequences using the --fasta or --single_fasta_file options. Therefore, it is impossible to project the pangenome onto the input genomes. " f"The following organisms have no associated sequence data: {', '.join(o.name for o in organisms_with_no_fasta)}") @@ -171,56 +159,51 @@ def launch(args: argparse.Namespace): annotate_param_names = ["norna", "kingdom", "allow_overlap", "prodigal_procedure"] - annotate_params = manage_annotate_param(annotate_param_names, pangenome_params.annotate, args.config) + annotate_params = manage_annotate_param(annotate_param_names, pangenome_params.annotate, config) check_input_names(pangenome, genome_name_to_path) - organisms = annotate_fasta_files(genome_name_to_fasta_path=genome_name_to_path, tmpdir=args.tmpdir, cpu=args.cpu, - translation_table=int(pangenome_params.cluster.translation_table), norna=annotate_params.norna, kingdom=annotate_params.kingdom, - allow_overlap=annotate_params.allow_overlap, procedure=annotate_params.prodigal_procedure, disable_bar=args.disable_prog_bar ) - - - input_org_to_lonely_genes_count = annotate_input_genes_with_pangenome_families(pangenome, input_organisms=organisms, - output=output_dir, cpu=args.cpu, use_representatives=args.fast, - no_defrag=args.no_defrag, identity=args.identity, - coverage=args.coverage, tmpdir=args.tmpdir, - translation_table=int(pangenome_params.cluster.translation_table), - keep_tmp=args.keep_tmp, - disable_bar=args.disable_prog_bar) - - - input_org_2_rgps, input_org_to_spots, input_orgs_to_modules = {}, {}, {} - - if predict_rgp: - logging.getLogger('PPanGGOLiN').info('Detecting RGPs in input genomes.') - - multigenics = pangenome.get_multigenics(pangenome_params.rgp.dup_margin) - - input_org_2_rgps = predict_RGP(pangenome, organisms, persistent_penalty=pangenome_params.rgp.persistent_penalty, variable_gain=pangenome_params.rgp.variable_gain, - min_length=pangenome_params.rgp.min_length, min_score=pangenome_params.rgp.min_score, multigenics=multigenics, output_dir=output_dir, - disable_bar=args.disable_prog_bar) - - - if project_spots: - logging.getLogger('PPanGGOLiN').info('Predicting spot of insertion in input genomes.') - input_org_to_spots = predict_spots_in_input_organisms(initial_spots=list(pangenome.spots), - initial_regions=pangenome.regions, - input_org_2_rgps=input_org_2_rgps, - multigenics=multigenics, - output=output_dir, - write_graph_flag=args.spot_graph, - graph_formats=args.graph_formats, - overlapping_match=pangenome_params.spot.overlapping_match, - set_size=pangenome_params.spot.set_size, - exact_match=pangenome_params.spot.exact_match_size) - - + organisms = annotate_fasta_files(genome_name_to_fasta_path=genome_name_to_path, tmpdir=tmpdir, cpu=cpu, + translation_table=int(pangenome_params.cluster.translation_table), norna=annotate_params.norna, kingdom=annotate_params.kingdom, + allow_overlap=annotate_params.allow_overlap, procedure=annotate_params.prodigal_procedure, disable_bar=disable_bar) + return organisms, genome_name_to_path, input_type + + +def write_projection_results(pangenome:Pangenome, organisms:Set[Organism], input_org_2_rgps:Dict[Organism, Set[Region]], + input_org_to_spots:Dict[Organism, Set[Spot]], + input_orgs_to_modules:Dict[Organism, Set[Module]] , + input_org_to_lonely_genes_count:Dict[Organism, int], + write_proksee:bool, write_gff:bool, add_sequences:bool, + genome_name_to_path:Dict[str,dict], input_type:str, + output_dir:Path, dup_margin:float, ): + """ + Write the results of the projection of pangneome onto input genomes. + + :param pangenome: The pangenome onto which the projection is performed. + :param organisms: A set of input organisms for projection. + :param input_org_2_rgps: A dictionary mapping input organisms to sets of regions of genomic plasticity (RGPs). + :param input_org_to_spots: A dictionary mapping input organisms to sets of spots. + :param input_orgs_to_modules: A dictionary mapping input organisms to sets of modules. + :param input_org_to_lonely_genes_count: A dictionary mapping input organisms to the count of lonely genes. + :param write_proksee: Whether to write ProkSee JSON files. + :param write_gff: Whether to write GFF files. + :param add_sequences: Whether to add sequences to the output files. + :param genome_name_to_path: A dictionary mapping genome names to file paths. + :param input_type: The type of input data (e.g., "annotation"). + :param output_dir: The directory where the output files will be written. + :param dup_margin: The duplication margin used to compute completeness. + + Note: + - If `write_proksee` is True and input organisms have modules, module colors for ProkSee are obtained. + - The function calls other functions such as `summarize_projection`, `read_genome_file`, `write_proksee_organism`, + `write_gff_file`, and `write_summaries` to generate various output files and summaries. + """ - if project_modules: + if write_proksee and input_orgs_to_modules: # get module color for proksee module_to_colors = manage_module_colors(set(pangenome.modules)) - - input_orgs_to_modules = project_and_write_modules(pangenome, organisms, output_dir) + + single_copy_families = get_single_copy_families(pangenome, dup_margin) organism_2_summary = {} @@ -229,19 +212,19 @@ def launch(args: argparse.Namespace): org_outdir = output_dir / organism.name # summarize projection for all input organisms - organism_2_summary[organism] = summarize_projection(organism, pangenome, single_copy_fams, + organism_2_summary[organism] = summarize_projection(organism, pangenome, single_copy_families, input_org_2_rgps.get(organism, None), input_org_to_spots.get(organism, None), input_orgs_to_modules.get(organism, None), input_org_to_lonely_genes_count[organism]) - if (args.proksee or args.gff) and args.add_sequences: + if (write_proksee or write_gff) and add_sequences: genome_sequences = read_genome_file(genome_name_to_path[organism.name]['path'], organism) genome_name_to_path[organism.name]['path'] else: genome_sequences = None - if args.proksee: + if write_proksee: org_module_to_color = {org_mod: module_to_colors[org_mod] for org_mod in input_orgs_to_modules.get(organism, [])} output_file = output_dir / organism.name / f"{organism.name}_proksee.json" @@ -252,7 +235,7 @@ def launch(args: argparse.Namespace): genome_sequences=genome_sequences) - if args.gff: + if write_gff: if input_type == "annotation": # if the genome has not been annotated by PPanGGOLiN annotation_sources = {"rRNA": "external", "tRNA": "external", @@ -277,7 +260,7 @@ def launch(args: argparse.Namespace): - write_summaries(organism_2_summary, output_dir) + write_summaries(organism_2_summary, output_dir) def annotate_fasta_files(genome_name_to_fasta_path: Dict[str, dict], tmpdir: str, cpu: int = 1, translation_table: int = 11, @@ -463,15 +446,47 @@ def write_summaries(organism_2_summary: Dict[Organism, Dict[str, Any]], output_d df_summary.to_csv(output_dir / "summary_projection.tsv", sep='\t', index=False) +def get_single_copy_families(pangenome: Pangenome, dup_margin:float): + """ + Get single copy families + + :param pangenome: The pangenome onto which the projection is performed. + :param dup_margin: The duplication margin used to compute single copy families. + + """ + + # TODO make this single_copy_fams a method of class Pangenome that should be used in write --stats + single_copy_families = set() + + for fam in pangenome.gene_families: + if fam.named_partition == "persistent": + dup = len([genes for genes in fam.get_org_dict().values() if + len([gene for gene in genes if not gene.is_fragment]) > 1]) + + if (dup / fam.number_of_organisms) < dup_margin: + single_copy_families.add(fam) + + return single_copy_families + def summarize_projection(input_organism:Organism, pangenome:Pangenome, single_copy_families:Set, input_org_rgps:Region, input_org_spots:Spot, input_org_modules:Module, singleton_gene_count:int): """ + Summarize the projection of an input organism onto a pangenome. - :param singleton_gene_count: Number of genes that do not cluster with any of the gene families of the pangenome. + :param input_organism: The input organism for projection. + :param input_org_rgps: The regions of genomic plasticity (RGPs) in the input organism. + :param input_org_spots: The spots in the input organism. + :param input_org_modules: The modules in the input organism. + :param singleton_gene_count: Number of genes that do not cluster with any gene families in the pangenome. + + Returns: + A dictionary containing summary information about the projection, including organism details, + gene and family counts, completeness, and counts of RGPs, spots, new spots, and modules. """ + partition_to_gene = defaultdict(set) contigs_count = 0 for contig in input_organism.contigs: @@ -485,7 +500,7 @@ def summarize_projection(input_organism:Organism, pangenome:Pangenome, single_c completeness = "NA" - single_copy_markers_count = len(set(input_organism.families) & single_copy_families ) + single_copy_markers_count = len(set(input_organism.families) & single_copy_families) if len(single_copy_families) > 0: completeness = round((single_copy_markers_count / len(single_copy_families)) * 100, 2) @@ -1184,6 +1199,91 @@ def check_projection_arguments(args: argparse.Namespace, parser: argparse.Argume return input_mode + + +def launch(args: argparse.Namespace): + """ + Command launcher + + :param args: All arguments provide by user + """ + + output_dir = Path(args.output) + mk_outdir(output_dir, args.force) + + # For the moment these elements of the pangenome are predicted by default + + pangenome = Pangenome() + pangenome.add_file(args.pangenome) + + predict_rgp, project_spots, project_modules = check_pangenome_for_projection(pangenome, args.fast) + + check_pangenome_info(pangenome, need_annotations=True, need_families=True, disable_bar=args.disable_prog_bar, + need_rgp=predict_rgp, need_modules=project_modules, need_gene_sequences=False, + need_spots=project_spots) + + logging.getLogger('PPanGGOLiN').info('Retrieving parameters from the provided pangenome file.') + pangenome_params = argparse.Namespace( + **{step: argparse.Namespace(**k_v) for step, k_v in pangenome.parameters.items()}) + + organisms, genome_name_to_path, input_type = manage_input_genomes_annotation(pangenome=pangenome, + input_mode=args.input_mode, + anno=args.anno, fasta=args.fasta, + organism_name=args.organism_name, + circular_contigs=args.circular_contigs, + pangenome_params=pangenome_params, + cpu=args.cpu, use_pseudo=args.use_pseudo, + disable_bar=args.disable_prog_bar, + tmpdir= args.tmpdir, config=args.config) + + + + input_org_to_lonely_genes_count = annotate_input_genes_with_pangenome_families(pangenome, input_organisms=organisms, + output=output_dir, cpu=args.cpu, use_representatives=args.fast, + no_defrag=args.no_defrag, identity=args.identity, + coverage=args.coverage, tmpdir=args.tmpdir, + translation_table=int(pangenome_params.cluster.translation_table), + keep_tmp=args.keep_tmp, + disable_bar=args.disable_prog_bar) + + + input_org_2_rgps, input_org_to_spots, input_orgs_to_modules = {}, {}, {} + + if predict_rgp: + + logging.getLogger('PPanGGOLiN').info('Detecting RGPs in input genomes.') + + multigenics = pangenome.get_multigenics(pangenome_params.rgp.dup_margin) + + input_org_2_rgps = predict_RGP(pangenome, organisms, persistent_penalty=pangenome_params.rgp.persistent_penalty, variable_gain=pangenome_params.rgp.variable_gain, + min_length=pangenome_params.rgp.min_length, min_score=pangenome_params.rgp.min_score, multigenics=multigenics, output_dir=output_dir, + disable_bar=args.disable_prog_bar) + + if project_spots: + logging.getLogger('PPanGGOLiN').info('Predicting spot of insertion in input genomes.') + input_org_to_spots = predict_spots_in_input_organisms(initial_spots=list(pangenome.spots), + initial_regions=pangenome.regions, + input_org_2_rgps=input_org_2_rgps, + multigenics=multigenics, + output=output_dir, + write_graph_flag=args.spot_graph, + graph_formats=args.graph_formats, + overlapping_match=pangenome_params.spot.overlapping_match, + set_size=pangenome_params.spot.set_size, + exact_match=pangenome_params.spot.exact_match_size) + + if project_modules: + input_orgs_to_modules = project_and_write_modules(pangenome, organisms, output_dir) + + write_projection_results(pangenome, organisms, input_org_2_rgps, + input_org_to_spots, + input_orgs_to_modules, + input_org_to_lonely_genes_count, + write_proksee=args.proksee, write_gff=args.gff, add_sequences=args.add_sequences, + genome_name_to_path=genome_name_to_path, input_type=input_type, + output_dir=output_dir, dup_margin=args.dup_margin) + + def subparser(sub_parser: argparse._SubParsersAction) -> argparse.ArgumentParser: """ Subparser to launch PPanGGOLiN in Command line