From ac564270c8b1ea0cb3d8f7fa86ee0ceac639b4fc Mon Sep 17 00:00:00 2001 From: Anke Koke <79906866+ankeko@users.noreply.github.com> Date: Wed, 8 Nov 2023 13:55:12 +0100 Subject: [PATCH] fix: Generate mkdocs graphs with multiple dependencies (#86) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📥 Pull Request Description Implemented option to generate pipeline graphs of dagster jobs with multiple op dependencies. ## 👀 Affected Areas Documentation ## 📝 Checklist Please make sure you've completed the following tasks before submitting this pull request: - [x] Pre-commit hooks were executed - [x] Changes have been reviewed by at least one other developer - [ ] Tests have been added or updated to cover the changes (only necessary if the changes affect the executable code) - [x] All tests ran successfully - [x] All merge conflicts are resolved - [ ] Documentation has been updated to reflect the changes - [ ] Any necessary migrations have been run --- niceml/mkdocs/mdgraph.py | 14 ++++++++++---- niceml/mkdocs/mdjob.py | 8 +++----- niceml/mkdocs/mdop.py | 8 +++----- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/niceml/mkdocs/mdgraph.py b/niceml/mkdocs/mdgraph.py index 872b20ee..f5e45bdb 100644 --- a/niceml/mkdocs/mdgraph.py +++ b/niceml/mkdocs/mdgraph.py @@ -1,14 +1,20 @@ """Module for generating a graph in mkdocs""" -from dagster import JobDefinition +from dagster import JobDefinition, DependencyDefinition, MultiDependencyDefinition def get_graph_md(job: JobDefinition) -> str: """Creates a graph as str with material for mkdocs""" deps = job.graph.dependencies graph_str = "" - for key, value in deps.items(): - for _, val2 in value.items(): - graph_str += f" {val2.node} --> {key.name};\n" + for node, node_dependencies in deps.items(): + for _, dependencies in node_dependencies.items(): + if isinstance(dependencies, DependencyDefinition): + graph_str += f" {dependencies.node} --> {node.name};\n" + elif isinstance(dependencies, MultiDependencyDefinition): + for dependency in dependencies.dependencies: + graph_str += f" {dependency.node} --> {node.name};\n" + else: + raise AttributeError("'dependencies' is not of expected type.") if len(graph_str) == 0: return "" diff --git a/niceml/mkdocs/mdjob.py b/niceml/mkdocs/mdjob.py index 858e4537..5d62e427 100644 --- a/niceml/mkdocs/mdjob.py +++ b/niceml/mkdocs/mdjob.py @@ -1,11 +1,9 @@ """Module for generating mkdocs str for jobs""" from typing import List -from dagster.core.definitions import NodeDefinition - from niceml.mkdocs.mdgraph import get_graph_md from niceml.mkdocs.mdop import get_md_op -from dagster import JobDefinition +from dagster import JobDefinition, OpDefinition def get_job_md(job: JobDefinition, include_graph: bool = True) -> str: @@ -17,13 +15,13 @@ def get_job_md(job: JobDefinition, include_graph: bool = True) -> str: graph_md = get_graph_md(job) if len(graph_md) > 0: job_md += graph_md + "\n\n" - op_list: List[NodeDefinition] = get_ops_from_job(job) + op_list: List[OpDefinition] = get_ops_from_job(job) for cur_op in op_list: job_md += get_md_op(cur_op) return job_md -def get_ops_from_job(job: JobDefinition) -> List[NodeDefinition]: +def get_ops_from_job(job: JobDefinition) -> List[OpDefinition]: """Returns all ops from job""" return job.all_node_defs diff --git a/niceml/mkdocs/mdop.py b/niceml/mkdocs/mdop.py index 92602a98..23f71195 100644 --- a/niceml/mkdocs/mdop.py +++ b/niceml/mkdocs/mdop.py @@ -1,13 +1,11 @@ """Module for generating markdown strings for dagster ops""" from typing import Dict, List -from dagster._core.definitions import NodeDefinition - from niceml.mkdocs.mdtable import get_md_table -from dagster import Field +from dagster import Field, OpDefinition -def get_md_op(op_def: NodeDefinition) -> str: +def get_md_op(op_def: OpDefinition) -> str: """generates markdown strings for dagster ops""" col_widths: List[int] = [80, 120] op_fields = get_op_fields(op_def) @@ -24,7 +22,7 @@ def get_md_op(op_def: NodeDefinition) -> str: return cur_md -def get_op_fields(op_def: NodeDefinition) -> Dict[str, Field]: +def get_op_fields(op_def: OpDefinition) -> Dict[str, Field]: """returns fields from OpDefinition""" try: return op_def.config_schema.config_type.fields