Skip to content

Commit

Permalink
good prog
Browse files Browse the repository at this point in the history
  • Loading branch information
rashidakanchwala committed May 8, 2024
1 parent d2188bc commit fac44e0
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 265 deletions.
24 changes: 12 additions & 12 deletions demo-project/src/demo_project/pipeline_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ def create_pipeline(**kwargs) -> Pipeline:
outputs={"dataset_3","dataset_4"}
)

# other = pipeline([
# node(lambda x: x,
# inputs="dataset_3",
# outputs="dataset_5",
# name="step5"
# )
# ],
# namespace="other_pipeline",
# inputs={"dataset_3"},
# outputs={"dataset_5"}
# )
other = pipeline([
node(lambda x: x,
inputs="dataset_3",
outputs="dataset_5",
name="step5"
)
],
namespace="other_pipeline",
inputs={"dataset_3"},
outputs={"dataset_5"}
)

return new_pipeline
return new_pipeline + other


def register_pipelines() -> Dict[str, Pipeline]:
Expand Down
2 changes: 1 addition & 1 deletion package/kedro_viz/api/rest/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BaseGraphNodeAPIResponse(BaseAPIResponse):
type: str

# If a node is a ModularPipeline node, this value will be None, hence Optional.
modular_pipelines: Optional[List[str]] = None
modular_pipeline: Optional[str] = None


class TaskNodeAPIResponse(BaseGraphNodeAPIResponse):
Expand Down
46 changes: 14 additions & 32 deletions package/kedro_viz/data_access/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ def get_stats_for_data_node(self, data_node_name: str) -> Union[Dict, None]:

return self.dataset_stats.get(data_node_name, None)



def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
"""Iterate through all the nodes and datasets in a "registered" pipeline
and add them to relevant repositories. Take care of extracting other relevant information
Expand All @@ -158,22 +156,19 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
free_inputs = pipeline.inputs()

for node in pipeline.nodes:
modular_pipelines_tree.extract_from_node(node)
node_modular_pipeline_id = modular_pipelines_tree.get_modular_pipeline_from_node(node)
task_node = self.add_node(registered_pipeline_id, node, node_modular_pipeline_id)

task_node = self.add_node(registered_pipeline_id, node, modular_pipelines_tree)

self.registered_pipelines.add_node(registered_pipeline_id, task_node.id)


# Add node's inputs as DataNode to the graph
for input_ in node.inputs:
# Add the input as an input to the task_node
# Mark it as a transcoded dataset unless it's a free input
# because free inputs to the pipeline can't be transcoded.
is_free_input = input_ in free_inputs
input_modular_pipeline_id = modular_pipelines_tree.get_modular_pipeline_from_node(input_)
input_node = self.add_node_input(
registered_pipeline_id, input_, task_node, is_free_input, input_modular_pipeline_id
registered_pipeline_id, input_, task_node, modular_pipelines_tree, is_free_input
)
self.registered_pipelines.add_node(
registered_pipeline_id, input_node.id
Expand All @@ -186,9 +181,8 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
# Add node outputs as DataNode to the graph.
# It follows similar logic to adding inputs.
for output in node.outputs:
output_modular_pipeline_id = modular_pipelines_tree.get_modular_pipeline_from_node(output)
output_node = self.add_node_output(
registered_pipeline_id, output, task_node, output_modular_pipeline_id
registered_pipeline_id, output, task_node, modular_pipelines_tree
)
self.registered_pipelines.add_node(
registered_pipeline_id, output_node.id
Expand All @@ -199,7 +193,7 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):

# modular_pipelines_repo_obj.extract_from_node(output_node)

def add_node(self, registered_pipeline_id: str, node: KedroNode, node_mod_pipeline) -> TaskNode:
def add_node(self, registered_pipeline_id: str, node: KedroNode, modular_pipeline_tree: ModularPipelinesRepository) -> TaskNode:
"""Add a Kedro node as a TaskNode to the NodesRepository
for a given registered pipeline ID.
Expand All @@ -209,7 +203,7 @@ def add_node(self, registered_pipeline_id: str, node: KedroNode, node_mod_pipeli
Returns:
The GraphNode instance representing the Kedro node that was added to the graph.
"""
task_node: TaskNode = self.nodes.add_node(GraphNode.create_task_node(node, node_mod_pipeline))
task_node: TaskNode = self.nodes.add_node(GraphNode.create_task_node(node, modular_pipeline_tree.get_modular_pipeline_for_node(node)))
task_node.add_pipeline(registered_pipeline_id)
self.tags.add_tags(task_node.tags)
return task_node
Expand All @@ -219,8 +213,8 @@ def add_node_input(
registered_pipeline_id: str,
input_dataset: str,
task_node: TaskNode,
modular_pipeline_tree: ModularPipelinesRepository,
is_free_input: bool = False,
node_mod_pipeline = None
) -> Union[DataNode, TranscodedDataNode, ParametersNode]:
"""Add a Kedro node's input as a DataNode, TranscodedDataNode or ParametersNode
to the NodesRepository for a given registered pipeline ID.
Expand All @@ -235,7 +229,7 @@ def add_node_input(
"""

graph_node = self.add_dataset(
registered_pipeline_id, input_dataset, is_free_input=is_free_input, node_mod_pipeline=node_mod_pipeline
registered_pipeline_id, input_dataset, modular_pipeline_tree, is_free_input=is_free_input,
)
graph_node.tags.update(task_node.tags)
self.edges[registered_pipeline_id].add_edge(
Expand All @@ -250,7 +244,7 @@ def add_node_input(
return graph_node

def add_node_output(
self, registered_pipeline_id: str, output_dataset: str, task_node: TaskNode, node_mod_pipeline = None
self, registered_pipeline_id: str, output_dataset: str, task_node: TaskNode, modular_pipeline_tree: ModularPipelinesRepository,
) -> Union[DataNode, TranscodedDataNode, ParametersNode]:
"""Add a Kedro node's output as a DataNode, TranscodedDataNode or ParametersNode
to the NodesRepository for a given registered pipeline ID.
Expand All @@ -262,7 +256,7 @@ def add_node_output(
Returns:
The GraphNode instance representing the node's output that was added to the graph.
"""
graph_node = self.add_dataset(registered_pipeline_id, output_dataset, node_mod_pipeline=node_mod_pipeline)
graph_node = self.add_dataset(registered_pipeline_id, output_dataset, modular_pipeline_tree)
graph_node.tags.update(task_node.tags)
self.edges[registered_pipeline_id].add_edge(
GraphEdge(source=task_node.id, target=graph_node.id)
Expand All @@ -274,8 +268,8 @@ def add_dataset(
self,
registered_pipeline_id: str,
dataset_name: str,
modular_pipeline_tree: ModularPipelinesRepository,
is_free_input: bool = False,
node_mod_pipeline = None
) -> Union[DataNode, TranscodedDataNode, ParametersNode]:
"""Add a Kedro dataset as a DataNode, TranscodedDataNode or ParametersNode
to the NodesRepository for a given registered pipeline ID.
Expand Down Expand Up @@ -304,8 +298,9 @@ def add_dataset(
tags=set(),
dataset=obj,
stats=self.get_stats_for_data_node(_strip_transcoding(dataset_name)),
is_free_input=is_free_input,
node_mod_pipeline=node_mod_pipeline
modular_pipeline_id=modular_pipeline_tree.get_modular_pipeline_for_node(dataset_name),
is_free_input=is_free_input

)
graph_node = self.nodes.add_node(graph_node)
graph_node.add_pipeline(registered_pipeline_id)
Expand Down Expand Up @@ -494,17 +489,4 @@ def create_modular_pipelines_tree_for_registered_pipeline(
)
node_dependencies[bad_input].remove(modular_pipeline_id)

for node_id, node in self.nodes.as_dict().items():
if (
node.type == GraphNodeType.MODULAR_PIPELINE
or not node.belongs_to_pipeline(registered_pipeline_id)
):
continue
if not node.modular_pipelines or node_id in root_parameters:
modular_pipelines_tree[ROOT_MODULAR_PIPELINE_ID].children.add(
ModularPipelineChild(
id=node_id, type=self.nodes.get_node_by_id(node_id).type
)
)

return modular_pipelines_tree
130 changes: 23 additions & 107 deletions package/kedro_viz/data_access/repositories/modular_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,8 @@ def explode(nested_namespace: str) -> list[str]:
self.add_inputs(modular_pipeline_id, free_inputs_to_sub_pipeline)
self.add_outputs(modular_pipeline_id, free_outputs_from_sub_pipeline)

internal_inputs = sub_pipeline.all_inputs() - free_inputs_to_sub_pipeline
internal_outputs = sub_pipeline.all_outputs() - free_outputs_from_sub_pipeline

self.add_child_datasets(modular_pipeline_id, internal_inputs, internal_outputs)

task_nodes = sub_pipeline.nodes
# self.add_child_tasks(modular_pipeline_id, task_nodes)
self.add_children(modular_pipeline_id, task_nodes)

def get_or_create_modular_pipeline(
self, modular_pipeline_id: str
Expand Down Expand Up @@ -181,29 +176,8 @@ def add_outputs(
GraphNode._hash(output) for output in outputs
}

def add_child_datasets(self, modular_pipeline_id: str, inputs: Set[str], outputs: Set[str]):
print(modular_pipeline_id)
parent_modular_pipeline_id, _ = modular_pipeline_id.partition('.')
if parent_modular_pipeline_id:
parent_modular_pipeline = self.get_or_create_modular_pipeline(parent_modular_pipeline_id)
modular_pipeline = self.get_or_create_modular_pipeline(modular_pipeline_id)
for input in inputs:
input_id = GraphNode._hash(input)
print("input_id", input_id)
if parent_modular_pipeline_id and input_id in parent_modular_pipeline.children:
print("parent_modular_pipeline_id", parent_modular_pipeline_id)
print("parent_modular_pipeline.children", parent_modular_pipeline.children)
parent_modular_pipeline.inputs.remove(input_id)
modular_pipeline.children.add(ModularPipelineChild(id= input_id , type=GraphNodeType.DATA))
for output in outputs:
output_id = GraphNode._hash(output)
if parent_modular_pipeline_id and output_id in parent_modular_pipeline.children:
parent_modular_pipeline.inputs.remove(output_id)
modular_pipeline.children.add(ModularPipelineChild(id= output_id, type=GraphNodeType.DATA))



def add_child_task(self, modular_pipeline_id: str, child: ModularPipelineChild):
def add_children(self, modular_pipeline_id: str, task_nodes: List[GraphNode]):
"""Add a child to a modular pipeline.
Args:
modular_pipeline_id: ID of the modular pipeline to add the child to.
Expand All @@ -220,50 +194,20 @@ def add_child_task(self, modular_pipeline_id: str, child: ModularPipelineChild):
... )
>>> assert data_science_pipeline.children == {modular_pipeline_child}
"""
modular_pipeline = self.get_or_create_modular_pipeline(modular_pipeline_id)
modular_pipeline.children.add(child)


def extract_from_node(self, node) -> Optional[str]:
"""Extract the namespace from a graph node and add it as a modular pipeline node
to the modular pipeline repository.
Args:
node: The GraphNode from which to extract modular pipeline.
Returns:
ID of the modular pipeline node added to the modular pipeline repository if found.
Example:
>>> modular_pipelines = ModularPipelinesRepository()
>>> model_output_node = GraphNode.create_data_node(
... "data_science.model_output", layer=None, tags=set(), dataset=None
... )
>>> modular_pipelines.extract_from_node(model_output_node)
'data_science'
>>> assert modular_pipelines.has_modular_pipeline("data_science")
"""

# There is no need to extract modular pipeline from parameters
# because all valid modular pipelines are encoded in either a TaskNode or DataNode.

if isinstance(node, ParametersNode):
return None

modular_pipeline_id = node.namespace
if not modular_pipeline_id:
return None

# Add the node's registered pipelines to the modular pipeline's registered pipelines.
# Basically this means if the node belongs to the "__default__" pipeline, for example,
# so does the modular pipeline.
# modular_pipeline.pipelines.update(node.pipelines)

# Since we extract the modular pipeline from the node's namespace,
# the node is by definition a child of the modular pipeline.
self.add_child_task(
modular_pipeline_id,
ModularPipelineChild(id=GraphNode._hash(str(node)), type=GraphNodeType.TASK),
)
return modular_pipeline_id
modular_pipeline = self.get_or_create_modular_pipeline(modular_pipeline_id)
for task_node in task_nodes:
if task_node.namespace == modular_pipeline_id:
modular_pipeline.children.add(ModularPipelineChild(id=GraphNode._hash(str(task_node)), type=GraphNodeType.TASK))
for input in task_node.inputs:
input_id = GraphNode._hash(input)
if input_id not in modular_pipeline.inputs:
modular_pipeline.children.add(ModularPipelineChild(id=input_id, type=GraphNodeType.DATA))
for output in task_node.outputs:
output_id = GraphNode._hash(output)
if output_id not in modular_pipeline.outputs:
modular_pipeline.children.add(ModularPipelineChild(id=output_id, type=GraphNodeType.DATA))


def has_modular_pipeline(self, modular_pipeline_id: str) -> bool:
"""Return whether this modular pipeline repository has a given modular pipeline ID.
Expand All @@ -280,8 +224,8 @@ def has_modular_pipeline(self, modular_pipeline_id: str) -> bool:
"""
return modular_pipeline_id in self.tree

def get_modular_pipeline_from_node(self, node) -> Optional[str]:
"""Get the name of the modular pipeline to which the given node_id belongs.
def get_modular_pipeline_for_node(self, node) -> Optional[str]:
"""Get the name of the modular pipeline to which the given node belongs.
Args:
node: The node to check for its parent modular pipeline.
Expand All @@ -290,42 +234,14 @@ def get_modular_pipeline_from_node(self, node) -> Optional[str]:
The name of the modular pipeline if the node belongs to any modular pipeline,
otherwise returns None.
"""
print(self.tree.items())
node_id = GraphNode._hash(str(node))


mod_id = None
for modular_pipeline_id, modular_pipeline_node in self.tree.items():
if any(child.id == node_id for child in modular_pipeline_node.children):
mod_id= modular_pipeline_id
if mod_id:
return [mod_id]
else:
return []


# def resolve_children(self):
# # Iterate over each pipeline in the tree
# print(self.tree)
# for pipeline_name, pipeline_node in self.tree.items():
# # Check if the pipeline has a parent
# if '.' in pipeline_name:
# # Extract parent pipeline name and child pipeline name
# parent_name, child_name = pipeline_name.split('.')

# # Get the children of parent pipeline and child pipeline
# parent_children = self.tree[parent_name].children
# child_children = pipeline_node.children

# # Get the set of child IDs from the child pipeline
# child_ids = {child.id for child in child_children}

# # Remove duplicate children from the parent pipeline
# parent_children = {child for child in parent_children if child.id not in child_ids}
modular_pipeline_id = None
for id, node in self.tree.items():
if any(child.id == node_id for child in node.children):
modular_pipeline_id = id
return modular_pipeline_id

# # Update the children attribute of the parent pipeline
# self.tree[parent_name].children = parent_children

# print(self.tree)

def as_dict(self) -> Dict[str, ModularPipelineNode]:
"""Return the repository as a dictionary."""
Expand Down
Loading

0 comments on commit fac44e0

Please sign in to comment.