Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] proxy state
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 1, 2024
1 parent b891cd8 commit ce2496a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 1 deletion.
13 changes: 12 additions & 1 deletion docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ Proxying Function

.. autofunction:: proxying_to_dagster

Proxying State
~~~~~~~~~~~~~~

.. autofunction:: load_proxied_state_from_yaml

.. autoclass:: AirflowProxiedState

.. autoclass:: DagProxiedState

.. autoclass:: TaskProxiedState

Operators
~~~~~~~~~

Expand All @@ -75,7 +86,7 @@ Task-level proxying
.. autoclass:: BaseProxyTaskToDagsterOperator

.. autoclass:: DefaultProxyTaskToDagsterOperator

Dag-level proxying
~~~~~~~~~~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def ensure_airflow_installed() -> None:
)
from .proxied_state import (
AirflowProxiedState as AirflowProxiedState,
DagProxiedState as DagProxiedState,
TaskProxiedState as TaskProxiedState,
load_proxied_state_from_yaml as load_proxied_state_from_yaml,
)
from .proxying_fn import proxying_to_dagster as proxying_to_dagster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@


class TaskProxiedState(NamedTuple):
"""A class to store the proxied state of a task.
Args:
task_id (str): The id of the task.
proxied (bool): A boolean indicating whether the task is proxied.
"""

task_id: str
proxied: bool

Expand All @@ -23,6 +30,15 @@ def to_dict(self) -> Dict[str, Any]:


class DagProxiedState(NamedTuple):
"""A class to store the proxied state of tasks in a dag.
Args:
tasks (Dict[str, TaskProxiedState]): A dictionary of task_id to TaskProxiedState. If the entire dag is proxied, or proxied state
is not set for a task, the task_id will not be present in this dictionary.
proxied (Optional[bool]): A boolean indicating whether the entire dag is proxied. If this is None, then the dag proxies at the task level (or
proxying state has not been set at all).
"""

proxied: Optional[bool]
tasks: Dict[str, TaskProxiedState]

Expand Down Expand Up @@ -75,6 +91,13 @@ def dag_proxies_at_dag_level(self) -> bool:


class AirflowProxiedState(NamedTuple):
"""A class to store the proxied state of dags and tasks in Airflow.
Typically, this is constructed by :py:func:`load_proxied_state_from_yaml`.
Args:
dags (Dict[str, DagProxiedState]): A dictionary of dag_id to DagProxiedState.
"""

dags: Dict[str, DagProxiedState]

def get_task_proxied_state(self, *, dag_id: str, task_id: str) -> Optional[bool]:
Expand Down Expand Up @@ -128,6 +151,31 @@ class ProxiedStateParsingError(Exception):


def load_proxied_state_from_yaml(proxied_yaml_path: Path) -> AirflowProxiedState:
"""Loads the proxied state from a directory of yaml files.
Expects the directory to contain yaml files, where each file corresponds to the id of a dag (ie: `dag_id.yaml`).
This directory is typically constructed using the `dagster-airlift` CLI:
.. code-block:: bash
AIRFLOW_HOME=... dagster-airlift proxy scaffold
The file should have either of the following structure.
In the case of task-level proxying:
.. code-block:: yaml
tasks:
- id: task_id
proxied: true
- id: task_id
proxied: false
In the case of dag-level proxying:
.. code-block:: yaml
proxied: true
Args:
proxied_yaml_path (Path): The path to the directory containing the yaml files.
Returns:
AirflowProxiedState: The proxied state of the dags and tasks in Airflow.
"""
# Expect proxied_yaml_path to be a directory, where each file represents a dag, and each
# file in the subdir represents a task. The dictionary for each task should contain two keys;
# id: the task id, and proxied: a boolean indicating whether the task has been proxied.
Expand Down

0 comments on commit ce2496a

Please sign in to comment.