Skip to content

Commit

Permalink
Merge branch 'main' into add-upstream-model-with-var
Browse files Browse the repository at this point in the history
  • Loading branch information
dojinkimm authored Sep 1, 2023
2 parents 23eecb7 + 13087d5 commit 55448e1
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 44 deletions.
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ class RenderConfig:
:param load_method: The parsing method for loading the dbt model. Defaults to AUTOMATIC
:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
"""

emit_datasets: bool = True
test_behavior: TestBehavior = TestBehavior.AFTER_EACH
load_method: LoadMode = LoadMode.AUTOMATIC
select: list[str] = field(default_factory=list)
exclude: list[str] = field(default_factory=list)
dbt_deps: bool = True


@dataclass
Expand Down
2 changes: 2 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def __init__(
test_behavior = render_config.test_behavior
select = render_config.select
exclude = render_config.exclude
dbt_deps = render_config.dbt_deps
execution_mode = execution_config.execution_mode
load_mode = render_config.load_method
manifest_path = project_config.parsed_manifest_path
Expand Down Expand Up @@ -146,6 +147,7 @@ def __init__(
dbt_cmd=dbt_executable_path,
profile_config=profile_config,
operator_args=operator_args,
dbt_deps=dbt_deps,
)
dbt_graph.load(method=load_mode, execution_mode=execution_mode)

Expand Down
79 changes: 54 additions & 25 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,17 @@ def __init__(
dbt_cmd: str = get_system_dbt(),
profile_config: ProfileConfig | None = None,
operator_args: dict[str, Any] | None = None,
dbt_deps: bool | None = True,
):
self.project = project
self.exclude = exclude or []
self.select = select or []
self.profile_config = profile_config
self.operator_args = operator_args or {}
self.dbt_deps = dbt_deps

# specific to loading using ls
self.dbt_deps = dbt_deps
self.dbt_cmd = dbt_cmd

def load(
Expand Down Expand Up @@ -145,14 +148,6 @@ def load_via_dbt_ls(self) -> None:
if not shutil.which(self.dbt_cmd):
raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}")

command = [self.dbt_cmd, "ls", "--output", "json"]

if self.exclude:
command.extend(["--exclude", *self.exclude])

if self.select:
command.extend(["--select", *self.select])

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values:
(profile_path, env_vars) = profile_values
env = os.environ.copy()
Expand All @@ -169,27 +164,56 @@ def load_via_dbt_ls(self) -> None:
if child_name not in ignore_paths:
os.symlink(self.project.dir / child_name, tmpdir_path / child_name)

command.extend(
[
"--project-dir",
str(tmpdir),
"--profiles-dir",
str(profile_path.parent),
"--profile",
self.profile_config.profile_name,
"--target",
self.profile_config.target_name,
]
)
logger.info("Running command: `%s`", " ".join(command))
logger.info("Environment variable keys: %s", env.keys())
local_flags = [
"--project-dir",
str(tmpdir),
"--profiles-dir",
str(profile_path.parent),
"--profile",
self.profile_config.profile_name,
"--target",
self.profile_config.target_name,
]
log_dir = Path(env.get(DBT_LOG_PATH_ENVVAR) or tmpdir_path / DBT_LOG_DIR_NAME)
target_dir = Path(env.get(DBT_TARGET_PATH_ENVVAR) or tmpdir_path / DBT_TARGET_DIR_NAME)
env[DBT_LOG_PATH_ENVVAR] = str(log_dir)
env[DBT_TARGET_PATH_ENVVAR] = str(target_dir)

if self.dbt_deps:
deps_command = [self.dbt_cmd, "deps"]
deps_command.extend(local_flags)
logger.info("Running command: `%s`", " ".join(deps_command))
logger.info("Environment variable keys: %s", env.keys())
process = Popen(
deps_command,
stdout=PIPE,
stderr=PIPE,
cwd=tmpdir,
universal_newlines=True,
env=env,
)
stdout, stderr = process.communicate()
logger.debug("dbt deps output: %s", stdout)

if stderr or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}")

ls_command = [self.dbt_cmd, "ls", "--output", "json"]

if self.exclude:
ls_command.extend(["--exclude", *self.exclude])

if self.select:
ls_command.extend(["--select", *self.select])

ls_command.extend(local_flags)

logger.info("Running command: `%s`", " ".join(ls_command))
logger.info("Environment variable keys: %s", env.keys())

process = Popen(
command,
ls_command,
stdout=PIPE,
stderr=PIPE,
cwd=tmpdir,
Expand All @@ -208,8 +232,13 @@ def load_via_dbt_ls(self) -> None:
logger.debug(line.strip())

if stderr or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run the command due to the error:\n{details}")
if 'Run "dbt deps" to install package dependencies' in stdout:
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)
else:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}")

nodes = {}
for line in stdout.split("\n"):
Expand Down
8 changes: 4 additions & 4 deletions cosmos/profiles/snowflake/user_privatekey.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class SnowflakePrivateKeyPemProfileMapping(BaseProfileMapping):
"database",
"warehouse",
"schema",
"private_key_content",
"private_key",
]
secret_fields = [
"private_key_content",
"private_key",
]
airflow_param_mapping = {
"account": "extra.account",
Expand All @@ -39,7 +39,7 @@ class SnowflakePrivateKeyPemProfileMapping(BaseProfileMapping):
"warehouse": "extra.warehouse",
"schema": "schema",
"role": "extra.role",
"private_key_content": "extra.private_key_content",
"private_key": "extra.private_key_content",
}

@property
Expand Down Expand Up @@ -68,7 +68,7 @@ def profile(self) -> dict[str, Any | None]:
**self.mapped_params,
**self.profile_args,
# private_key should always get set as env var
"private_key_content": self.get_env_var_format("private_key_content"),
"private_key": self.get_env_var_format("private_key"),
}

# remove any null values
Expand Down
1 change: 1 addition & 0 deletions dev/dags/basic_cosmos_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
operator_args={"install_deps": True},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
Expand Down
1 change: 1 addition & 0 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def basic_cosmos_task_group() -> None:
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
operator_args={"install_deps": True},
profile_config=profile_config,
)

Expand Down
1 change: 1 addition & 0 deletions dev/dags/cosmos_profile_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def cosmos_profile_mapping() -> None:
profile_args={"schema": "public"},
),
),
operator_args={"install_deps": True},
)

post_dbt = EmptyOperator(task_id="post_dbt")
Expand Down
2 changes: 2 additions & 0 deletions dev/dags/cosmos_seed_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@
args={"table_name": seed},
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
install_deps=True,
)

jaffle_shop_seed = DbtSeedOperator(
task_id="seed_jaffle_shop",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
outlets=[Dataset("SEED://JAFFLE_SHOP")],
profile_config=profile_config,
install_deps=True,
)

drop_seeds >> jaffle_shop_seed
3 changes: 3 additions & 0 deletions dev/dags/dbt/jaffle_shop/packages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
packages:
- package: dbt-labs/dbt_utils
version: "1.1.1"
1 change: 1 addition & 0 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
},
# normal dag parameters
schedule_interval="@daily",
Expand Down
4 changes: 1 addition & 3 deletions dev/dags/user_defined_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ def user_defined_profile() -> None:
target_name="dev",
profiles_yml_filepath=PROFILES_FILE_PATH,
),
operator_args={
"append_env": True,
},
operator_args={"append_env": True, "install_deps": True},
)

post_dbt = EmptyOperator(task_id="post_dbt")
Expand Down
14 changes: 8 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,27 @@ test = 'pytest -vv --durations=0 . -m "not integration" --ignore=tests/test_exam
test-cov = 'pytest -vv --cov=cosmos --cov-report=term-missing --cov-report=xml --durations=0 -m "not integration" --ignore=tests/test_example_dags.py --ignore=tests/test_example_dags_no_connections.py'
# we install using the following workaround to overcome installation conflicts, such as:
# apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies
test-integration-setup = """pip uninstall dbt-postgres; \
test-integration-setup = """pip uninstall dbt-postgres dbt-databricks; \
rm -rf airflow.*; \
airflow db init; \
pip install 'dbt-postgres<=1.5.4' 'dbt-databricks<=1.5.4'"""
test-integration = """pytest -vv \
pip install 'dbt-postgres<=1.5.4' 'dbt-databricks<=1.5.5'"""
test-integration = """rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
--durations=0 \
-m integration \
-k 'not (test_example_dag[example_cosmos_python_models] or test_example_dag[example_virtualenv])'
-k 'not (test_example_dags_no_connections[example_cosmos_python_models] or test_example_dag[example_cosmos_python_models] or test_example_dag[example_virtualenv])'
"""
test-integration-expensive = """pytest -vv \
test-integration-expensive = """rm -rf dbt/jaffle_shop/dbt_packages;
pytest -vv \
--cov=cosmos \
--cov-report=term-missing \
--cov-report=xml \
--durations=0 \
-m integration \
-k 'test_example_dag[example_cosmos_python_models] or test_example_dag[example_virtualenv]'"""
-k 'test_example_dags_no_connections[example_cosmos_python_models] or test_example_dag[example_cosmos_python_models] or test_example_dag[example_virtualenv]'"""

[tool.pytest.ini_options]
filterwarnings = [
Expand Down
26 changes: 25 additions & 1 deletion tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,30 @@ def test_load_via_dbt_ls_with_invalid_dbt_path():
assert err_info.value.args[0] == expected


@pytest.mark.integration
def test_load_via_dbt_ls_without_dbt_deps():
pipeline_name = "jaffle_shop"
dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR)
dbt_graph = DbtGraph(
dbt_deps=False,
project=dbt_project,
profile_config=ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
),
)

with pytest.raises(CosmosLoadDbtException) as err_info:
dbt_graph.load_via_dbt_ls()

expected = "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
assert err_info.value.args[0] == expected


@pytest.mark.integration
@patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", ""))
def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate):
Expand All @@ -270,7 +294,7 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate):
)
with pytest.raises(CosmosLoadDbtException) as err_info:
dbt_graph.load_via_dbt_ls()
expected = "Unable to run the command due to the error:\nSome Runtime Error"
expected = "Unable to run dbt deps command due to the error:\nSome Runtime Error"
assert err_info.value.args[0] == expected
mock_popen_communicate.assert_called_once()

Expand Down
10 changes: 5 additions & 5 deletions tests/profiles/snowflake/test_snowflake_user_privatekey.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_profile_args(
assert profile_mapping.profile == {
"type": mock_snowflake_conn.conn_type,
"user": mock_snowflake_conn.login,
"private_key_content": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY_CONTENT') }}",
"private_key": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY') }}",
"schema": mock_snowflake_conn.schema,
"account": mock_snowflake_conn.extra_dejson.get("account"),
"database": mock_snowflake_conn.extra_dejson.get("database"),
Expand All @@ -158,7 +158,7 @@ def test_profile_args_overrides(
assert profile_mapping.profile == {
"type": mock_snowflake_conn.conn_type,
"user": mock_snowflake_conn.login,
"private_key_content": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY_CONTENT') }}",
"private_key": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY') }}",
"schema": mock_snowflake_conn.schema,
"account": mock_snowflake_conn.extra_dejson.get("account"),
"database": "my_db_override",
Expand All @@ -176,7 +176,7 @@ def test_profile_env_vars(
mock_snowflake_conn.conn_id,
)
assert profile_mapping.env_vars == {
"COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY_CONTENT": mock_snowflake_conn.extra_dejson.get("private_key_content"),
"COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY": mock_snowflake_conn.extra_dejson.get("private_key_content"),
}


Expand Down Expand Up @@ -204,7 +204,7 @@ def test_old_snowflake_format() -> None:
assert profile_mapping.profile == {
"type": conn.conn_type,
"user": conn.login,
"private_key_content": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY_CONTENT') }}",
"private_key": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY') }}",
"schema": conn.schema,
"account": conn.extra_dejson.get("account"),
"database": conn.extra_dejson.get("database"),
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_appends_region() -> None:
assert profile_mapping.profile == {
"type": conn.conn_type,
"user": conn.login,
"private_key_content": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY_CONTENT') }}",
"private_key": "{{ env_var('COSMOS_CONN_SNOWFLAKE_PRIVATE_KEY') }}",
"schema": conn.schema,
"account": f"{conn.extra_dejson.get('account')}.{conn.extra_dejson.get('region')}",
"database": conn.extra_dejson.get("database"),
Expand Down

0 comments on commit 55448e1

Please sign in to comment.