From eab2ac7a2e708e213d75c245589978d2b2aec246 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Wed, 23 Oct 2024 14:02:08 +0530 Subject: [PATCH] =?UTF-8?q?feat(ingest/snowflake):=20support=20lineage=20v?= =?UTF-8?q?ia=20rename=20and=20swap=20using=20que=E2=80=A6=20(#11600)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ingestion/source/redshift/lineage.py | 19 +- .../ingestion/source/redshift/lineage_v2.py | 6 +- .../source/snowflake/snowflake_queries.py | 75 ++- .../sql_parsing/sql_parsing_aggregator.py | 169 ++++-- .../aggregator_goldens/test_table_rename.json | 236 ++++++++- .../test_table_rename_with_temp.json | 303 +++++++++++ .../aggregator_goldens/test_table_swap.json | 490 ++++++++++++++++++ .../test_table_swap_with_temp.json | 271 ++++++++++ ...t_table_swap_with_temp_with_preparsed.json | 298 +++++++++++ .../unit/sql_parsing/test_sql_aggregator.py | 251 ++++++++- 10 files changed, 2037 insertions(+), 81 deletions(-) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp_with_preparsed.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index fe491ccb31850..192c97bbee348 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -53,6 +53,7 @@ ) from datahub.metadata.urns import DatasetUrn from datahub.sql_parsing.schema_resolver import SchemaResolver +from datahub.sql_parsing.sql_parsing_aggregator import TableRename from datahub.sql_parsing.sqlglot_utils import get_dialect, parse_statement from datahub.utilities import memory_footprint from datahub.utilities.dedup_list import deduplicate_list @@ -504,21 +505,21 @@ def _populate_lineage_map( self.report_status(f"extract-{lineage_type.name}", False) def _update_lineage_map_for_table_renames( - self, table_renames: Dict[str, str] + self, table_renames: Dict[str, TableRename] ) -> None: if not table_renames: return logger.info(f"Updating lineage map for {len(table_renames)} table renames") - for new_table_urn, prev_table_urn in table_renames.items(): + for entry in table_renames.values(): # This table was renamed from some other name, copy in the lineage # for the previous name as well. - prev_table_lineage = self._lineage_map.get(prev_table_urn) + prev_table_lineage = self._lineage_map.get(entry.original_urn) if prev_table_lineage: logger.debug( - f"including lineage for {prev_table_urn} in {new_table_urn} due to table rename" + f"including lineage for {entry.original_urn} in {entry.new_urn} due to table rename" ) - self._lineage_map[new_table_urn].merge_lineage( + self._lineage_map[entry.new_urn].merge_lineage( upstreams=prev_table_lineage.upstreams, cll=prev_table_lineage.cll, ) @@ -672,7 +673,7 @@ def populate_lineage( for db, schemas in all_tables.items() } - table_renames: Dict[str, str] = {} + table_renames: Dict[str, TableRename] = {} if self.config.include_table_rename_lineage: table_renames, all_tables_set = self._process_table_renames( database=database, @@ -851,11 +852,11 @@ def _process_table_renames( database: str, connection: redshift_connector.Connection, all_tables: Dict[str, Dict[str, Set[str]]], - ) -> Tuple[Dict[str, str], Dict[str, Dict[str, Set[str]]]]: + ) -> Tuple[Dict[str, TableRename], Dict[str, Dict[str, Set[str]]]]: logger.info(f"Processing table renames for db {database}") # new urn -> prev urn - table_renames: Dict[str, str] = {} + table_renames: Dict[str, TableRename] = {} query = self.queries.alter_table_rename_query( db_name=database, @@ -893,7 +894,7 @@ def _process_table_renames( env=self.config.env, ) - table_renames[new_urn] = prev_urn + table_renames[new_urn] = TableRename(prev_urn, new_urn, query_text) # We want to generate lineage for the previous name too. all_tables[database][schema].add(prev_name) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 53f9383ec02a7..e7cc5fc50cf6e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -146,10 +146,8 @@ def build( lambda: collections.defaultdict(set) ), ) - for new_urn, original_urn in table_renames.items(): - self.aggregator.add_table_rename( - original_urn=original_urn, new_urn=new_urn - ) + for entry in table_renames.values(): + self.aggregator.add_table_rename(entry) if self.config.table_lineage_mode in { LineageMode.SQL_BASED, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py index e11073d77b46e..33dc1388ff0e1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py @@ -52,6 +52,8 @@ PreparsedQuery, SqlAggregatorReport, SqlParsingAggregator, + TableRename, + TableSwap, ) from datahub.sql_parsing.sql_parsing_common import QueryType from datahub.sql_parsing.sqlglot_lineage import ( @@ -116,6 +118,8 @@ class SnowflakeQueriesExtractorReport(Report): audit_log_load_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer) sql_aggregator: Optional[SqlAggregatorReport] = None + num_ddl_queries_dropped: int = 0 + @dataclass class SnowflakeQueriesSourceReport(SourceReport): @@ -225,7 +229,9 @@ def get_workunits_internal( audit_log_file = self.local_temp_path / "audit_log.sqlite" use_cached_audit_log = audit_log_file.exists() - queries: FileBackedList[Union[KnownLineageMapping, PreparsedQuery]] + queries: FileBackedList[ + Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap] + ] if use_cached_audit_log: logger.info("Using cached audit log") shared_connection = ConnectionWrapper(audit_log_file) @@ -235,7 +241,7 @@ def get_workunits_internal( shared_connection = ConnectionWrapper(audit_log_file) queries = FileBackedList(shared_connection) - entry: Union[KnownLineageMapping, PreparsedQuery] + entry: Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap] with self.report.copy_history_fetch_timer: for entry in self.fetch_copy_history(): @@ -296,7 +302,7 @@ def fetch_copy_history(self) -> Iterable[KnownLineageMapping]: def fetch_query_log( self, - ) -> Iterable[PreparsedQuery]: + ) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]: query_log_query = _build_enriched_query_log_query( start_time=self.config.window.start_time, end_time=self.config.window.end_time, @@ -324,12 +330,16 @@ def fetch_query_log( exc=e, ) else: - yield entry + if entry: + yield entry - def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery: + def _parse_audit_log_row( + self, row: Dict[str, Any] + ) -> Optional[Union[TableRename, TableSwap, PreparsedQuery]]: json_fields = { "DIRECT_OBJECTS_ACCESSED", "OBJECTS_MODIFIED", + "OBJECT_MODIFIED_BY_DDL", } res = {} @@ -341,6 +351,17 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery: direct_objects_accessed = res["direct_objects_accessed"] objects_modified = res["objects_modified"] + object_modified_by_ddl = res["object_modified_by_ddl"] + + if object_modified_by_ddl and not objects_modified: + ddl_entry: Optional[Union[TableRename, TableSwap]] = None + with self.structured_reporter.report_exc( + "Error fetching ddl lineage from Snowflake" + ): + ddl_entry = self.parse_ddl_query( + res["query_text"], object_modified_by_ddl + ) + return ddl_entry upstreams = [] column_usage = {} @@ -437,6 +458,45 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery: ) return entry + def parse_ddl_query( + self, query: str, object_modified_by_ddl: dict + ) -> Optional[Union[TableRename, TableSwap]]: + if object_modified_by_ddl[ + "operationType" + ] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"): + urn1 = self.identifiers.gen_dataset_urn( + self.identifiers.get_dataset_identifier_from_qualified_name( + object_modified_by_ddl["objectName"] + ) + ) + + urn2 = self.identifiers.gen_dataset_urn( + self.identifiers.get_dataset_identifier_from_qualified_name( + object_modified_by_ddl["properties"]["swapTargetName"]["value"] + ) + ) + + return TableSwap(urn1, urn2, query) + elif object_modified_by_ddl[ + "operationType" + ] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"): + original_un = self.identifiers.gen_dataset_urn( + self.identifiers.get_dataset_identifier_from_qualified_name( + object_modified_by_ddl["objectName"] + ) + ) + + new_urn = self.identifiers.gen_dataset_urn( + self.identifiers.get_dataset_identifier_from_qualified_name( + object_modified_by_ddl["properties"]["objectName"]["value"] + ) + ) + + return TableRename(original_un, new_urn, query) + else: + self.report.num_ddl_queries_dropped += 1 + return None + def close(self) -> None: self._exit_stack.close() @@ -542,6 +602,7 @@ def _build_enriched_query_log_query( user_name, direct_objects_accessed, objects_modified, + object_modified_by_ddl FROM snowflake.account_usage.access_history WHERE @@ -563,8 +624,9 @@ def _build_enriched_query_log_query( ) as direct_objects_accessed, -- TODO: Drop the columns.baseSources subfield. FILTER(objects_modified, o -> o:objectDomain IN {SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER}) as objects_modified, + case when object_modified_by_ddl:objectDomain IN {SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER} then object_modified_by_ddl else null end as object_modified_by_ddl FROM raw_access_history - WHERE ( array_size(direct_objects_accessed) > 0 or array_size(objects_modified) > 0 ) + WHERE ( array_size(direct_objects_accessed) > 0 or array_size(objects_modified) > 0 or object_modified_by_ddl is not null ) ) , query_access_history AS ( SELECT @@ -586,6 +648,7 @@ def _build_enriched_query_log_query( q.role_name AS "ROLE_NAME", a.direct_objects_accessed, a.objects_modified, + a.object_modified_by_ddl FROM deduplicated_queries q JOIN filtered_access_history a USING (query_id) ) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index 0b7ad14a8c1b4..9aab33bf09f05 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -174,6 +174,26 @@ class KnownLineageMapping: lineage_type: str = models.DatasetLineageTypeClass.COPY +@dataclasses.dataclass +class TableRename: + original_urn: UrnStr + new_urn: UrnStr + query: Optional[str] = None + session_id: str = _MISSING_SESSION_ID + + +@dataclasses.dataclass +class TableSwap: + urn1: UrnStr + urn2: UrnStr + query: Optional[str] = None + session_id: str = _MISSING_SESSION_ID + + def id(self) -> str: + # TableSwap(A,B) is same as TableSwap(B,A) + return str(hash(frozenset([self.urn1, self.urn2]))) + + @dataclasses.dataclass class PreparsedQuery: # If not provided, we will generate one using the fast fingerprint generator. @@ -237,6 +257,7 @@ class SqlAggregatorReport(Report): num_preparsed_queries: int = 0 num_known_mapping_lineage: int = 0 num_table_renames: int = 0 + num_table_swaps: int = 0 # Temp tables. num_temp_sessions: Optional[int] = None @@ -442,6 +463,12 @@ def __init__( ) self._exit_stack.push(self._table_renames) + # Map of table swaps, from unique swap id to TableSwap + self._table_swaps = FileBackedDict[TableSwap]( + shared_connection=self._shared_connection, tablename="table_swaps" + ) + self._exit_stack.push(self._table_swaps) + # Usage aggregator. This will only be initialized if usage statistics are enabled. # TODO: Replace with FileBackedDict. # TODO: The BaseUsageConfig class is much too broad for our purposes, and has a number of @@ -533,7 +560,12 @@ def is_allowed_table(self, urn: UrnStr) -> bool: def add( self, item: Union[ - KnownQueryLineageInfo, KnownLineageMapping, PreparsedQuery, ObservedQuery + KnownQueryLineageInfo, + KnownLineageMapping, + PreparsedQuery, + ObservedQuery, + TableRename, + TableSwap, ], ) -> None: if isinstance(item, KnownQueryLineageInfo): @@ -544,6 +576,10 @@ def add( self.add_preparsed_query(item) elif isinstance(item, ObservedQuery): self.add_observed_query(item) + elif isinstance(item, TableRename): + self.add_table_rename(item) + elif isinstance(item, TableSwap): + self.add_table_swap(item) else: raise ValueError(f"Cannot add unknown item type: {type(item)}") @@ -629,19 +665,11 @@ def add_known_lineage_mapping( query_id = self._known_lineage_query_id() # Generate CLL if schema of downstream is known - column_lineage: List[ColumnLineageInfo] = [] - if self._schema_resolver.has_urn(downstream_urn): - schema = self._schema_resolver._resolve_schema_info(downstream_urn) - if schema: - column_lineage = [ - ColumnLineageInfo( - downstream=DownstreamColumnRef( - table=downstream_urn, column=field_path - ), - upstreams=[ColumnRef(table=upstream_urn, column=field_path)], - ) - for field_path in schema - ] + column_lineage: List[ + ColumnLineageInfo + ] = self._generate_identity_column_lineage( + upstream_urn=upstream_urn, downstream_urn=downstream_urn + ) # Register the query. self._add_to_query_map( @@ -663,6 +691,25 @@ def add_known_lineage_mapping( # Register the lineage. self._lineage_map.for_mutation(downstream_urn, OrderedSet()).add(query_id) + def _generate_identity_column_lineage( + self, *, upstream_urn: UrnStr, downstream_urn: UrnStr + ) -> List[ColumnLineageInfo]: + column_lineage: List[ColumnLineageInfo] = [] + if self._schema_resolver.has_urn(downstream_urn): + schema = self._schema_resolver._resolve_schema_info(downstream_urn) + if schema: + column_lineage = [ + ColumnLineageInfo( + downstream=DownstreamColumnRef( + table=downstream_urn, column=field_path + ), + upstreams=[ColumnRef(table=upstream_urn, column=field_path)], + ) + for field_path in schema + ] + + return column_lineage + def add_view_definition( self, view_urn: Union[DatasetUrn, UrnStr], @@ -849,12 +896,6 @@ def add_preparsed_query( return out_table = parsed.downstream - # Handle table renames. - is_renamed_table = False - if out_table in self._table_renames: - out_table = self._table_renames[out_table] - is_renamed_table = True - # Register the query's lineage. if ( is_known_temp_table @@ -863,13 +904,10 @@ def add_preparsed_query( and parsed.query_type_props.get("temporary") ) or ( - not is_renamed_table - and ( - self.is_temp_table(out_table) - or ( - require_out_table_schema - and not self._schema_resolver.has_urn(out_table) - ) + self.is_temp_table(out_table) + or ( + require_out_table_schema + and not self._schema_resolver.has_urn(out_table) ) ) ): @@ -896,26 +934,81 @@ def add_preparsed_query( def add_table_rename( self, - original_urn: UrnStr, - new_urn: UrnStr, + table_rename: TableRename, ) -> None: """Add a table rename to the aggregator. - This will so that all _future_ observed queries that reference the original urn - will instead generate usage and lineage for the new urn. + This will make all observed queries that reference the original urn + will instead generate lineage for the new urn. + """ + + self.report.num_table_renames += 1 + + # This will not work if the table is renamed multiple times. + self._table_renames[table_rename.original_urn] = table_rename.new_urn - Currently, this does not affect any queries that have already been observed. - TODO: Add a mechanism to update the lineage for queries that have already been observed. + original_table = self._name_from_urn(table_rename.original_urn) + new_table = self._name_from_urn(table_rename.new_urn) + + self.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text=table_rename.query + or f"--Datahub generated query text--\n" + f"alter table {original_table} rename to {new_table}", + upstreams=[table_rename.original_urn], + downstream=table_rename.new_urn, + column_lineage=self._generate_identity_column_lineage( + downstream_urn=table_rename.new_urn, + upstream_urn=table_rename.original_urn, + ), + ) + ) + + def add_table_swap(self, table_swap: TableSwap) -> None: + """Add a table swap to the aggregator. Args: - original_urn: The original dataset URN. - new_urn: The new dataset URN. + table_swap.urn1, table_swap.urn2: The dataset URNs to swap. """ - self.report.num_table_renames += 1 + if table_swap.id() in self._table_swaps: + # We have already processed this table swap once + return - # This will not work if the table is renamed multiple times. - self._table_renames[original_urn] = new_urn + self.report.num_table_swaps += 1 + self._table_swaps[table_swap.id()] = table_swap + table1 = self._name_from_urn(table_swap.urn1) + table2 = self._name_from_urn(table_swap.urn2) + + # NOTE: Both queries are different on purpose. Currently, we can not + # store (A->B) and (B->A) lineage against same query. + + # NOTE: we do not store upstreams for temp table on purpose, as that would + # otherwise overwrite original upstream query of temp table because + # currently a temporay table can have only one upstream query. + + if not self.is_temp_table(table_swap.urn2): + self.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text=f"--Datahub generated query text--" + f"\nalter table {table1} swap with {table2}", + upstreams=[table_swap.urn1], + downstream=table_swap.urn2, + ) + ) + + if not self.is_temp_table(table_swap.urn1): + self.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text=f"--Datahub generated query text--\n" + f"alter table {table2} swap with {table1}", + upstreams=[table_swap.urn2], + downstream=table_swap.urn1, + ) + ) def _make_schema_resolver_for_session( self, session_id: str diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json index 9a4d405e50a7a..19d19d1f56ae9 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename.json @@ -119,6 +119,198 @@ "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:a30d42497a737321ece461fa17344c3ba3588fdee736016acb59a00cec955a0c" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:a30d42497a737321ece461fa17344c3ba3588fdee736016acb59a00cec955a0c", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "ALTER TABLE dev.public.foo_staging RENAME TO foo", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:a30d42497a737321ece461fa17344c3ba3588fdee736016acb59a00cec955a0c", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE foo_staging AS\nSELECT\n a,\n b\nFROM foo_dep", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD),b)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD),a)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),a)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),b)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:a30d42497a737321ece461fa17344c3ba3588fdee736016acb59a00cec955a0c", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:e4b3b60ab99e0f0bc1629ea82a5d7705a30dbd98a3923d599b39fb68624ea58d" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),a)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e4b3b60ab99e0f0bc1629ea82a5d7705a30dbd98a3923d599b39fb68624ea58d" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),b)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e4b3b60ab99e0f0bc1629ea82a5d7705a30dbd98a3923d599b39fb68624ea58d" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", "aspect": { "json": { "upstreams": [ @@ -144,7 +336,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),a)" ], "confidenceScore": 0.2, "query": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc" @@ -156,7 +348,7 @@ ], "downstreamType": "FIELD", "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),b)" ], "confidenceScore": 0.2, "query": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc" @@ -167,13 +359,13 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc", + "entityUrn": "urn:li:query:e4b3b60ab99e0f0bc1629ea82a5d7705a30dbd98a3923d599b39fb68624ea58d", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { "json": { "statement": { - "value": "CREATE TABLE foo_staging AS\nSELECT\n a,\n b\nFROM foo_dep", + "value": "CREATE TABLE foo_downstream AS\nSELECT\n a,\n b\nFROM foo_staging", "language": "SQL" }, "source": "SYSTEM", @@ -190,43 +382,43 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc", + "entityUrn": "urn:li:query:e4b3b60ab99e0f0bc1629ea82a5d7705a30dbd98a3923d599b39fb68624ea58d", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e4b3b60ab99e0f0bc1629ea82a5d7705a30dbd98a3923d599b39fb68624ea58d", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { "json": { "subjects": [ { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD),a)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),b)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD),b)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_staging,PROD),a)" }, { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),a)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),b)" } ] } } -}, -{ - "entityType": "query", - "entityUrn": "urn:li:query:234a2904c367a6cc02d76cf358cd86937ec9e14af03e5539b5edb0b6df5db3dc", - "changeType": "UPSERT", - "aspectName": "dataPlatformInstance", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:redshift" - } - } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json new file mode 100644 index 0000000000000..4e2eba778ab9a --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_rename_with_temp.json @@ -0,0 +1,303 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.baz,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.baz,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.baz,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE bar AS\nSELECT\n a,\n b\nFROM baz", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.baz,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.baz,PROD),b)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.baz,PROD),a)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:composite_4b8845f9fa02123e7501a9baf421175923514b4a4a85158b2df36f930bcdc10e" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_4b8845f9fa02123e7501a9baf421175923514b4a4a85158b2df36f930bcdc10e", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE foo_staging AS\nSELECT\n a,\n b\nFROM foo_dep;\n\nALTER TABLE dev.public.foo_staging RENAME TO foo", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:composite_ff126f9f14af4f1871d685a332dc4d71a507a6ca8b7a13e38e46cb58e0b1ecb9" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),a)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:composite_ff126f9f14af4f1871d685a332dc4d71a507a6ca8b7a13e38e46cb58e0b1ecb9" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),b)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:composite_ff126f9f14af4f1871d685a332dc4d71a507a6ca8b7a13e38e46cb58e0b1ecb9" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_4b8845f9fa02123e7501a9baf421175923514b4a4a85158b2df36f930bcdc10e", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_4b8845f9fa02123e7501a9baf421175923514b4a4a85158b2df36f930bcdc10e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_ff126f9f14af4f1871d685a332dc4d71a507a6ca8b7a13e38e46cb58e0b1ecb9", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE foo_staging AS\nSELECT\n a,\n b\nFROM foo_dep;\n\nCREATE TABLE foo_downstream AS\nSELECT\n a,\n b\nFROM foo_staging", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_ff126f9f14af4f1871d685a332dc4d71a507a6ca8b7a13e38e46cb58e0b1ecb9", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_dep,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),a)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_downstream,PROD),b)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_ff126f9f14af4f1871d685a332dc4d71a507a6ca8b7a13e38e46cb58e0b1ecb9", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:redshift" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json new file mode 100644 index 0000000000000..382bc8ee0281d --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap.json @@ -0,0 +1,490 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),a)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),b)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE bar AS\nSELECT\n a,\n b\nFROM baz", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),a)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),b)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),a)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),b)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_backup,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "ALTER TABLE dev.public.person_info_swap SWAP WITH dev.public.person_info", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE person_info_backup AS\nSELECT\n *\nFROM person_info_swap", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_backup,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_incremental,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_incremental,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE person_info_incremental AS\nSELECT\n *\nFROM person_info_dep", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68" + }, + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_incremental,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_incremental,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "INSERT INTO person_info_swap\nSELECT\n *\nFROM person_info_incremental", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:4b1fad909083e1ed5c47c146bd01247ed4d6295d175c34f9065b8fc6000fc7ae", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "ALTER TABLE dev.public.person_info SWAP WITH dev.public.person_info_swap", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json new file mode 100644 index 0000000000000..73084f49a8928 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp.json @@ -0,0 +1,271 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),a)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),b)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE bar AS\nSELECT\n a,\n b\nFROM baz", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),a)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.baz,PROD),b)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),a)" + }, + { + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.bar,PROD),b)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:e2629e6fd3a70a223cb3e2c9e5bd3416763782de3ec32124bc56cb835b60978a", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:composite_333307d51724d89c6c2761f83c9e54006c78565154857272f02c7c0849920970" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_333307d51724d89c6c2761f83c9e54006c78565154857272f02c7c0849920970", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE person_info_incremental AS\nSELECT\n *\nFROM person_info_dep;\n\nINSERT INTO person_info_swap\nSELECT\n *\nFROM person_info_incremental;\n\nALTER TABLE dev.public.person_info_swap SWAP WITH dev.public.person_info", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_333307d51724d89c6c2761f83c9e54006c78565154857272f02c7c0849920970", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_333307d51724d89c6c2761f83c9e54006c78565154857272f02c7c0849920970", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_backup,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:composite_d5d07752cbe5b894085996dc22582334b88b5726c049a5a533c02c49a427766d" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_d5d07752cbe5b894085996dc22582334b88b5726c049a5a533c02c49a427766d", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE person_info_incremental AS\nSELECT\n *\nFROM person_info_dep;\n\nINSERT INTO person_info_swap\nSELECT\n *\nFROM person_info_incremental;\n\nCREATE TABLE person_info_backup AS\nSELECT\n *\nFROM person_info_swap", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_d5d07752cbe5b894085996dc22582334b88b5726c049a5a533c02c49a427766d", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_dep,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_backup,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_d5d07752cbe5b894085996dc22582334b88b5726c049a5a533c02c49a427766d", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp_with_preparsed.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp_with_preparsed.json new file mode 100644 index 0000000000000..24ed6a3b54d8c --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_swap_with_temp_with_preparsed.json @@ -0,0 +1,298 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "ALTER TABLE dev.public.person_info_swap SWAP WITH dev.public.person_info", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:3865108263e5f0670e6506f5747392f8315a72039cbfde1c4be4dd9a71bdd500", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_backup,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TABLE person_info_backup AS\nSELECT\n *\nFROM person_info_swap", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_backup,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:6f71602f39d01a39b3f8bd411c74c5ac08dc4b90bc3d49b257089acb19fa8559", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68" + }, + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_incremental,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "ALTER TABLE dev.public.person_info SWAP WITH dev.public.person_info_swap", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:d29a1c8ed6d4d77efb290260234e5eee56f98311a5631d0a12213798077d1a68", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "INSERT INTO person_info_swap\nSELECT\n *\nFROM person_info_incremental", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_incremental,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.person_info_swap,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:481d0392ffeffdafd198d94e0a9f778dd722b60daa47083a32800b99ea21f86f", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:snowflake" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 849d550ef69c5..45bbc19a2124f 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -13,8 +13,11 @@ from datahub.sql_parsing.sql_parsing_aggregator import ( KnownQueryLineageInfo, ObservedQuery, + PreparsedQuery, QueryLogSetting, SqlParsingAggregator, + TableRename, + TableSwap, ) from datahub.sql_parsing.sql_parsing_common import QueryType from datahub.sql_parsing.sqlglot_lineage import ( @@ -522,8 +525,10 @@ def test_table_rename(pytestconfig: pytest.Config) -> None: # Register that foo_staging is renamed to foo. aggregator.add_table_rename( - original_urn=DatasetUrn("redshift", "dev.public.foo_staging").urn(), - new_urn=DatasetUrn("redshift", "dev.public.foo").urn(), + TableRename( + original_urn=DatasetUrn("redshift", "dev.public.foo_staging").urn(), + new_urn=DatasetUrn("redshift", "dev.public.foo").urn(), + ) ) # Add an unrelated query. @@ -544,6 +549,15 @@ def test_table_rename(pytestconfig: pytest.Config) -> None: ) ) + # Add the query that created the downstream from foo_staging table. + aggregator.add_observed_query( + ObservedQuery( + query="create table foo_downstream as select a, b from foo_staging", + default_db="dev", + default_schema="public", + ) + ) + mcps = list(aggregator.gen_metadata()) mce_helpers.check_goldens_stream( @@ -553,6 +567,226 @@ def test_table_rename(pytestconfig: pytest.Config) -> None: ) +@freeze_time(FROZEN_TIME) +def test_table_rename_with_temp(pytestconfig: pytest.Config) -> None: + aggregator = SqlParsingAggregator( + platform="redshift", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + is_temp_table=lambda x: "staging" in x.lower(), + ) + + # Register that foo_staging is renamed to foo. + aggregator.add_table_rename( + TableRename( + original_urn=DatasetUrn("redshift", "dev.public.foo_staging").urn(), + new_urn=DatasetUrn("redshift", "dev.public.foo").urn(), + query="alter table dev.public.foo_staging rename to dev.public.foo", + ) + ) + + # Add an unrelated query. + aggregator.add_observed_query( + ObservedQuery( + query="create table bar as select a, b from baz", + default_db="dev", + default_schema="public", + ) + ) + + # Add the query that created the staging table. + aggregator.add_observed_query( + ObservedQuery( + query="create table foo_staging as select a, b from foo_dep", + default_db="dev", + default_schema="public", + ) + ) + + # Add the query that created the downstream from foo_staging table. + aggregator.add_observed_query( + ObservedQuery( + query="create table foo_downstream as select a, b from foo_staging", + default_db="dev", + default_schema="public", + ) + ) + + mcps = list(aggregator.gen_metadata()) + + mce_helpers.check_goldens_stream( + pytestconfig, + outputs=mcps, + golden_path=RESOURCE_DIR / "test_table_rename_with_temp.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_table_swap(pytestconfig: pytest.Config) -> None: + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + ) + + # Add an unrelated query. + aggregator.add_observed_query( + ObservedQuery( + query="create table bar as select a, b from baz", + default_db="dev", + default_schema="public", + ) + ) + + # Add the query that created the swap table initially. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="CREATE TABLE person_info_swap CLONE person_info;", + upstreams=[DatasetUrn("snowflake", "dev.public.person_info").urn()], + downstream=DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ) + ) + + # Add the query that created the incremental table. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="CREATE TABLE person_info_incremental AS SELECT * from person_info_dep;", + upstreams=[ + DatasetUrn("snowflake", "dev.public.person_info_dep").urn(), + ], + downstream=DatasetUrn( + "snowflake", "dev.public.person_info_incremental" + ).urn(), + ) + ) + + # Add the query that updated the swap table. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="INSERT INTO person_info_swap SELECT * from person_info_incremental;", + upstreams=[ + DatasetUrn("snowflake", "dev.public.person_info_incremental").urn(), + ], + downstream=DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ) + ) + + aggregator.add_table_swap( + TableSwap( + urn1=DatasetUrn("snowflake", "dev.public.person_info").urn(), + urn2=DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ) + ) + + # Add the query that is created from swap table. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="create table person_info_backup as select * from person_info_swap", + upstreams=[ + DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ], + downstream=DatasetUrn("snowflake", "dev.public.person_info_backup").urn(), + ) + ) + + mcps = list(aggregator.gen_metadata()) + + mce_helpers.check_goldens_stream( + pytestconfig, + outputs=mcps, + golden_path=RESOURCE_DIR / "test_table_swap.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_table_swap_with_temp(pytestconfig: pytest.Config) -> None: + aggregator = SqlParsingAggregator( + platform="snowflake", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + is_temp_table=lambda x: "swap" in x.lower() or "incremental" in x.lower(), + ) + + # Add an unrelated query. + aggregator.add_observed_query( + ObservedQuery( + query="create table bar as select a, b from baz", + default_db="dev", + default_schema="public", + ) + ) + + # Add the query that created the swap table initially. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="CREATE TABLE person_info_swap CLONE person_info;", + upstreams=[DatasetUrn("snowflake", "dev.public.person_info").urn()], + downstream=DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ) + ) + + # Add the query that created the incremental table. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="CREATE TABLE person_info_incremental AS SELECT * from person_info_dep;", + upstreams=[ + DatasetUrn("snowflake", "dev.public.person_info_dep").urn(), + ], + downstream=DatasetUrn( + "snowflake", "dev.public.person_info_incremental" + ).urn(), + ) + ) + + # Add the query that updated the swap table. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="INSERT INTO person_info_swap SELECT * from person_info_incremental;", + upstreams=[ + DatasetUrn("snowflake", "dev.public.person_info_incremental").urn(), + ], + downstream=DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ) + ) + + aggregator.add_table_swap( + TableSwap( + urn1=DatasetUrn("snowflake", "dev.public.person_info").urn(), + urn2=DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ) + ) + + # Add the query that is created from swap table. + aggregator.add_preparsed_query( + PreparsedQuery( + query_id=None, + query_text="create table person_info_backup as select * from person_info_swap", + upstreams=[ + DatasetUrn("snowflake", "dev.public.person_info_swap").urn(), + ], + downstream=DatasetUrn("snowflake", "dev.public.person_info_backup").urn(), + ) + ) + + mcps = list(aggregator.gen_metadata()) + + mce_helpers.check_goldens_stream( + pytestconfig, + outputs=mcps, + golden_path=RESOURCE_DIR / "test_table_swap_with_temp.json", + ) + + @freeze_time(FROZEN_TIME) def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None: aggregator = SqlParsingAggregator( @@ -665,6 +899,19 @@ def test_basic_usage(pytestconfig: pytest.Config) -> None: ) +def test_table_swap_id() -> None: + assert ( + TableSwap( + urn1=DatasetUrn("snowflake", "dev.public.foo").urn(), + urn2=DatasetUrn("snowflake", "dev.public.foo_staging").urn(), + ).id() + == TableSwap( + urn1=DatasetUrn("snowflake", "dev.public.foo_staging").urn(), + urn2=DatasetUrn("snowflake", "dev.public.foo").urn(), + ).id() + ) + + def test_sql_aggreator_close_cleans_tmp(tmp_path): frozen_timestamp = parse_user_datetime(FROZEN_TIME) with patch("tempfile.tempdir", str(tmp_path)):