Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #17888 - Implemented Cross Database Lineage #18831

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,27 @@
from abc import ABC
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from typing import Callable, Iterable, Iterator, Union
from typing import Callable, Iterable, Iterator, List, Optional, Union

from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.basic import (
FullyQualifiedEntityName,
SqlQuery,
Uuid,
)
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
Source,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_query
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.models import TableView
Expand Down Expand Up @@ -239,6 +251,58 @@ def yield_procedure_lineage(
f"Processing Procedure Lineage not supported for {str(self.service_connection.type.value)}"
)

def _get_column_lineage(
self, from_table: Table, to_table: Table, columns_list: List[str]
) -> List[ColumnLineage]:
"""
Get the column lineage from the fields
"""
try:
column_lineage = []
for field in columns_list or []:
from_column = get_column_fqn(table_entity=from_table, column=field)
to_column = get_column_fqn(table_entity=to_table, column=field)
if from_column and to_column:
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())

@staticmethod
def _get_add_lineage_request(
from_entity: Table, to_entity: Table, column_lineage: List[ColumnLineage] = None
) -> Optional[Either[AddLineageRequest]]:
if from_entity and to_entity:
return Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=Uuid(from_entity.id.root), type="table"
),
toEntity=EntityReference(
id=Uuid(to_entity.id.root), type="table"
),
lineageDetails=LineageDetails(
source=Source.CrossDatabaseLineage,
columnsLineage=column_lineage,
),
)
)
)

return None

def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
By default cross database lineage is not supported.
"""
logger.info(
f"Processing Cross Database Lineage not supported for {str(self.service_connection.type.value)}"
)

def _iter(
self, *_, **__
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
Expand All @@ -257,3 +321,5 @@ def _iter(
logger.warning(
f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection"
)
if self.source_config.processCrossDatabaseLineage:
yield from self.yield_cross_database_lineage() or []
113 changes: 113 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/trino/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,17 @@
"""
Trino lineage module
"""
import traceback
from typing import Iterable, List, Tuple

from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.trino.queries import TRINO_SQL_STATEMENT
from metadata.ingestion.source.database.trino.query_parser import TrinoQueryParserSource
Expand All @@ -32,3 +42,106 @@ class TrinoLineageSource(TrinoQueryParserSource, LineageSource):
OR lower("query") LIKE '%%merge%%'
)
"""

def get_cross_database_fqn_from_service_names(self) -> List[str]:
database_service_names = self.source_config.crossDatabaseServiceNames
return [
database.fullyQualifiedName.root
for service in database_service_names
for database in self.metadata.list_all_entities(
entity=Database, params={"service": service}
)
]

def check_same_table(self, table1: Table, table2: Table) -> bool:
"""
Method to check whether the table1 and table2 are same
"""
return table1.name.root == table2.name.root and {
column.name.root for column in table1.columns
} == {column.name.root for column in table2.columns}

def get_lineage_details(self) -> List[Tuple[Table, Table]]:
"""
Method to fetch database lineage details.
"""
all_cross_database_fqns = self.get_cross_database_fqn_from_service_names()
cross_database_table_fqn_mapping = {}
lineage_details = []

# Get all databases for the specified Trino service
trino_databases = self.metadata.list_all_entities(
entity=Database, params={"service": self.config.serviceName}
)
for trino_database in trino_databases:
trino_database_fqn = trino_database.fullyQualifiedName.root

# Get all schemas for the specified Trino database
trino_schemas = self.metadata.list_all_entities(
entity=DatabaseSchema,
params={"database": trino_database.fullyQualifiedName.root},
)
for trino_schema in trino_schemas:
if trino_schema.name.root == "information_schema":
continue

# Get all tables for the specified Trino database schema
trino_tables = self.metadata.list_all_entities(
entity=Table,
params={"databaseSchema": trino_schema.fullyQualifiedName.root},
)
for trino_table in trino_tables:
trino_table_fqn = trino_table.fullyQualifiedName.root

for cross_database_fqn in all_cross_database_fqns:
# Construct the FQN for cross-database tables
cross_database_table_fqn = trino_table_fqn.replace(
trino_database_fqn, cross_database_fqn
)
# Cache cross-database table against its FQN to avoid repeated API calls
cross_database_table = cross_database_table_fqn_mapping[
cross_database_table_fqn
] = cross_database_table_fqn_mapping.get(
cross_database_table_fqn,
self.metadata.get_by_name(
Table, fqn=cross_database_table_fqn
),
)
if cross_database_table and self.check_same_table(
trino_table, cross_database_table
):
lineage_details.append((trino_table, cross_database_table))
break

return lineage_details

def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]:
if not (
self.source_config.processCrossDatabaseLineage
and self.source_config.crossDatabaseServiceNames
):
return

try:
lineage_details = self.get_lineage_details()
for target, source in lineage_details:
column_lineage = None
if source and source.columns and target and target.columns:
columns_list = [column.name.root for column in source.columns]
column_lineage = self._get_column_lineage(
from_table=source, to_table=target, columns_list=columns_list
)
yield self._get_add_lineage_request(
from_entity=source, to_entity=target, column_lineage=column_lineage
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=f"{self.config.serviceName} Cross Database Lineage",
error=(
"Error to yield cross database lineage details "
f"service name [{self.config.serviceName}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ slug: /main-concepts/metadata-standard/schemas/type/entitylineage
- **Items**: Refer to *#/definitions/columnLineage*.
- **`pipeline`**: Pipeline where the sqlQuery is periodically run. Refer to *../type/entityReference.json*.
- **`description`** *(string)*: description of lineage.
- **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage']`. Default: `Manual`.
- **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage', 'CrossDatabaseLineage']`. Default: `Manual`.
- **`edge`** *(object)*: Edge in the lineage graph from one entity to another by entity IDs. Cannot contain additional properties.
- **`fromEntity`**: From entity that is upstream of lineage edge. Refer to *basic.json#/definitions/uuid*.
- **`toEntity`**: To entity that is downstream of lineage edge. Refer to *basic.json#/definitions/uuid*.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@
"default": 1,
"title": "Number of Threads",
"minimum": 1
},
"processCrossDatabaseLineage": {
"title": "Process Cross Database Lineage",
"description": "Set the 'Process Cross Database Lineage' toggle to control whether to process table lineage across different databases.",
"type": "boolean",
"default": false
},
"crossDatabaseServiceNames": {
"title": "Cross Database Service Names",
"description": "Set 'Cross Database Service Names' to process lineage with the database.",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"source": {
"description": "Lineage type describes how a lineage was created.",
"type": "string",
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage"],
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage", "CrossDatabaseLineage"],
"default": "Manual"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export const LINEAGE_SOURCE: { [key in Source]: string } = {
[Source.SparkLineage]: 'Spark Lineage',
[Source.ViewLineage]: 'View Lineage',
[Source.OpenLineage]: 'OpenLineage',
[Source.CrossDatabaseLineage]: 'Cross Database Lineage',
[Source.ExternalTableLineage]: 'External Table Lineage',
};

Expand Down
Loading