From 446b83c1a6f868bc633439e3a6a72a70d8abae27 Mon Sep 17 00:00:00 2001 From: Harshad Hegde Date: Mon, 5 Aug 2024 19:23:46 -0500 Subject: [PATCH] rerranged code and made a linkml model for merge --- Makefile | 2 +- kg_microbe_merge/run.py | 49 +--- kg_microbe_merge/schema/merge_datamodel.py | 289 +++++++++++---------- kg_microbe_merge/schema/merge_schema.yaml | 147 +++++------ kg_microbe_merge/utils/file_utils.py | 74 +++++- 5 files changed, 308 insertions(+), 253 deletions(-) diff --git a/Makefile b/Makefile index d7d8c09..3b01a45 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ # poetry run kg merge -y merge_yamls/merge.yaml -m duckdb -base-n '/Users/brooksantangelo/Documents/LozuponeLab/FRMS_2024/duckdb/merged-kg_kg-microbe-base/merged-kg_nodes.tsv' -base-e '/Users/brooksantangelo/Documents/LozuponeLab/FRMS_2024/duckdb/merged-kg_kg-microbe-base/merged-kg_edges.tsv' -subset-n '/Users/brooksantangelo/Documents/Repositories/kg-microbe/data/transformed/uniprot_genome_features/nodes.tsv' -subset-e '/Users/brooksantangelo/Documents/Repositories/kg-microbe/data/transformed/uniprot_genome_features/edges.tsv' datamodel: - poetry run gen-python $(PWD)/kg_microbe_merge/schema/merge_schema.yaml > $(PWD)/kg_microbe_merge/schema/merge_datamodel.py + poetry run gen-python kg_microbe_merge/schema/merge_schema.yaml > kg_microbe_merge/schema/merge_datamodel.py subset-merge: diff --git a/kg_microbe_merge/run.py b/kg_microbe_merge/run.py index 1340665..c96dc41 100644 --- a/kg_microbe_merge/run.py +++ b/kg_microbe_merge/run.py @@ -8,7 +8,11 @@ import click from kg_microbe_merge.constants import MERGED_DATA_DIR, RAW_DATA_DIR -from kg_microbe_merge.utils.file_utils import unzip_files_in_dir +from kg_microbe_merge.utils.file_utils import ( + collect_all_kg_paths, + collect_subset_kg_paths, + unzip_files_in_dir, +) try: from kg_chat.app import create_app @@ -98,7 +102,7 @@ def merge( processes: int, merge_tool: str, data_dir: str, - subset_transforms: list, + subset_transforms: tuple, nodes_batch_size: int, edges_batch_size: int, # base_nodes: str, @@ -126,46 +130,9 @@ def merge( edge_paths = [] if subset_transforms: - transforms_lower = { - transform.strip().lower() for transform in subset_transforms[0].split(",") - } - transform_dirs = [ - dir - for dir in data_dir_path.iterdir() - if dir.is_dir() and dir.name.lower() in transforms_lower - ] - ontology_transforms = transforms_lower - {dir.name.lower() for dir in transform_dirs} - - for directory in transform_dirs: - node_paths.append(directory / "nodes.tsv") - edge_paths.append(directory / "edges.tsv") - - if ontology_transforms: - ontology_dir = data_dir_path / "ontologies" - for file in ontology_dir.iterdir(): - if file.is_file() and file.suffix == ".tsv" and not file.name.startswith("._"): - if any(transform in file.name.lower() for transform in ontology_transforms): - if "nodes" in file.name: - node_paths.append(file) - elif "edges" in file.name: - edge_paths.append(file) + node_paths, edge_paths = collect_subset_kg_paths(subset_transforms, data_dir_path) else: - for directory in data_dir_path.iterdir(): - if directory.is_dir(): - if directory.name != "ontologies": - node_paths.append(directory / "nodes.tsv") - edge_paths.append(directory / "edges.tsv") - else: - for file in directory.iterdir(): - if ( - file.is_file() - and file.suffix == ".tsv" - and not file.name.startswith("._") - ): - if "nodes" in file.name: - node_paths.append(file) - elif "edges" in file.name: - edge_paths.append(file) + node_paths, edge_paths = collect_all_kg_paths(data_dir_path) duckdb_merge( node_paths, diff --git a/kg_microbe_merge/schema/merge_datamodel.py b/kg_microbe_merge/schema/merge_datamodel.py index f1d06f1..9f61dd0 100644 --- a/kg_microbe_merge/schema/merge_datamodel.py +++ b/kg_microbe_merge/schema/merge_datamodel.py @@ -1,5 +1,5 @@ # Auto generated from merge_schema.yaml by pythongen.py version: 0.0.1 -# Generation date: 2024-07-31T17:23:28 +# Generation date: 2024-08-05T19:23:17 # Schema: KGMergeSchema # # id: http://example.org/kg-merge-schema @@ -22,8 +22,7 @@ from linkml_runtime.utils.enumerations import EnumDefinitionImpl from rdflib import Namespace, URIRef from linkml_runtime.utils.curienamespace import CurieNamespace -from linkml_runtime.linkml_model.types import Boolean, String -from linkml_runtime.utils.metamodelcore import Bool +from linkml_runtime.linkml_model.types import String metamodel_version = "1.7.0" version = None @@ -42,10 +41,35 @@ +@dataclass +class MergeKG(YAMLRoot): + """ + Configuration for merging knowledge graphs + """ + _inherited_slots: ClassVar[List[str]] = [] + + class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/MergeKG") + class_class_curie: ClassVar[str] = None + class_name: ClassVar[str] = "MergeKG" + class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/MergeKG") + + configuration: Optional[Union[dict, "Configuration"]] = None + merged_graph: Optional[Union[dict, "MergedGraph"]] = None + + def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): + if self.configuration is not None and not isinstance(self.configuration, Configuration): + self.configuration = Configuration(**as_dict(self.configuration)) + + if self.merged_graph is not None and not isinstance(self.merged_graph, MergedGraph): + self.merged_graph = MergedGraph(**as_dict(self.merged_graph)) + + super().__post_init__(**kwargs) + + @dataclass class Configuration(YAMLRoot): """ - Configuration settings for the merged graph. + Configuration for the merge operation """ _inherited_slots: ClassVar[List[str]] = [] @@ -55,54 +79,106 @@ class Configuration(YAMLRoot): class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Configuration") output_directory: Optional[str] = None - checkpoint: Optional[Union[bool, Bool]] = None + checkpoint: Optional[str] = None + curie_map: Optional[str] = None + node_properties: Optional[str] = None + predicate_mappings: Optional[str] = None + property_types: Optional[str] = None def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): if self.output_directory is not None and not isinstance(self.output_directory, str): self.output_directory = str(self.output_directory) - if self.checkpoint is not None and not isinstance(self.checkpoint, Bool): - self.checkpoint = Bool(self.checkpoint) + if self.checkpoint is not None and not isinstance(self.checkpoint, str): + self.checkpoint = str(self.checkpoint) + + if self.curie_map is not None and not isinstance(self.curie_map, str): + self.curie_map = str(self.curie_map) + + if self.node_properties is not None and not isinstance(self.node_properties, str): + self.node_properties = str(self.node_properties) + + if self.predicate_mappings is not None and not isinstance(self.predicate_mappings, str): + self.predicate_mappings = str(self.predicate_mappings) + + if self.property_types is not None and not isinstance(self.property_types, str): + self.property_types = str(self.property_types) + + super().__post_init__(**kwargs) + + +@dataclass +class MergedGraph(YAMLRoot): + """ + Details about graphs to be merged. + """ + _inherited_slots: ClassVar[List[str]] = [] + + class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/MergedGraph") + class_class_curie: ClassVar[str] = None + class_name: ClassVar[str] = "MergedGraph" + class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/MergedGraph") + + name: Optional[str] = None + source: Optional[Union[Union[dict, "SourceGraph"], List[Union[dict, "SourceGraph"]]]] = empty_list() + operations: Optional[Union[Union[dict, "Operations"], List[Union[dict, "Operations"]]]] = empty_list() + destination: Optional[Union[Union[dict, "Destination"], List[Union[dict, "Destination"]]]] = empty_list() + + def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): + if self.name is not None and not isinstance(self.name, str): + self.name = str(self.name) + + if not isinstance(self.source, list): + self.source = [self.source] if self.source is not None else [] + self.source = [v if isinstance(v, SourceGraph) else SourceGraph(**as_dict(v)) for v in self.source] + + if not isinstance(self.operations, list): + self.operations = [self.operations] if self.operations is not None else [] + self.operations = [v if isinstance(v, Operations) else Operations(**as_dict(v)) for v in self.operations] + + if not isinstance(self.destination, list): + self.destination = [self.destination] if self.destination is not None else [] + self.destination = [v if isinstance(v, Destination) else Destination(**as_dict(v)) for v in self.destination] super().__post_init__(**kwargs) @dataclass -class Source(YAMLRoot): +class SourceGraph(YAMLRoot): """ - Source information for the graphs. + Details of a source graph to be merged """ _inherited_slots: ClassVar[List[str]] = [] - class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Source") + class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/SourceGraph") class_class_curie: ClassVar[str] = None - class_name: ClassVar[str] = "Source" - class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Source") + class_name: ClassVar[str] = "SourceGraph" + class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/SourceGraph") name: Optional[str] = None - input: Optional[Union[dict, "Input"]] = None + input: Optional[Union[dict, "InputFiles"]] = None def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): if self.name is not None and not isinstance(self.name, str): self.name = str(self.name) - if self.input is not None and not isinstance(self.input, Input): - self.input = Input(**as_dict(self.input)) + if self.input is not None and not isinstance(self.input, InputFiles): + self.input = InputFiles(**as_dict(self.input)) super().__post_init__(**kwargs) @dataclass -class Input(YAMLRoot): +class InputFiles(YAMLRoot): """ - Input file details. + Input files for the source graph """ _inherited_slots: ClassVar[List[str]] = [] - class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Input") + class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/InputFiles") class_class_curie: ClassVar[str] = None - class_name: ClassVar[str] = "Input" - class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Input") + class_name: ClassVar[str] = "InputFiles" + class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/InputFiles") format: Optional[str] = None filename: Optional[Union[str, List[str]]] = empty_list() @@ -119,41 +195,41 @@ def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): @dataclass -class Operation(YAMLRoot): +class Operations(YAMLRoot): """ - Operations to be performed on the graph. + Details of an operation to perform on the merged graph """ _inherited_slots: ClassVar[List[str]] = [] - class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Operation") + class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Operations") class_class_curie: ClassVar[str] = None - class_name: ClassVar[str] = "Operation" - class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Operation") + class_name: ClassVar[str] = "Operations" + class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Operations") name: Optional[str] = None - args: Optional[Union[dict, "Args"]] = None + args: Optional[Union[dict, "OperationArgs"]] = None def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): if self.name is not None and not isinstance(self.name, str): self.name = str(self.name) - if self.args is not None and not isinstance(self.args, Args): - self.args = Args(**as_dict(self.args)) + if self.args is not None and not isinstance(self.args, OperationArgs): + self.args = OperationArgs(**as_dict(self.args)) super().__post_init__(**kwargs) @dataclass -class Args(YAMLRoot): +class OperationArgs(YAMLRoot): """ - Arguments for operations. + Arguments for an operation """ _inherited_slots: ClassVar[List[str]] = [] - class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Args") + class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/OperationArgs") class_class_curie: ClassVar[str] = None - class_name: ClassVar[str] = "Args" - class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/Args") + class_name: ClassVar[str] = "OperationArgs" + class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/OperationArgs") graph_name: Optional[str] = None filename: Optional[str] = None @@ -181,7 +257,7 @@ def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): @dataclass class Destination(YAMLRoot): """ - Destination details for the merged graph. + Details of a destination for the merged graph """ _inherited_slots: ClassVar[List[str]] = [] @@ -207,42 +283,6 @@ def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): super().__post_init__(**kwargs) -@dataclass -class MergedGraph(YAMLRoot): - """ - Details of the merged graph. - """ - _inherited_slots: ClassVar[List[str]] = [] - - class_class_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/MergedGraph") - class_class_curie: ClassVar[str] = None - class_name: ClassVar[str] = "MergedGraph" - class_model_uri: ClassVar[URIRef] = URIRef("http://example.org/kg-merge-schema/MergedGraph") - - name: Optional[str] = None - source: Optional[Union[Union[dict, Source], List[Union[dict, Source]]]] = empty_list() - operations: Optional[Union[Union[dict, Operation], List[Union[dict, Operation]]]] = empty_list() - destination: Optional[Union[Union[dict, Destination], List[Union[dict, Destination]]]] = empty_list() - - def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): - if self.name is not None and not isinstance(self.name, str): - self.name = str(self.name) - - if not isinstance(self.source, list): - self.source = [self.source] if self.source is not None else [] - self.source = [v if isinstance(v, Source) else Source(**as_dict(v)) for v in self.source] - - if not isinstance(self.operations, list): - self.operations = [self.operations] if self.operations is not None else [] - self.operations = [v if isinstance(v, Operation) else Operation(**as_dict(v)) for v in self.operations] - - if not isinstance(self.destination, list): - self.destination = [self.destination] if self.destination is not None else [] - self.destination = [v if isinstance(v, Destination) else Destination(**as_dict(v)) for v in self.destination] - - super().__post_init__(**kwargs) - - # Enumerations @@ -250,80 +290,71 @@ def __post_init__(self, *_: List[str], **kwargs: Dict[str, Any]): class slots: pass -slots.output_directory = Slot(uri=DEFAULT_.output_directory, name="output_directory", curie=DEFAULT_.curie('output_directory'), - model_uri=DEFAULT_.output_directory, domain=None, range=Optional[str]) +slots.mergeKG__configuration = Slot(uri=DEFAULT_.configuration, name="mergeKG__configuration", curie=DEFAULT_.curie('configuration'), + model_uri=DEFAULT_.mergeKG__configuration, domain=None, range=Optional[Union[dict, Configuration]]) -slots.checkpoint = Slot(uri=DEFAULT_.checkpoint, name="checkpoint", curie=DEFAULT_.curie('checkpoint'), - model_uri=DEFAULT_.checkpoint, domain=None, range=Optional[str]) +slots.mergeKG__merged_graph = Slot(uri=DEFAULT_.merged_graph, name="mergeKG__merged_graph", curie=DEFAULT_.curie('merged_graph'), + model_uri=DEFAULT_.mergeKG__merged_graph, domain=None, range=Optional[Union[dict, MergedGraph]]) -slots.name = Slot(uri=DEFAULT_.name, name="name", curie=DEFAULT_.curie('name'), - model_uri=DEFAULT_.name, domain=None, range=Optional[str]) - -slots.input = Slot(uri=DEFAULT_.input, name="input", curie=DEFAULT_.curie('input'), - model_uri=DEFAULT_.input, domain=None, range=Optional[str]) - -slots.format = Slot(uri=DEFAULT_.format, name="format", curie=DEFAULT_.curie('format'), - model_uri=DEFAULT_.format, domain=None, range=Optional[str]) - -slots.filename = Slot(uri=DEFAULT_.filename, name="filename", curie=DEFAULT_.curie('filename'), - model_uri=DEFAULT_.filename, domain=None, range=Optional[str]) +slots.configuration__output_directory = Slot(uri=DEFAULT_.output_directory, name="configuration__output_directory", curie=DEFAULT_.curie('output_directory'), + model_uri=DEFAULT_.configuration__output_directory, domain=None, range=Optional[str]) -slots.source = Slot(uri=DEFAULT_.source, name="source", curie=DEFAULT_.curie('source'), - model_uri=DEFAULT_.source, domain=None, range=Optional[str]) +slots.configuration__checkpoint = Slot(uri=DEFAULT_.checkpoint, name="configuration__checkpoint", curie=DEFAULT_.curie('checkpoint'), + model_uri=DEFAULT_.configuration__checkpoint, domain=None, range=Optional[str]) -slots.operations = Slot(uri=DEFAULT_.operations, name="operations", curie=DEFAULT_.curie('operations'), - model_uri=DEFAULT_.operations, domain=None, range=Optional[str]) +slots.configuration__curie_map = Slot(uri=DEFAULT_.curie_map, name="configuration__curie_map", curie=DEFAULT_.curie('curie_map'), + model_uri=DEFAULT_.configuration__curie_map, domain=None, range=Optional[str]) -slots.destination = Slot(uri=DEFAULT_.destination, name="destination", curie=DEFAULT_.curie('destination'), - model_uri=DEFAULT_.destination, domain=None, range=Optional[str]) +slots.configuration__node_properties = Slot(uri=DEFAULT_.node_properties, name="configuration__node_properties", curie=DEFAULT_.curie('node_properties'), + model_uri=DEFAULT_.configuration__node_properties, domain=None, range=Optional[str]) -slots.graph_name = Slot(uri=DEFAULT_.graph_name, name="graph_name", curie=DEFAULT_.curie('graph_name'), - model_uri=DEFAULT_.graph_name, domain=None, range=Optional[str]) +slots.configuration__predicate_mappings = Slot(uri=DEFAULT_.predicate_mappings, name="configuration__predicate_mappings", curie=DEFAULT_.curie('predicate_mappings'), + model_uri=DEFAULT_.configuration__predicate_mappings, domain=None, range=Optional[str]) -slots.node_facet_properties = Slot(uri=DEFAULT_.node_facet_properties, name="node_facet_properties", curie=DEFAULT_.curie('node_facet_properties'), - model_uri=DEFAULT_.node_facet_properties, domain=None, range=Optional[str]) +slots.configuration__property_types = Slot(uri=DEFAULT_.property_types, name="configuration__property_types", curie=DEFAULT_.curie('property_types'), + model_uri=DEFAULT_.configuration__property_types, domain=None, range=Optional[str]) -slots.edge_facet_properties = Slot(uri=DEFAULT_.edge_facet_properties, name="edge_facet_properties", curie=DEFAULT_.curie('edge_facet_properties'), - model_uri=DEFAULT_.edge_facet_properties, domain=None, range=Optional[str]) +slots.mergedGraph__name = Slot(uri=DEFAULT_.name, name="mergedGraph__name", curie=DEFAULT_.curie('name'), + model_uri=DEFAULT_.mergedGraph__name, domain=None, range=Optional[str]) -slots.compression = Slot(uri=DEFAULT_.compression, name="compression", curie=DEFAULT_.curie('compression'), - model_uri=DEFAULT_.compression, domain=None, range=Optional[str]) +slots.mergedGraph__source = Slot(uri=DEFAULT_.source, name="mergedGraph__source", curie=DEFAULT_.curie('source'), + model_uri=DEFAULT_.mergedGraph__source, domain=None, range=Optional[Union[Union[dict, SourceGraph], List[Union[dict, SourceGraph]]]]) -slots.configuration__output_directory = Slot(uri=DEFAULT_.output_directory, name="configuration__output_directory", curie=DEFAULT_.curie('output_directory'), - model_uri=DEFAULT_.configuration__output_directory, domain=None, range=Optional[str]) +slots.mergedGraph__operations = Slot(uri=DEFAULT_.operations, name="mergedGraph__operations", curie=DEFAULT_.curie('operations'), + model_uri=DEFAULT_.mergedGraph__operations, domain=None, range=Optional[Union[Union[dict, Operations], List[Union[dict, Operations]]]]) -slots.configuration__checkpoint = Slot(uri=DEFAULT_.checkpoint, name="configuration__checkpoint", curie=DEFAULT_.curie('checkpoint'), - model_uri=DEFAULT_.configuration__checkpoint, domain=None, range=Optional[Union[bool, Bool]]) +slots.mergedGraph__destination = Slot(uri=DEFAULT_.destination, name="mergedGraph__destination", curie=DEFAULT_.curie('destination'), + model_uri=DEFAULT_.mergedGraph__destination, domain=None, range=Optional[Union[Union[dict, Destination], List[Union[dict, Destination]]]]) -slots.source__name = Slot(uri=DEFAULT_.name, name="source__name", curie=DEFAULT_.curie('name'), - model_uri=DEFAULT_.source__name, domain=None, range=Optional[str]) +slots.sourceGraph__name = Slot(uri=DEFAULT_.name, name="sourceGraph__name", curie=DEFAULT_.curie('name'), + model_uri=DEFAULT_.sourceGraph__name, domain=None, range=Optional[str]) -slots.source__input = Slot(uri=DEFAULT_.input, name="source__input", curie=DEFAULT_.curie('input'), - model_uri=DEFAULT_.source__input, domain=None, range=Optional[Union[dict, Input]]) +slots.sourceGraph__input = Slot(uri=DEFAULT_.input, name="sourceGraph__input", curie=DEFAULT_.curie('input'), + model_uri=DEFAULT_.sourceGraph__input, domain=None, range=Optional[Union[dict, InputFiles]]) -slots.input__format = Slot(uri=DEFAULT_.format, name="input__format", curie=DEFAULT_.curie('format'), - model_uri=DEFAULT_.input__format, domain=None, range=Optional[str]) +slots.inputFiles__format = Slot(uri=DEFAULT_.format, name="inputFiles__format", curie=DEFAULT_.curie('format'), + model_uri=DEFAULT_.inputFiles__format, domain=None, range=Optional[str]) -slots.input__filename = Slot(uri=DEFAULT_.filename, name="input__filename", curie=DEFAULT_.curie('filename'), - model_uri=DEFAULT_.input__filename, domain=None, range=Optional[Union[str, List[str]]]) +slots.inputFiles__filename = Slot(uri=DEFAULT_.filename, name="inputFiles__filename", curie=DEFAULT_.curie('filename'), + model_uri=DEFAULT_.inputFiles__filename, domain=None, range=Optional[Union[str, List[str]]]) -slots.operation__name = Slot(uri=DEFAULT_.name, name="operation__name", curie=DEFAULT_.curie('name'), - model_uri=DEFAULT_.operation__name, domain=None, range=Optional[str]) +slots.operations__name = Slot(uri=DEFAULT_.name, name="operations__name", curie=DEFAULT_.curie('name'), + model_uri=DEFAULT_.operations__name, domain=None, range=Optional[str]) -slots.operation__args = Slot(uri=DEFAULT_.args, name="operation__args", curie=DEFAULT_.curie('args'), - model_uri=DEFAULT_.operation__args, domain=None, range=Optional[Union[dict, Args]]) +slots.operations__args = Slot(uri=DEFAULT_.args, name="operations__args", curie=DEFAULT_.curie('args'), + model_uri=DEFAULT_.operations__args, domain=None, range=Optional[Union[dict, OperationArgs]]) -slots.args__graph_name = Slot(uri=DEFAULT_.graph_name, name="args__graph_name", curie=DEFAULT_.curie('graph_name'), - model_uri=DEFAULT_.args__graph_name, domain=None, range=Optional[str]) +slots.operationArgs__graph_name = Slot(uri=DEFAULT_.graph_name, name="operationArgs__graph_name", curie=DEFAULT_.curie('graph_name'), + model_uri=DEFAULT_.operationArgs__graph_name, domain=None, range=Optional[str]) -slots.args__filename = Slot(uri=DEFAULT_.filename, name="args__filename", curie=DEFAULT_.curie('filename'), - model_uri=DEFAULT_.args__filename, domain=None, range=Optional[str]) +slots.operationArgs__filename = Slot(uri=DEFAULT_.filename, name="operationArgs__filename", curie=DEFAULT_.curie('filename'), + model_uri=DEFAULT_.operationArgs__filename, domain=None, range=Optional[str]) -slots.args__node_facet_properties = Slot(uri=DEFAULT_.node_facet_properties, name="args__node_facet_properties", curie=DEFAULT_.curie('node_facet_properties'), - model_uri=DEFAULT_.args__node_facet_properties, domain=None, range=Optional[Union[str, List[str]]]) +slots.operationArgs__node_facet_properties = Slot(uri=DEFAULT_.node_facet_properties, name="operationArgs__node_facet_properties", curie=DEFAULT_.curie('node_facet_properties'), + model_uri=DEFAULT_.operationArgs__node_facet_properties, domain=None, range=Optional[Union[str, List[str]]]) -slots.args__edge_facet_properties = Slot(uri=DEFAULT_.edge_facet_properties, name="args__edge_facet_properties", curie=DEFAULT_.curie('edge_facet_properties'), - model_uri=DEFAULT_.args__edge_facet_properties, domain=None, range=Optional[Union[str, List[str]]]) +slots.operationArgs__edge_facet_properties = Slot(uri=DEFAULT_.edge_facet_properties, name="operationArgs__edge_facet_properties", curie=DEFAULT_.curie('edge_facet_properties'), + model_uri=DEFAULT_.operationArgs__edge_facet_properties, domain=None, range=Optional[Union[str, List[str]]]) slots.destination__format = Slot(uri=DEFAULT_.format, name="destination__format", curie=DEFAULT_.curie('format'), model_uri=DEFAULT_.destination__format, domain=None, range=Optional[str]) @@ -333,15 +364,3 @@ class slots: slots.destination__filename = Slot(uri=DEFAULT_.filename, name="destination__filename", curie=DEFAULT_.curie('filename'), model_uri=DEFAULT_.destination__filename, domain=None, range=Optional[str]) - -slots.mergedGraph__name = Slot(uri=DEFAULT_.name, name="mergedGraph__name", curie=DEFAULT_.curie('name'), - model_uri=DEFAULT_.mergedGraph__name, domain=None, range=Optional[str]) - -slots.mergedGraph__source = Slot(uri=DEFAULT_.source, name="mergedGraph__source", curie=DEFAULT_.curie('source'), - model_uri=DEFAULT_.mergedGraph__source, domain=None, range=Optional[Union[Union[dict, Source], List[Union[dict, Source]]]]) - -slots.mergedGraph__operations = Slot(uri=DEFAULT_.operations, name="mergedGraph__operations", curie=DEFAULT_.curie('operations'), - model_uri=DEFAULT_.mergedGraph__operations, domain=None, range=Optional[Union[Union[dict, Operation], List[Union[dict, Operation]]]]) - -slots.mergedGraph__destination = Slot(uri=DEFAULT_.destination, name="mergedGraph__destination", curie=DEFAULT_.curie('destination'), - model_uri=DEFAULT_.mergedGraph__destination, domain=None, range=Optional[Union[Union[dict, Destination], List[Union[dict, Destination]]]]) diff --git a/kg_microbe_merge/schema/merge_schema.yaml b/kg_microbe_merge/schema/merge_schema.yaml index 7a811f5..728b745 100644 --- a/kg_microbe_merge/schema/merge_schema.yaml +++ b/kg_microbe_merge/schema/merge_schema.yaml @@ -5,111 +5,108 @@ prefixes: linkml: https://w3id.org/linkml/ imports: - linkml:types + default_range: string classes: + MergeKG: + description: Configuration for merging knowledge graphs + tree_root: true + attributes: + configuration: + description: Configuration for the merge operation + range: Configuration + merged_graph: + description: Details about graphs to be merged. + range: MergedGraph + Configuration: - description: Configuration settings for the merged graph. + description: Configuration for the merge operation attributes: output_directory: - description: Directory where the output will be stored. - + description: Directory to write output files checkpoint: - description: Whether to use checkpoints. - range: boolean + description: Whether to checkpoint intermediate results + curie_map: + description: Mapping of CURIE prefixes to base IRIs + node_properties: + description: List of node properties to include in the merged graph + predicate_mappings: + description: Mapping of predicates to use in the merged graph + property_types: + description: Mapping of property types to use in the merged graph - Source: - description: Source information for the graphs. + MergedGraph: + description: Details about graphs to be merged. attributes: name: - description: Name of the source. - - input: - description: Input details for the source. - range: Input + description: Name of the merged graph + source: + description: Source graphs to be merged + multivalued: true + inlined_as_list: true + range: SourceGraph + operations: + description: Operations to perform on the merged graph + range: Operations + multivalued: true + inlined_as_list: true + destination: + description: Destination for the merged graph + range: Destination + multivalued: true + inlined_as_list: true - Input: - description: Input file details. + SourceGraph: + description: Details of a source graph to be merged + attributes: + name: + description: Name of the source graph + input: + description: Input files for the source graph + range: InputFiles + + InputFiles: + description: Input files for the source graph attributes: format: - description: Format of the input file. - + description: Format of the input files filename: - description: List of filenames for the input files. + description: List of filenames for the input files multivalued: true - + inlined_as_list: true - Operation: - description: Operations to be performed on the graph. + Operations: + description: Details of an operation to perform on the merged graph attributes: name: - description: Name of the operation. - + description: Name of the operation args: - description: Arguments for the operation. - range: Args + description: Arguments for the operation + range: OperationArgs - Args: - description: Arguments for operations. + OperationArgs: + description: Arguments for an operation attributes: graph_name: - description: Name of the graph. - + description: Name of the graph filename: - description: Filename for the output. - + description: Filename for the output node_facet_properties: - description: Node facet properties. + description: List of node facet properties multivalued: true + inlined_as_list: true edge_facet_properties: - description: Edge facet properties. + description: List of edge facet properties multivalued: true - + inlined_as_list: true Destination: - description: Destination details for the merged graph. + description: Details of a destination for the merged graph attributes: format: - description: Format of the output file. - + description: Format of the destination compression: - description: Compression type for the output file. - + description: Compression format for the destination filename: - description: Filename for the output file. - - - MergedGraph: - description: Details of the merged graph. - attributes: - name: - description: Name of the merged graph. - - source: - description: Source graphs. - range: Source - multivalued: true - operations: - description: Operations to be performed on the graph. - range: Operation - multivalued: true - destination: - description: Destination details for the merged graph. - range: Destination - multivalued: true - -slots: - output_directory: - checkpoint: - name: - input: - format: - filename: - source: - operations: - destination: - graph_name: - node_facet_properties: - edge_facet_properties: - compression: - + description: Filename for the destination \ No newline at end of file diff --git a/kg_microbe_merge/utils/file_utils.py b/kg_microbe_merge/utils/file_utils.py index 7a93ae8..eebab38 100644 --- a/kg_microbe_merge/utils/file_utils.py +++ b/kg_microbe_merge/utils/file_utils.py @@ -3,7 +3,9 @@ # Given a path to a directory, look for all files with the extension tar.zip and unzip them all import tarfile from pathlib import Path -from typing import Union +from typing import List, Union + +from click import Tuple def unzip_files_in_dir(dir_path: Union[str, Path]) -> None: @@ -40,3 +42,73 @@ def tarball_files_in_dir(dir_path: Union[str, Path], filename: str) -> None: tar.add(file, arcname=file.name) print(f"Added {file.name} to {dir_path}/{filename}.tar.gz") tar.close() + + +def collect_paths_from_directory( + directory: Path, node_paths: List[Path], edge_paths: List[Path] +) -> None: + """ + Collect node and edge paths from a given directory. + + :param directory: Path to the directory. + :param node_paths: List of node paths. + :param edge_paths: List of edge paths. + :return: None + """ + for file in directory.iterdir(): + if file.is_file() and file.suffix == ".tsv" and not file.name.startswith("._"): + if "nodes" in file.name: + node_paths.append(file) + elif "edges" in file.name: + edge_paths.append(file) + + +def collect_subset_kg_paths( + subset_transforms: Tuple[str], data_dir_path: Path +) -> Tuple[List[Path], List[Path]]: + """ + Get the paths to the nodes and edges files for the subset transforms. + + :param subset_transforms: Tuple of subset transforms. + :param data_dir_path: Path to the data directory. + :return: List of node and edge paths. + """ + node_paths = [] + edge_paths = [] + transforms_lower = {transform.strip().lower() for transform in subset_transforms[0].split(",")} + transform_dirs = [ + dir + for dir in data_dir_path.iterdir() + if dir.is_dir() and dir.name.lower() in transforms_lower + ] + ontology_transforms = transforms_lower - {dir.name.lower() for dir in transform_dirs} + + for directory in transform_dirs: + node_paths.append(directory / "nodes.tsv") + edge_paths.append(directory / "edges.tsv") + + if ontology_transforms: + ontology_dir = data_dir_path / "ontologies" + collect_paths_from_directory(ontology_dir, node_paths, edge_paths) + + return node_paths, edge_paths + + +def collect_all_kg_paths(data_dir_path: Path) -> Tuple[List[Path], List[Path]]: + """ + Get the paths to the nodes and edges files for all KGs. + + :param data_dir_path: Path to the data directory. + :return: List of node and edge paths. + """ + node_paths = [] + edge_paths = [] + for directory in data_dir_path.iterdir(): + if directory.is_dir(): + if directory.name != "ontologies": + node_paths.append(directory / "nodes.tsv") + edge_paths.append(directory / "edges.tsv") + else: + collect_paths_from_directory(directory, node_paths, edge_paths) + + return node_paths, edge_paths