diff --git a/kg_microbe/transform_utils/bacdive/bacdive.py b/kg_microbe/transform_utils/bacdive/bacdive.py index 53948b23..0cddf99a 100644 --- a/kg_microbe/transform_utils/bacdive/bacdive.py +++ b/kg_microbe/transform_utils/bacdive/bacdive.py @@ -105,7 +105,6 @@ MUREIN, NAME_COLUMN, NCBI_CATEGORY, - NCBI_TO_ENZYME_EDGE, NCBI_TO_MEDIUM_EDGE, NCBI_TO_METABOLITE_PRODUCTION_EDGE, NCBI_TO_METABOLITE_UTILIZATION_EDGE, @@ -158,6 +157,7 @@ def __init__( source_name = "BacDive" super().__init__(source_name, input_dir, output_dir) self.ncbi_impl = get_adapter("sqlite:obo:ncbitaxon") + self.ncbitaxon_info = {} # To accumulate data for each NCBITaxon def _flatten_to_dicts(self, obj): if isinstance(obj, dict): @@ -594,6 +594,85 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu if not all(item is None for item in phys_and_meta_data[1:]): writer_2.writerow(phys_and_meta_data) + if ncbitaxon_id: + if ncbitaxon_id not in self.ncbitaxon_info: + self.ncbitaxon_info[ncbitaxon_id] = { + "media": set(), + "assays": set(), + # Add other fields as necessary + } + if medium_id: + self.ncbitaxon_info[ncbitaxon_id]["media"].add(medium_id) + if phys_and_metabolism_metabolite_utilization: + positive_chebi_activity = None + if isinstance(phys_and_metabolism_metabolite_utilization, list): + positive_chebi_activity = [] + for metabolite in phys_and_metabolism_metabolite_utilization: + if ( + METABOLITE_CHEBI_KEY in metabolite + and metabolite.get(UTILIZATION_ACTIVITY) == PLUS_SIGN + ): + chebi_key = ( + f"{CHEBI_PREFIX}{metabolite[METABOLITE_CHEBI_KEY]}" + ) + positive_chebi_activity.append( + ( + chebi_key, + metabolite[METABOLITE_KEY], + metabolite.get(UTILIZATION_TYPE_TESTED), + ) + ) + + elif isinstance(phys_and_metabolism_metabolite_utilization, dict): + utilization_activity = ( + phys_and_metabolism_metabolite_utilization.get( + UTILIZATION_ACTIVITY + ) + ) + if ( + utilization_activity == PLUS_SIGN + and phys_and_metabolism_metabolite_utilization.get( + METABOLITE_CHEBI_KEY + ) + ): + chebi_key = ( + f"{CHEBI_PREFIX}" + f"{phys_and_metabolism_metabolite_utilization.get(METABOLITE_CHEBI_KEY)}" + ) + metabolite_value = ( + phys_and_metabolism_metabolite_utilization.get( + METABOLITE_KEY + ) + ) + positive_chebi_activity = [(chebi_key, metabolite_value)] + + else: + print( + f"{phys_and_metabolism_metabolite_utilization} data not recorded." + ) + if positive_chebi_activity: + for item in positive_chebi_activity: + self.ncbitaxon_info[ncbitaxon_id]["assays"].add(item) + # Repeat for other data types like assays, enzyme activities, etc. + + # Uncomment and handle isolation_source code + if isolation and isinstance(isolation, str): + isolation_cleaned = isolation.replace(" ", "_").replace("-", "_") + isolation_source_curie = ISOLATION_SOURCE_COLUMN + isolation_cleaned + node_writer.writerow( + [isolation_source_curie, ISOLATION_SOURCE_COLUMN, isolation] + + [None] * (len(self.node_header) - 3) + ) + edge_writer.writerow( + [ + ncbitaxon_id, + NCBI_TO_ISOLATION_SOURCE_EDGE, + isolation_source_curie, + LOCATION_OF, + BACDIVE_PREFIX + key, + ] + ) + if ncbitaxon_id and medium_id: # Combine list creation and extension nodes_data_to_write = [ @@ -667,7 +746,7 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu postive_activity_enzymes = None if isinstance(phys_and_metabolism_enzymes, list): postive_activity_enzymes = [ - {f"{EC_PREFIX}{enzyme.get(EC_KEY)}": f"{enzyme.get('value')}"} + (f"{EC_PREFIX}{enzyme.get(EC_KEY)}", f"{enzyme.get('value')}") for enzyme in phys_and_metabolism_enzymes if enzyme.get(ACTIVITY_KEY) == PLUS_SIGN and enzyme.get(EC_KEY) ] @@ -676,64 +755,30 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu if activity == PLUS_SIGN and phys_and_metabolism_enzymes.get(EC_KEY): ec_value = f"{EC_PREFIX}{phys_and_metabolism_enzymes.get(EC_KEY)}" value = phys_and_metabolism_enzymes.get("value") - postive_activity_enzymes = [{ec_value: value}] + postive_activity_enzymes = [(ec_value, value)] else: print(f"{phys_and_metabolism_enzymes} data not recorded.") if postive_activity_enzymes: - enzyme_nodes_to_write = [ - [k, PHENOTYPIC_CATEGORY, v] + [None] * (len(self.node_header) - 3) - for inner_dict in postive_activity_enzymes - for k, v in inner_dict.items() - ] - enzyme_nodes_to_write.append( - [ncbitaxon_id, NCBI_CATEGORY, ncbi_label] - + [None] * (len(self.node_header) - 3) - ) - node_writer.writerows(enzyme_nodes_to_write) - - for inner_dict in postive_activity_enzymes: - for k, _ in inner_dict.items(): - enzyme_edges_to_write = [ - ncbitaxon_id, - NCBI_TO_ENZYME_EDGE, - k, - HAS_PHENOTYPE, - BACDIVE_PREFIX + key, - ] - edge_writer.writerow(enzyme_edges_to_write) + for item in postive_activity_enzymes: + self.ncbitaxon_info[ncbitaxon_id]["assays"].add(item) + # Replace this section inside the loop processing each strain: if phys_and_metabolism_metabolite_utilization: - positive_chebi_activity = None + positive_chebi_activity = [] if isinstance(phys_and_metabolism_metabolite_utilization, list): - positive_chebi_activity = [] - # no_chebi_activity = defaultdict(list) for metabolite in phys_and_metabolism_metabolite_utilization: - # ! NO CURIE associated to metabolite. - # if ( - # METABOLITE_CHEBI_KEY not in metabolite - # and metabolite.get(UTILIZATION_ACTIVITY) == PLUS_SIGN - # ): - # no_chebi_activity.setdefault("NO_CURIE", []).append( - # [ - # metabolite[METABOLITE_KEY], - # metabolite.get(UTILIZATION_TYPE_TESTED), - # ] - # ) - # positive_chebi_activity.append(no_chebi_activity) - if ( METABOLITE_CHEBI_KEY in metabolite and metabolite.get(UTILIZATION_ACTIVITY) == PLUS_SIGN ): chebi_key = f"{CHEBI_PREFIX}{metabolite[METABOLITE_CHEBI_KEY]}" positive_chebi_activity.append( - { - chebi_key: [ - metabolite[METABOLITE_KEY], - metabolite.get(UTILIZATION_TYPE_TESTED), - ] - } + ( + chebi_key, + metabolite[METABOLITE_KEY], + metabolite.get(UTILIZATION_TYPE_TESTED), + ) ) elif isinstance(phys_and_metabolism_metabolite_utilization, dict): @@ -746,43 +791,50 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu METABOLITE_CHEBI_KEY ) ): - chebi_key = ( - f"{CHEBI_PREFIX}" - f"{phys_and_metabolism_metabolite_utilization.get(METABOLITE_CHEBI_KEY)}" - ) + chebi_key = f"{CHEBI_PREFIX}{phys_and_metabolism_metabolite_utilization.get(METABOLITE_CHEBI_KEY)}" metabolite_value = phys_and_metabolism_metabolite_utilization.get( METABOLITE_KEY ) - positive_chebi_activity = [{chebi_key: metabolite_value}] + positive_chebi_activity = [ + ( + chebi_key, + metabolite_value, + phys_and_metabolism_metabolite_utilization.get( + UTILIZATION_TYPE_TESTED + ), + ) + ] + else: print( f"{phys_and_metabolism_metabolite_utilization} data not recorded." ) + if positive_chebi_activity: + for item in positive_chebi_activity: + self.ncbitaxon_info[ncbitaxon_id]["assays"].add(item) + + # Also modify the corresponding code for writing nodes and edges at the end of processing if positive_chebi_activity: meta_util_nodes_to_write = [ - [k, METABOLITE_CATEGORY, v[0]] - + [None] * (len(self.node_header) - 3) - for inner_dict in positive_chebi_activity - for k, v in inner_dict.items() + [k, METABOLITE_CATEGORY, v] + [None] * (len(self.node_header) - 3) + for k, v, _ in positive_chebi_activity ] node_writer.writerows(meta_util_nodes_to_write) - for inner_dict in positive_chebi_activity: - for k, _ in inner_dict.items(): - meta_util_edges_to_write = [ - ncbitaxon_id, - NCBI_TO_METABOLITE_UTILIZATION_EDGE, - k, - HAS_PARTICIPANT, - BACDIVE_PREFIX + key, - ] - edge_writer.writerow(meta_util_edges_to_write) + for k, _, _ in positive_chebi_activity: + meta_util_edges_to_write = [ + ncbitaxon_id, + NCBI_TO_METABOLITE_UTILIZATION_EDGE, + k, + HAS_PARTICIPANT, + BACDIVE_PREFIX + key, + ] + edge_writer.writerow(meta_util_edges_to_write) if phys_and_metabolism_metabolite_production: positive_chebi_production = None if isinstance(phys_and_metabolism_metabolite_production, list): positive_chebi_production = [] - # no_chebi_production = defaultdict(list) for metabolite in phys_and_metabolism_metabolite_production: if ( METABOLITE_CHEBI_KEY in metabolite @@ -790,14 +842,8 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu ): chebi_key = f"{CHEBI_PREFIX}{metabolite[METABOLITE_CHEBI_KEY]}" positive_chebi_production.append( - {chebi_key: metabolite[METABOLITE_KEY]} + (chebi_key, metabolite[METABOLITE_KEY]) ) - # ! NO CURIE associated to metabolite. - # if ( - # METABOLITE_CHEBI_KEY not in metabolite and metabolite.get(PRODUCTION_KEY) == "yes" - # ): - # no_chebi_production.setdefault("NO_CURIE", []).append(metabolite[METABOLITE_KEY]) - # positive_chebi_production.append(no_chebi_production) elif isinstance(phys_and_metabolism_metabolite_production, dict): production = phys_and_metabolism_metabolite_production.get( @@ -816,7 +862,7 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu metabolite_value = phys_and_metabolism_metabolite_production.get( METABOLITE_KEY ) - positive_chebi_production = [{chebi_key: metabolite_value}] + positive_chebi_production = [(chebi_key, metabolite_value)] else: print(f"{phys_and_metabolism_metabolite_production} data not recorded.") @@ -824,21 +870,19 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu if positive_chebi_production: metabolite_production_nodes_to_write = [ [k, METABOLITE_CATEGORY, v] + [None] * (len(self.node_header) - 3) - for inner_dict in positive_chebi_production - for k, v in inner_dict.items() + for k, v in positive_chebi_production ] node_writer.writerows(metabolite_production_nodes_to_write) - for inner_dict in positive_chebi_production: - for k, _ in inner_dict.items(): - metabolite_production_edges_to_write = [ - ncbitaxon_id, - NCBI_TO_METABOLITE_PRODUCTION_EDGE, - k, - BIOLOGICAL_PROCESS, - BACDIVE_PREFIX + key, - ] - edge_writer.writerow(metabolite_production_edges_to_write) + for k, _ in positive_chebi_production: + metabolite_production_edges_to_write = [ + ncbitaxon_id, + NCBI_TO_METABOLITE_PRODUCTION_EDGE, + k, + BIOLOGICAL_PROCESS, + BACDIVE_PREFIX + key, + ] + edge_writer.writerow(metabolite_production_edges_to_write) if phys_and_metabolism_API: values = self._flatten_to_dicts(list(phys_and_metabolism_API.values())) @@ -881,5 +925,25 @@ def run(self, data_file: Union[Optional[Path], Optional[str]] = None, show_statu # After each iteration, call the update method to advance the progress bar. progress.update() + # At the end of the `run` method, inside the loop writing accumulated data for each NCBITAXON + for ncbitaxon_id, info in self.ncbitaxon_info.items(): + for medium_id in info["media"]: + edge_writer.writerow( + [ncbitaxon_id, NCBI_TO_MEDIUM_EDGE, medium_id, IS_GROWN_IN, ""] + ) + for assay_id in info["assays"]: + # Unpacking the assay information stored as tuples + assay_curie, assay_value, utilization_type = assay_id + edge_writer.writerow( + [ + ncbitaxon_id, + NCBI_TO_METABOLITE_UTILIZATION_EDGE, + assay_curie, + HAS_PARTICIPANT, + "", + ] + ) + # Repeat for other accumulated data + drop_duplicates(self.output_node_file, consolidation_columns=[ID_COLUMN, NAME_COLUMN]) drop_duplicates(self.output_edge_file, consolidation_columns=[OBJECT_ID_COLUMN])