From b9e369e2c7570bf7bb8d9ad33c4815585b7d50b5 Mon Sep 17 00:00:00 2001 From: Steffen Grohsschmiedt Date: Thu, 14 Nov 2024 12:24:21 +0100 Subject: [PATCH] fix(ingestion/airflow-plugin): fix AthenaOperator extraction The GenericSqlExtractor which is currently by the DataHub Airflow plugin to extract lineage information does not properly support the AthenaOperator and crashes with "AttributeError: 'AthenaOperator' object has no attribute 'sql'". This patch introduces a AthenaOperatorExtractor following the BigQueryInsertJobOperatorExtractor example to fix support for the AthenaOperator. --- .../src/datahub_airflow_plugin/_extractors.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py index de0d4f8711f53..b4619d744d929 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_extractors.py @@ -50,7 +50,6 @@ def __init__(self): "BigQueryOperator", "BigQueryExecuteQueryOperator", # Athena also does something similar. - "AthenaOperator", "AWSAthenaOperator", # Additional types that OL doesn't support. This is only necessary because # on older versions of Airflow, these operators don't inherit from SQLExecuteQueryOperator. @@ -59,6 +58,8 @@ def __init__(self): for operator in _sql_operator_overrides: self.task_to_extractor.extractors[operator] = GenericSqlExtractor + self.task_to_extractor.extractors["AthenaOperator"] = AthenaOperatorExtractor + self.task_to_extractor.extractors[ "BigQueryInsertJobOperator" ] = BigQueryInsertJobOperatorExtractor @@ -276,6 +277,27 @@ def extract(self) -> Optional[TaskMetadata]: ) +class AthenaOperatorExtractor(BaseExtractor): + def extract(self) -> Optional[TaskMetadata]: + from airflow.providers.amazon.aws.operators.athena import ( + AthenaOperator, # type: ignore + ) + + operator: "AthenaOperator" = self.operator + sql = operator.query + if not sql: + self.log.warning("No query found in AthenaOperator") + return None + + return _parse_sql_into_task_metadata( + self, + sql, + platform="athena", + default_database=None, + default_schema=None, + ) + + def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]: if hasattr(self.operator, "schema") and self.operator.schema is not None: return self.operator.schema