Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Automate the dependency resolution between DbtDags that represent sub-parts of a dbt project #1321

Open
1 task done
tatiana opened this issue Nov 13, 2024 · 2 comments
Labels
area:datasets Related to the Airflow datasets feature/module area:selector Related to selector, like DAG selector, DBT selector, etc customer request An Astronomer customer made requested this enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Nov 13, 2024

Description

Automatically create cross-DAG scheduling dependencies when the user splits a dbt project into multiple DbtDag instances.

Use case

Cosmos 1.7 allows users to map a dbt project into an Airflow DAG or TaskGroup. It also allows users to easily convert a subset of dbt nodes into an Airflow DAG or Task Group using selectors. In both cases, it maps 1:1 dbt resources, such as models, into Airflow tasks and resolves their dependencies within the same Task Group or DAG.

However, the use case that Cosmos does not cover yet is to resolve the dependency of a dbt project split into multiple Airflow DAGs, affecting how they are triggered/scheduled. This feature aims to handle this use case by creating cross-DAG dependency on behalf of the user.

This feature assumes the DAGs are running within the same Airflow installation (deployment, in the case of Astro).

By resolving dependencies cross-DAGs automatically, it would also be great if we also allow users to easily:

  • Assign different schedules per subset of a dbt project while still taking into account model dependencies;
  • Assign different owners for subsets of a dbt project;
  • Assign different sets of credentials to be used by subsets of a dbt project;

While delegating the responsibility to define the dbt models dependency between those DAGs to Cosmos.

Current approach

Users can currently handle cross-DAG dependency manually with Cosmos 1.7. One approach is to leverage the Airflow Datasets generated by Cosmos and Airflow's Data-Aware scheduling.

We've included below the use-case with Airflow Datasets using the Jaffle shop:

screenshot

Let's say a team is responsible for maintaining the upstream seeds (raw_customers, raw_orders, raw_payments), and another team is responsible for maintaining the downstream transformations (the other downstream nodes). Seeds are expected to be processed hourly, and once they are ready, the transformations are supposed to be triggered. The organisation wants to have separate DAGs per team for accountability.

This use-case could be accomplished by the following DAGs, adapted from the Cosmos Demo project:

(1) Upstream seeds DAG

This DAG only executes the project's dbt seeds:

from datetime import datetime

from cosmos import DbtDag, ProjectConfig, RenderConfig

from include.profiles import airflow_db
from include.constants import jaffle_shop_path, venv_execution_config

only_seeds = DbtDag(
    project_config=ProjectConfig(jaffle_shop_path),
    profile_config=airflow_db,
    execution_config=venv_execution_config,

    # only select seed nodes
    render_config=RenderConfig(
        select=["path:seeds"],
    ),

    # Schedule on an hourly basis
    schedulel="@hourly",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="only_seeds",
    tags=["filtering"],
)

As of Cosmos 1.7, this first DAG emits three Airflow Datasets once the respective tasks are executed, depending on the Airflw connection (or dbt profile) and dbt models configuration:

  • Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")
  • Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders")
  • Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")

(2) Downstream transformations DAG

This second DAG will only be triggered once all the following datasets are updated:

  • Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")
  • Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders")
  • Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
from datetime import datetime

from cosmos import DbtDag, ProjectConfig, RenderConfig

from include.profiles import airflow_db
from include.constants import jaffle_shop_path, venv_execution_config

only_models = DbtDag(
    project_config=ProjectConfig(jaffle_shop_path),
    profile_config=airflow_db,
    execution_config=venv_execution_config,

    # only select model nodes
    render_config=RenderConfig(
        select=["path:models"],
    ),

    #  this DAG is scheduled to run after all Datasets in the list have received at least one update:
    schedule=[
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers"),
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders"),
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="only_models",
    tags=["filtering"],
)

An alternative schedule for the downstream DAG could be to be run if any of the upstream Datasets was updated using a conditional dataset expression:

    schedule=(
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")) | \
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders") |  \
        Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),

Or a combined time-based schedule with dataset expression (Airflow 2.9+), for triggering the downstream DAG either every day during midnight or when the dataset condition is met:

    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
        datasets=(
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")) | \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders") |  \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),

Challenges with current approach

While users can accomplish what they need with the current state, it is fragile. Users are expected to understand the syntax of Cosmos-generated datasets URIs. If the DbtDag RenderConfig.select clause changes, the schedules will likely have to change. If the dbt project changes, the schedule must be updated. It can be painful for end-users to manage this.

Additionally, as of Cosmos 1.7, the datasets can be specific to each environment, since they rely on properties of the Airflow connection or dbt profile.

Proposed solution

Allow users to use Cosmos to inject Dataset dependencies automatically. This has to be opt-in in Cosmos 1.x, so it is backwards-compatible.

A possible syntax could be:

    render_config=RenderConfig(
        select=["path:models"],
        auto_schedule=DbtUpstreamUpdated.AND, # only triggers the DAG when all upstream dbt models are updated
    ),

Where DbtUpstreamUpdated is a Python enum.

With this, the downstream DAG previously illustrated in (2) could be represented by:

only_models = DbtDag(
    project_config=ProjectConfig(jaffle_shop_path),
    profile_config=airflow_db,
    execution_config=venv_execution_config,

    render_config=RenderConfig(
        select=["path:models"],
        auto_schedule=DbtUpstreamUpdated.AND,
    ),
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="only_models",
    tags=["filtering"],
)

With this, Cosmos would append to the Dag schedule:

     (
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers")) | \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_orders") |  \
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_payments")
    ),

Extending the user-defined schedule if the user also had a set schedule beforehand.

Limitations with the proposed solution

The current proposal does not solve the following scenario:

  • a downstream DAG should be run once a day AND only if a particular dataset is updated since the last DAG run (or within an offset)

Airflow 2.x does not support natively this use case since there is no AND equivalent for DatasetOrTimeSchedule.

However, users could overcome this limitation by creating a DAG that runs daily and emits a "daily" dataset. The DbtDags that rely on this could be set to schedule=Dataset("daily") - in addition to the Cosmos-generated dbt Datasets.

We may need to also allow users to have the ability to say how Cosmos dependencies relate to other user-defined scheduled datasets (AND or OR).

This approach does not handle the "freshness" of previous sources.

Alternative implementations

(a) Use TriggerDagRun operators

Another alternative would be to use TriggerDagRun operators. This approach seems less flexible and would not leverage conditional dataset scheduling, that can be very valuable.

The other challenge is that Cosmos would need to be aware of how the users define their DAGs and their scope, and it would have to analyse the DAG topology and dependency resolution more complexly.

(b) Introduce a DbtDagGroup or DbtDagFamily class

We could introduce a new concept for creating a group or family of DAGs for a specific dbt project.

Users would instantiate:

DbtDagGroup(
    project_config=,
    profile_config=,
   ...
)

And Cosmos would magically create those DAGs.

There are a few ways of accomplishing this.

(i) Leveraging dbt tags

Let's say the dbt_project.yml contains something like:

version: 2
models:

    staging:
      +tags:
        - "hourly"

    marts:
      +tags:
        - "hourly"
        - "published"

    metrics:
      +tags:
        - "daily"
        - "published"

Cosmos could try to automatically split based on "daily" and "hourly" into separate DAGs. Some of the challenges may be:

  • What if there are circular dependencies between the hourly versus daily groups?
  • Do we want a single daily and hourly, or is it okay to mix them?
  • Which exact time would the hourly/daily tasks run?
  • Is this approach granular/flexible enough (to only rely on a pre-defined set of tags)?
  • This would not give end-users the flexibility to use different profiles for each sub-part of the dbt project

(ii) Explicitly configure dbt models using a Cosmos config

Let's say users were willing to annotate their models with a dedicated Cosmos configuration:

version: 2
models:
  - name: model_a
    config:
      cosmos:
        group_id:
        schedule:

Like approach (i), Cosmos would decide how to group things using this Cosmos-specific config. Some of the challenges:

  • Are models the best place to describe the schedule?
  • What happens to models that need to be annotated?
  • Similar issues to the ones described in (i) apply here

(iii) Allow users to configure multiple DAGs within a bigger DAGGrouper:

Using Python, users would define their groups, and Cosmos would resolve the dependencies between them.

cosmos_dag_group = DbtDagGroup(
   DbtDag(
     select="tag:daily",
     schedule="@daily"
   ],
   DbtDag(
     select="tag:hourly",
     schedule=DbtUpstreamUpdated.OR
   ]
)

Are you willing to submit a PR?

  • Yes, I am willing to submit a PR!
@tatiana tatiana added enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone labels Nov 13, 2024
@dosubot dosubot bot added area:datasets Related to the Airflow datasets feature/module area:selector Related to selector, like DAG selector, DBT selector, etc labels Nov 13, 2024
@tatiana tatiana added the customer request An Astronomer customer made requested this label Nov 13, 2024
@tatiana tatiana changed the title [Feature] Automate dependencies between DAGs that represent sub-parts of a dbt project [feature ]Automate the dependency resolution between DbtDags that represent sub-parts of a dbt project Nov 13, 2024
@tatiana tatiana changed the title [feature ]Automate the dependency resolution between DbtDags that represent sub-parts of a dbt project [Feature] Automate the dependency resolution between DbtDags that represent sub-parts of a dbt project Nov 13, 2024
@tatiana tatiana changed the title [Feature] Automate the dependency resolution between DbtDags that represent sub-parts of a dbt project [Feature] Automate the dependency resolution between DbtDags that represent sub-parts of a dbt project Nov 13, 2024
@tatiana tatiana added this to the Cosmos 1.10.0 milestone Nov 13, 2024
@uranusjr
Copy link
Member

Random thoughts below… text probably does not make sense unless you have context on this.

The DatasetAndTimeSchedule time can be emulated by putting a custom operator as the first task in the generated Airflow DAG. The task sets the depended assets as inlets, and use logis similar to dags_needing_dagruns to determine if the asset event conditions have been fulfilled. Skip all the tasks if the conditions are not satisfied.

Currently dags_needing_dagruns relies on AssetDagRunQueue so the task can’t just use it (it won’t be properly populated anyway). It needs to process the events directly, and call evaluate on its own. Fortunately it’s not too complicated…

(This is pseudo code)

def asset_condition_satisfied():
    def _has_events_in_period(events):
        last_event = next(iter(reversed(events)))
        return self._is_later_than_cutoff(last_event.timestamp)  # TODO: Need to implement this.

inlet_statuses = {
    inlet.uri: _has_events_in_period(inlet_events[inlet])
    for inlet in self.inlets
    if isinstance(inlet, BaseAsset)
}
if not self._condition.evaluate(inlet_statuses):
    # Skip all tasks...
    return self.skip(dag_run, ...)
DetermineAssetCondition(
    task_id="cosmos__determine_asset_condition",
    inlets=list(dbt_dag.asset_condition.iter_assets()),  # Gets a list of assets from the condition.
    cutoff=timedelta(hours=2),  # This can also be "schedule" to infer from the DAG schedule.
)

@tatiana
Copy link
Collaborator Author

tatiana commented Nov 18, 2024

That's very helpful, thanks a lot, @uranusjr!

Some notes from our discussion earlier today involving also:

It would be great if Apache Airflow itself expose this sort of interface:

    schedule=DatasetAndTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
        datasets=(
            Dataset("postgres://0.0.0.0:5432/postgres.public.raw_customers"))
    ),

With the following behaviour:

  • DAG is not run if it is not midnight
  • DAG is not run if the dataset was not updated (including if it is midnight, but the dataset had not been updated since the previous run)

A bonus would be:

  • the possibility of customizing the dataset's last update offset time (which may be different from the DAG schedule)

Summarising the feature using other words:

  • Run DAGs on a schedule, giving flexibility to decide on when to run or not
  • Also, check if the dataset condition was fulfilled in a period before the schedule
  • The schedule interval doesn't have to be necessarily the same as the period we check

While Airflow does not expose this feature, as @uranusjr detailed, Cosmos could use a branch operator at the beginning of the downstream DAG DatasetAndTimeSchedule. We'll need to check how much of Dataset Events are currently exposed in the Airflow interface.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:datasets Related to the Airflow datasets feature/module area:selector Related to selector, like DAG selector, DBT selector, etc customer request An Astronomer customer made requested this enhancement New feature or request triage-needed Items need to be reviewed / assigned to milestone
Projects
None yet
Development

No branches or pull requests

2 participants