From 8b1d2094aa768d1f795a2d240de888e65c26f6fc Mon Sep 17 00:00:00 2001 From: richenc <125420929+richenc@users.noreply.github.com> Date: Fri, 27 Oct 2023 10:36:47 -0700 Subject: [PATCH] feat(airflow): retry callback, support ExternalTaskSensor subclasses (#8514) Co-authored-by: Richie Chen Co-authored-by: Harshal Sheth --- .../client/airflow_generator.py | 7 +++- .../datahub_plugin_v22.py | 36 ++++++++++++++++++- .../integration/goldens/v1_basic_iolets.json | 7 +++- .../integration/goldens/v1_simple_dag.json | 14 ++++++-- .../integration/goldens/v2_basic_iolets.json | 7 +++- .../v2_basic_iolets_no_dag_listener.json | 7 +++- .../integration/goldens/v2_simple_dag.json | 12 +++++-- .../v2_simple_dag_no_dag_listener.json | 14 ++++++-- .../goldens/v2_snowflake_operator.json | 7 +++- .../goldens/v2_sqlite_operator.json | 27 +++++++++++--- .../v2_sqlite_operator_no_dag_listener.json | 35 +++++++++++++++--- 11 files changed, 151 insertions(+), 22 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 16585f70e820b..e1d53be7bae6b 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -98,7 +98,7 @@ def _get_dependencies( # It is possible to tie an external sensor to DAG if external_task_id is omitted but currently we can't tie # jobflow to anothet jobflow. external_task_upstreams = [] - if task.task_type == "ExternalTaskSensor": + if isinstance(task, ExternalTaskSensor): task = cast(ExternalTaskSensor, task) if hasattr(task, "external_task_id") and task.external_task_id is not None: external_task_upstreams = [ @@ -155,6 +155,8 @@ def generate_dataflow( "_concurrency", # "_default_view", "catchup", + "description", + "doc_md", "fileloc", "is_paused_upon_creation", "start_date", @@ -431,6 +433,9 @@ def run_datajob( job_property_bag["operator"] = str(ti.operator) job_property_bag["priority_weight"] = str(ti.priority_weight) job_property_bag["log_url"] = ti.log_url + job_property_bag["orchestrator"] = "airflow" + job_property_bag["dag_id"] = str(dag.dag_id) + job_property_bag["task_id"] = str(ti.task_id) dpi.properties.update(job_property_bag) dpi.url = ti.log_url diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py index 046fbb5efaa03..f9a2119f51e32 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py @@ -23,6 +23,7 @@ TASK_ON_FAILURE_CALLBACK = "on_failure_callback" TASK_ON_SUCCESS_CALLBACK = "on_success_callback" +TASK_ON_RETRY_CALLBACK = "on_retry_callback" def get_task_inlets_advanced(task: BaseOperator, context: Any) -> Iterable[Any]: @@ -259,6 +260,28 @@ def custom_on_success_callback(context): return custom_on_success_callback +def _wrap_on_retry_callback(on_retry_callback): + def custom_on_retry_callback(context): + config = get_lineage_config() + if config.enabled: + context["_datahub_config"] = config + try: + datahub_task_status_callback( + context, status=InstanceRunResult.UP_FOR_RETRY + ) + except Exception as e: + if not config.graceful_exceptions: + raise e + else: + print(f"Exception: {traceback.format_exc()}") + + # Call original policy + if on_retry_callback: + on_retry_callback(context) + + return custom_on_retry_callback + + def task_policy(task: Union[BaseOperator, MappedOperator]) -> None: task.log.debug(f"Setting task policy for Dag: {task.dag_id} Task: {task.task_id}") # task.add_inlets(["auto"]) @@ -274,7 +297,14 @@ def task_policy(task: Union[BaseOperator, MappedOperator]) -> None: on_success_callback_prop: property = getattr( MappedOperator, TASK_ON_SUCCESS_CALLBACK ) - if not on_failure_callback_prop.fset or not on_success_callback_prop.fset: + on_retry_callback_prop: property = getattr( + MappedOperator, TASK_ON_RETRY_CALLBACK + ) + if ( + not on_failure_callback_prop.fset + or not on_success_callback_prop.fset + or not on_retry_callback_prop.fset + ): task.log.debug( "Using MappedOperator's partial_kwargs instead of callback properties" ) @@ -284,10 +314,14 @@ def task_policy(task: Union[BaseOperator, MappedOperator]) -> None: task.partial_kwargs[TASK_ON_SUCCESS_CALLBACK] = _wrap_on_success_callback( task.on_success_callback ) + task.partial_kwargs[TASK_ON_RETRY_CALLBACK] = _wrap_on_retry_callback( + task.on_retry_callback + ) return task.on_failure_callback = _wrap_on_failure_callback(task.on_failure_callback) # type: ignore task.on_success_callback = _wrap_on_success_callback(task.on_success_callback) # type: ignore + task.on_retry_callback = _wrap_on_retry_callback(task.on_retry_callback) # type: ignore # task.pre_execute = _wrap_pre_execution(task.pre_execute) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json index 26aa2afaa831a..a4c17c73e9c7e 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -373,7 +375,10 @@ "state": "success", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets", + "orchestrator": "airflow", + "dag_id": "basic_iolets", + "task_id": "run_data_task" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets", "name": "basic_iolets_run_data_task_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json index b2e3a1fe47da7..a0a95716a0993 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "'A simple DAG that runs a few fake data tasks.'", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -302,7 +304,10 @@ "state": "success", "operator": "BashOperator", "priority_weight": "2", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag", + "orchestrator": "airflow", + "dag_id": "simple_dag", + "task_id": "task_1" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag", "name": "simple_dag_task_1_manual_run_test", @@ -433,6 +438,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "'A simple DAG that runs a few fake data tasks.'", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -654,7 +661,10 @@ "state": "success", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag", + "orchestrator": "airflow", + "dag_id": "simple_dag", + "task_id": "run_another_data_task" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag", "name": "simple_dag_run_another_data_task_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 2e733c2ad40a9..1974f1f085df0 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -224,7 +226,10 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", + "orchestrator": "airflow", + "dag_id": "basic_iolets", + "task_id": "run_data_task" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", "name": "basic_iolets_run_data_task_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json index 44b288efda954..d02951bc9e82d 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -224,7 +226,10 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", + "orchestrator": "airflow", + "dag_id": "basic_iolets", + "task_id": "run_data_task" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", "name": "basic_iolets_run_data_task_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index 454c509279e11..9acc47ec1321e 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "'A simple DAG that runs a few fake data tasks.'", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -189,7 +191,10 @@ "state": "running", "operator": "BashOperator", "priority_weight": "2", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", + "orchestrator": "airflow", + "dag_id": "simple_dag", + "task_id": "task_1" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", "name": "simple_dag_task_1_manual_run_test", @@ -523,7 +528,10 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", + "orchestrator": "airflow", + "dag_id": "simple_dag", + "task_id": "run_another_data_task" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", "name": "simple_dag_run_another_data_task_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json index 73b5765e96b7d..03299c483f57f 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "'A simple DAG that runs a few fake data tasks.'", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -189,7 +191,10 @@ "state": "running", "operator": "BashOperator", "priority_weight": "2", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", + "orchestrator": "airflow", + "dag_id": "simple_dag", + "task_id": "task_1" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", "name": "simple_dag_task_1_manual_run_test", @@ -435,6 +440,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "'A simple DAG that runs a few fake data tasks.'", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -579,7 +586,10 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", + "orchestrator": "airflow", + "dag_id": "simple_dag", + "task_id": "run_another_data_task" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", "name": "simple_dag_run_another_data_task_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json index affc395d421da..11a0b17b45b95 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/snowflake_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -234,7 +236,10 @@ "state": "running", "operator": "SnowflakeOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=snowflake_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=snowflake_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "snowflake_operator", + "task_id": "transform_cost_table" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=snowflake_operator&map_index=-1", "name": "snowflake_operator_transform_cost_table_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index 81d0a71b651d9..19e4aac9fb95e 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -201,7 +203,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "5", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "create_cost_table" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_create_cost_table_manual_run_test", @@ -562,7 +567,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "4", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "populate_cost_table" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_populate_cost_table_manual_run_test", @@ -922,7 +930,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "3", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "transform_cost_table" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_transform_cost_table_manual_run_test", @@ -1364,7 +1375,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "cleanup_costs" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_cleanup_costs_manual_run_test", @@ -1658,7 +1672,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "cleanup_processed_costs" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index 96a0f02ccec17..b67464b385335 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -9,6 +9,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -201,7 +203,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "5", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "create_cost_table" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_create_cost_table_manual_run_test", @@ -460,6 +465,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -617,7 +624,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "4", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "populate_cost_table" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_populate_cost_table_manual_run_test", @@ -805,6 +815,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -1032,7 +1044,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "3", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "transform_cost_table" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_transform_cost_table_manual_run_test", @@ -1370,6 +1385,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -1529,7 +1546,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "cleanup_costs" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_cleanup_costs_manual_run_test", @@ -1719,6 +1739,8 @@ "customProperties": { "_access_control": "None", "catchup": "False", + "description": "None", + "doc_md": "None", "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", @@ -1878,7 +1900,10 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1" + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "cleanup_processed_costs" }, "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_cleanup_processed_costs_manual_run_test",