Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Sep 2, 2024
2 parents daa56e3 + a7fc7f5 commit 367ed2c
Show file tree
Hide file tree
Showing 36 changed files with 1,682 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
)
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
Expand All @@ -51,6 +55,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_report.ingestion_stage import QUERIES_EXTRACTION
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.utilities.registries.domain_registry import DomainRegistry

Expand Down Expand Up @@ -139,6 +144,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.lineage_extractor = BigqueryLineageExtractor(
config,
self.report,
schema_resolver=self.sql_parser_schema_resolver,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_lineage_run_skip_handler,
)
Expand Down Expand Up @@ -196,7 +202,9 @@ def test_connection(config_dict: dict) -> TestConnectionReport:

def _init_schema_resolver(self) -> SchemaResolver:
schema_resolution_required = (
self.config.lineage_parse_view_ddl or self.config.lineage_use_sql_parser
self.config.use_queries_v2
or self.config.lineage_parse_view_ddl
or self.config.lineage_use_sql_parser
)
schema_ingestion_enabled = (
self.config.include_schema_metadata
Expand Down Expand Up @@ -244,22 +252,54 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)
if self.config.use_queries_v2:
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.sql_parser_schema_resolver,
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

queries_extractor = BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
top_n_queries=self.config.usage.top_n_queries,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
)
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
)

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

def get_report(self) -> BigQueryV2Report:
return self.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ class BigQueryV2Config(
"enabled.",
)

use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from bigquery.",
)

@property
def have_table_data_read_permission(self) -> bool:
return self.use_tables_list_query_v2 or self.is_profiling_enabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BigQueryIdentifierConfig,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryQueriesExtractorReport,
BigQuerySchemaApiPerfReport,
)
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigQuerySchemaApi
Expand All @@ -25,7 +26,6 @@
from datahub.ingestion.source.bigquery_v2.queries_extractor import (
BigQueryQueriesExtractor,
BigQueryQueriesExtractorConfig,
BigQueryQueriesExtractorReport,
)

logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ class BigQueryProcessingPerfReport(Report):
usage_state_size: Optional[str] = None


@dataclass
class BigQueryQueriesExtractorReport(Report):
query_log_fetch_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_preprocessing_timer: PerfTimer = field(default_factory=PerfTimer)
audit_log_load_timer: PerfTimer = field(default_factory=PerfTimer)
sql_aggregator: Optional[SqlAggregatorReport] = None
num_queries_by_project: TopKDict[str, int] = field(default_factory=int_top_k_dict)

num_total_queries: int = 0
num_unique_queries: int = 0


@dataclass
class BigQueryV2Report(
ProfilingSqlReport,
Expand Down Expand Up @@ -143,10 +155,8 @@ class BigQueryV2Report(

snapshots_scanned: int = 0

num_view_definitions_parsed: int = 0
num_view_definitions_failed_parsing: int = 0
num_view_definitions_failed_column_parsing: int = 0
view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList)
# view lineage
sql_aggregator: Optional[SqlAggregatorReport] = None

read_reasons_stat: Counter[str] = field(default_factory=collections.Counter)
operation_types_stat: Counter[str] = field(default_factory=collections.Counter)
Expand All @@ -171,8 +181,7 @@ class BigQueryV2Report(
usage_end_time: Optional[datetime] = None
stateful_usage_ingestion_enabled: bool = False

# lineage/usage v2
sql_aggregator: Optional[SqlAggregatorReport] = None
queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ def lineage_capability_test(
report: BigQueryV2Report,
) -> CapabilityReport:
lineage_extractor = BigqueryLineageExtractor(
connection_conf, report, BigQueryIdentifierBuilder(connection_conf, report)
connection_conf,
report,
schema_resolver=SchemaResolver(platform="bigquery"),
identifiers=BigQueryIdentifierBuilder(connection_conf, report),
)
for project_id in project_ids:
try:
Expand Down
Loading

0 comments on commit 367ed2c

Please sign in to comment.