diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 581467faf779..7148256a03e2 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -17,15 +17,27 @@ from abc import ABC from concurrent.futures import ThreadPoolExecutor, as_completed from functools import partial -from typing import Callable, Iterable, Iterator, Union +from typing import Callable, Iterable, Iterator, List, Optional, Union from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.type.basic import ( + FullyQualifiedEntityName, + SqlQuery, + Uuid, +) +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, + Source, +) +from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.models import Either from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect -from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query +from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_query from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.ingestion.source.models import TableView @@ -239,6 +251,62 @@ def yield_procedure_lineage( f"Processing Procedure Lineage not supported for {str(self.service_connection.type.value)}" ) + def get_column_lineage( + self, from_table: Table, to_table: Table + ) -> List[ColumnLineage]: + """ + Get the column lineage from the fields + """ + try: + column_lineage = [] + for column in from_table.columns: + field = column.name.root + from_column = get_column_fqn(table_entity=from_table, column=field) + to_column = get_column_fqn(table_entity=to_table, column=field) + if from_column and to_column: + column_lineage.append( + ColumnLineage(fromColumns=[from_column], toColumn=to_column) + ) + + return column_lineage + except Exception as exc: + logger.debug(f"Error to get column lineage: {exc}") + logger.debug(traceback.format_exc()) + + def get_add_cross_database_lineage_request( + self, + from_entity: Table, + to_entity: Table, + column_lineage: List[ColumnLineage] = None, + ) -> Optional[Either[AddLineageRequest]]: + if from_entity and to_entity: + return Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=Uuid(from_entity.id.root), type="table" + ), + toEntity=EntityReference( + id=Uuid(to_entity.id.root), type="table" + ), + lineageDetails=LineageDetails( + source=Source.CrossDatabaseLineage, + columnsLineage=column_lineage, + ), + ) + ) + ) + + return None + + def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: + """ + By default cross database lineage is not supported. + """ + logger.info( + f"Processing Cross Database Lineage not supported for {str(self.service_connection.type.value)}" + ) + def _iter( self, *_, **__ ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: @@ -257,3 +325,8 @@ def _iter( logger.warning( f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection" ) + if ( + self.source_config.processCrossDatabaseLineage + and self.source_config.crossDatabaseServiceNames + ): + yield from self.yield_cross_database_lineage() or [] diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py index e97fd407d1fa..0cd8c5178f50 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -11,7 +11,16 @@ """ Trino lineage module """ +import traceback +from typing import Iterable, List +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) +from metadata.ingestion.api.models import Either from metadata.ingestion.source.database.lineage_source import LineageSource from metadata.ingestion.source.database.trino.queries import TRINO_SQL_STATEMENT from metadata.ingestion.source.database.trino.query_parser import TrinoQueryParserSource @@ -32,3 +41,89 @@ class TrinoLineageSource(TrinoQueryParserSource, LineageSource): OR lower("query") LIKE '%%merge%%' ) """ + + def get_cross_database_fqn_from_service_names(self) -> List[str]: + database_service_names = self.source_config.crossDatabaseServiceNames + return [ + database.fullyQualifiedName.root + for service in database_service_names + for database in self.metadata.list_all_entities( + entity=Database, params={"service": service} + ) + ] + + def check_same_table(self, table1: Table, table2: Table) -> bool: + """ + Method to check whether the table1 and table2 are same + """ + return table1.name.root == table2.name.root and { + column.name.root for column in table1.columns + } == {column.name.root for column in table2.columns} + + def get_cross_database_lineage( + self, from_table: Table, to_table: Table + ) -> Either[AddLineageRequest]: + """ + Method to return cross database lineage request object + """ + column_lineage = None + if from_table and from_table.columns and to_table and to_table.columns: + column_lineage = self.get_column_lineage( + from_table=from_table, to_table=to_table + ) + return self.get_add_cross_database_lineage_request( + from_entity=from_table, to_entity=to_table, column_lineage=column_lineage + ) + + def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]: + try: + all_cross_database_fqns = self.get_cross_database_fqn_from_service_names() + cross_database_table_fqn_mapping = {} + + # Get all databases for the specified Trino service + trino_databases = self.metadata.list_all_entities( + entity=Database, params={"service": self.config.serviceName} + ) + for trino_database in trino_databases: + trino_database_fqn = trino_database.fullyQualifiedName.root + + # Get all tables for the specified Trino database schema + trino_tables = self.metadata.list_all_entities( + entity=Table, params={"database": trino_database_fqn} + ) + # NOTE: Currently, tables in system-defined schemas will also be checked for lineage. + for trino_table in trino_tables: + trino_table_fqn = trino_table.fullyQualifiedName.root + for cross_database_fqn in all_cross_database_fqns: + # Construct the FQN for cross-database tables + cross_database_table_fqn = trino_table_fqn.replace( + trino_database_fqn, cross_database_fqn + ) + # Cache cross-database table against its FQN to avoid repeated API calls + cross_database_table = cross_database_table_fqn_mapping[ + cross_database_table_fqn + ] = cross_database_table_fqn_mapping.get( + cross_database_table_fqn, + self.metadata.get_by_name( + Table, fqn=cross_database_table_fqn + ), + ) + # Create cross database lineage request if both tables are same + if cross_database_table and self.check_same_table( + trino_table, cross_database_table + ): + yield self.get_cross_database_lineage( + cross_database_table, trino_table + ) + break + except Exception as exc: + yield Either( + left=StackTraceError( + name=f"{self.config.serviceName} Cross Database Lineage", + error=( + "Error to yield cross database lineage details " + f"service name [{self.config.serviceName}]: {exc}" + ), + stackTrace=traceback.format_exc(), + ) + ) diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md index d82c199c8977..4fbfb6a87264 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/type/entityLineage.md @@ -29,7 +29,7 @@ slug: /main-concepts/metadata-standard/schemas/type/entitylineage - **Items**: Refer to *#/definitions/columnLineage*. - **`pipeline`**: Pipeline where the sqlQuery is periodically run. Refer to *../type/entityReference.json*. - **`description`** *(string)*: description of lineage. - - **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage']`. Default: `Manual`. + - **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage', 'CrossDatabaseLineage']`. Default: `Manual`. - **`edge`** *(object)*: Edge in the lineage graph from one entity to another by entity IDs. Cannot contain additional properties. - **`fromEntity`**: From entity that is upstream of lineage edge. Refer to *basic.json#/definitions/uuid*. - **`toEntity`**: To entity that is downstream of lineage edge. Refer to *basic.json#/definitions/uuid*. diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json index 490a49d1822b..13ba6e286df2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json @@ -91,6 +91,20 @@ "default": 1, "title": "Number of Threads", "minimum": 1 + }, + "processCrossDatabaseLineage": { + "title": "Process Cross Database Lineage", + "description": "Set the 'Process Cross Database Lineage' toggle to control whether to process table lineage across different databases.", + "type": "boolean", + "default": false + }, + "crossDatabaseServiceNames": { + "title": "Cross Database Service Names", + "description": "Set 'Cross Database Service Names' to process lineage with the database.", + "type": "array", + "items": { + "type": "string" + } } }, "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json index 9fb1062ad894..e917b80fa415 100644 --- a/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json +++ b/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json @@ -52,7 +52,7 @@ "source": { "description": "Lineage type describes how a lineage was created.", "type": "string", - "enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage"], + "enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage", "CrossDatabaseLineage"], "default": "Manual" } } diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts b/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts index 3bf79b3fa7cc..3a35e21e5949 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/Lineage.constants.ts @@ -105,6 +105,7 @@ export const LINEAGE_SOURCE: { [key in Source]: string } = { [Source.SparkLineage]: 'Spark Lineage', [Source.ViewLineage]: 'View Lineage', [Source.OpenLineage]: 'OpenLineage', + [Source.CrossDatabaseLineage]: 'Cross Database Lineage', [Source.ExternalTableLineage]: 'External Table Lineage', }; diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/addLineage.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/addLineage.ts index a846cb53ee73..4c4da2ea5750 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/addLineage.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/addLineage.ts @@ -12,7 +12,7 @@ */ - /** +/** * Add lineage details between two entities */ export interface AddLineage { @@ -150,6 +150,7 @@ export interface ColumnLineage { * Lineage type describes how a lineage was created. */ export enum Source { + CrossDatabaseLineage = "CrossDatabaseLineage", DashboardLineage = "DashboardLineage", DbtLineage = "DbtLineage", ExternalTableLineage = "ExternalTableLineage", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/type/entityLineage.ts b/openmetadata-ui/src/main/resources/ui/src/generated/type/entityLineage.ts index 2977b3c059ec..b81b9118e645 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/type/entityLineage.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/type/entityLineage.ts @@ -19,8 +19,8 @@ export interface EntityLineage { /** * Primary entity for which this lineage graph is created. */ - entity: EntityReference; - nodes?: EntityReference[]; + entity: EntityReference; + nodes?: EntityReference[]; upstreamEdges?: Edge[]; } @@ -148,6 +148,7 @@ export interface EntityReference { * Lineage type describes how a lineage was created. */ export enum Source { + CrossDatabaseLineage = "CrossDatabaseLineage", DashboardLineage = "DashboardLineage", DbtLineage = "DbtLineage", ExternalTableLineage = "ExternalTableLineage",