Skip to content

Commit

Permalink
Override datajob external_url.
Browse files Browse the repository at this point in the history
  • Loading branch information
Peng Gao committed Jan 5, 2024
1 parent a593883 commit 14d5a9d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class DatahubLineageConfig(ConfigModel):
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True

# Override the external urls of datajob as Airflow DAG page.
override_datajob_url: bool = False

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub.utilities.urns.data_job_urn import DataJobUrn

from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED
from datahub_airflow_plugin.lineage.datahub import get_lineage_backend_config

assert AIRFLOW_PATCHED

Expand Down Expand Up @@ -269,6 +270,11 @@ def generate_datajob(
base_url = conf.get("webserver", "base_url")
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"

config = get_lineage_backend_config()

if config.override_datajob_url:
datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}"

if capture_owner and dag.owner:
datajob.owners.add(dag.owner)

Expand Down

0 comments on commit 14d5a9d

Please sign in to comment.