Skip to content

Commit

Permalink
Removing unneeded view lineage parsing as it is now part of sql common
Browse files Browse the repository at this point in the history
Fixing empty lineage
  • Loading branch information
treff7es committed Nov 20, 2023
1 parent 3833ca9 commit 4fb4d2a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 61 deletions.
3 changes: 3 additions & 0 deletions metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
upstreams.append(edge.gen_upstream_aspect())
fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects())

if not upstreams:
continue

upstream_lineage = UpstreamLineageClass(
upstreams=sorted(upstreams, key=lambda x: x.dataset),
fineGrainedLineages=sorted(
Expand Down
64 changes: 3 additions & 61 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass
from datahub.metadata._schema_classes import SchemaMetadataClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
)
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -568,8 +566,6 @@ class TeradataSource(TwoTierSQLAlchemySource):

_tables_cache: MutableMapping[str, List[TeradataTable]] = defaultdict(list)

_view_definition_cache: MutableMapping[str, str]

def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata")

Expand All @@ -587,11 +583,6 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):

self.schema_resolver = self._init_schema_resolver()

if self.config.use_file_backed_cache:
self._view_definition_cache = FileBackedDict[str]()
else:
self._view_definition_cache = {}

if self.config.include_tables or self.config.include_views:
self.cache_tables_and_views()
logger.info(f"Found {len(self._tables_cache)} tables and views")
Expand Down Expand Up @@ -785,24 +776,6 @@ def cached_loop_views( # noqa: C901
)
yield from super().loop_views(inspector, schema, sql_config)

def get_view_lineage(self, urns: Set[str]) -> Iterable[MetadataWorkUnit]:
for key in self._view_definition_cache.keys():
view_definition = self._view_definition_cache[key]
dataset_urn = DatasetUrn.create_from_string(key)

db_name: Optional[str] = None
# We need to get the default db from the dataset urn otherwise the builder generates the wrong urns
if "." in dataset_urn.get_dataset_name():
db_name = dataset_urn.get_dataset_name().split(".", 1)[0]

self.report.num_view_ddl_parsed += 1
if self.report.num_view_ddl_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_view_ddl_parsed} view ddl")

yield from self.gen_lineage_from_query(
query=view_definition, default_database=db_name, view_urn=key, urns=urns
)

def cache_tables_and_views(self) -> None:
engine = self.get_metadata_engine()
for entry in engine.execute(self.TABLES_AND_VIEWS_QUERY):
Expand All @@ -829,6 +802,7 @@ def get_audit_log_mcps(self, urns: Set[str]) -> Iterable[MetadataWorkUnit]:
self.report.num_queries_parsed += 1
if self.report.num_queries_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} queries")

yield from self.gen_lineage_from_query(
query=entry.query_text,
default_database=entry.default_database,
Expand Down Expand Up @@ -893,47 +867,15 @@ def get_metadata_engine(self) -> Engine:

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
# Add all schemas to the schema resolver
# Sql parser operates on lowercase urns so we need to lowercase the urns
for wu in auto_lowercase_urns(super().get_workunits_internal()):
urn = wu.get_urn()
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
if schema_metadata:
self.schema_resolver.add_schema_metadata(urn, schema_metadata)
view_properties = wu.get_aspect_of_type(ViewPropertiesClass)
if view_properties and self.config.include_view_lineage:
self._view_definition_cache[urn] = view_properties.viewLogic
yield wu

# Sql parser operates on lowercase urns so we need to lowercase the urns
urns = self.schema_resolver.get_urns()
if self.config.include_view_lineage:
if not self.config.include_views and self.graph is not None:
d = {}

entries = self.graph.bulk_fetch_view_definitions(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)

for urn, view_definition in entries:
self._view_definition_cache[urn] = view_definition
d[urn] = view_definition

logger.info(
f"Loaded {len(self._view_definition_cache)} view definitions."
)
logger.info(
f"Sample view urns: {list(self._view_definition_cache.keys())[:10]}"
)
elif not self.config.include_views:
self.report.report_failure(
"view_lineage", "Missing DataHubGraph to read view definitions."
)

self.report.report_ingestion_stage_start("view lineage extraction")
yield from self.get_view_lineage(urns=urns)
yield from self.builder._gen_lineage_workunits()

if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps(urns=urns)
Expand Down

0 comments on commit 4fb4d2a

Please sign in to comment.