From 4fb4d2aa85f4790753f4c2b895990fc5880f1c4f Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 20 Nov 2023 15:21:27 +0100 Subject: [PATCH] Removing unneeded view lineage parsing as it is now part of sql common Fixing empty lineage --- .../datahub/emitter/sql_parsing_builder.py | 3 + .../datahub/ingestion/source/sql/teradata.py | 64 +------------------ 2 files changed, 6 insertions(+), 61 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py index cedaa4fbbd7f6..ea5ebf705707a 100644 --- a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py +++ b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py @@ -195,6 +195,9 @@ def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: upstreams.append(edge.gen_upstream_aspect()) fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects()) + if not upstreams: + continue + upstream_lineage = UpstreamLineageClass( upstreams=sorted(upstreams, key=lambda x: x.dataset), fineGrainedLineages=sorted( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index ce7dafdd30d7b..429fb0c74515d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -53,14 +53,12 @@ from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport -from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass +from datahub.metadata._schema_classes import SchemaMetadataClass from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BytesTypeClass, TimeTypeClass, ) -from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage -from datahub.utilities.urns.dataset_urn import DatasetUrn logger: logging.Logger = logging.getLogger(__name__) @@ -568,8 +566,6 @@ class TeradataSource(TwoTierSQLAlchemySource): _tables_cache: MutableMapping[str, List[TeradataTable]] = defaultdict(list) - _view_definition_cache: MutableMapping[str, str] - def __init__(self, config: TeradataConfig, ctx: PipelineContext): super().__init__(config, ctx, "teradata") @@ -587,11 +583,6 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): self.schema_resolver = self._init_schema_resolver() - if self.config.use_file_backed_cache: - self._view_definition_cache = FileBackedDict[str]() - else: - self._view_definition_cache = {} - if self.config.include_tables or self.config.include_views: self.cache_tables_and_views() logger.info(f"Found {len(self._tables_cache)} tables and views") @@ -785,24 +776,6 @@ def cached_loop_views( # noqa: C901 ) yield from super().loop_views(inspector, schema, sql_config) - def get_view_lineage(self, urns: Set[str]) -> Iterable[MetadataWorkUnit]: - for key in self._view_definition_cache.keys(): - view_definition = self._view_definition_cache[key] - dataset_urn = DatasetUrn.create_from_string(key) - - db_name: Optional[str] = None - # We need to get the default db from the dataset urn otherwise the builder generates the wrong urns - if "." in dataset_urn.get_dataset_name(): - db_name = dataset_urn.get_dataset_name().split(".", 1)[0] - - self.report.num_view_ddl_parsed += 1 - if self.report.num_view_ddl_parsed % 1000 == 0: - logger.info(f"Parsed {self.report.num_view_ddl_parsed} view ddl") - - yield from self.gen_lineage_from_query( - query=view_definition, default_database=db_name, view_urn=key, urns=urns - ) - def cache_tables_and_views(self) -> None: engine = self.get_metadata_engine() for entry in engine.execute(self.TABLES_AND_VIEWS_QUERY): @@ -829,6 +802,7 @@ def get_audit_log_mcps(self, urns: Set[str]) -> Iterable[MetadataWorkUnit]: self.report.num_queries_parsed += 1 if self.report.num_queries_parsed % 1000 == 0: logger.info(f"Parsed {self.report.num_queries_parsed} queries") + yield from self.gen_lineage_from_query( query=entry.query_text, default_database=entry.default_database, @@ -893,47 +867,15 @@ def get_metadata_engine(self) -> Engine: def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: # Add all schemas to the schema resolver + # Sql parser operates on lowercase urns so we need to lowercase the urns for wu in auto_lowercase_urns(super().get_workunits_internal()): urn = wu.get_urn() schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) if schema_metadata: self.schema_resolver.add_schema_metadata(urn, schema_metadata) - view_properties = wu.get_aspect_of_type(ViewPropertiesClass) - if view_properties and self.config.include_view_lineage: - self._view_definition_cache[urn] = view_properties.viewLogic yield wu - # Sql parser operates on lowercase urns so we need to lowercase the urns urns = self.schema_resolver.get_urns() - if self.config.include_view_lineage: - if not self.config.include_views and self.graph is not None: - d = {} - - entries = self.graph.bulk_fetch_view_definitions( - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) - - for urn, view_definition in entries: - self._view_definition_cache[urn] = view_definition - d[urn] = view_definition - - logger.info( - f"Loaded {len(self._view_definition_cache)} view definitions." - ) - logger.info( - f"Sample view urns: {list(self._view_definition_cache.keys())[:10]}" - ) - elif not self.config.include_views: - self.report.report_failure( - "view_lineage", "Missing DataHubGraph to read view definitions." - ) - - self.report.report_ingestion_stage_start("view lineage extraction") - yield from self.get_view_lineage(urns=urns) - yield from self.builder._gen_lineage_workunits() - if self.config.include_table_lineage or self.config.include_usage_statistics: self.report.report_ingestion_stage_start("audit log extraction") yield from self.get_audit_log_mcps(urns=urns)