diff --git a/demo-project/src/demo_project/pipeline_registry.py b/demo-project/src/demo_project/pipeline_registry.py index d415265723..18d1903f11 100644 --- a/demo-project/src/demo_project/pipeline_registry.py +++ b/demo-project/src/demo_project/pipeline_registry.py @@ -47,19 +47,19 @@ def create_pipeline(**kwargs) -> Pipeline: outputs={"dataset_3","dataset_4"} ) - # other = pipeline([ - # node(lambda x: x, - # inputs="dataset_3", - # outputs="dataset_5", - # name="step5" - # ) - # ], - # namespace="other_pipeline", - # inputs={"dataset_3"}, - # outputs={"dataset_5"} - # ) + other = pipeline([ + node(lambda x: x, + inputs="dataset_3", + outputs="dataset_5", + name="step5" + ) + ], + namespace="other_pipeline", + inputs={"dataset_3"}, + outputs={"dataset_5"} + ) - return new_pipeline + return new_pipeline + other def register_pipelines() -> Dict[str, Pipeline]: diff --git a/package/kedro_viz/api/rest/responses.py b/package/kedro_viz/api/rest/responses.py index 13aaa05a6d..1bba3c2cfb 100644 --- a/package/kedro_viz/api/rest/responses.py +++ b/package/kedro_viz/api/rest/responses.py @@ -43,7 +43,7 @@ class BaseGraphNodeAPIResponse(BaseAPIResponse): type: str # If a node is a ModularPipeline node, this value will be None, hence Optional. - modular_pipelines: Optional[List[str]] = None + modular_pipeline: Optional[str] = None class TaskNodeAPIResponse(BaseGraphNodeAPIResponse): diff --git a/package/kedro_viz/data_access/managers.py b/package/kedro_viz/data_access/managers.py index 833e4946f8..8f0b0b0e02 100644 --- a/package/kedro_viz/data_access/managers.py +++ b/package/kedro_viz/data_access/managers.py @@ -136,8 +136,6 @@ def get_stats_for_data_node(self, data_node_name: str) -> Union[Dict, None]: return self.dataset_stats.get(data_node_name, None) - - def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline): """Iterate through all the nodes and datasets in a "registered" pipeline and add them to relevant repositories. Take care of extracting other relevant information @@ -158,12 +156,10 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline): free_inputs = pipeline.inputs() for node in pipeline.nodes: - modular_pipelines_tree.extract_from_node(node) - node_modular_pipeline_id = modular_pipelines_tree.get_modular_pipeline_from_node(node) - task_node = self.add_node(registered_pipeline_id, node, node_modular_pipeline_id) + + task_node = self.add_node(registered_pipeline_id, node, modular_pipelines_tree) self.registered_pipelines.add_node(registered_pipeline_id, task_node.id) - # Add node's inputs as DataNode to the graph for input_ in node.inputs: @@ -171,9 +167,8 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline): # Mark it as a transcoded dataset unless it's a free input # because free inputs to the pipeline can't be transcoded. is_free_input = input_ in free_inputs - input_modular_pipeline_id = modular_pipelines_tree.get_modular_pipeline_from_node(input_) input_node = self.add_node_input( - registered_pipeline_id, input_, task_node, is_free_input, input_modular_pipeline_id + registered_pipeline_id, input_, task_node, modular_pipelines_tree, is_free_input ) self.registered_pipelines.add_node( registered_pipeline_id, input_node.id @@ -186,9 +181,8 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline): # Add node outputs as DataNode to the graph. # It follows similar logic to adding inputs. for output in node.outputs: - output_modular_pipeline_id = modular_pipelines_tree.get_modular_pipeline_from_node(output) output_node = self.add_node_output( - registered_pipeline_id, output, task_node, output_modular_pipeline_id + registered_pipeline_id, output, task_node, modular_pipelines_tree ) self.registered_pipelines.add_node( registered_pipeline_id, output_node.id @@ -199,7 +193,7 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline): # modular_pipelines_repo_obj.extract_from_node(output_node) - def add_node(self, registered_pipeline_id: str, node: KedroNode, node_mod_pipeline) -> TaskNode: + def add_node(self, registered_pipeline_id: str, node: KedroNode, modular_pipeline_tree: ModularPipelinesRepository) -> TaskNode: """Add a Kedro node as a TaskNode to the NodesRepository for a given registered pipeline ID. @@ -209,7 +203,7 @@ def add_node(self, registered_pipeline_id: str, node: KedroNode, node_mod_pipeli Returns: The GraphNode instance representing the Kedro node that was added to the graph. """ - task_node: TaskNode = self.nodes.add_node(GraphNode.create_task_node(node, node_mod_pipeline)) + task_node: TaskNode = self.nodes.add_node(GraphNode.create_task_node(node, modular_pipeline_tree.get_modular_pipeline_for_node(node))) task_node.add_pipeline(registered_pipeline_id) self.tags.add_tags(task_node.tags) return task_node @@ -219,8 +213,8 @@ def add_node_input( registered_pipeline_id: str, input_dataset: str, task_node: TaskNode, + modular_pipeline_tree: ModularPipelinesRepository, is_free_input: bool = False, - node_mod_pipeline = None ) -> Union[DataNode, TranscodedDataNode, ParametersNode]: """Add a Kedro node's input as a DataNode, TranscodedDataNode or ParametersNode to the NodesRepository for a given registered pipeline ID. @@ -235,7 +229,7 @@ def add_node_input( """ graph_node = self.add_dataset( - registered_pipeline_id, input_dataset, is_free_input=is_free_input, node_mod_pipeline=node_mod_pipeline + registered_pipeline_id, input_dataset, modular_pipeline_tree, is_free_input=is_free_input, ) graph_node.tags.update(task_node.tags) self.edges[registered_pipeline_id].add_edge( @@ -250,7 +244,7 @@ def add_node_input( return graph_node def add_node_output( - self, registered_pipeline_id: str, output_dataset: str, task_node: TaskNode, node_mod_pipeline = None + self, registered_pipeline_id: str, output_dataset: str, task_node: TaskNode, modular_pipeline_tree: ModularPipelinesRepository, ) -> Union[DataNode, TranscodedDataNode, ParametersNode]: """Add a Kedro node's output as a DataNode, TranscodedDataNode or ParametersNode to the NodesRepository for a given registered pipeline ID. @@ -262,7 +256,7 @@ def add_node_output( Returns: The GraphNode instance representing the node's output that was added to the graph. """ - graph_node = self.add_dataset(registered_pipeline_id, output_dataset, node_mod_pipeline=node_mod_pipeline) + graph_node = self.add_dataset(registered_pipeline_id, output_dataset, modular_pipeline_tree) graph_node.tags.update(task_node.tags) self.edges[registered_pipeline_id].add_edge( GraphEdge(source=task_node.id, target=graph_node.id) @@ -274,8 +268,8 @@ def add_dataset( self, registered_pipeline_id: str, dataset_name: str, + modular_pipeline_tree: ModularPipelinesRepository, is_free_input: bool = False, - node_mod_pipeline = None ) -> Union[DataNode, TranscodedDataNode, ParametersNode]: """Add a Kedro dataset as a DataNode, TranscodedDataNode or ParametersNode to the NodesRepository for a given registered pipeline ID. @@ -304,8 +298,9 @@ def add_dataset( tags=set(), dataset=obj, stats=self.get_stats_for_data_node(_strip_transcoding(dataset_name)), - is_free_input=is_free_input, - node_mod_pipeline=node_mod_pipeline + modular_pipeline_id=modular_pipeline_tree.get_modular_pipeline_for_node(dataset_name), + is_free_input=is_free_input + ) graph_node = self.nodes.add_node(graph_node) graph_node.add_pipeline(registered_pipeline_id) @@ -494,17 +489,4 @@ def create_modular_pipelines_tree_for_registered_pipeline( ) node_dependencies[bad_input].remove(modular_pipeline_id) - for node_id, node in self.nodes.as_dict().items(): - if ( - node.type == GraphNodeType.MODULAR_PIPELINE - or not node.belongs_to_pipeline(registered_pipeline_id) - ): - continue - if not node.modular_pipelines or node_id in root_parameters: - modular_pipelines_tree[ROOT_MODULAR_PIPELINE_ID].children.add( - ModularPipelineChild( - id=node_id, type=self.nodes.get_node_by_id(node_id).type - ) - ) - return modular_pipelines_tree diff --git a/package/kedro_viz/data_access/repositories/modular_pipelines.py b/package/kedro_viz/data_access/repositories/modular_pipelines.py index 3cd1e3d363..684116b2c2 100644 --- a/package/kedro_viz/data_access/repositories/modular_pipelines.py +++ b/package/kedro_viz/data_access/repositories/modular_pipelines.py @@ -121,13 +121,8 @@ def explode(nested_namespace: str) -> list[str]: self.add_inputs(modular_pipeline_id, free_inputs_to_sub_pipeline) self.add_outputs(modular_pipeline_id, free_outputs_from_sub_pipeline) - internal_inputs = sub_pipeline.all_inputs() - free_inputs_to_sub_pipeline - internal_outputs = sub_pipeline.all_outputs() - free_outputs_from_sub_pipeline - - self.add_child_datasets(modular_pipeline_id, internal_inputs, internal_outputs) - task_nodes = sub_pipeline.nodes - # self.add_child_tasks(modular_pipeline_id, task_nodes) + self.add_children(modular_pipeline_id, task_nodes) def get_or_create_modular_pipeline( self, modular_pipeline_id: str @@ -181,29 +176,8 @@ def add_outputs( GraphNode._hash(output) for output in outputs } - def add_child_datasets(self, modular_pipeline_id: str, inputs: Set[str], outputs: Set[str]): - print(modular_pipeline_id) - parent_modular_pipeline_id, _ = modular_pipeline_id.partition('.') - if parent_modular_pipeline_id: - parent_modular_pipeline = self.get_or_create_modular_pipeline(parent_modular_pipeline_id) - modular_pipeline = self.get_or_create_modular_pipeline(modular_pipeline_id) - for input in inputs: - input_id = GraphNode._hash(input) - print("input_id", input_id) - if parent_modular_pipeline_id and input_id in parent_modular_pipeline.children: - print("parent_modular_pipeline_id", parent_modular_pipeline_id) - print("parent_modular_pipeline.children", parent_modular_pipeline.children) - parent_modular_pipeline.inputs.remove(input_id) - modular_pipeline.children.add(ModularPipelineChild(id= input_id , type=GraphNodeType.DATA)) - for output in outputs: - output_id = GraphNode._hash(output) - if parent_modular_pipeline_id and output_id in parent_modular_pipeline.children: - parent_modular_pipeline.inputs.remove(output_id) - modular_pipeline.children.add(ModularPipelineChild(id= output_id, type=GraphNodeType.DATA)) - - - def add_child_task(self, modular_pipeline_id: str, child: ModularPipelineChild): + def add_children(self, modular_pipeline_id: str, task_nodes: List[GraphNode]): """Add a child to a modular pipeline. Args: modular_pipeline_id: ID of the modular pipeline to add the child to. @@ -220,50 +194,20 @@ def add_child_task(self, modular_pipeline_id: str, child: ModularPipelineChild): ... ) >>> assert data_science_pipeline.children == {modular_pipeline_child} """ - modular_pipeline = self.get_or_create_modular_pipeline(modular_pipeline_id) - modular_pipeline.children.add(child) - - - def extract_from_node(self, node) -> Optional[str]: - """Extract the namespace from a graph node and add it as a modular pipeline node - to the modular pipeline repository. - - Args: - node: The GraphNode from which to extract modular pipeline. - Returns: - ID of the modular pipeline node added to the modular pipeline repository if found. - Example: - >>> modular_pipelines = ModularPipelinesRepository() - >>> model_output_node = GraphNode.create_data_node( - ... "data_science.model_output", layer=None, tags=set(), dataset=None - ... ) - >>> modular_pipelines.extract_from_node(model_output_node) - 'data_science' - >>> assert modular_pipelines.has_modular_pipeline("data_science") - """ - - # There is no need to extract modular pipeline from parameters - # because all valid modular pipelines are encoded in either a TaskNode or DataNode. - if isinstance(node, ParametersNode): - return None - - modular_pipeline_id = node.namespace - if not modular_pipeline_id: - return None - - # Add the node's registered pipelines to the modular pipeline's registered pipelines. - # Basically this means if the node belongs to the "__default__" pipeline, for example, - # so does the modular pipeline. - # modular_pipeline.pipelines.update(node.pipelines) - - # Since we extract the modular pipeline from the node's namespace, - # the node is by definition a child of the modular pipeline. - self.add_child_task( - modular_pipeline_id, - ModularPipelineChild(id=GraphNode._hash(str(node)), type=GraphNodeType.TASK), - ) - return modular_pipeline_id + modular_pipeline = self.get_or_create_modular_pipeline(modular_pipeline_id) + for task_node in task_nodes: + if task_node.namespace == modular_pipeline_id: + modular_pipeline.children.add(ModularPipelineChild(id=GraphNode._hash(str(task_node)), type=GraphNodeType.TASK)) + for input in task_node.inputs: + input_id = GraphNode._hash(input) + if input_id not in modular_pipeline.inputs: + modular_pipeline.children.add(ModularPipelineChild(id=input_id, type=GraphNodeType.DATA)) + for output in task_node.outputs: + output_id = GraphNode._hash(output) + if output_id not in modular_pipeline.outputs: + modular_pipeline.children.add(ModularPipelineChild(id=output_id, type=GraphNodeType.DATA)) + def has_modular_pipeline(self, modular_pipeline_id: str) -> bool: """Return whether this modular pipeline repository has a given modular pipeline ID. @@ -280,8 +224,8 @@ def has_modular_pipeline(self, modular_pipeline_id: str) -> bool: """ return modular_pipeline_id in self.tree - def get_modular_pipeline_from_node(self, node) -> Optional[str]: - """Get the name of the modular pipeline to which the given node_id belongs. + def get_modular_pipeline_for_node(self, node) -> Optional[str]: + """Get the name of the modular pipeline to which the given node belongs. Args: node: The node to check for its parent modular pipeline. @@ -290,42 +234,14 @@ def get_modular_pipeline_from_node(self, node) -> Optional[str]: The name of the modular pipeline if the node belongs to any modular pipeline, otherwise returns None. """ + print(self.tree.items()) node_id = GraphNode._hash(str(node)) - - - mod_id = None - for modular_pipeline_id, modular_pipeline_node in self.tree.items(): - if any(child.id == node_id for child in modular_pipeline_node.children): - mod_id= modular_pipeline_id - if mod_id: - return [mod_id] - else: - return [] - - - # def resolve_children(self): - # # Iterate over each pipeline in the tree - # print(self.tree) - # for pipeline_name, pipeline_node in self.tree.items(): - # # Check if the pipeline has a parent - # if '.' in pipeline_name: - # # Extract parent pipeline name and child pipeline name - # parent_name, child_name = pipeline_name.split('.') - - # # Get the children of parent pipeline and child pipeline - # parent_children = self.tree[parent_name].children - # child_children = pipeline_node.children - - # # Get the set of child IDs from the child pipeline - # child_ids = {child.id for child in child_children} - - # # Remove duplicate children from the parent pipeline - # parent_children = {child for child in parent_children if child.id not in child_ids} + modular_pipeline_id = None + for id, node in self.tree.items(): + if any(child.id == node_id for child in node.children): + modular_pipeline_id = id + return modular_pipeline_id - # # Update the children attribute of the parent pipeline - # self.tree[parent_name].children = parent_children - - # print(self.tree) def as_dict(self) -> Dict[str, ModularPipelineNode]: """Return the repository as a dictionary.""" diff --git a/package/kedro_viz/models/flowchart.py b/package/kedro_viz/models/flowchart.py index 487ad50df2..c3d72334dd 100644 --- a/package/kedro_viz/models/flowchart.py +++ b/package/kedro_viz/models/flowchart.py @@ -126,7 +126,7 @@ class GraphNode(BaseModel, abc.ABC): pipelines (Set[str]): The set of registered pipeline IDs this node belongs to. Defaults to `set()`. namespace (Optional[str]): The original namespace on this node. Defaults to `None`. - modular_pipelines (Optional[List[str]]): The list of modular pipeline this node belongs to. + modular_pipelines (str): Modular pipeline this node belongs to. """ @@ -143,15 +143,7 @@ class GraphNode(BaseModel, abc.ABC): set(), description="The set of registered pipeline IDs this node belongs to" ) - # In Kedro, modular pipeline is implemented by declaring namespace on a node. - # For example, node(func, namespace="uk.de") means this node belongs - # to the modular pipeline "uk" and "uk.de" - namespace: Optional[str] = Field( - default=None, - validate_default=True, - description="The original namespace on this node", - ) - modular_pipelines: Optional[List[str]] = Field( + modular_pipeline: Optional[str] = Field( default=None, validate_default=True, description="The modular_pipelines this node belongs to", @@ -162,49 +154,8 @@ class GraphNode(BaseModel, abc.ABC): def _hash(value: str): return hashlib.sha1(value.encode("UTF-8")).hexdigest()[:8] - @staticmethod - def _get_namespace(dataset_name: str) -> Optional[str]: - """Extract the namespace from the dataset/parameter name. - Args: - dataset_name: The name of the dataset. - Returns: - The namespace of this dataset, if available. - Example: - >>> GraphNode._get_namespace("pipeline.dataset") - 'pipeline' - """ - if "." not in dataset_name: - return None - - return dataset_name.rsplit(".", 1)[0] - - @staticmethod - def _expand_namespaces(namespace: Optional[str]) -> List[str]: - """Expand a node's namespace to the list of modular pipelines - that this node belongs to. - Args: - namespace: The namespace of the node. - Returns: - The list of modular pipelines that this node belongs to. - Example: - >>> GraphNode._expand_namespaces("pipeline1.data_science") - ['pipeline1', 'pipeline1.data_science'] - """ - if not namespace: - return [] - namespace_list = [] - namespace_chunks = namespace.split(".") - prefix = "" - for chunk in namespace_chunks: - if prefix: - prefix = f"{prefix}.{chunk}" - else: - prefix = chunk - namespace_list.append(prefix) - return namespace_list - - @classmethod - def create_task_node(cls, node: KedroNode, node_mod_pipeline) -> "TaskNode": + @classmethod + def create_task_node(cls, node: KedroNode, modular_pipeline_id: Optional[str]) -> "TaskNode": """Create a graph node of type task for a given Kedro Node instance. Args: node: A node in a Kedro pipeline. @@ -217,7 +168,7 @@ def create_task_node(cls, node: KedroNode, node_mod_pipeline) -> "TaskNode": name=node_name, tags=set(node.tags), kedro_obj=node, - modular_pipelines=node_mod_pipeline, + modular_pipeline=modular_pipeline_id, ) @classmethod @@ -228,8 +179,8 @@ def create_data_node( tags: Set[str], dataset: AbstractDataset, stats: Optional[Dict], + modular_pipeline_id: Optional[str], is_free_input: bool = False, - node_mod_pipeline=None, ) -> Union["DataNode", "TranscodedDataNode"]: """Create a graph node of type data for a given Kedro Dataset instance. Args: @@ -255,7 +206,7 @@ def create_data_node( layer=layer, is_free_input=is_free_input, stats=stats, - modular_pipelines=node_mod_pipeline, + modular_pipeline=modular_pipeline_id, ) return DataNode( @@ -266,7 +217,7 @@ def create_data_node( kedro_obj=dataset, is_free_input=is_free_input, stats=stats, - modular_pipelines=node_mod_pipeline, + modular_pipelines=modular_pipeline_id, ) @classmethod @@ -355,16 +306,6 @@ def check_kedro_obj_exists(cls, values): assert "kedro_obj" in values return values - @field_validator("namespace") - @classmethod - def set_namespace(cls, _, info: ValidationInfo): - return info.data["kedro_obj"].namespace - - # @field_validator("modular_pipelines") - # @classmethod - # def set_modular_pipelines(cls, _, info: ValidationInfo): - # return cls._expand_namespaces(info.data["kedro_obj"].namespace) - def _extract_wrapped_func(func: FunctionType) -> FunctionType: """Extract a wrapped decorated function to inspect the source code if available. @@ -547,13 +488,7 @@ class DataNode(GraphNode): validate_default=True, description="The concrete type of the underlying kedro_obj", ) - - modular_pipelines: List[str] = Field( - default=[], - validate_default=True, - description="The modular pipelines this node belongs to", - ) - + viz_metadata: Optional[Dict] = Field( default=None, validate_default=True, description="The metadata for data node" ) @@ -577,25 +512,6 @@ def set_dataset_type(cls, _, info: ValidationInfo): kedro_obj = cast(AbstractDataset, info.data.get("kedro_obj")) return get_dataset_type(kedro_obj) - @field_validator("namespace") - @classmethod - def set_namespace(cls, _, info: ValidationInfo): - assert "name" in info.data - - # the modular pipelines that a data node belongs to - # are derived from its namespace, which in turn - # is derived from the dataset's name. - name = info.data.get("name") - return cls._get_namespace(str(name)) - - # @field_validator("modular_pipelines") - # @classmethod - # def set_modular_pipelines(cls, _, info: ValidationInfo): - # assert "name" in info.data - - # name = info.data.get("name") - # namespace = cls._get_namespace(str(name)) - # return cls._expand_namespaces(namespace) @field_validator("viz_metadata") @classmethod @@ -665,26 +581,6 @@ class TranscodedDataNode(GraphNode): # The type for data node type: str = GraphNodeType.DATA.value - @field_validator("namespace") - @classmethod - def set_namespace(cls, _, info: ValidationInfo): - assert "name" in info.data - - # the modular pipelines that a data node belongs to - # are derived from its namespace, which in turn - # is derived from the dataset's name. - name = info.data.get("name") - return cls._get_namespace(str(name)) - - @field_validator("modular_pipelines") - @classmethod - def set_modular_pipelines(cls, _, info: ValidationInfo): - assert "name" in info.data - - name = info.data.get("name") - namespace = cls._get_namespace(str(name)) - return cls._expand_namespaces(namespace) - def has_metadata(self) -> bool: return True