Skip to content

Commit

Permalink
[dagster-airlift] Airflow 1 content
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 21, 2024
1 parent b09f9b0 commit 4e4b51f
Show file tree
Hide file tree
Showing 4 changed files with 407 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,10 @@
{
"title": "DAG-level migration",
"path": "/integrations/airlift/full_dag"
},
{
"title": "Migration with Airflow 1",
"path": "/integrations/airlift/airlift-1-migration"
}
]
},
Expand Down
264 changes: 264 additions & 0 deletions docs/content/integrations/airlift/airflow-1-migration.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
# Airflow `1.x` Migration Tutorial

## Overview

This guide covers using `dagster-airlift` to migrate an Airflow DAG to Dagster on `apache-airflow<2.0.0`.

Many APIs within the `dagster-airlift` package make use of Airflow's stable REST API, which was added in Airflow 2.0. However, we still enable a migration process for Airflow 1.x users.

This guide will cover the migration process using the same base example as the [tutorial](integrations/airlift/tutorial/overview).

We recommend following the tutorial in order to understand the concepts and steps involved in the migration process, and then using this guide to apply those steps to an Airflow 1.x environment.

### Setup

\-- turn this into a flipper If you previously ran the Airlift tutorial, you can follow along by doing the following:

- clear `tutorial_example/dagster_defs/definitions.py`, and mark all tasks as unproxied in the proxied state YAML file.

Start by following the [setup](integrations/airlift/tutorial/setup) step of the migration tutorial, and we'll diverge from there.

With Airflow 1.x, we won't [peer](integrations/airlift/tutorial/peer) or [observe](integrations/airlift/tutorial/observe) Airflow DAGs first - we'll immediately skip to the migration step and proxy execution to Dagster.

### Scaffolding proxied state

To begin proxying tasks in a DAG, first you will need a file to track proxying state. In your Airflow DAG directory, create a `proxied_state` folder, and in it create a yaml file with the same name as your DAG. The included example at `airflow_dags/proxied_state` is used by `make airflow_run`, and can be used as a template for your own proxied state files.

Given our example DAG `rebuild_customers_list` with three tasks, `load_raw_customers`, `run_dbt_model`, and `export_customers`, `proxied_state/rebuild_customers_list.yaml` should look like the following:

```yaml file=../../airlift-migration-tutorial/tutorial_example/airflow_dags/proxied_state/rebuild_customers_list.yaml
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
proxied: False
- id: export_customers
proxied: False
```
Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG:
```python file=../../airlift-migration-tutorial/tutorial_example/snippets/dags_truncated.py
# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path

from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml

dag = DAG("rebuild_customers_list", ...)

...

# Set this to True to begin the proxying process
PROXYING = False

if PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
)
```
Set `PROXYING` to `True` or eliminate the `if` statement.

The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.)

<p align="center">

<Image
alt="Migration state rendering in Airflow UI"
src="/images/integrations/airlift/state_in_airflow.png"
width={528}
height={102}
/>

</p>

### Migrating `build_dbt_models`

We'll now create Dagster assets that correspond to each Airflow task. First, since Dagster provides out of the box integration with dbt, we'll use `dagster-dbt` to create assets for the `build_dbt_models` task in our `tutorial_example/dagster_defs/definitions.py` file:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_dbt_project_assets endbefore=end_dbt_project_assets
import os
from pathlib import Path
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

Now, we'll mark our `dbt_project_assets` as being mapped from Airflow:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_mapping endbefore=end_mapping
from dagster_airlift.core import assets_with_task_mappings
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"build_dbt_models":
# load rich set of assets from dbt project
[dbt_project_assets],
},
)
```

The `assets_with_task_mappings` function adds some metadata to each passed-in asset which, over the wire in Airflow, we'll use to determine which assets to execute in Dagster.

We'll provide the mapped assets to a `Definitions` object in our `tutorial_example/dagster_defs/definitions.py` file:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_defs endbefore=end_defs
from dagster import Definitions
defs = Definitions(
assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}
)
```

Note how this differs from the original migration tutorial; we're not using `build_defs_from_airflow_instance`, which relies on the REST API.

Finally, we'll mark the `build_dbt_models` task as proxied in the proxied state YAML file:

```yaml file=../../airlift-migration-tutorial/tutorial_example/snippets/dbt_proxied.yaml
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
proxied: True
- id: export_customers
proxied: False
```

**Important**: It may take up to 30 seconds for the proxied state in the Airflow UI to reflect this change. You must subsequently reload the definitions in Dagster via the UI or by restarting `dagster dev`.

You can now run the `rebuild_customers_list` DAG in Airflow, and the `build_dbt_models` task will be executed in a Dagster run:

<p align="center">

<Image
alt="dbt build executing in Dagster"
src="/images/integrations/airlift/proxied_dag.png"
width={1314}
height={178}
/>

</p>

### Completed code

Migrating the other tasks should follow the same pattern as in the [migration tutorial](integrations/airlift/tutorial/migrate#migrating-the-remaining-custom-operators). When you're done, your code should look like this:

```python file=../../airlift-migration-tutorial/snippets/airflow_1_migrated.py
import os
from pathlib import Path
from dagster import (
AssetExecutionContext,
AssetsDefinition,
AssetSpec,
DailyPartitionsDefinition,
Definitions,
multi_asset,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import assets_with_task_mappings
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
# Code also invoked from Airflow
from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv
from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb
PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
def airflow_dags_path() -> Path:
return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags"
def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition:
@multi_asset(name=f"load_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
load_csv_to_duckdb(args)
return _multi_asset
def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition:
@multi_asset(name=f"export_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
export_duckdb_to_csv(args)
return _multi_asset
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
partitions_def=PARTITIONS_DEF,
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [
load_csv_to_duckdb_asset(
AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF),
LoadCsvToDuckDbArgs(
table_name="raw_customers",
csv_path=airflow_dags_path() / "raw_customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
names=["id", "first_name", "last_name"],
duckdb_schema="raw_data",
duckdb_database_name="jaffle_shop",
),
)
],
"build_dbt_models":
# load rich set of assets from dbt project
[dbt_project_assets],
"export_customers": [
export_duckdb_to_csv_defs(
AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF),
ExportDuckDbToCsvArgs(
table_name="customers",
csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
duckdb_database_name="jaffle_shop",
),
)
],
},
)
defs = Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
)
```

### Conclusion

To recap, we've covered the process of migrating an Airflow 1.x DAG to Dagster using `dagster-airlift`. We've made clearer what functionality works wth Airflow < 2.0, and what does not. We've shown how to create Dagster assets that correspond to Airflow tasks, and how to mark those tasks as proxied in the proxied state YAML file.
44 changes: 44 additions & 0 deletions examples/airlift-migration-tutorial/snippets/airflow_1_dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# start_dbt_project_assets
import os
from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)


@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


# end_dbt_project_assets

# start_mapping
from dagster_airlift.core import assets_with_task_mappings

mapped_assets = assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"build_dbt_models":
# load rich set of assets from dbt project
[dbt_project_assets],
},
)
# end_mapping

# start_defs
from dagster import Definitions

defs = Definitions(
assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}
)
# end_defs
Loading

0 comments on commit 4e4b51f

Please sign in to comment.