Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/tableau): support column level lineage for custom sql #8466

Merged
merged 21 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5b17ea4
wip
siddiquebagwan-gslab Jul 18, 2023
c0966f4
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 19, 2023
1a06fb0
lineage working with new parser
siddiquebagwan-gslab Jul 19, 2023
965366d
CLL
siddiquebagwan-gslab Jul 19, 2023
b70ebb4
CLL
siddiquebagwan-gslab Jul 20, 2023
9720834
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 20, 2023
dce60e7
lint fix
siddiquebagwan-gslab Jul 20, 2023
be020a6
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 24, 2023
f785f25
test case
siddiquebagwan-gslab Jul 24, 2023
0c9a275
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 25, 2023
c6af421
test case
siddiquebagwan-gslab Jul 25, 2023
8942921
lint fix
siddiquebagwan-gslab Jul 25, 2023
b774909
Merge branch 'master' into master+tableau-cll
siddiquebagwan Jul 25, 2023
3c53f5f
Merge branch 'master' into master+tableau-cll
siddiquebagwan-gslab Jul 26, 2023
fd561aa
review comments
siddiquebagwan-gslab Jul 26, 2023
bb66232
Merge branch 'master+tableau-cll' of github.com:mohdsiddique/datahub …
siddiquebagwan-gslab Jul 26, 2023
e6dddfc
Merge branch 'master' into master+tableau-cll
siddiquebagwan Jul 26, 2023
aca19cd
resolve merge conflict
siddiquebagwan-gslab Jul 31, 2023
81b98f4
Merge branch 'master+tableau-cll' of github.com:acryldata/datahub-for…
siddiquebagwan-gslab Jul 31, 2023
4855380
Merge branch 'master+tableau-cll' of github.com:mohdsiddique/datahub …
siddiquebagwan-gslab Jul 31, 2023
96c91c1
Merge branch 'master' into master+tableau-cll
hsheth2 Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 216 additions & 54 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
Union,
cast,
)

import dateutil.parser as dp
import tableauserverclient as TSC
from pydantic import root_validator, validator
from pydantic.fields import Field
from requests.adapters import ConnectionError
from sqllineage.runner import LineageRunner
from tableauserverclient import (
PersonalAccessTokenAuth,
Server,
Expand Down Expand Up @@ -71,8 +81,11 @@
dashboard_graphql_query,
database_tables_graphql_query,
embedded_datasource_graphql_query,
get_overridden_info,
get_unique_custom_sql,
make_fine_grained_lineage_class,
make_table_urn,
make_upstream_class,
published_datasource_graphql_query,
query_metadata,
sheet_graphql_query,
Expand Down Expand Up @@ -120,10 +133,15 @@
OwnershipClass,
OwnershipTypeClass,
SubTypesClass,
UpstreamClass,
ViewPropertiesClass,
)
from datahub.utilities import config_clean
from datahub.utilities.sqlglot_lineage import (
ColumnLineageInfo,
SchemaResolver,
SqlParsingResult,
sqlglot_lineage,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -871,14 +889,14 @@ def _create_upstream_table_lineage(
f"A total of {len(upstream_tables)} upstream table edges found for datasource {datasource[tableau_constant.ID]}"
)

if datasource.get(tableau_constant.FIELDS):
datasource_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datasource[tableau_constant.ID],
platform_instance=self.config.platform_instance,
env=self.config.env,
)
datasource_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datasource[tableau_constant.ID],
platform_instance=self.config.platform_instance,
env=self.config.env,
)

if datasource.get(tableau_constant.FIELDS):
if self.config.extract_column_level_lineage:
# Find fine grained lineage for datasource column to datasource column edge,
# upstream columns may be from same datasource
Expand Down Expand Up @@ -1140,6 +1158,57 @@ def get_upstream_fields_of_field_in_datasource(self, datasource, datasource_urn)
)
return fine_grained_lineages

def get_upstream_fields_from_custom_sql(
self, datasource: dict, datasource_urn: str
) -> List[FineGrainedLineage]:
fine_grained_lineages: List[FineGrainedLineage] = []

parsed_result = self.parse_custom_sql(
datasource=datasource,
datasource_urn=datasource_urn,
env=self.config.env,
platform=self.platform,
platform_instance=self.config.platform_instance,
func_overridden_info=None, # Here we don't want to override any information from configuration
)

if parsed_result is None:
logger.info(
f"Failed to extract column level lineage from datasource {datasource_urn}"
)
return fine_grained_lineages

cll: List[ColumnLineageInfo] = (
parsed_result.column_lineage
if parsed_result.column_lineage is not None
else []
)
for cll_info in cll:
downstream = (
[
builder.make_schema_field_urn(
datasource_urn, cll_info.downstream.column
)
]
if cll_info.downstream is not None
and cll_info.downstream.column is not None
else []
)
upstreams = [
builder.make_schema_field_urn(column_ref.table, column_ref.column)
for column_ref in cll_info.upstreams
]
fine_grained_lineages.append(
FineGrainedLineage(
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=downstream,
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=upstreams,
)
)

return fine_grained_lineages

def get_transform_operation(self, field):
field_type = field[tableau_constant.TYPE_NAME]
if field_type in (
Expand Down Expand Up @@ -1176,6 +1245,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
platform_instance=self.config.platform_instance,
env=self.config.env,
)

dataset_snapshot = DatasetSnapshot(
urn=csql_urn,
aspects=[self.get_data_platform_instance()],
Expand Down Expand Up @@ -1223,14 +1293,20 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
csql_urn, tables, datasource
)
elif self.config.extract_lineage_from_unsupported_custom_sql_queries:
logger.debug("Extracting TLL & CLL from custom sql")
# custom sql tables may contain unsupported sql, causing incomplete lineage
# we extract the lineage from the raw queries
yield from self._create_lineage_from_unsupported_csql(
csql_urn, csql
)

# Schema Metadata
columns = csql.get(tableau_constant.COLUMNS, [])
# if condition is needed as graphQL return "cloumns": None
columns: List[Dict[Any, Any]] = (
cast(List[Dict[Any, Any]], csql.get(tableau_constant.COLUMNS))
if tableau_constant.COLUMNS in csql
and csql.get(tableau_constant.COLUMNS) is not None
else []
)
schema_metadata = self.get_schema_metadata_for_custom_sql(columns)
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)
Expand Down Expand Up @@ -1447,52 +1523,138 @@ def _create_lineage_to_upstream_tables(
aspect=upstream_lineage,
)

def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict
) -> Iterable[MetadataWorkUnit]:
database = csql.get(tableau_constant.DATABASE) or {}
def parse_custom_sql(
self,
datasource: dict,
datasource_urn: str,
platform: str,
env: str,
platform_instance: Optional[str],
func_overridden_info: Optional[
Callable[
[
str,
Optional[str],
Optional[Dict[str, str]],
Optional[TableauLineageOverrides],
],
Tuple[Optional[str], Optional[str], str, str],
]
],
) -> Optional["SqlParsingResult"]:

database_info = datasource.get(tableau_constant.DATABASE) or {}

if datasource.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False):
logger.debug(f"datasource {datasource_urn} is not created from custom sql")
return None

if (
csql.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL, False)
and tableau_constant.NAME in database
and tableau_constant.CONNECTION_TYPE in database
tableau_constant.NAME not in database_info
or tableau_constant.CONNECTION_TYPE not in database_info
):
upstream_tables = []
query = csql.get(tableau_constant.QUERY)
parser = LineageRunner(query)

try:
for table in parser.source_tables:
split_table = str(table).split(".")
if len(split_table) == 2:
datset = make_table_urn(
env=self.config.env,
upstream_db=database.get(tableau_constant.NAME),
connection_type=database.get(
tableau_constant.CONNECTION_TYPE, ""
),
schema=split_table[0],
full_name=split_table[1],
platform_instance_map=self.config.platform_instance_map,
lineage_overrides=self.config.lineage_overrides,
)
upstream_tables.append(
UpstreamClass(
type=DatasetLineageType.TRANSFORMED, dataset=datset
)
)
except Exception as e:
self.report.report_warning(
key="csql-lineage",
reason=f"Unable to retrieve lineage from query. "
f"Query: {query} "
f"Reason: {str(e)} ",
logger.debug(
f"database information is missing from datasource {datasource_urn}"
)
return None

query = datasource.get(tableau_constant.QUERY)
if query is None:
logger.debug(
f"raw sql query is not available for datasource {datasource_urn}"
)
return None

logger.debug(f"Parsing sql={query}")

upstream_db = database_info.get(tableau_constant.NAME)

if func_overridden_info is not None:
# Override the information as per configuration
upstream_db, platform_instance, platform, _ = func_overridden_info(
database_info[tableau_constant.CONNECTION_TYPE],
database_info.get(tableau_constant.NAME),
self.config.platform_instance_map,
self.config.lineage_overrides,
)

parsed_result: Optional["SqlParsingResult"] = None
try:
schema_resolver = (
self.ctx.graph._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
)
upstream_lineage = UpstreamLineage(upstreams=upstream_tables)
yield self.get_metadata_change_proposal(
csql_urn,
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
if self.ctx.graph is not None
else SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)
)

if schema_resolver.graph is None:
logger.warning(
"Column Level Lineage extraction would not work as DataHub graph client is None."
)

parsed_result = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=upstream_db,
)
except Exception as e:
self.report.report_warning(
key="csql-lineage",
reason=f"Unable to retrieve lineage from query. "
f"Query: {query} "
f"Reason: {str(e)} ",
)

return parsed_result

def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict
) -> Iterable[MetadataWorkUnit]:

parsed_result = self.parse_custom_sql(
datasource=csql,
datasource_urn=csql_urn,
env=self.config.env,
platform=self.platform,
platform_instance=self.config.platform_instance,
func_overridden_info=get_overridden_info,
)

if parsed_result is None:
logger.info(
f"Failed to extract table level lineage for datasource {csql_urn}"
)
return

upstream_tables = make_upstream_class(parsed_result)

logger.debug(f"Upstream tables = {upstream_tables}")

fine_grained_lineages: List[FineGrainedLineage] = []
if self.config.extract_column_level_lineage:
logger.info("Extracting CLL from custom sql")
fine_grained_lineages = make_fine_grained_lineage_class(
parsed_result, csql_urn
)

upstream_lineage = UpstreamLineage(
upstreams=upstream_tables,
fineGrainedLineages=fine_grained_lineages,
)

yield self.get_metadata_change_proposal(
csql_urn,
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
aspect=upstream_lineage,
)

def _get_schema_metadata_for_datasource(
self, datasource_fields: List[dict]
Expand Down
Loading
Loading