From 9a98e495434a6a73cb2459f2436a2855a987f5b0 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Sun, 20 Oct 2024 23:59:45 -0700 Subject: [PATCH] feat(ingest/fivetran): add safeguards on table/column lineage (#11674) --- .../ingestion/source/fivetran/config.py | 19 +-- .../ingestion/source/fivetran/data_classes.py | 2 +- .../ingestion/source/fivetran/fivetran.py | 23 ++- .../source/fivetran/fivetran_log_api.py | 86 +++++------ .../source/fivetran/fivetran_query.py | 143 +++++++++++------- .../integration/fivetran/test_fivetran.py | 6 +- 6 files changed, 156 insertions(+), 123 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 02eb096b240f52..2fb5ffd16ea34c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -1,6 +1,6 @@ +import dataclasses import logging -from dataclasses import dataclass, field as dataclass_field -from typing import Dict, List, Optional +from typing import Dict, Optional import pydantic from pydantic import Field, root_validator @@ -23,6 +23,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, ) +from datahub.utilities.lossy_collections import LossyList from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -114,24 +115,24 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict: return values -@dataclass +@dataclasses.dataclass class MetadataExtractionPerfReport(Report): - connectors_metadata_extraction_sec: PerfTimer = dataclass_field( + connectors_metadata_extraction_sec: PerfTimer = dataclasses.field( default_factory=PerfTimer ) - connectors_lineage_extraction_sec: PerfTimer = dataclass_field( + connectors_lineage_extraction_sec: PerfTimer = dataclasses.field( default_factory=PerfTimer ) - connectors_jobs_extraction_sec: PerfTimer = dataclass_field( + connectors_jobs_extraction_sec: PerfTimer = dataclasses.field( default_factory=PerfTimer ) -@dataclass +@dataclasses.dataclass class FivetranSourceReport(StaleEntityRemovalSourceReport): connectors_scanned: int = 0 - filtered_connectors: List[str] = dataclass_field(default_factory=list) - metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field( + filtered_connectors: LossyList[str] = dataclasses.field(default_factory=LossyList) + metadata_extraction_perf: MetadataExtractionPerfReport = dataclasses.field( default_factory=MetadataExtractionPerfReport ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py index 18de2b01edd3b7..046aa9efe3f59b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py @@ -24,7 +24,7 @@ class Connector: sync_frequency: int destination_id: str user_id: str - table_lineage: List[TableLineage] + lineage: List[TableLineage] jobs: List["Job"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 334bb58ea84f8e..c27ec57c2e99ec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -27,9 +27,10 @@ PlatformDetail, ) from datahub.ingestion.source.fivetran.data_classes import Connector, Job -from datahub.ingestion.source.fivetran.fivetran_log_api import ( +from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI +from datahub.ingestion.source.fivetran.fivetran_query import ( MAX_JOBS_PER_CONNECTOR, - FivetranLogAPI, + MAX_TABLE_LINEAGE_PER_CONNECTOR, ) from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, @@ -106,13 +107,21 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity." ) - for table_lineage in connector.table_lineage: + if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR: + self.report.warning( + title="Table lineage truncated", + message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. " + f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.", + context=f"{connector.connector_name} (connector_id: {connector.connector_id})", + ) + + for lineage in connector.lineage: input_dataset_urn = DatasetUrn.create_from_ids( platform_id=source_platform, table_name=( - f"{source_database.lower()}.{table_lineage.source_table}" + f"{source_database.lower()}.{lineage.source_table}" if source_database - else table_lineage.source_table + else lineage.source_table ), env=source_platform_detail.env, platform_instance=source_platform_detail.platform_instance, @@ -121,14 +130,14 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: output_dataset_urn = DatasetUrn.create_from_ids( platform_id=self.config.fivetran_log_config.destination_platform, - table_name=f"{self.audit_log.fivetran_log_database.lower()}.{table_lineage.destination_table}", + table_name=f"{self.audit_log.fivetran_log_database.lower()}.{lineage.destination_table}", env=destination_platform_detail.env, platform_instance=destination_platform_detail.platform_instance, ) output_dataset_urn_list.append(output_dataset_urn) if self.config.include_column_lineage: - for column_lineage in table_lineage.column_lineage: + for column_lineage in lineage.column_lineage: fine_grained_lineage.append( FineGrainedLineage( upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 5908efe39e2b40..b55c8bbbd607fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -1,6 +1,7 @@ import functools import json import logging +from collections import defaultdict from typing import Any, Dict, List, Optional, Tuple import sqlglot @@ -22,10 +23,6 @@ logger: logging.Logger = logging.getLogger(__name__) -# We don't want to generate a massive number of dataProcesses for a single connector. -# This is primarily used as a safeguard to prevent performance issues. -MAX_JOBS_PER_CONNECTOR = 1000 - class FivetranLogAPI: def __init__(self, fivetran_log_config: FivetranLogConfig) -> None: @@ -91,55 +88,51 @@ def _query(self, query: str) -> List[Dict]: resp = self.engine.execute(query) return [row for row in resp] - def _get_column_lineage_metadata(self) -> Dict[str, List]: + def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]: """ - Return's dict of column lineage metadata with key as '-' + Returns dict of column lineage metadata with key as (, ) """ - all_column_lineage: Dict[str, List] = {} + all_column_lineage = defaultdict(list) column_lineage_result = self._query( self.fivetran_log_query.get_column_lineage_query() ) for column_lineage in column_lineage_result: - key = f"{column_lineage[Constant.SOURCE_TABLE_ID]}-{column_lineage[Constant.DESTINATION_TABLE_ID]}" - if key not in all_column_lineage: - all_column_lineage[key] = [column_lineage] - else: - all_column_lineage[key].append(column_lineage) - return all_column_lineage + key = ( + column_lineage[Constant.SOURCE_TABLE_ID], + column_lineage[Constant.DESTINATION_TABLE_ID], + ) + all_column_lineage[key].append(column_lineage) + return dict(all_column_lineage) - def _get_connectors_table_lineage_metadata(self) -> Dict[str, List]: + def _get_table_lineage_metadata(self) -> Dict[str, List]: """ - Return's dict of table lineage metadata with key as 'CONNECTOR_ID' + Returns dict of table lineage metadata with key as 'CONNECTOR_ID' """ - connectors_table_lineage_metadata: Dict[str, List] = {} + connectors_table_lineage_metadata = defaultdict(list) table_lineage_result = self._query( self.fivetran_log_query.get_table_lineage_query() ) for table_lineage in table_lineage_result: - if ( + connectors_table_lineage_metadata[ table_lineage[Constant.CONNECTOR_ID] - not in connectors_table_lineage_metadata - ): - connectors_table_lineage_metadata[ - table_lineage[Constant.CONNECTOR_ID] - ] = [table_lineage] - else: - connectors_table_lineage_metadata[ - table_lineage[Constant.CONNECTOR_ID] - ].append(table_lineage) - return connectors_table_lineage_metadata + ].append(table_lineage) + return dict(connectors_table_lineage_metadata) - def _get_table_lineage( + def _extract_connector_lineage( self, - column_lineage_metadata: Dict[str, List], table_lineage_result: Optional[List], + column_lineage_metadata: Dict[Tuple[str, str], List], ) -> List[TableLineage]: table_lineage_list: List[TableLineage] = [] if table_lineage_result is None: return table_lineage_list for table_lineage in table_lineage_result: + # Join the column lineage into the table lineage. column_lineage_result = column_lineage_metadata.get( - f"{table_lineage[Constant.SOURCE_TABLE_ID]}-{table_lineage[Constant.DESTINATION_TABLE_ID]}" + ( + table_lineage[Constant.SOURCE_TABLE_ID], + table_lineage[Constant.DESTINATION_TABLE_ID], + ) ) column_lineage_list: List[ColumnLineage] = [] if column_lineage_result: @@ -152,6 +145,7 @@ def _get_table_lineage( ) for column_lineage in column_lineage_result ] + table_lineage_list.append( TableLineage( source_table=f"{table_lineage[Constant.SOURCE_SCHEMA_NAME]}.{table_lineage[Constant.SOURCE_TABLE_NAME]}", @@ -167,14 +161,9 @@ def _get_all_connector_sync_logs( ) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]: sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {} - # Format connector_ids as a comma-separated string of quoted IDs - formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) - - query = self.fivetran_log_query.get_sync_logs_query().format( - db_clause=self.fivetran_log_query.db_clause, + query = self.fivetran_log_query.get_sync_logs_query( syncs_interval=syncs_interval, - max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR, - connector_ids=formatted_connector_ids, + connector_ids=connector_ids, ) for row in self._query(query): @@ -234,13 +223,13 @@ def get_user_email(self, user_id: str) -> Optional[str]: return None return self._get_users().get(user_id) - def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None: - table_lineage_metadata = self._get_connectors_table_lineage_metadata() + def _fill_connectors_lineage(self, connectors: List[Connector]) -> None: + table_lineage_metadata = self._get_table_lineage_metadata() column_lineage_metadata = self._get_column_lineage_metadata() for connector in connectors: - connector.table_lineage = self._get_table_lineage( - column_lineage_metadata=column_lineage_metadata, + connector.lineage = self._extract_connector_lineage( table_lineage_result=table_lineage_metadata.get(connector.connector_id), + column_lineage_metadata=column_lineage_metadata, ) def _fill_connectors_jobs( @@ -262,6 +251,7 @@ def get_allowed_connectors_list( ) -> List[Connector]: connectors: List[Connector] = [] with report.metadata_extraction_perf.connectors_metadata_extraction_sec: + logger.info("Fetching connector list") connector_list = self._query(self.fivetran_log_query.get_connectors_query()) for connector in connector_list: if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): @@ -279,12 +269,20 @@ def get_allowed_connectors_list( sync_frequency=connector[Constant.SYNC_FREQUENCY], destination_id=connector[Constant.DESTINATION_ID], user_id=connector[Constant.CONNECTING_USER_ID], - table_lineage=[], - jobs=[], + lineage=[], # filled later + jobs=[], # filled later ) ) + + if not connectors: + # Some of our queries don't work well when there's no connectors, since + # we push down connector id filters. + return [] + with report.metadata_extraction_perf.connectors_lineage_extraction_sec: - self._fill_connectors_table_lineage(connectors) + logger.info("Fetching connector lineage") + self._fill_connectors_lineage(connectors) with report.metadata_extraction_perf.connectors_jobs_extraction_sec: + logger.info("Fetching connector job run history") self._fill_connectors_jobs(connectors, syncs_interval) return connectors diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index c4680b4b1037a2..c9e329b706768f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -1,3 +1,11 @@ +from typing import List + +# Safeguards to prevent fetching massive amounts of data. +MAX_TABLE_LINEAGE_PER_CONNECTOR = 100 +MAX_COLUMN_LINEAGE_PER_CONNECTOR = 3000 +MAX_JOBS_PER_CONNECTOR = 1000 + + class FivetranLogQuery: # Note: All queries are written in Snowflake SQL. # They will be transpiled to the target database's SQL dialect at runtime. @@ -24,69 +32,88 @@ def get_connectors_query(self) -> str: destination_id FROM {self.db_clause}connector WHERE - _fivetran_deleted = FALSE\ + _fivetran_deleted = FALSE """ def get_users_query(self) -> str: - return f""" - SELECT id as user_id, - given_name, - family_name, - email - FROM {self.db_clause}user""" + return f"""\ +SELECT id as user_id, +given_name, +family_name, +email +FROM {self.db_clause}user +""" - def get_sync_logs_query(self) -> str: - return """ - WITH ranked_syncs AS ( - SELECT - connector_id, - sync_id, - MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time, - MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, - MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, - ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn - FROM {db_clause}log - WHERE message_event in ('sync_start', 'sync_end') - AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' - AND connector_id IN ({connector_ids}) - GROUP BY connector_id, sync_id - ) - SELECT - connector_id, - sync_id, - start_time, - end_time, - end_message_data - FROM ranked_syncs - WHERE rn <= {max_jobs_per_connector} - AND start_time IS NOT NULL - AND end_time IS NOT NULL - ORDER BY connector_id, end_time DESC - """ + def get_sync_logs_query( + self, + syncs_interval: int, + connector_ids: List[str], + ) -> str: + # Format connector_ids as a comma-separated string of quoted IDs + formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) + + return f"""\ +WITH ranked_syncs AS ( + SELECT + connector_id, + sync_id, + MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time, + MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, + MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, + ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn + FROM {self.db_clause}log + WHERE message_event in ('sync_start', 'sync_end') + AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' + AND connector_id IN ({formatted_connector_ids}) + GROUP BY connector_id, sync_id +) +SELECT + connector_id, + sync_id, + start_time, + end_time, + end_message_data +FROM ranked_syncs +WHERE rn <= {MAX_JOBS_PER_CONNECTOR} + AND start_time IS NOT NULL + AND end_time IS NOT NULL +ORDER BY connector_id, end_time DESC +""" def get_table_lineage_query(self) -> str: - return f""" - SELECT stm.connector_id as connector_id, - stm.id as source_table_id, - stm.name as source_table_name, - ssm.name as source_schema_name, - dtm.id as destination_table_id, - dtm.name as destination_table_name, - dsm.name as destination_schema_name - FROM {self.db_clause}table_lineage as tl - JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id - JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id - JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id - JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id""" + return f"""\ +SELECT + stm.connector_id as connector_id, + stm.id as source_table_id, + stm.name as source_table_name, + ssm.name as source_schema_name, + dtm.id as destination_table_id, + dtm.name as destination_table_name, + dsm.name as destination_schema_name +FROM {self.db_clause}table_lineage as tl +JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id +JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id +JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id +JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id +QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} +ORDER BY stm.connector_id, tl.created_at DESC +""" def get_column_lineage_query(self) -> str: - return f""" - SELECT scm.table_id as source_table_id, - dcm.table_id as destination_table_id, - scm.name as source_column_name, - dcm.name as destination_column_name - FROM {self.db_clause}column_lineage as cl - JOIN {self.db_clause}source_column_metadata as scm - on cl.source_column_id = scm.id - JOIN {self.db_clause}destination_column_metadata as dcm - on cl.destination_column_id = dcm.id""" + return f"""\ +SELECT + scm.table_id as source_table_id, + dcm.table_id as destination_table_id, + scm.name as source_column_name, + dcm.name as destination_column_name +FROM {self.db_clause}column_lineage as cl +JOIN {self.db_clause}source_column_metadata as scm + ON cl.source_column_id = scm.id +JOIN {self.db_clause}destination_column_metadata as dcm + ON cl.destination_column_id = dcm.id +-- Only joining source_table_metadata to get the connector_id. +JOIN {self.db_clause}source_table_metadata as stm + ON scm.table_id = stm.id +QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} +ORDER BY stm.connector_id, cl.created_at DESC +""" diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 33ac09e69a3c0a..e72162b12e48fd 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -100,11 +100,9 @@ def default_query_results( "email": "abc.xyz@email.com", } ] - elif query == fivetran_log_query.get_sync_logs_query().format( - db_clause=fivetran_log_query.db_clause, + elif query == fivetran_log_query.get_sync_logs_query( syncs_interval=7, - max_jobs_per_connector=1000, - connector_ids="'calendar_elected'", + connector_ids=["calendar_elected"], ): return [ {