Skip to content

Commit

Permalink
Add support render for versioned dbt models. (#516)
Browse files Browse the repository at this point in the history
Add support to model versioning available since dbt 1.6.

[Model
version](https://docs.getdbt.com/docs/collaborate/govern/model-versions)
is supported by dbt-core >= 1.5.

As cosmos v1.x, it does not support rendering tasks for the model have
versions.

Use `node_dict["alias"]` as `DbtNode.name` instead of `node_dict["name"]` if `node_dict["resource_type"]` ==
`model`.

This PR also helps cosmos render the names of nodes to be correctly the
same as `identifier` if users config alias or custom the
`generate_alias_name` macro. Refer: [alias
document](https://docs.getdbt.com/reference/resource-configs/alias) and
[the `generate_alias_name`
implementation](https://github.com/dbt-labs/dbt-core/blob/ada8860e48b32ac712d92e8b0977b2c3c9749981/core/dbt/include/global_project/macros/get_custom_name/get_custom_alias.sql#L26-L30).

Closes: #260 

## Breaking Change?

When the model has a version config, each model version will create a
database relation with alias `<model_name>_v<v>`, for examples:
`dim_customers_v1`, `dim_customers_v2`...

Cosmos will create run/test tasks for each version,
for example: dim_customers_v1_run, dim_customers_v2_run,
dim_customers_v1_test, dim_customers_v2_test...

The second expectation is the dag will render the right dependencies, for
example: stg_customers_v1 >> customers_v1, stg_customers_v2 >>
customer_v2...
  • Loading branch information
binhnq94 authored Oct 6, 2023
1 parent 435a699 commit 6609f76
Show file tree
Hide file tree
Showing 26 changed files with 15,653 additions and 17 deletions.
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ repos:
name: Run codespell to check for common misspellings in files
language: python
types: [text]
args:
- --exclude-file=tests/sample/manifest_model_version.json
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
Expand Down
23 changes: 15 additions & 8 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
from __future__ import annotations

import itertools
import json
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from subprocess import Popen, PIPE
from subprocess import PIPE, Popen
from typing import Any

from cosmos.config import ProfileConfig
from cosmos.constants import (
DbtResourceType,
ExecutionMode,
LoadMode,
DBT_LOG_DIR_NAME,
DBT_LOG_FILENAME,
DBT_LOG_PATH_ENVVAR,
DBT_TARGET_PATH_ENVVAR,
DBT_LOG_DIR_NAME,
DBT_TARGET_DIR_NAME,
DBT_TARGET_PATH_ENVVAR,
DbtResourceType,
ExecutionMode,
LoadMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.dbt.parser.project import DbtProject as LegacyDbtProject
Expand Down Expand Up @@ -137,6 +138,9 @@ def load_via_dbt_ls(self) -> None:
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
line for both parsing and filtering the nodes.
Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...
Updates in-place:
* self.nodes
* self.filtered_nodes
Expand Down Expand Up @@ -252,7 +256,7 @@ def load_via_dbt_ls(self) -> None:
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict["name"],
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down Expand Up @@ -326,6 +330,9 @@ def load_from_dbt_manifest(self) -> None:
However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation
to filter out the nodes relevant to the user (based on self.exclude and self.select).
Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...
Updates in-place:
* self.nodes
* self.filtered_nodes
Expand All @@ -337,7 +344,7 @@ def load_from_dbt_manifest(self) -> None:

for unique_id, node_dict in manifest.get("nodes", {}).items():
node = DbtNode(
name=node_dict["name"],
name=node_dict.get("alias", node_dict["name"]),
unique_id=unique_id,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict["depends_on"].get("nodes", []),
Expand Down
2 changes: 1 addition & 1 deletion dev/dags/cosmos_seed_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
for seed in ["raw_customers", "raw_payments", "raw_orders"]:
DbtRunOperationOperator(
task_id=f"drop_{seed}_if_exists",
macro_name="drop_table",
macro_name="drop_table_by_name",
args={"table_name": seed},
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
Expand Down
2 changes: 1 addition & 1 deletion dev/dags/dbt/jaffle_shop/macros/drop_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{%- macro drop_table(table_name) -%}
{%- macro drop_table_by_name(table_name) -%}
{%- set drop_query -%}
DROP TABLE IF EXISTS {{ target.schema }}.{{ table_name }} CASCADE
{%- endset -%}
Expand Down
6 changes: 0 additions & 6 deletions dev/dags/dbt/jaffle_shop_python/macros/drop_table.sql

This file was deleted.

11 changes: 11 additions & 0 deletions dev/dags/dbt/model_version/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## `jaffle_shop`

`jaffle_shop` is a fictional ecommerce store. This dbt project transforms raw data from an app database into a customers and orders model ready for analytics.

See [dbt's documentation](https://github.com/dbt-labs/jaffle_shop) for more info.

### Modifications

This project has been modified from the original to highlight some of the features of Cosmos. Namely:

- tags have been added to the models
26 changes: 26 additions & 0 deletions dev/dags/dbt/model_version/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: 'jaffle_shop'

config-version: 2
version: '0.1'

profile: 'jaffle_shop'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]

target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
- "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
jaffle_shop:
materialized: table
staging:
materialized: view
70 changes: 70 additions & 0 deletions dev/dags/dbt/model_version/models/customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
with customers as (

select * from {{ ref('stg_customers') }}

),

orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

customer_orders as (

select
customer_id,

min(order_date) as first_order,
max(order_date) as most_recent_order,
count(order_id) as number_of_orders
from orders

group by customer_id

),

customer_payments as (

select
orders.customer_id,
sum(amount) as total_amount

from payments

left join orders on
payments.order_id = orders.order_id

group by orders.customer_id

),

final as (

select
customers.customer_id,
customers.first_name,
customers.last_name,
customers.full_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value

from customers

left join customer_orders
on customers.customer_id = customer_orders.customer_id

left join customer_payments
on customers.customer_id = customer_payments.customer_id

)

select * from final
69 changes: 69 additions & 0 deletions dev/dags/dbt/model_version/models/customers_v1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
with customers as (

select * from {{ ref('stg_customers', v=1) }}

),

orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

customer_orders as (

select
customer_id,

min(order_date) as first_order,
max(order_date) as most_recent_order,
count(order_id) as number_of_orders
from orders

group by customer_id

),

customer_payments as (

select
orders.customer_id,
sum(amount) as total_amount

from payments

left join orders on
payments.order_id = orders.order_id

group by orders.customer_id

),

final as (

select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value

from customers

left join customer_orders
on customers.customer_id = customer_orders.customer_id

left join customer_payments
on customers.customer_id = customer_payments.customer_id

)

select * from final
14 changes: 14 additions & 0 deletions dev/dags/dbt/model_version/models/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% docs orders_status %}

Orders can be one of the following statuses:

| status | description |
|----------------|------------------------------------------------------------------------------------------------------------------------|
| placed | The order has been placed but has not yet left the warehouse |
| shipped | The order has ben shipped to the customer and is currently in transit |
| completed | The order has been received by the customer |
| return_pending | The customer has indicated that they would like to return the order, but it has not yet been received at the warehouse |
| returned | The order has been returned by the customer and received at the warehouse |


{% enddocs %}
56 changes: 56 additions & 0 deletions dev/dags/dbt/model_version/models/orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %}

with orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

order_payments as (

select
order_id,

{% for payment_method in payment_methods -%}
sum(case when payment_method = '{{ payment_method }}' then amount else 0 end) as {{ payment_method }}_amount,
{% endfor -%}

sum(amount) as total_amount

from payments

group by order_id

),

final as (

select
orders.order_id,
orders.customer_id,
orders.order_date,
orders.status,

{% for payment_method in payment_methods -%}

order_payments.{{ payment_method }}_amount,

{% endfor -%}

order_payments.total_amount as amount

from orders


left join order_payments
on orders.order_id = order_payments.order_id

)

select * from final
Loading

0 comments on commit 6609f76

Please sign in to comment.