From b7c915cf1289bd7c33e16cdb5e141ab62556082c Mon Sep 17 00:00:00 2001 From: Keshav Mohta Date: Thu, 28 Nov 2024 15:10:27 +0530 Subject: [PATCH 1/7] Fix #17888: implemented cross database lineage for trino --- .../src/metadata/ingestion/ometa/ometa_api.py | 3 +- .../source/database/lineage_source.py | 75 +++++++++++- .../source/database/trino/lineage.py | 111 ++++++++++++++++++ .../databaseServiceQueryLineagePipeline.json | 14 +++ .../json/schema/type/entityLineage.json | 2 +- 5 files changed, 200 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 0a99ab655228..e9286ab782b3 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -396,8 +396,9 @@ def list_entities( url_limit = f"?limit={limit}" url_after = f"&after={after}" if after else "" url_fields = f"&fields={','.join(fields)}" if fields else "" + url_params = "".join([f"&{k}={v}," for k, v in params.items()])[:-1] resp = self.client.get( - path=f"{suffix}{url_limit}{url_after}{url_fields}", data=params + path=f"{suffix}{url_limit}{url_after}{url_fields}{url_params}", data=params ) if self._use_raw_data: diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 581467faf779..d2922c7a418f 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -17,15 +17,27 @@ from abc import ABC from concurrent.futures import ThreadPoolExecutor, as_completed from functools import partial -from typing import Callable, Iterable, Iterator, Union +from typing import Callable, Iterable, Iterator, List, Optional, Union from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.type.basic import ( + FullyQualifiedEntityName, + SqlQuery, + Uuid, +) +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, + Source, +) +from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.models import Either from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect -from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query +from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_query from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.ingestion.source.models import TableView @@ -239,6 +251,58 @@ def yield_procedure_lineage( f"Processing Procedure Lineage not supported for {str(self.service_connection.type.value)}" ) + def _get_column_lineage( + self, from_table: Table, to_table: Table, columns_list: List[str] + ) -> List[ColumnLineage]: + """ + Get the column lineage from the fields + """ + try: + column_lineage = [] + for field in columns_list or []: + from_column = get_column_fqn(table_entity=from_table, column=field) + to_column = get_column_fqn(table_entity=to_table, column=field) + if from_column and to_column: + column_lineage.append( + ColumnLineage(fromColumns=[from_column], toColumn=to_column) + ) + return column_lineage + except Exception as exc: + logger.debug(f"Error to get column lineage: {exc}") + logger.debug(traceback.format_exc()) + + @staticmethod + def _get_add_lineage_request( + from_entity: Table, to_entity: Table, column_lineage: List[ColumnLineage] = None + ) -> Optional[Either[AddLineageRequest]]: + if from_entity and to_entity: + return Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=Uuid(from_entity.id.root), type="table" + ), + toEntity=EntityReference( + id=Uuid(to_entity.id.root), type="table" + ), + lineageDetails=LineageDetails( + source=Source.CrossDatabaseLineage, + columnsLineage=column_lineage, + ), + ) + ) + ) + + return None + + def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: + """ + By default cross database lineage is not supported. + """ + logger.info( + f"Processing Cross Database Lineage not supported for {str(self.service_connection.type.value)}" + ) + def _iter( self, *_, **__ ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: @@ -257,3 +321,8 @@ def _iter( logger.warning( f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection" ) + if self.source_config.processCrossDatabaseLineage: + yield from self.yield_cross_database_lineage() or [] + + # additional lineage + # hive service name - diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index e97fd407d1fa..7c4abe3b2d11 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -11,7 +11,17 @@ """ Trino lineage module """ +import traceback +from typing import Iterable, List, Tuple +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) +from metadata.ingestion.api.models import Either from metadata.ingestion.source.database.lineage_source import LineageSource from metadata.ingestion.source.database.trino.queries import TRINO_SQL_STATEMENT from metadata.ingestion.source.database.trino.query_parser import TrinoQueryParserSource @@ -32,3 +42,104 @@ class TrinoLineageSource(TrinoQueryParserSource, LineageSource): OR lower("query") LIKE '%%merge%%' ) """ + + # TODO: Need to fix params issue + def get_cross_database_fqn_from_service_names(self): + database_service_names = self.source_config.crossDatabaseServiceNames + return [ + database.fullyQualifiedName.root + for service in database_service_names + for database in self.metadata.list_all_entities( + entity=Database, params={"service": service} + ) + ] + + # TODO: Need to discuss column datatype issue (varchar-string, int-integer). Using column names for now + def check_same_table(self, table1, table2): + # table1_column_mapping = {column.name.root: (column.dataType, column.arrayDataType) for column in table1.columns} + # table2_column_mapping = {column.name.root: (column.dataType, column.arrayDataType) for column in table2.columns} + # return table1_column_mapping==table2_column_mapping + return {column.name.root for column in table1.columns} == { + column.name.root for column in table2.columns + } + + def get_lineage_details(self) -> List[Tuple[Table, Table]]: + all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() + cross_database_table_fqn_mapping = {} + lineage_details = [] + + # Get all databases for the specified Trino service + trino_databases = self.metadata.list_all_entities( + entity=Database, params={"service": self.config.serviceName} + ) + for trino_database in trino_databases: + trino_database_fqn = trino_database.fullyQualifiedName.root + + # Get all schemas for the specified Trino database + trino_schemas = self.metadata.list_all_entities( + entity=DatabaseSchema, + params={"database": trino_database.fullyQualifiedName.root}, + ) + for trino_schema in trino_schemas: + if trino_schema.name.root == "information_schema": + continue + + # Get all tables for the specified Trino database schema + trino_tables = self.metadata.list_all_entities( + entity=Table, + params={"databaseSchema": trino_schema.fullyQualifiedName.root}, + ) + for trino_table in trino_tables: + trino_table_fqn = trino_table.fullyQualifiedName.root + + for cross_database_fqn in all_cross_database_fqns: + # Construct the FQN for cross-database tables + cross_database_table_fqn = trino_table_fqn.replace( + trino_database_fqn, cross_database_fqn + ) + # Cache cross-database table against its FQN to avoid repeated API calls + cross_database_table = cross_database_table_fqn_mapping[ + cross_database_table_fqn + ] = cross_database_table_fqn_mapping.get( + cross_database_table_fqn, + self.metadata.get_by_name( + Table, fqn=cross_database_table_fqn + ), + ) + if self.check_same_table(trino_table, cross_database_table): + lineage_details.append((trino_table, cross_database_table)) + break + + return lineage_details + + def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: + if not ( + self.source_config.processCrossDatabaseLineage + and self.source_config.crossDatabaseServiceNames + ): + return + + try: + lineage_details = self.get_lineage_details() + for target, source in lineage_details: + column_lineage = None + if source and source.columns and target and target.columns: + columns_list = [column.name.root for column in source.columns] + column_lineage = self._get_column_lineage( + from_table=source, to_table=target, columns_list=columns_list + ) + + yield self._get_add_lineage_request( + from_entity=source, to_entity=target, column_lineage=column_lineage + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=f"{self.config.serviceName} Cross Database Lineage", + error=( + "Error to yield cross database lineage details " + f"service name [{self.config.serviceName}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json index 490a49d1822b..13ba6e286df2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json @@ -91,6 +91,20 @@ "default": 1, "title": "Number of Threads", "minimum": 1 + }, + "processCrossDatabaseLineage": { + "title": "Process Cross Database Lineage", + "description": "Set the 'Process Cross Database Lineage' toggle to control whether to process table lineage across different databases.", + "type": "boolean", + "default": false + }, + "crossDatabaseServiceNames": { + "title": "Cross Database Service Names", + "description": "Set 'Cross Database Service Names' to process lineage with the database.", + "type": "array", + "items": { + "type": "string" + } } }, "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json index 9fb1062ad894..e917b80fa415 100644 --- a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json +++ b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json @@ -52,7 +52,7 @@ "source": { "description": "Lineage type describes how a lineage was created.", "type": "string", - "enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage"], + "enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage", "CrossDatabaseLineage"], "default": "Manual" } } From 793a21053200dc3767421671d2be10b6e65d0546 Mon Sep 17 00:00:00 2001 From: Keshav Mohta Date: Thu, 28 Nov 2024 15:13:58 +0530 Subject: [PATCH 2/7] Fix #17888: added docstring in get_lineage_details method --- .../src/metadata/ingestion/source/database/trino/lineage.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 7c4abe3b2d11..098eec5178fc 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -64,6 +64,9 @@ def check_same_table(self, table1, table2): } def get_lineage_details(self) -> List[Tuple[Table, Table]]: + """ + Method to fetch database lineage details. + """ all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() cross_database_table_fqn_mapping = {} lineage_details = [] From 73081f4faab20139fb6af7b29cd7a5435f455b13 Mon Sep 17 00:00:00 2001 From: Keshav Mohta Date: Thu, 28 Nov 2024 15:46:56 +0530 Subject: [PATCH 3/7] Fix #17888: added cross database lineage type in entityLineage.md and Lineage.Constants.ts --- .../metadata-standard/schemas/type/entityLineage.md | 2 +- .../src/main/resources/ui/src/constants/Lineage.constants.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md index d82c199c8977..4fbfb6a87264 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md @@ -29,7 +29,7 @@ slug: /main-concepts/metadata-standard/schemas/type/entitylineage - **Items**: Refer to *#/definitions/columnLineage*. - **`pipeline`**: Pipeline where the sqlQuery is periodically run. Refer to *../type/entityReference.json*. - **`description`** *(string)*: description of lineage. - - **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage']`. Default: `Manual`. + - **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage', 'CrossDatabaseLineage']`. Default: `Manual`. - **`edge`** *(object)*: Edge in the lineage graph from one entity to another by entity IDs. Cannot contain additional properties. - **`fromEntity`**: From entity that is upstream of lineage edge. Refer to *basic.json#/definitions/uuid*. - **`toEntity`**: To entity that is downstream of lineage edge. Refer to *basic.json#/definitions/uuid*. diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts b/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts index 3bf79b3fa7cc..3a35e21e5949 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts @@ -105,6 +105,7 @@ export const LINEAGE_SOURCE: { [key in Source]: string } = { [Source.SparkLineage]: 'Spark Lineage', [Source.ViewLineage]: 'View Lineage', [Source.OpenLineage]: 'OpenLineage', + [Source.CrossDatabaseLineage]: 'Cross Database Lineage', [Source.ExternalTableLineage]: 'External Table Lineage', }; From d23ba2688b2334b73ee96c5a9a9980c1160f1884 Mon Sep 17 00:00:00 2001 From: Keshav Mohta Date: Thu, 28 Nov 2024 17:20:30 +0530 Subject: [PATCH 4/7] Fix #17888: revert ometa_api changes & removed commented code --- ingestion/src/metadata/ingestion/ometa/ometa_api.py | 3 +-- .../ingestion/source/database/trino/lineage.py | 11 ++++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index a365e588ac8f..1217d8950aff 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -398,9 +398,8 @@ def list_entities( url_after = f"&after={after}" if after else "" url_before = f"&before={before}" if before else "" url_fields = f"&fields={','.join(fields)}" if fields else "" - url_params = "".join([f"&{k}={v}," for k, v in params.items()])[:-1] resp = self.client.get( - path=f"{suffix}{url_limit}{url_after}{url_before}{url_fields}{url_params}", data=params + path=f"{suffix}{url_limit}{url_after}{url_before}{url_fields}", data=params ) if self._use_raw_data: diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 098eec5178fc..045a8265de15 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -43,7 +43,6 @@ class TrinoLineageSource(TrinoQueryParserSource, LineageSource): ) """ - # TODO: Need to fix params issue def get_cross_database_fqn_from_service_names(self): database_service_names = self.source_config.crossDatabaseServiceNames return [ @@ -54,12 +53,11 @@ def get_cross_database_fqn_from_service_names(self): ) ] - # TODO: Need to discuss column datatype issue (varchar-string, int-integer). Using column names for now def check_same_table(self, table1, table2): - # table1_column_mapping = {column.name.root: (column.dataType, column.arrayDataType) for column in table1.columns} - # table2_column_mapping = {column.name.root: (column.dataType, column.arrayDataType) for column in table2.columns} - # return table1_column_mapping==table2_column_mapping - return {column.name.root for column in table1.columns} == { + """ + Method to check whether the table1 and table2 are same + """ + return table1.name.root==table2.name.root and {column.name.root for column in table1.columns} == { column.name.root for column in table2.columns } @@ -131,7 +129,6 @@ def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: column_lineage = self._get_column_lineage( from_table=source, to_table=target, columns_list=columns_list ) - yield self._get_add_lineage_request( from_entity=source, to_entity=target, column_lineage=column_lineage ) From db87907f40b6b8f05c9b7031f3e3ba3f1831386d Mon Sep 17 00:00:00 2001 From: Keshav Mohta Date: Thu, 28 Nov 2024 17:31:35 +0530 Subject: [PATCH 5/7] Fix #17888: add table name check in check_same_table function --- .../src/metadata/ingestion/source/database/trino/lineage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 045a8265de15..0cf105fbf122 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -57,9 +57,9 @@ def check_same_table(self, table1, table2): """ Method to check whether the table1 and table2 are same """ - return table1.name.root==table2.name.root and {column.name.root for column in table1.columns} == { - column.name.root for column in table2.columns - } + return table1.name.root == table2.name.root and { + column.name.root for column in table1.columns + } == {column.name.root for column in table2.columns} def get_lineage_details(self) -> List[Tuple[Table, Table]]: """ From 9f080d32cac249d3d93bc105ad25d075269519d1 Mon Sep 17 00:00:00 2001 From: Keshav Mohta Date: Fri, 29 Nov 2024 16:15:51 +0530 Subject: [PATCH 6/7] Fix #17888: check cross_database_table is not None before check_same_table --- .../src/metadata/ingestion/source/database/trino/lineage.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 0cf105fbf122..31855febf441 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -107,7 +107,9 @@ def get_lineage_details(self) -> List[Tuple[Table, Table]]: Table, fqn=cross_database_table_fqn ), ) - if self.check_same_table(trino_table, cross_database_table): + if cross_database_table and self.check_same_table( + trino_table, cross_database_table + ): lineage_details.append((trino_table, cross_database_table)) break From 537c3fc55d2f288fd48239c6fd78d69da34c0dae Mon Sep 17 00:00:00 2001 From: Keshav Mohta Date: Fri, 29 Nov 2024 19:25:34 +0530 Subject: [PATCH 7/7] Fix #17888: method typehint and removed comments --- .../src/metadata/ingestion/source/database/lineage_source.py | 3 --- .../src/metadata/ingestion/source/database/trino/lineage.py | 4 ++-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index d2922c7a418f..6b42cd377158 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -323,6 +323,3 @@ def _iter( ) if self.source_config.processCrossDatabaseLineage: yield from self.yield_cross_database_lineage() or [] - - # additional lineage - # hive service name - diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index 31855febf441..c3cf4096ec81 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -43,7 +43,7 @@ class TrinoLineageSource(TrinoQueryParserSource, LineageSource): ) """ - def get_cross_database_fqn_from_service_names(self): + def get_cross_database_fqn_from_service_names(self) -> List[str]: database_service_names = self.source_config.crossDatabaseServiceNames return [ database.fullyQualifiedName.root @@ -53,7 +53,7 @@ def get_cross_database_fqn_from_service_names(self): ) ] - def check_same_table(self, table1, table2): + def check_same_table(self, table1: Table, table2: Table) -> bool: """ Method to check whether the table1 and table2 are same """