From 6a2a3d452437a42189fd7f93701a6c9c5ec0371d Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Tue, 5 Sep 2023 17:57:49 +0530 Subject: [PATCH] merge related changes From this https://github.com/datahub-project/datahub/commit/fa0c43c0313f6239f54879819ffc6c6dc04cbef5 --- .../datahub/ingestion/source/bigquery_v2/bigquery.py | 2 +- .../datahub/ingestion/source/bigquery_v2/lineage.py | 10 +++++----- metadata-ingestion/tests/unit/test_bigquery_lineage.py | 10 +++++++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 165a0eea106d1..ff7a47924626d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -541,7 +541,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from self.lineage_extractor.get_lineage_workunits( [p.id for p in projects], self.sql_parser_schema_resolver, - self.view_definition_ids, + self.view_refs_by_project, self.view_definitions, self.table_refs, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 08a3db2bf6503..98c8cbaf85eec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -253,7 +253,7 @@ def get_lineage_workunits( self, projects: List[str], sql_parser_schema_resolver: SchemaResolver, - view_definition_ids: Dict[str, Dict[str, str]], + view_refs_by_project: Dict[str, Set[str]], view_definitions: FileBackedDict[str], table_refs: Set[str], ) -> Iterable[MetadataWorkUnit]: @@ -265,7 +265,7 @@ def get_lineage_workunits( for project in projects: self.populate_view_lineage_with_sql_parsing( view_lineage, - view_definition_ids[project], + view_refs_by_project[project], view_definitions, sql_parser_schema_resolver, project, @@ -348,13 +348,13 @@ def generate_lineage( def populate_view_lineage_with_sql_parsing( self, view_lineage: Dict[str, Set[LineageEdge]], - view_definition_ids: Dict[str, str], + view_refs: Set[str], view_definitions: FileBackedDict[str], sql_parser_schema_resolver: SchemaResolver, default_project: str, ) -> None: - for view, view_definition_id in view_definition_ids.items(): - view_definition = view_definitions[view_definition_id] + for view in view_refs: + view_definition = view_definitions[view] raw_view_lineage = sqlglot_lineage( view_definition, schema_resolver=sql_parser_schema_resolver, diff --git a/metadata-ingestion/tests/unit/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_lineage.py index 566d6fc2cb0c3..e23494963e475 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_lineage.py @@ -3,6 +3,7 @@ import pytest +import datahub.emitter.mce_builder as builder from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BigQueryTableRef, QueryEvent, @@ -81,7 +82,9 @@ def lineage_entries() -> List[QueryEvent]: def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config() report = BigQueryV2Report() - extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report, lambda x: "") + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, report, lambda x: builder.make_dataset_urn("bigquery", str(x)) + ) bq_table = BigQueryTableRef.from_string_name( "projects/my_project/datasets/my_dataset/tables/my_table" @@ -104,7 +107,9 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False) report = BigQueryV2Report() - extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report, lambda x: "") + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, report, lambda x: builder.make_dataset_urn("bigquery", str(x)) + ) bq_table = BigQueryTableRef.from_string_name( "projects/my_project/datasets/my_dataset/tables/my_table" @@ -119,7 +124,6 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: bq_table=bq_table, bq_table_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)", lineage_metadata=lineage_map, - platform="bigquery", ) assert upstream_lineage assert len(upstream_lineage.upstreams) == 2