Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dbt project parsing and Airflow DAG generation #360

Merged
merged 56 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
6e23d06
Improve running tests locally
tatiana Jun 22, 2023
f4a2a5b
Fix conflict between dbt and airflow
tatiana Jun 23, 2023
3ca8d51
Split integration tests script into two, addressing code review feedback
tatiana Jun 27, 2023
f0ffeb9
E2E working with local executor and DAG
tatiana Jul 4, 2023
dcff003
Remove filter
tatiana Jul 4, 2023
a183764
Generate the DAG as in the past, using internal task groups
tatiana Jul 4, 2023
74f6c1d
Working for all example dags
tatiana Jul 4, 2023
233ea59
Refactor directory structure, grouping airflow stuff
tatiana Jul 4, 2023
4b4db66
Add notes for later
tatiana Jul 4, 2023
8e597a1
Refactor DAG building, with clear airflow and dbt split
tatiana Jul 5, 2023
f2da580
Refactor modules and packages
tatiana Jul 5, 2023
e8f095c
add todo
tatiana Jul 5, 2023
f604f4c
Doc todo
tatiana Jul 5, 2023
4d8ac60
Add LoadMode, so we have control of how things are loaded from outsid…
tatiana Jul 7, 2023
1b8afa8
Fix method name
tatiana Jul 7, 2023
582b74b
Run all integration tests
tatiana Jul 7, 2023
6f51ae2
Fix typing import
tatiana Jul 7, 2023
0bccdfa
Improve test coverage
tatiana Jul 7, 2023
e5dde71
Improve test coverage
tatiana Jul 7, 2023
db4bc40
Reimplement custom filters
tatiana Jul 12, 2023
858055b
Improve test coverage
tatiana Jul 12, 2023
537f27f
Improve test coverage
tatiana Jul 12, 2023
e6e5830
Remove renderer
tatiana Jul 12, 2023
0f7080e
Move aux method from render to new module
tatiana Jul 12, 2023
b313a23
Fix test config after_all
tatiana Jul 12, 2023
1ee78c3
Change unittest to relate to changes to calculate_leaves
tatiana Jul 12, 2023
84b7c4a
Adjust test for Airflow > 2.3
tatiana Jul 12, 2023
1d28447
Fix test
tatiana Jul 12, 2023
7ce59af
Cover multiple behaviours of loading automatically and not automatically
tatiana Jul 12, 2023
3467372
Expose manifest_path and load_mode in DbtToAirflowConverter
tatiana Jul 12, 2023
28405d9
Expose dbt command line error to end-user
tatiana Jul 12, 2023
2354d9c
Handle errors more gracefully when running subprocess
tatiana Jul 12, 2023
16c1936
Add docstrings and type hints
tatiana Jul 12, 2023
1bc0b1f
Fix issue with pipeline path & attempt to deal with inexistent profiles
tatiana Jul 14, 2023
6c97412
Improve dbt ls error logging
tatiana Jul 14, 2023
5717482
Attempt to solve CI issue
tatiana Jul 17, 2023
80bb987
Try to fix CI issue
tatiana Jul 17, 2023
870530b
Rollback changes in integration tests
tatiana Jul 17, 2023
a323422
Fix integration tests
tatiana Jul 17, 2023
59e1821
Fix parametrize
tatiana Jul 17, 2023
a67aaa3
Add typehints and docstrings
tatiana Jul 17, 2023
8804c90
More type hints and docstrings
tatiana Jul 17, 2023
87d3207
Remove stale comment, we can onlyu remove this class when we stop usi…
tatiana Jul 17, 2023
e09802b
More docstrings and typehints
tatiana Jul 17, 2023
9078af8
Add docstrings and remove creation ofartificial profiles.yml
tatiana Jul 17, 2023
6d71189
Fix issue introduced by refactor
tatiana Jul 17, 2023
efee43c
Remove project pipeline_dir as it is redundant
tatiana Jul 17, 2023
8e7577c
Add docstrings to selector
tatiana Jul 17, 2023
60f87ed
Fix validate_arguments so it uses the new select and exclude represen…
tatiana Jul 17, 2023
3070206
Add dbt model name on run tasks
tatiana Jul 17, 2023
1ed023f
Add test related to giving users feedback if dbt command doesnt exist
tatiana Jul 17, 2023
665a6e4
Fix test on CI
tatiana Jul 17, 2023
9b6f51c
Fix running a single model/seed/snapshot per task
tatiana Jul 17, 2023
e87c776
Replace select by models parameter on run/seed/snapshot calls
tatiana Jul 17, 2023
d2a354f
Improve test coverage
tatiana Jul 17, 2023
3f171db
Mark as integrationt test that relies on dbt
tatiana Jul 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from cosmos.dataset import get_dbt_dataset

# re-export the dag and task group
from cosmos.dag import DbtDag
from cosmos.task_group import DbtTaskGroup
from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup

# re-export the operators
from cosmos.operators.local import (
Expand Down
Empty file added cosmos/airflow/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions cosmos/airflow/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
This module contains a function to render a dbt project as an Airflow DAG.
"""
from __future__ import annotations

from typing import Any

from airflow.models.dag import DAG

from cosmos.converter import airflow_kwargs, specific_kwargs, DbtToAirflowConverter


class DbtDag(DAG, DbtToAirflowConverter):
"""
Render a dbt project as an Airflow DAG.
"""

def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
DAG.__init__(self, *args, **airflow_kwargs(**kwargs))
DbtToAirflowConverter.__init__(self, *args, dag=self, **specific_kwargs(**kwargs))
196 changes: 196 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from __future__ import annotations

import logging
from typing import Callable


from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos.core.airflow import get_airflow_task as create_airflow_task
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dataset import get_dbt_dataset
from cosmos.dbt.graph import DbtNode


logger = logging.getLogger(__name__)


def calculate_operator_class(
execution_mode: str,
dbt_class: str,
) -> str:
"""
Given an execution mode and dbt class, return the operator class path to use.

:param execution_mode: Cosmos execution mode (e.g. local, virtualenv, docker, kubernetes)
:param dbt_class: The dbt command being used (e.g. DbtSnapshot, DbtRun, DbtSeed)
:returns: path string to the correspondent Cosmos Airflow operator
(e.g. cosmos.operators.localDbtSnapshotLocalOperator)
"""
return f"cosmos.operators.{execution_mode}.{dbt_class}{execution_mode.capitalize()}Operator"


def calculate_leaves(tasks_ids: list[str], nodes: dict[str, DbtNode]) -> list[str]:
"""
Return a list of unique_ids for nodes that are not parents (don't have dependencies on other tasks).

:param tasks_ids: Node/task IDs which are materialized in the Airflow DAG
:param nodes: Dictionary mapping dbt nodes (node.unique_id to node)
:returns: List of unique_ids for the nodes that are graph leaves
"""
parents = []
leaves = []
materialized_nodes = [node for node in nodes.values() if node.unique_id in tasks_ids]
[parents.extend(node.depends_on) for node in materialized_nodes]
parents_ids = set(parents)
for node in materialized_nodes:
if node.unique_id not in parents_ids:
leaves.append(node.unique_id)
return leaves


def create_task_metadata(node: DbtNode, execution_mode: str, args: dict) -> TaskMetadata:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.

:param node: The dbt node which we desired to convert into an Airflow Task
:param execution_mode: The Cosmos execution mode we're aiming to run the dbt task at (e.g. local)
:param args: Arguments to be used to instantiate an Airflow Task
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
dbt_resource_to_class = {"model": "DbtRun", "snapshot": "DbtSnapshot", "seed": "DbtSeed", "test": "DbtTest"}
args = {**args, **{"models": node.name}}
task_id_suffix = "run" if node.resource_type == "model" else node.resource_type

if node.resource_type in dbt_resource_to_class:
task_metadata = TaskMetadata(
id=f"{node.name}_{task_id_suffix}",
operator_class=calculate_operator_class(
execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type]
),
arguments=args,
)
return task_metadata
else:
logger.error(f"Unsupported resource type {node.resource_type} (node {node.unique_id}).")


def create_test_task_metadata(
test_task_name: str,
execution_mode: str,
task_args: dict,
on_warning_callback: callable,
model_name: str | None = None,
) -> TaskMetadata:
"""
Create the metadata that will be used to instantiate the Airflow Task that will be used to run the Dbt test node.

:param test_task_name: Name of the Airflow task to be created
:param execution_mode: The Cosmos execution mode we're aiming to run the dbt task at (e.g. local)
:param task_args: Arguments to be used to instantiate an Airflow Task
:param on_warning_callback: A callback function called on warnings with additional Context variables “test_names”
and “test_results” of type List.
:param model_name: If the test relates to a specific model, the name of the model it relates to
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
task_args = dict(task_args)
task_args["on_warning_callback"] = on_warning_callback
if model_name is not None:
task_args["models"] = model_name
return TaskMetadata(
id=test_task_name,
operator_class=calculate_operator_class(
execution_mode=execution_mode,
dbt_class="DbtTest",
),
arguments=task_args,
)


def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
execution_mode: str, # Cosmos-specific - decide what which class to use
task_args: dict[str, str], # Cosmos/DBT - used to instantiate tasks
test_behavior: str | None, # Cosmos-specific: how to inject tests to Airflow DAG
dbt_project_name: str, # DBT / Cosmos - used to name test task if mode is after_all,
conn_id: str, # Cosmos, dataset URI
task_group: TaskGroup | None = None,
on_warning_callback: Callable | None = None, # argument specific to the DBT test command
emit_datasets: bool = True, # Cosmos
) -> None:
"""
Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory).

The following arguments affect how each airflow task is instantiated:
* `execution_mode`
* `task_args`

The parameter `test_behavior` influences how many and where test nodes will be added, while the argument
`on_warning_callback` allows users to set a callback function to be called depending on the test result.
If the `test_behavior` is None, no test nodes are added. Otherwise, if the `test_behaviour` is `after_all`,
a single test task will be added after the Cosmos leave tasks, and it is named using `dbt_project_name`.
Finally, if the `test_behaviour` is `after_each`, a test will be added after each model.

If `emit_datasets` is True, tasks will create outlets using:
* `dbt_project_name`
* `conn_id`

:param nodes: Dictionary mapping dbt nodes (node.unique_id to node)
:param dag: Airflow DAG instance
:param execution_mode: The Cosmos execution mode we're aiming to run the dbt task at (e.g. local)
:param task_args: Arguments to be used to instantiate an Airflow Task
:param test_behavior: Defines how many test dbt nodes and where they will be added
:param dbt_project_name: Name of the dbt pipeline of interest
:param conn_id: Airflow connection ID
:param task_group: Airflow Task Group instance
:param on_warning_callback: A callback function called on warnings with additional Context variables “test_names”
and “test_results” of type List.
:param emit_datasets: Decides if Cosmos should add outlets to model classes or not.
"""
tasks_map = {}

# In most cases, we'll map one DBT node to one Airflow task
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
for node_id, node in nodes.items():
task_meta = create_task_metadata(node=node, execution_mode=execution_mode, args=task_args)
if emit_datasets:
task_args["outlets"] = [get_dbt_dataset(conn_id, dbt_project_name, node.name)]
if task_meta and node.resource_type != "test":
if node.resource_type == "model" and test_behavior == "after_each":
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
f"{node.name}_test",
execution_mode,
task_args=task_args,
model_name=node.name,
on_warning_callback=on_warning_callback,
)
test_task = create_airflow_task(test_meta, dag, task_group=model_task_group)
task >> test_task
task_or_group = model_task_group
else:
task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
tasks_map[node_id] = task_or_group

# If test_behaviour=="after_all", there will be one test task, run "by the end" of the DAG
# The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks)
if test_behavior == "after_all":
task_args.pop("outlets", None)
test_meta = create_test_task_metadata(
f"{dbt_project_name}_test", execution_mode, task_args=task_args, on_warning_callback=on_warning_callback
)
test_task = create_airflow_task(test_meta, dag, task_group=task_group)
leaves_ids = calculate_leaves(tasks_ids=tasks_map.keys(), nodes=nodes)
for leaf_node_id in leaves_ids:
tasks_map[leaf_node_id] >> test_task

# Create the Airflow task dependencies between non-test nodes
for node_id, node in nodes.items():
for parent_node_id in node.depends_on:
# depending on the node type, it will not have mapped 1:1 to tasks_map
if (node_id in tasks_map) and (parent_node_id in tasks_map):
tasks_map[parent_node_id] >> tasks_map[node_id]
24 changes: 24 additions & 0 deletions cosmos/airflow/task_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
This module contains a function to render a dbt project as an Airflow Task Group.
"""
from __future__ import annotations
from typing import Any

from airflow.utils.task_group import TaskGroup

from cosmos.converter import airflow_kwargs, specific_kwargs, DbtToAirflowConverter


class DbtTaskGroup(TaskGroup, DbtToAirflowConverter):
"""
Render a dbt project as an Airflow Task Group.
"""

def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
group_id = kwargs.get("group_id", kwargs.get("dbt_project_name", "dbt_task_group"))
TaskGroup.__init__(self, group_id, *args, **airflow_kwargs(**kwargs))
DbtToAirflowConverter.__init__(self, *args, task_group=self, **specific_kwargs(**kwargs))
Loading
Loading