Skip to content

Commit

Permalink
merge related changes
Browse files Browse the repository at this point in the history
From this fa0c43c
  • Loading branch information
mayurinehate committed Sep 5, 2023
1 parent c77a9ab commit 6a2a3d4
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.lineage_extractor.get_lineage_workunits(
[p.id for p in projects],
self.sql_parser_schema_resolver,
self.view_definition_ids,
self.view_refs_by_project,
self.view_definitions,
self.table_refs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def get_lineage_workunits(
self,
projects: List[str],
sql_parser_schema_resolver: SchemaResolver,
view_definition_ids: Dict[str, Dict[str, str]],
view_refs_by_project: Dict[str, Set[str]],
view_definitions: FileBackedDict[str],
table_refs: Set[str],
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -265,7 +265,7 @@ def get_lineage_workunits(
for project in projects:
self.populate_view_lineage_with_sql_parsing(
view_lineage,
view_definition_ids[project],
view_refs_by_project[project],
view_definitions,
sql_parser_schema_resolver,
project,
Expand Down Expand Up @@ -348,13 +348,13 @@ def generate_lineage(
def populate_view_lineage_with_sql_parsing(
self,
view_lineage: Dict[str, Set[LineageEdge]],
view_definition_ids: Dict[str, str],
view_refs: Set[str],
view_definitions: FileBackedDict[str],
sql_parser_schema_resolver: SchemaResolver,
default_project: str,
) -> None:
for view, view_definition_id in view_definition_ids.items():
view_definition = view_definitions[view_definition_id]
for view in view_refs:
view_definition = view_definitions[view]
raw_view_lineage = sqlglot_lineage(
view_definition,
schema_resolver=sql_parser_schema_resolver,
Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/tests/unit/test_bigquery_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

import datahub.emitter.mce_builder as builder
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigQueryTableRef,
QueryEvent,
Expand Down Expand Up @@ -81,7 +82,9 @@ def lineage_entries() -> List[QueryEvent]:
def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None:
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report, lambda x: "")
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
config, report, lambda x: builder.make_dataset_urn("bigquery", str(x))
)

bq_table = BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
Expand All @@ -104,7 +107,9 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None:
def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None:
config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False)
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report, lambda x: "")
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
config, report, lambda x: builder.make_dataset_urn("bigquery", str(x))
)

bq_table = BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
Expand All @@ -119,7 +124,6 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None:
bq_table=bq_table,
bq_table_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)",
lineage_metadata=lineage_map,
platform="bigquery",
)
assert upstream_lineage
assert len(upstream_lineage.upstreams) == 2
Expand Down

0 comments on commit 6a2a3d4

Please sign in to comment.