Skip to content

Commit

Permalink
Add docstrings and remove creation ofartificial profiles.yml
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Jul 17, 2023
1 parent a6ea918 commit ccc87f0
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class DbtToAirflowConverter:
Defaults to "after_each"
:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param execution_mode: The execution mode in which the dbt project should be run.
:param execution_mode: How Cosmos should run each dbt node (local, virtualenv, docker, k8s)
Options are "local", "virtualenv", "docker", and "kubernetes".
Defaults to "local"
:param on_warning_callback: A callback function called on warnings with additional Context variables "test_names"
Expand Down
76 changes: 66 additions & 10 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from subprocess import Popen, PIPE
from typing import Any

Expand All @@ -19,10 +18,18 @@


class CosmosLoadDbtException(Exception):
"""
Exception raised while trying to load a `dbt` project as a `DbtGraph` instance.
"""

pass


class LoadMode(Enum):
"""
Supported ways to load a `dbt` project into a `DbtGraph` instance.
"""

AUTOMATIC = "automatic"
CUSTOM = "custom"
DBT_LS = "dbt_ls"
Expand All @@ -46,34 +53,57 @@ class DbtNode:

class DbtGraph:
"""
Support loading a dbt project graph (represented by nodes) using different strategies.
A dbt project graph (represented by `nodes` and `filtered_nodes`).
Supports different ways of loading the `dbt` project into this representation.
Different loading methods can result in different `nodes` and `filtered_nodes`.
Example of how to use:
dbt_graph = DbtGraph(
project=DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR),
exclude=["*orders*"],
select=[],
dbt_cmd="/usr/local/bin/dbt",
)
dbt_graph.load(method=LoadMode.DBT_LS, execution_mode="local")
"""

nodes: dict[str, DbtNode] = dict()
filtered_nodes: dict[str, DbtNode] = dict()

def __init__(self, project: DbtProject, exclude=None, select=None, dbt_cmd=get_system_dbt()):
def __init__(
self,
project: DbtProject,
exclude: list[str] | None = None,
select: list[str] = None,
dbt_cmd: str = get_system_dbt(),
):
self.project = project
self.exclude = exclude or []
self.select = select or []

# specific to loading using ls
self.dbt_cmd = dbt_cmd

def is_manifest_available(self):
return self.project.manifest_path and Path(self.project.manifest_path).exists()
def load(self, method: LoadMode = LoadMode.AUTOMATIC, execution_mode: str = "local") -> None:
"""
Load a `dbt` project into a `DbtGraph`, setting `nodes` and `filtered_nodes` accordingly.
def load(self, method=LoadMode.AUTOMATIC, execution_mode="local"):
:param method: How to load `nodes` from a `dbt` project (automatically, using custom parser, using dbt manifest
or dbt ls)
:param execution_mode: How Cosmos should run each dbt node (local, virtualenv, docker, k8s)
"""
load_method = {
LoadMode.CUSTOM: self.load_via_custom_parser,
LoadMode.DBT_LS: self.load_via_dbt_ls,
LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest,
}
if method == LoadMode.AUTOMATIC:
if self.is_manifest_available():
if self.project.is_manifest_available():
self.load_from_dbt_manifest()
return
elif execution_mode in ("local", "virtualenv"):
elif execution_mode in ("local", "virtualenv") and self.is_profile_yml_available():
try:
self.load_via_dbt_ls()
return
Expand All @@ -84,12 +114,20 @@ def load(self, method=LoadMode.AUTOMATIC, execution_mode="local"):
self.load_via_custom_parser()
return

if method == LoadMode.DBT_MANIFEST and not self.is_manifest_available():
if method == LoadMode.DBT_MANIFEST and not self.project.is_manifest_available():
raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}")

load_method[method]()

def load_via_dbt_ls(self):
"""
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
line for both parsing and filtering the nodes.
Updates in-place:
* self.nodes
* self.filtered_nodes
"""
logger.info("Trying to parse the dbt project using dbt ls...")
command = [self.dbt_cmd, "ls", "--output", "json", "--profiles-dir", self.project.dir]
if self.exclude:
Expand Down Expand Up @@ -133,7 +171,15 @@ def load_via_dbt_ls(self):

def load_via_custom_parser(self):
"""
Convert from the legacy Cosmos DbtProject to the new list of nodes representation.
This is the least accurate way of loading `dbt` projects and filtering them out, since it uses custom Cosmos
logic, which is usually a subset of what is available in `dbt`.
Internally, it uses the legacy Cosmos DbtProject representation and converts it to the current
nodes list representation.
Updates in-place:
* self.nodes
* self.filtered_nodes
"""
logger.info("Trying to parse the dbt project using a custom Cosmos method...")
project = LegacyDbtProject(
Expand Down Expand Up @@ -164,6 +210,16 @@ def load_via_custom_parser(self):
)

def load_from_dbt_manifest(self):
"""
This approach accurately loads `dbt` projects using the `manifest.yml` file.
However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation
to filter out the nodes relevant to the user (based on self.exclude and self.select).
Updates in-place:
* self.nodes
* self.filtered_nodes
"""
logger.info("Trying to parse the dbt project using a dbt manifest...")
nodes = {}
with open(self.project.manifest_path) as fp:
Expand Down
23 changes: 12 additions & 11 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations
import yaml
from dataclasses import dataclass
from pathlib import Path

Expand Down Expand Up @@ -29,18 +28,20 @@ def __post_init__(self):
if self.snapshots_dir is None:
self.snapshots_dir = self.pipeline_dir / "snapshots"
if self.profile_path is None:
self.update_profile_path()
self.profile_path = self.pipeline_dir / "profiles.yml"

@property
def dir(self) -> Path:
return self.root_dir / self.name

def update_profile_path(self):
if not self.profile_path:
self.profile_path = self.pipeline_dir / "profiles.yml"
if not self.profile_path.exists():
with open(self.profile_path, "w") as fp:
profile_content = {
self.name: {"target": "dev", "outputs": {"dev": {"type": "sqlite", "database": "/tmp/dummy.db"}}}
}
yaml.dump(profile_content, fp)
def is_manifest_available(self) -> bool:
"""
Checks if the `dbt` project manifest is set and if the file exists.
"""
return self.project.manifest_path and Path(self.project.manifest_path).exists()

def is_profile_yml_available(self) -> bool:
"""
Checks if the `dbt` profiles.yml file exists.
"""
return Path(self.project.profile_path).exists()

0 comments on commit ccc87f0

Please sign in to comment.