diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 1107a54a1896b..ae49a4ba17c11 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -4,7 +4,7 @@ import re import traceback from collections import defaultdict -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast from google.cloud import bigquery @@ -44,21 +44,17 @@ from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, - BigQueryDataDictionary, BigqueryDataset, BigqueryProject, + BigQuerySchemaApi, BigqueryTable, BigqueryView, ) from datahub.ingestion.source.bigquery_v2.common import ( BQ_EXTERNAL_DATASET_URL_TEMPLATE, BQ_EXTERNAL_TABLE_URL_TEMPLATE, - get_bigquery_client, -) -from datahub.ingestion.source.bigquery_v2.lineage import ( - BigqueryLineageExtractor, - make_lineage_edges_from_parsing_result, ) +from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor from datahub.ingestion.source.common.subtypes import ( @@ -83,7 +79,6 @@ StatefulIngestionSourceBase, ) from datahub.ingestion.source_report.ingestion_stage import ( - LINEAGE_EXTRACTION, METADATA_EXTRACTION, PROFILING, ) @@ -94,7 +89,6 @@ ) from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( DatasetProperties, - UpstreamLineage, ViewProperties, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import ( @@ -113,11 +107,9 @@ ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, - DatasetLineageTypeClass, GlobalTagsClass, TagAssociationClass, ) -from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.hive_schema_to_avro import ( HiveColumnToAvroConverter, @@ -126,7 +118,7 @@ from datahub.utilities.mapping import Constants from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.registries.domain_registry import DomainRegistry -from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage +from datahub.utilities.sqlglot_lineage import SchemaResolver logger: logging.Logger = logging.getLogger(__name__) @@ -228,11 +220,15 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase) - self.redundant_lineage_run_skip_handler: Optional[ + self.bigquery_data_dictionary = BigQuerySchemaApi( + self.report.schema_api_perf, self.config.get_bigquery_client() + ) + + redundant_lineage_run_skip_handler: Optional[ RedundantLineageRunSkipHandler ] = None if self.config.enable_stateful_lineage_ingestion: - self.redundant_lineage_run_skip_handler = RedundantLineageRunSkipHandler( + redundant_lineage_run_skip_handler = RedundantLineageRunSkipHandler( source=self, config=self.config, pipeline_name=self.ctx.pipeline_name, @@ -241,7 +237,10 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): # For database, schema, tables, views, etc self.lineage_extractor = BigqueryLineageExtractor( - config, self.report, self.redundant_lineage_run_skip_handler + config, + self.report, + dataset_urn_builder=self.gen_dataset_urn_from_ref, + redundant_run_skip_handler=redundant_lineage_run_skip_handler, ) redundant_usage_run_skip_handler: Optional[RedundantUsageRunSkipHandler] = None @@ -289,6 +288,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): self.sql_parser_schema_resolver = SchemaResolver( platform=self.platform, env=self.config.env ) + self.add_config_to_report() atexit.register(cleanup, config) @@ -314,18 +314,20 @@ def metadata_read_capability_test( for project_id in project_ids: try: logger.info((f"Metadata read capability test for project {project_id}")) - client: bigquery.Client = get_bigquery_client(config) + client: bigquery.Client = config.get_bigquery_client() assert client - result = BigQueryDataDictionary.get_datasets_for_project_id( - client, project_id, 10 + bigquery_data_dictionary = BigQuerySchemaApi( + BigQueryV2Report().schema_api_perf, client + ) + result = bigquery_data_dictionary.get_datasets_for_project_id( + project_id, 10 ) if len(result) == 0: return CapabilityReport( capable=False, failure_reason=f"Dataset query returned empty dataset. It is either empty or no dataset in project {project_id}", ) - tables = BigQueryDataDictionary.get_tables_for_dataset( - conn=client, + tables = bigquery_data_dictionary.get_tables_for_dataset( project_id=project_id, dataset_name=result[0].name, tables={}, @@ -351,7 +353,9 @@ def lineage_capability_test( project_ids: List[str], report: BigQueryV2Report, ) -> CapabilityReport: - lineage_extractor = BigqueryLineageExtractor(connection_conf, report) + lineage_extractor = BigqueryLineageExtractor( + connection_conf, report, lambda ref: "" + ) for project_id in project_ids: try: logger.info(f"Lineage capability test for project {project_id}") @@ -397,7 +401,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport: try: connection_conf = BigQueryV2Config.parse_obj_allow_extras(config_dict) - client: bigquery.Client = get_bigquery_client(connection_conf) + client: bigquery.Client = connection_conf.get_bigquery_client() assert client test_report.basic_connectivity = BigqueryV2Source.connectivity_test(client) @@ -519,54 +523,30 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - conn: bigquery.Client = get_bigquery_client(self.config) - - projects = self._get_projects(conn) + projects = self._get_projects() if not projects: return for project_id in projects: self.report.set_ingestion_stage(project_id.id, METADATA_EXTRACTION) logger.info(f"Processing project: {project_id.id}") - yield from self._process_project(conn, project_id) + yield from self._process_project(project_id) if self.config.include_usage_statistics: yield from self.usage_extractor.get_usage_workunits( [p.id for p in projects], self.table_refs ) - if self._should_ingest_lineage(): - for project in projects: - self.report.set_ingestion_stage(project.id, LINEAGE_EXTRACTION) - yield from self.generate_lineage(project.id) - - if self.redundant_lineage_run_skip_handler: - # Update the checkpoint state for this run. - self.redundant_lineage_run_skip_handler.update_state( - self.config.start_time, self.config.end_time - ) - - def _should_ingest_lineage(self) -> bool: - if not self.config.include_table_lineage: - return False - - if ( - self.redundant_lineage_run_skip_handler - and self.redundant_lineage_run_skip_handler.should_skip_this_run( - cur_start_time=self.config.start_time, - cur_end_time=self.config.end_time, + if self.config.include_table_lineage: + yield from self.lineage_extractor.get_lineage_workunits( + [p.id for p in projects], + self.sql_parser_schema_resolver, + self.view_refs_by_project, + self.view_definitions, + self.table_refs, ) - ): - # Skip this run - self.report.report_warning( - "lineage-extraction", - "Skip this run as there was already a run for current ingestion window.", - ) - return False - - return True - def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]: + def _get_projects(self) -> List[BigqueryProject]: logger.info("Getting projects") if self.config.project_ids or self.config.project_id: project_ids = self.config.project_ids or [self.config.project_id] # type: ignore @@ -575,15 +555,10 @@ def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]: for project_id in project_ids ] else: - return list(self._get_project_list(conn)) - - def _get_project_list(self, conn: bigquery.Client) -> Iterable[BigqueryProject]: - try: - projects = BigQueryDataDictionary.get_projects(conn) - except Exception as e: - logger.error(f"Error getting projects. {e}", exc_info=True) - projects = [] + return list(self._query_project_list()) + def _query_project_list(self) -> Iterable[BigqueryProject]: + projects = self.bigquery_data_dictionary.get_projects() if not projects: # Report failure on exception and if empty list is returned self.report.report_failure( "metadata-extraction", @@ -600,7 +575,7 @@ def _get_project_list(self, conn: bigquery.Client) -> Iterable[BigqueryProject]: self.report.report_dropped(project.id) def _process_project( - self, conn: bigquery.Client, bigquery_project: BigqueryProject + self, bigquery_project: BigqueryProject ) -> Iterable[MetadataWorkUnit]: db_tables: Dict[str, List[BigqueryTable]] = {} db_views: Dict[str, List[BigqueryView]] = {} @@ -611,7 +586,7 @@ def _process_project( try: bigquery_project.datasets = ( - BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id) + self.bigquery_data_dictionary.get_datasets_for_project_id(project_id) ) except Exception as e: error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}" @@ -645,7 +620,7 @@ def _process_project( try: # db_tables and db_views are populated in the this method yield from self._process_schema( - conn, project_id, bigquery_dataset, db_tables, db_views + project_id, bigquery_dataset, db_tables, db_views ) except Exception as e: @@ -670,73 +645,8 @@ def _process_project( tables=db_tables, ) - def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: - logger.info(f"Generate lineage for {project_id}") - lineage = self.lineage_extractor.calculate_lineage_for_project( - project_id, - sql_parser_schema_resolver=self.sql_parser_schema_resolver, - ) - - if self.config.lineage_parse_view_ddl: - for view in self.view_refs_by_project[project_id]: - view_definition = self.view_definitions[view] - raw_view_lineage = sqlglot_lineage( - view_definition, - schema_resolver=self.sql_parser_schema_resolver, - default_db=project_id, - ) - if raw_view_lineage.debug_info.table_error: - logger.debug( - f"Failed to parse lineage for view {view}: {raw_view_lineage.debug_info.table_error}" - ) - self.report.num_view_definitions_failed_parsing += 1 - self.report.view_definitions_parsing_failures.append( - f"Table-level sql parsing error for view {view}: {raw_view_lineage.debug_info.table_error}" - ) - continue - elif raw_view_lineage.debug_info.column_error: - self.report.num_view_definitions_failed_column_parsing += 1 - self.report.view_definitions_parsing_failures.append( - f"Column-level sql parsing error for view {view}: {raw_view_lineage.debug_info.column_error}" - ) - else: - self.report.num_view_definitions_parsed += 1 - - # For views, we override the upstreams obtained by parsing audit logs - # as they may contain indirectly referenced tables. - ts = datetime.now(timezone.utc) - lineage[view] = set( - make_lineage_edges_from_parsing_result( - raw_view_lineage, - audit_stamp=ts, - lineage_type=DatasetLineageTypeClass.VIEW, - ) - ) - - for lineage_key in lineage.keys(): - if lineage_key not in self.table_refs: - continue - - table_ref = BigQueryTableRef.from_string_name(lineage_key) - dataset_urn = self.gen_dataset_urn( - project_id=table_ref.table_identifier.project_id, - dataset_name=table_ref.table_identifier.dataset, - table=table_ref.table_identifier.get_table_display_name(), - ) - - lineage_info = self.lineage_extractor.get_lineage_for_table( - bq_table=table_ref, - bq_table_urn=dataset_urn, - platform=self.platform, - lineage_metadata=lineage, - ) - - if lineage_info: - yield from self.gen_lineage(dataset_urn, lineage_info) - def _process_schema( self, - conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset, db_tables: Dict[str, List[BigqueryTable]], @@ -750,8 +660,7 @@ def _process_schema( columns = None if self.config.include_tables or self.config.include_views: - columns = BigQueryDataDictionary.get_columns_for_dataset( - conn, + columns = self.bigquery_data_dictionary.get_columns_for_dataset( project_id=project_id, dataset_name=dataset_name, column_limit=self.config.column_limit, @@ -760,7 +669,7 @@ def _process_schema( if self.config.include_tables: db_tables[dataset_name] = list( - self.get_tables_for_dataset(conn, project_id, dataset_name) + self.get_tables_for_dataset(project_id, dataset_name) ) for table in db_tables[dataset_name]: @@ -773,7 +682,9 @@ def _process_schema( ) elif self.config.include_table_lineage or self.config.include_usage_statistics: # Need table_refs to calculate lineage and usage - for table_item in conn.list_tables(f"{project_id}.{dataset_name}"): + for table_item in self.bigquery_data_dictionary.list_tables( + dataset_name, project_id + ): identifier = BigqueryTableIdentifier( project_id=project_id, dataset=dataset_name, @@ -793,8 +704,8 @@ def _process_schema( if self.config.include_views: db_views[dataset_name] = list( - BigQueryDataDictionary.get_views_for_dataset( - conn, project_id, dataset_name, self.config.is_profiling_enabled() + self.bigquery_data_dictionary.get_views_for_dataset( + project_id, dataset_name, self.config.is_profiling_enabled() ) ) @@ -1065,39 +976,6 @@ def gen_dataset_workunits( domain_config=self.config.domain, ) - def gen_lineage( - self, - dataset_urn: str, - upstream_lineage: Optional[UpstreamLineage] = None, - ) -> Iterable[MetadataWorkUnit]: - if upstream_lineage is None: - return - - if upstream_lineage is not None: - if self.config.incremental_lineage: - patch_builder: DatasetPatchBuilder = DatasetPatchBuilder( - urn=dataset_urn - ) - for upstream in upstream_lineage.upstreams: - patch_builder.add_upstream_lineage(upstream) - - yield from [ - MetadataWorkUnit( - id=f"upstreamLineage-for-{dataset_urn}", - mcp_raw=mcp, - ) - for mcp in patch_builder.build() - ] - else: - if not self.config.extract_column_lineage: - upstream_lineage.fineGrainedLineages = None - - yield from [ - MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=upstream_lineage - ).as_workunit() - ] - def gen_tags_aspect_workunit( self, dataset_urn: str, tags_to_add: List[str] ) -> MetadataWorkUnit: @@ -1212,7 +1090,6 @@ def get_report(self) -> BigQueryV2Report: def get_tables_for_dataset( self, - conn: bigquery.Client, project_id: str, dataset_name: str, ) -> Iterable[BigqueryTable]: @@ -1231,14 +1108,15 @@ def get_tables_for_dataset( # We get the list of tables in the dataset to get core table properties and to be able to process the tables in batches # We collect only the latest shards from sharded tables (tables with _YYYYMMDD suffix) and ignore temporary tables - table_items = self.get_core_table_details(conn, dataset_name, project_id) + table_items = self.get_core_table_details( + dataset_name, project_id, self.config.temp_table_dataset_prefix + ) items_to_get: Dict[str, TableListItem] = {} for table_item in table_items.keys(): items_to_get[table_item] = table_items[table_item] if len(items_to_get) % max_batch_size == 0: - yield from BigQueryDataDictionary.get_tables_for_dataset( - conn, + yield from self.bigquery_data_dictionary.get_tables_for_dataset( project_id, dataset_name, items_to_get, @@ -1247,8 +1125,7 @@ def get_tables_for_dataset( items_to_get.clear() if items_to_get: - yield from BigQueryDataDictionary.get_tables_for_dataset( - conn, + yield from self.bigquery_data_dictionary.get_tables_for_dataset( project_id, dataset_name, items_to_get, @@ -1260,13 +1137,15 @@ def get_tables_for_dataset( ) def get_core_table_details( - self, conn: bigquery.Client, dataset_name: str, project_id: str + self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str ) -> Dict[str, TableListItem]: table_items: Dict[str, TableListItem] = {} # Dict to store sharded table and the last seen max shard id sharded_tables: Dict[str, TableListItem] = {} - for table in conn.list_tables(f"{project_id}.{dataset_name}"): + for table in self.bigquery_data_dictionary.list_tables( + dataset_name, project_id + ): table_identifier = BigqueryTableIdentifier( project_id=project_id, dataset=dataset_name, @@ -1303,9 +1182,7 @@ def get_core_table_details( if stored_shard < shard: sharded_tables[table_name] = table continue - elif str(table_identifier).startswith( - self.config.temp_table_dataset_prefix - ): + elif str(table_identifier).startswith(temp_table_dataset_prefix): logger.debug(f"Dropping temporary table {table_identifier.table}") self.report.report_dropped(table_identifier.raw_table_name()) continue diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index 0f9b37c93feaa..b0ac77201b415 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -13,48 +13,6 @@ get_first_missing_key_any, ) -BQ_FILTER_RULE_TEMPLATE = "BQ_FILTER_RULE_TEMPLATE" - -BQ_AUDIT_V2 = { - BQ_FILTER_RULE_TEMPLATE: """ -resource.type=("bigquery_project" OR "bigquery_dataset") -AND -timestamp >= "{start_time}" -AND -timestamp < "{end_time}" -AND protoPayload.serviceName="bigquery.googleapis.com" -AND -( - ( - protoPayload.methodName= - ( - "google.cloud.bigquery.v2.JobService.Query" - OR - "google.cloud.bigquery.v2.JobService.InsertJob" - ) - AND protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE" - AND NOT protoPayload.metadata.jobChange.job.jobStatus.errorResult:* - AND protoPayload.metadata.jobChange.job.jobConfig.queryConfig:* - AND - ( - ( - protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:* - AND NOT protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/.*/tables/__TABLES__|__TABLES_SUMMARY__|INFORMATION_SCHEMA.*" - ) - OR - ( - protoPayload.metadata.jobChange.job.jobConfig.queryConfig.destinationTable:* - ) - ) - ) - OR - protoPayload.metadata.tableDataRead.reason = "JOB" -) -""".strip( - "\t \n" - ), -} - AuditLogEntry = Any # BigQueryAuditMetadata is the v2 format in which audit logs are exported to BigQuery @@ -606,7 +564,6 @@ def from_query_event( query_event: QueryEvent, debug_include_full_payloads: bool = False, ) -> "ReadEvent": - readEvent = ReadEvent( actor_email=query_event.actor_email, timestamp=query_event.timestamp, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py new file mode 100644 index 0000000000000..03b12c61ee5c6 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py @@ -0,0 +1,139 @@ +import logging +from datetime import datetime +from typing import Callable, Iterable, List, Optional + +from google.cloud import bigquery +from google.cloud.logging_v2.client import Client as GCPLoggingClient +from ratelimiter import RateLimiter + +from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( + AuditLogEntry, + BigQueryAuditMetadata, +) +from datahub.ingestion.source.bigquery_v2.bigquery_report import ( + BigQueryAuditLogApiPerfReport, +) +from datahub.ingestion.source.bigquery_v2.common import ( + BQ_DATE_SHARD_FORMAT, + BQ_DATETIME_FORMAT, +) + +logger: logging.Logger = logging.getLogger(__name__) + + +# Api interfaces are separated based on functionality they provide +# rather than the underlying bigquery client that is used to +# provide the functionality. +class BigQueryAuditLogApi: + def __init__( + self, + report: BigQueryAuditLogApiPerfReport, + rate_limit: bool, + requests_per_min: int, + ) -> None: + self.report = report + self.rate_limit = rate_limit + self.requests_per_min = requests_per_min + + def get_exported_bigquery_audit_metadata( + self, + bigquery_client: bigquery.Client, + bigquery_audit_metadata_query_template: Callable[ + [ + str, # dataset: str + bool, # use_date_sharded_tables: bool + Optional[int], # limit: Optional[int] = None + ], + str, + ], + bigquery_audit_metadata_datasets: Optional[List[str]], + use_date_sharded_audit_log_tables: bool, + start_time: datetime, + end_time: datetime, + limit: Optional[int] = None, + ) -> Iterable[BigQueryAuditMetadata]: + if bigquery_audit_metadata_datasets is None: + return + + audit_start_time = start_time.strftime(BQ_DATETIME_FORMAT) + audit_start_date = start_time.strftime(BQ_DATE_SHARD_FORMAT) + + audit_end_time = end_time.strftime(BQ_DATETIME_FORMAT) + audit_end_date = end_time.strftime(BQ_DATE_SHARD_FORMAT) + + rate_limiter: Optional[RateLimiter] = None + if self.rate_limit: + rate_limiter = RateLimiter(max_calls=self.requests_per_min, period=60) + + with self.report.get_exported_log_entries as current_timer: + for dataset in bigquery_audit_metadata_datasets: + logger.info( + f"Start loading log entries from BigQueryAuditMetadata in {dataset}" + ) + + query = bigquery_audit_metadata_query_template( + dataset, + use_date_sharded_audit_log_tables, + limit, + ).format( + start_time=audit_start_time, + end_time=audit_end_time, + start_date=audit_start_date, + end_date=audit_end_date, + ) + + query_job = bigquery_client.query(query) + logger.info( + f"Finished loading log entries from BigQueryAuditMetadata in {dataset}" + ) + + for entry in query_job: + with current_timer.pause(): + if rate_limiter: + with rate_limiter: + yield entry + else: + yield entry + + def get_bigquery_log_entries_via_gcp_logging( + self, + client: GCPLoggingClient, + filter: str, + log_page_size: int, + limit: Optional[int] = None, + ) -> Iterable[AuditLogEntry]: + logger.debug(filter) + + list_entries: Iterable[AuditLogEntry] + rate_limiter: Optional[RateLimiter] = None + if self.rate_limit: + # client.list_entries is a generator, does api calls to GCP Logging when it runs out of entries and needs to fetch more from GCP Logging + # to properly ratelimit we multiply the page size by the number of requests per minute + rate_limiter = RateLimiter( + max_calls=self.requests_per_min * log_page_size, + period=60, + ) + + with self.report.list_log_entries as current_timer: + list_entries = client.list_entries( + filter_=filter, + page_size=log_page_size, + max_results=limit, + ) + + for i, entry in enumerate(list_entries): + if i % 1000 == 0: + logger.info( + f"Loaded {i} log entries from GCP Log for {client.project}" + ) + + with current_timer.pause(): + if rate_limiter: + with rate_limiter: + yield entry + else: + yield entry + + logger.info( + f"Finished loading log entries from GCP Log for {client.project}" + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 0f2082c5e53bf..3b06a4699c566 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -4,9 +4,11 @@ from typing import Any, Dict, List, Optional import pydantic -from pydantic import Field, PositiveInt, PrivateAttr, root_validator +from google.cloud import bigquery +from google.cloud.logging_v2.client import Client as GCPLoggingClient +from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator -from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( @@ -35,7 +37,52 @@ class BigQueryUsageConfig(BaseUsageConfig): ) +class BigQueryConnectionConfig(ConfigModel): + credential: Optional[BigQueryCredential] = Field( + default=None, description="BigQuery credential informations" + ) + + _credentials_path: Optional[str] = PrivateAttr(None) + + extra_client_options: Dict[str, Any] = Field( + default={}, + description="Additional options to pass to google.cloud.logging_v2.client.Client.", + ) + + project_on_behalf: Optional[str] = Field( + default=None, + description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.", + ) + + def __init__(self, **data: Any): + super().__init__(**data) + + if self.credential: + self._credentials_path = self.credential.create_credential_temp_file() + logger.debug( + f"Creating temporary credential file at {self._credentials_path}" + ) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path + + def get_bigquery_client(config) -> bigquery.Client: + client_options = config.extra_client_options + return bigquery.Client(config.project_on_behalf, **client_options) + + def make_gcp_logging_client( + self, project_id: Optional[str] = None + ) -> GCPLoggingClient: + # See https://github.com/googleapis/google-cloud-python/issues/2674 for + # why we disable gRPC here. + client_options = self.extra_client_options.copy() + client_options["_use_grpc"] = False + if project_id is not None: + return GCPLoggingClient(**client_options, project=project_id) + else: + return GCPLoggingClient(**client_options) + + class BigQueryV2Config( + BigQueryConnectionConfig, BigQueryBaseConfig, SQLCommonConfig, StatefulUsageConfigMixin, @@ -122,11 +169,6 @@ class BigQueryV2Config( ), ) - project_on_behalf: Optional[str] = Field( - default=None, - description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.", - ) - storage_project_id: None = Field(default=None, hidden_from_docs=True) lineage_use_sql_parser: bool = Field( @@ -180,14 +222,8 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: default=1000, description="The number of log item will be queried per page for lineage collection", ) - credential: Optional[BigQueryCredential] = Field( - description="BigQuery credential informations" - ) + # extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage. - extra_client_options: Dict[str, Any] = Field( - default={}, - description="Additional options to pass to google.cloud.logging_v2.client.Client.", - ) include_table_lineage: Optional[bool] = Field( default=True, description="Option to enable/disable lineage generation. Is enabled by default.", @@ -209,7 +245,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: default=False, description="Whether to read date sharded tables or time partitioned tables when extracting usage from exported audit logs.", ) - _credentials_path: Optional[str] = PrivateAttr(None) _cache_path: Optional[str] = PrivateAttr(None) @@ -230,16 +265,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="Maximum number of entries for the in-memory caches of FileBacked data structures.", ) - def __init__(self, **data: Any): - super().__init__(**data) - - if self.credential: - self._credentials_path = self.credential.create_credential_temp_file() - logger.debug( - f"Creating temporary credential file at {self._credentials_path}" - ) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path - @root_validator(pre=False) def profile_default_settings(cls, values: Dict) -> Dict: # Extra default SQLAlchemy option for better connection pooling and threading. @@ -248,6 +273,17 @@ def profile_default_settings(cls, values: Dict) -> Dict: return values + @validator("bigquery_audit_metadata_datasets") + def validate_bigquery_audit_metadata_datasets( + cls, v: Optional[List[str]], values: Dict + ) -> Optional[List[str]]: + if values.get("use_exported_bigquery_audit_metadata"): + assert ( + v and len(v) > 0 + ), "`bigquery_audit_metadata_datasets` should be set if using `use_exported_bigquery_audit_metadata: True`." + + return v + @root_validator(pre=False) def backward_compatibility_configs_set(cls, values: Dict) -> Dict: project_id = values.get("project_id") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index b2251fbb8ab1f..2d6882caa38ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -1,5 +1,4 @@ import collections -import dataclasses import logging from dataclasses import dataclass, field from datetime import datetime @@ -11,11 +10,26 @@ from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport from datahub.utilities.lossy_collections import LossyDict, LossyList +from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.stats_collections import TopKDict, int_top_k_dict logger: logging.Logger = logging.getLogger(__name__) +class BigQuerySchemaApiPerfReport: + list_projects = PerfTimer() + list_datasets = PerfTimer() + get_columns_for_dataset = PerfTimer() + get_tables_for_dataset = PerfTimer() + list_tables = PerfTimer() + get_views_for_dataset = PerfTimer() + + +class BigQueryAuditLogApiPerfReport: + get_exported_log_entries = PerfTimer() + list_log_entries = PerfTimer() + + @dataclass class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport): num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict) @@ -31,8 +45,12 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR num_skipped_lineage_entries_other: TopKDict[str, int] = field( default_factory=int_top_k_dict ) - num_total_log_entries: TopKDict[str, int] = field(default_factory=int_top_k_dict) - num_parsed_log_entries: TopKDict[str, int] = field(default_factory=int_top_k_dict) + num_lineage_total_log_entries: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) + num_lineage_parsed_log_entries: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) num_lineage_log_parse_failures: TopKDict[str, int] = field( default_factory=int_top_k_dict ) @@ -42,7 +60,14 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR lineage_mem_size: Dict[str, str] = field(default_factory=TopKDict) lineage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict) usage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict) + num_usage_total_log_entries: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) + num_usage_parsed_log_entries: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) usage_error_count: Dict[str, int] = field(default_factory=int_top_k_dict) + num_usage_resources_dropped: int = 0 num_usage_operations_dropped: int = 0 operation_dropped: LossyList[str] = field(default_factory=LossyList) @@ -53,10 +78,10 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR use_date_sharded_audit_log_tables: Optional[bool] = None log_page_size: Optional[pydantic.PositiveInt] = None use_exported_bigquery_audit_metadata: Optional[bool] = None - log_entry_start_time: Optional[str] = None - log_entry_end_time: Optional[str] = None - audit_start_time: Optional[str] = None - audit_end_time: Optional[str] = None + log_entry_start_time: Optional[datetime] = None + log_entry_end_time: Optional[datetime] = None + audit_start_time: Optional[datetime] = None + audit_end_time: Optional[datetime] = None upstream_lineage: LossyDict = field(default_factory=LossyDict) partition_info: Dict[str, str] = field(default_factory=TopKDict) profile_table_selection_criteria: Dict[str, str] = field(default_factory=TopKDict) @@ -89,13 +114,17 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR num_view_definitions_failed_column_parsing: int = 0 view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList) - read_reasons_stat: Counter[str] = dataclasses.field( - default_factory=collections.Counter + read_reasons_stat: Counter[str] = field(default_factory=collections.Counter) + operation_types_stat: Counter[str] = field(default_factory=collections.Counter) + + usage_state_size: Optional[str] = None + + schema_api_perf: BigQuerySchemaApiPerfReport = field( + default_factory=BigQuerySchemaApiPerfReport ) - operation_types_stat: Counter[str] = dataclasses.field( - default_factory=collections.Counter + audit_log_api_perf: BigQueryAuditLogApiPerfReport = field( + default_factory=BigQueryAuditLogApiPerfReport ) - usage_state_size: Optional[str] = None lineage_start_time: Optional[datetime] = None lineage_end_time: Optional[datetime] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 47a04c545231b..7edc8656360bb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -13,22 +13,19 @@ ) from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier -from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report +from datahub.ingestion.source.bigquery_v2.bigquery_report import ( + BigQuerySchemaApiPerfReport, + BigQueryV2Report, +) +from datahub.ingestion.source.bigquery_v2.queries import ( + BigqueryQuery, + BigqueryTableType, +) from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView logger: logging.Logger = logging.getLogger(__name__) -class BigqueryTableType: - # See https://cloud.google.com/bigquery/docs/information-schema-tables#schema - BASE_TABLE = "BASE TABLE" - EXTERNAL = "EXTERNAL" - VIEW = "VIEW" - MATERIALIZED_VIEW = "MATERIALIZED VIEW" - CLONE = "CLONE" - SNAPSHOT = "SNAPSHOT" - - @dataclass class BigqueryColumn(BaseColumn): field_path: str @@ -129,253 +126,43 @@ class BigqueryProject: datasets: List[BigqueryDataset] = field(default_factory=list) -class BigqueryQuery: - show_datasets: str = ( - "select schema_name from `{project_id}`.INFORMATION_SCHEMA.SCHEMATA" - ) - - datasets_for_project_id: str = """ -select - s.CATALOG_NAME as catalog_name, - s.schema_name as table_schema, - s.location as location, - s.CREATION_TIME as created, - s.LAST_MODIFIED_TIME as last_altered, - o.OPTION_VALUE as comment -from - `{project_id}`.INFORMATION_SCHEMA.SCHEMATA as s - left join `{project_id}`.INFORMATION_SCHEMA.SCHEMATA_OPTIONS as o on o.schema_name = s.schema_name - and o.option_name = "description" -order by - s.schema_name -""" - - # https://cloud.google.com/bigquery/docs/information-schema-table-storage?hl=en - # Note for max_partition_id - - # should we instead pick the partition with latest LAST_MODIFIED_TIME ? - # for range partitioning max may not be latest partition - tables_for_dataset = f""" -SELECT - t.table_catalog as table_catalog, - t.table_schema as table_schema, - t.table_name as table_name, - t.table_type as table_type, - t.creation_time as created, - ts.last_modified_time as last_altered, - tos.OPTION_VALUE as comment, - is_insertable_into, - ddl, - row_count, - size_bytes as bytes, - num_partitions, - max_partition_id, - active_billable_bytes, - long_term_billable_bytes, - REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix, - REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base - -FROM - `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t - join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME - left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema - and t.TABLE_NAME = tos.TABLE_NAME - and tos.OPTION_NAME = "description" - left join ( - select - table_name, - sum(case when partition_id not in ('__NULL__', '__UNPARTITIONED__', '__STREAMING_UNPARTITIONED__') then 1 else 0 END) as num_partitions, - max(case when partition_id not in ('__NULL__', '__UNPARTITIONED__', '__STREAMING_UNPARTITIONED__') then partition_id else NULL END) as max_partition_id, - sum(total_rows) as total_rows, - sum(case when storage_tier = 'LONG_TERM' then total_billable_bytes else 0 end) as long_term_billable_bytes, - sum(case when storage_tier = 'ACTIVE' then total_billable_bytes else 0 end) as active_billable_bytes, - from - `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.PARTITIONS - group by - table_name) as p on - t.table_name = p.table_name -WHERE - table_type in ('{BigqueryTableType.BASE_TABLE}', '{BigqueryTableType.EXTERNAL}') -{{table_filter}} -order by - table_schema ASC, - table_base ASC, - table_suffix DESC -""" - - tables_for_dataset_without_partition_data = f""" -SELECT - t.table_catalog as table_catalog, - t.table_schema as table_schema, - t.table_name as table_name, - t.table_type as table_type, - t.creation_time as created, - tos.OPTION_VALUE as comment, - is_insertable_into, - ddl, - REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix, - REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base - -FROM - `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t - left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema - and t.TABLE_NAME = tos.TABLE_NAME - and tos.OPTION_NAME = "description" -WHERE - table_type in ('{BigqueryTableType.BASE_TABLE}', '{BigqueryTableType.EXTERNAL}') -{{table_filter}} -order by - table_schema ASC, - table_base ASC, - table_suffix DESC -""" - - views_for_dataset: str = f""" -SELECT - t.table_catalog as table_catalog, - t.table_schema as table_schema, - t.table_name as table_name, - t.table_type as table_type, - t.creation_time as created, - ts.last_modified_time as last_altered, - tos.OPTION_VALUE as comment, - is_insertable_into, - ddl as view_definition, - row_count, - size_bytes -FROM - `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t - join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME - left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema - and t.TABLE_NAME = tos.TABLE_NAME - and tos.OPTION_NAME = "description" -WHERE - table_type in ('{BigqueryTableType.VIEW}', '{BigqueryTableType.MATERIALIZED_VIEW}') -order by - table_schema ASC, - table_name ASC -""" - - views_for_dataset_without_data_read: str = f""" -SELECT - t.table_catalog as table_catalog, - t.table_schema as table_schema, - t.table_name as table_name, - t.table_type as table_type, - t.creation_time as created, - tos.OPTION_VALUE as comment, - is_insertable_into, - ddl as view_definition -FROM - `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t - left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema - and t.TABLE_NAME = tos.TABLE_NAME - and tos.OPTION_NAME = "description" -WHERE - table_type in ('{BigqueryTableType.VIEW}', '{BigqueryTableType.MATERIALIZED_VIEW}') -order by - table_schema ASC, - table_name ASC -""" - - columns_for_dataset: str = """ -select - c.table_catalog as table_catalog, - c.table_schema as table_schema, - c.table_name as table_name, - c.column_name as column_name, - c.ordinal_position as ordinal_position, - cfp.field_path as field_path, - c.is_nullable as is_nullable, - CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, - description as comment, - c.is_hidden as is_hidden, - c.is_partitioning_column as is_partitioning_column, - c.clustering_ordinal_position as clustering_ordinal_position, -from - `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c - join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name - and cfp.column_name = c.column_name -ORDER BY - table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC""" - - optimized_columns_for_dataset: str = """ -select * from -(select - c.table_catalog as table_catalog, - c.table_schema as table_schema, - c.table_name as table_name, - c.column_name as column_name, - c.ordinal_position as ordinal_position, - cfp.field_path as field_path, - c.is_nullable as is_nullable, - CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, - description as comment, - c.is_hidden as is_hidden, - c.is_partitioning_column as is_partitioning_column, - c.clustering_ordinal_position as clustering_ordinal_position, - -- We count the columns to be able limit it later - row_number() over (partition by c.table_catalog, c.table_schema, c.table_name order by c.ordinal_position asc, c.data_type DESC) as column_num, - -- Getting the maximum shard for each table - row_number() over (partition by c.table_catalog, c.table_schema, ifnull(REGEXP_EXTRACT(c.table_name, r'(.*)_\\d{{8}}$'), c.table_name), cfp.field_path order by c.table_catalog, c.table_schema asc, c.table_name desc) as shard_num -from - `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c - join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name - and cfp.column_name = c.column_name - ) --- We filter column limit + 1 to make sure we warn about the limit being reached but not reading too much data -where column_num <= {column_limit} and shard_num = 1 -ORDER BY - table_catalog, table_schema, table_name, ordinal_position, column_num ASC, data_type DESC""" - - columns_for_table: str = """ -select - c.table_catalog as table_catalog, - c.table_schema as table_schema, - c.table_name as table_name, - c.column_name as column_name, - c.ordinal_position as ordinal_position, - cfp.field_path as field_path, - c.is_nullable as is_nullable, - CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, - c.is_hidden as is_hidden, - c.is_partitioning_column as is_partitioning_column, - c.clustering_ordinal_position as clustering_ordinal_position, - description as comment -from - `{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMNS as c - join `{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name - and cfp.column_name = c.column_name -where - c.table_name = '{table_identifier.table}' -ORDER BY - table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC""" - - -class BigQueryDataDictionary: - @staticmethod - def get_query_result(conn: bigquery.Client, query: str) -> RowIterator: +class BigQuerySchemaApi: + def __init__( + self, report: BigQuerySchemaApiPerfReport, client: bigquery.Client + ) -> None: + self.bq_client = client + self.report = report + + def get_query_result(self, query: str) -> RowIterator: logger.debug(f"Query : {query}") - resp = conn.query(query) + resp = self.bq_client.query(query) return resp.result() - @staticmethod - def get_projects(conn: bigquery.Client) -> List[BigqueryProject]: - projects = conn.list_projects() + def get_projects(self) -> List[BigqueryProject]: + with self.report.list_projects: + try: + projects = self.bq_client.list_projects() - return [ - BigqueryProject(id=p.project_id, name=p.friendly_name) for p in projects - ] + return [ + BigqueryProject(id=p.project_id, name=p.friendly_name) + for p in projects + ] + except Exception as e: + logger.error(f"Error getting projects. {e}", exc_info=True) + return [] - @staticmethod def get_datasets_for_project_id( - conn: bigquery.Client, project_id: str, maxResults: Optional[int] = None + self, project_id: str, maxResults: Optional[int] = None ) -> List[BigqueryDataset]: - datasets = conn.list_datasets(project_id, max_results=maxResults) - return [BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets] + with self.report.list_datasets: + datasets = self.bq_client.list_datasets(project_id, max_results=maxResults) + return [ + BigqueryDataset(name=d.dataset_id, labels=d.labels) for d in datasets + ] - @staticmethod + # This is not used anywhere def get_datasets_for_project_id_with_information_schema( - conn: bigquery.Client, project_id: str + self, project_id: str ) -> List[BigqueryDataset]: """ This method is not used as of now, due to below limitation. @@ -383,8 +170,7 @@ def get_datasets_for_project_id_with_information_schema( We'll need Region wise separate queries to fetch all datasets https://cloud.google.com/bigquery/docs/information-schema-datasets-schemata """ - schemas = BigQueryDataDictionary.get_query_result( - conn, + schemas = self.get_query_result( BigqueryQuery.datasets_for_project_id.format(project_id=project_id), ) return [ @@ -398,56 +184,67 @@ def get_datasets_for_project_id_with_information_schema( for s in schemas ] - @staticmethod + def list_tables( + self, dataset_name: str, project_id: str + ) -> Iterator[TableListItem]: + with self.report.list_tables as current_timer: + for table in self.bq_client.list_tables(f"{project_id}.{dataset_name}"): + with current_timer.pause(): + yield table + def get_tables_for_dataset( - conn: bigquery.Client, + self, project_id: str, dataset_name: str, tables: Dict[str, TableListItem], with_data_read_permission: bool = False, report: Optional[BigQueryV2Report] = None, ) -> Iterator[BigqueryTable]: - filter: str = ", ".join(f"'{table}'" for table in tables.keys()) - - if with_data_read_permission: - # Tables are ordered by name and table suffix to make sure we always process the latest sharded table - # and skip the others. Sharded tables are tables with suffix _20220102 - cur = BigQueryDataDictionary.get_query_result( - conn, - BigqueryQuery.tables_for_dataset.format( - project_id=project_id, - dataset_name=dataset_name, - table_filter=f" and t.table_name in ({filter})" if filter else "", - ), - ) - else: - # Tables are ordered by name and table suffix to make sure we always process the latest sharded table - # and skip the others. Sharded tables are tables with suffix _20220102 - cur = BigQueryDataDictionary.get_query_result( - conn, - BigqueryQuery.tables_for_dataset_without_partition_data.format( - project_id=project_id, - dataset_name=dataset_name, - table_filter=f" and t.table_name in ({filter})" if filter else "", - ), - ) - - for table in cur: - try: - yield BigQueryDataDictionary._make_bigquery_table( - table, tables.get(table.table_name) + with self.report.get_tables_for_dataset as current_timer: + filter_clause: str = ", ".join(f"'{table}'" for table in tables.keys()) + + if with_data_read_permission: + # Tables are ordered by name and table suffix to make sure we always process the latest sharded table + # and skip the others. Sharded tables are tables with suffix _20220102 + cur = self.get_query_result( + BigqueryQuery.tables_for_dataset.format( + project_id=project_id, + dataset_name=dataset_name, + table_filter=f" and t.table_name in ({filter_clause})" + if filter_clause + else "", + ), ) - except Exception as e: - table_name = f"{project_id}.{dataset_name}.{table.table_name}" - logger.warning( - f"Error while processing table {table_name}", - exc_info=True, + else: + # Tables are ordered by name and table suffix to make sure we always process the latest sharded table + # and skip the others. Sharded tables are tables with suffix _20220102 + cur = self.get_query_result( + BigqueryQuery.tables_for_dataset_without_partition_data.format( + project_id=project_id, + dataset_name=dataset_name, + table_filter=f" and t.table_name in ({filter_clause})" + if filter_clause + else "", + ), ) - if report: - report.report_warning( - "metadata-extraction", - f"Failed to get table {table_name}: {e}", + + for table in cur: + try: + with current_timer.pause(): + yield BigQuerySchemaApi._make_bigquery_table( + table, tables.get(table.table_name) + ) + except Exception as e: + table_name = f"{project_id}.{dataset_name}.{table.table_name}" + logger.warning( + f"Error while processing table {table_name}", + exc_info=True, ) + if report: + report.report_warning( + "metadata-extraction", + f"Failed to get table {table_name}: {e}", + ) @staticmethod def _make_bigquery_table( @@ -487,43 +284,42 @@ def _make_bigquery_table( long_term_billable_bytes=table.get("long_term_billable_bytes"), ) - @staticmethod def get_views_for_dataset( - conn: bigquery.Client, + self, project_id: str, dataset_name: str, has_data_read: bool, report: Optional[BigQueryV2Report] = None, ) -> Iterator[BigqueryView]: - if has_data_read: - cur = BigQueryDataDictionary.get_query_result( - conn, - BigqueryQuery.views_for_dataset.format( - project_id=project_id, dataset_name=dataset_name - ), - ) - else: - cur = BigQueryDataDictionary.get_query_result( - conn, - BigqueryQuery.views_for_dataset_without_data_read.format( - project_id=project_id, dataset_name=dataset_name - ), - ) - - for table in cur: - try: - yield BigQueryDataDictionary._make_bigquery_view(table) - except Exception as e: - view_name = f"{project_id}.{dataset_name}.{table.table_name}" - logger.warning( - f"Error while processing view {view_name}", - exc_info=True, + with self.report.get_views_for_dataset as current_timer: + if has_data_read: + cur = self.get_query_result( + BigqueryQuery.views_for_dataset.format( + project_id=project_id, dataset_name=dataset_name + ), + ) + else: + cur = self.get_query_result( + BigqueryQuery.views_for_dataset_without_data_read.format( + project_id=project_id, dataset_name=dataset_name + ), ) - if report: - report.report_warning( - "metadata-extraction", - f"Failed to get view {view_name}: {e}", + + for table in cur: + try: + with current_timer.pause(): + yield BigQuerySchemaApi._make_bigquery_view(table) + except Exception as e: + view_name = f"{project_id}.{dataset_name}.{table.table_name}" + logger.warning( + f"Error while processing view {view_name}", + exc_info=True, ) + if report: + report.report_warning( + "metadata-extraction", + f"Failed to get view {view_name}: {e}", + ) @staticmethod def _make_bigquery_view(view: bigquery.Row) -> BigqueryView: @@ -540,70 +336,68 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView: materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW, ) - @staticmethod def get_columns_for_dataset( - conn: bigquery.Client, + self, project_id: str, dataset_name: str, column_limit: int, run_optimized_column_query: bool = False, ) -> Optional[Dict[str, List[BigqueryColumn]]]: columns: Dict[str, List[BigqueryColumn]] = defaultdict(list) - try: - cur = BigQueryDataDictionary.get_query_result( - conn, - BigqueryQuery.columns_for_dataset.format( - project_id=project_id, dataset_name=dataset_name - ) - if not run_optimized_column_query - else BigqueryQuery.optimized_columns_for_dataset.format( - project_id=project_id, - dataset_name=dataset_name, - column_limit=column_limit, - ), - ) - except Exception as e: - logger.warning(f"Columns for dataset query failed with exception: {e}") - # Error - Information schema query returned too much data. - # Please repeat query with more selective predicates. - return None - - last_seen_table: str = "" - for column in cur: - if ( - column_limit - and column.table_name in columns - and len(columns[column.table_name]) >= column_limit - ): - if last_seen_table != column.table_name: - logger.warning( - f"{project_id}.{dataset_name}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns" - ) - last_seen_table = column.table_name - else: - columns[column.table_name].append( - BigqueryColumn( - name=column.column_name, - ordinal_position=column.ordinal_position, - field_path=column.field_path, - is_nullable=column.is_nullable == "YES", - data_type=column.data_type, - comment=column.comment, - is_partition_column=column.is_partitioning_column == "YES", - cluster_column_position=column.clustering_ordinal_position, + with self.report.get_columns_for_dataset: + try: + cur = self.get_query_result( + BigqueryQuery.columns_for_dataset.format( + project_id=project_id, dataset_name=dataset_name ) + if not run_optimized_column_query + else BigqueryQuery.optimized_columns_for_dataset.format( + project_id=project_id, + dataset_name=dataset_name, + column_limit=column_limit, + ), ) + except Exception as e: + logger.warning(f"Columns for dataset query failed with exception: {e}") + # Error - Information schema query returned too much data. + # Please repeat query with more selective predicates. + return None + + last_seen_table: str = "" + for column in cur: + if ( + column_limit + and column.table_name in columns + and len(columns[column.table_name]) >= column_limit + ): + if last_seen_table != column.table_name: + logger.warning( + f"{project_id}.{dataset_name}.{column.table_name} contains more than {column_limit} columns, only processing {column_limit} columns" + ) + last_seen_table = column.table_name + else: + columns[column.table_name].append( + BigqueryColumn( + name=column.column_name, + ordinal_position=column.ordinal_position, + field_path=column.field_path, + is_nullable=column.is_nullable == "YES", + data_type=column.data_type, + comment=column.comment, + is_partition_column=column.is_partitioning_column == "YES", + cluster_column_position=column.clustering_ordinal_position, + ) + ) return columns - @staticmethod + # This is not used anywhere def get_columns_for_table( - conn: bigquery.Client, + self, table_identifier: BigqueryTableIdentifier, column_limit: Optional[int], ) -> List[BigqueryColumn]: - cur = BigQueryDataDictionary.get_query_result( - conn, + cur = self.get_query_result( BigqueryQuery.columns_for_table.format(table_identifier=table_identifier), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py index 4ff509858b87d..e38ab07855b8b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py @@ -1,39 +1,5 @@ -from typing import Any, Dict, Optional - -from google.cloud import bigquery -from google.cloud.logging_v2.client import Client as GCPLoggingClient - -from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config - BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" BQ_DATE_SHARD_FORMAT = "%Y%m%d" BQ_EXTERNAL_TABLE_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m5!1m4!4m3!1s{project}!2s{dataset}!3s{table}" BQ_EXTERNAL_DATASET_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m4!1m3!3m2!1s{project}!2s{dataset}" - - -def _make_gcp_logging_client( - project_id: Optional[str] = None, extra_client_options: Dict[str, Any] = {} -) -> GCPLoggingClient: - # See https://github.com/googleapis/google-cloud-python/issues/2674 for - # why we disable gRPC here. - client_options = extra_client_options.copy() - client_options["_use_grpc"] = False - if project_id is not None: - return GCPLoggingClient(**client_options, project=project_id) - else: - return GCPLoggingClient(**client_options) - - -def get_bigquery_client(config: BigQueryV2Config) -> bigquery.Client: - client_options = config.extra_client_options - return bigquery.Client(config.project_on_behalf, **client_options) - - -def get_sql_alchemy_url(config: BigQueryV2Config) -> str: - if config.project_on_behalf: - return f"bigquery://{config.project_on_behalf}" - # When project_id is not set, we will attempt to detect the project ID - # based on the credentials or environment variables. - # See https://github.com/mxmzdlv/pybigquery#authentication. - return "bigquery://" diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 341952d95e7d7..98c8cbaf85eec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -1,7 +1,6 @@ import collections import itertools import logging -import textwrap from dataclasses import dataclass from datetime import datetime, timezone from typing import ( @@ -18,12 +17,12 @@ ) import humanfriendly -from google.cloud.bigquery import Client as BigQueryClient from google.cloud.datacatalog import lineage_v1 from google.cloud.logging_v2.client import Client as GCPLoggingClient -from ratelimiter import RateLimiter from datahub.emitter import mce_builder +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditLogEntry, BigQueryAuditMetadata, @@ -32,13 +31,16 @@ QueryEvent, ReadEvent, ) +from datahub.ingestion.source.bigquery_v2.bigquery_audit_log_api import ( + BigQueryAuditLogApi, +) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report -from datahub.ingestion.source.bigquery_v2.common import ( - BQ_DATE_SHARD_FORMAT, - BQ_DATETIME_FORMAT, - _make_gcp_logging_client, - get_bigquery_client, +from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigQuerySchemaApi +from datahub.ingestion.source.bigquery_v2.common import BQ_DATETIME_FORMAT +from datahub.ingestion.source.bigquery_v2.queries import ( + BQ_FILTER_RULE_TEMPLATE_V2_LINEAGE, + bigquery_audit_metadata_query_template_lineage, ) from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantLineageRunSkipHandler, @@ -52,7 +54,9 @@ UpstreamClass, UpstreamLineageClass, ) +from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities import memory_footprint +from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.sqlglot_lineage import ( SchemaResolver, @@ -194,49 +198,21 @@ def make_lineage_edges_from_parsing_result( class BigqueryLineageExtractor: - BQ_FILTER_RULE_TEMPLATE_V2 = """ -resource.type=("bigquery_project") -AND -( - protoPayload.methodName= - ( - "google.cloud.bigquery.v2.JobService.Query" - OR - "google.cloud.bigquery.v2.JobService.InsertJob" - ) - AND - protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE" - AND NOT protoPayload.metadata.jobChange.job.jobStatus.errorResult:* - AND ( - protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:* - OR - protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedViews:* - ) - AND ( - protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/_.*/tables/anon.*" - AND - protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/INFORMATION_SCHEMA.*" - AND - protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/__TABLES__" - AND - protoPayload.metadata.jobChange.job.jobConfig.queryConfig.destinationTable !~ "projects/.*/datasets/_.*/tables/anon.*" - ) - -) -AND -timestamp >= "{start_time}" -AND -timestamp < "{end_time}" -""".strip() - def __init__( self, config: BigQueryV2Config, report: BigQueryV2Report, + dataset_urn_builder: Callable[[BigQueryTableRef], str], redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None, ): self.config = config self.report = report + self.dataset_urn_builder = dataset_urn_builder + self.audit_log_api = BigQueryAuditLogApi( + report.audit_log_api_perf, + self.config.rate_limit, + self.config.requests_per_min, + ) self.redundant_run_skip_handler = redundant_run_skip_handler self.start_time, self.end_time = ( @@ -256,55 +232,205 @@ def error(self, log: logging.Logger, key: str, reason: str) -> None: self.report.report_warning(key, reason) log.error(f"{key} => {reason}") - @staticmethod - def bigquery_audit_metadata_query_template( - dataset: str, use_date_sharded_tables: bool, limit: Optional[int] = None - ) -> str: - """ - Receives a dataset (with project specified) and returns a query template that is used to query exported - AuditLogs containing protoPayloads of type BigQueryAuditMetadata. - Include only those that: - - have been completed (jobStatus.jobState = "DONE") - - do not contain errors (jobStatus.errorResults is none) - :param dataset: the dataset to query against in the form of $PROJECT.$DATASET - :param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log - tables - :param limit: set a limit for the maximum event to return. It is used for connection testing currently - :return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery - """ - limit_text = f"limit {limit}" if limit else "" + def _should_ingest_lineage(self) -> bool: + if ( + self.redundant_run_skip_handler + and self.redundant_run_skip_handler.should_skip_this_run( + cur_start_time=self.config.start_time, + cur_end_time=self.config.end_time, + ) + ): + # Skip this run + self.report.report_warning( + "lineage-extraction", + "Skip this run as there was already a run for current ingestion window.", + ) + return False + + return True + + def get_lineage_workunits( + self, + projects: List[str], + sql_parser_schema_resolver: SchemaResolver, + view_refs_by_project: Dict[str, Set[str]], + view_definitions: FileBackedDict[str], + table_refs: Set[str], + ) -> Iterable[MetadataWorkUnit]: + if not self._should_ingest_lineage(): + return + views_skip_audit_log_lineage: Set[str] = set() + if self.config.lineage_parse_view_ddl: + view_lineage: Dict[str, Set[LineageEdge]] = {} + for project in projects: + self.populate_view_lineage_with_sql_parsing( + view_lineage, + view_refs_by_project[project], + view_definitions, + sql_parser_schema_resolver, + project, + ) - shard_condition = "" - if use_date_sharded_tables: - from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`" - shard_condition = ( - """ AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """ + views_skip_audit_log_lineage.update(view_lineage.keys()) + for lineage_key in view_lineage.keys(): + yield from self.gen_lineage_workunits_for_table( + view_lineage, BigQueryTableRef.from_string_name(lineage_key) + ) + + if self.config.use_exported_bigquery_audit_metadata: + projects = ["*"] # project_id not used when using exported metadata + + for project in projects: + self.report.set_ingestion_stage(project, "Lineage Extraction") + yield from self.generate_lineage( + project, + sql_parser_schema_resolver, + views_skip_audit_log_lineage, + table_refs, ) - else: - from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`" - - query = f""" - SELECT - timestamp, - logName, - insertId, - protopayload_auditlog AS protoPayload, - protopayload_auditlog.metadataJson AS metadata - FROM - {from_table} - WHERE ( - timestamp >= "{{start_time}}" - AND timestamp < "{{end_time}}" + + if self.redundant_run_skip_handler: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + self.config.start_time, self.config.end_time ) - {shard_condition} - AND protopayload_auditlog.serviceName="bigquery.googleapis.com" - AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" - AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL - AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL - {limit_text}; - """ - return textwrap.dedent(query) + def generate_lineage( + self, + project_id: str, + sql_parser_schema_resolver: SchemaResolver, + views_skip_audit_log_lineage: Set[str], + table_refs: Set[str], + ) -> Iterable[MetadataWorkUnit]: + logger.info(f"Generate lineage for {project_id}") + with PerfTimer() as timer: + try: + if self.config.extract_lineage_from_catalog: + lineage = self.lineage_via_catalog_lineage_api(project_id) + else: + events = self._get_parsed_audit_log_events(project_id) + lineage = self._create_lineage_map( + events, sql_parser_schema_resolver + ) + except Exception as e: + if project_id: + self.report.lineage_failed_extraction.append(project_id) + self.error( + logger, + "lineage", + f"{project_id}: {e}", + ) + lineage = {} + + self.report.lineage_metadata_entries[project_id] = len(lineage) + logger.info(f"Built lineage map containing {len(lineage)} entries.") + logger.debug(f"lineage metadata is {lineage}") + self.report.lineage_extraction_sec[project_id] = round( + timer.elapsed_seconds(), 2 + ) + self.report.lineage_mem_size[project_id] = humanfriendly.format_size( + memory_footprint.total_size(lineage) + ) + + for lineage_key in lineage.keys(): + # For views, we do not use the upstreams obtained by parsing audit logs + # as they may contain indirectly referenced tables. + if ( + lineage_key not in table_refs + or lineage_key in views_skip_audit_log_lineage + ): + continue + + yield from self.gen_lineage_workunits_for_table( + lineage, BigQueryTableRef.from_string_name(lineage_key) + ) + + def populate_view_lineage_with_sql_parsing( + self, + view_lineage: Dict[str, Set[LineageEdge]], + view_refs: Set[str], + view_definitions: FileBackedDict[str], + sql_parser_schema_resolver: SchemaResolver, + default_project: str, + ) -> None: + for view in view_refs: + view_definition = view_definitions[view] + raw_view_lineage = sqlglot_lineage( + view_definition, + schema_resolver=sql_parser_schema_resolver, + default_db=default_project, + ) + if raw_view_lineage.debug_info.table_error: + logger.debug( + f"Failed to parse lineage for view {view}: {raw_view_lineage.debug_info.table_error}" + ) + self.report.num_view_definitions_failed_parsing += 1 + self.report.view_definitions_parsing_failures.append( + f"Table-level sql parsing error for view {view}: {raw_view_lineage.debug_info.table_error}" + ) + continue + elif raw_view_lineage.debug_info.column_error: + self.report.num_view_definitions_failed_column_parsing += 1 + self.report.view_definitions_parsing_failures.append( + f"Column-level sql parsing error for view {view}: {raw_view_lineage.debug_info.column_error}" + ) + else: + self.report.num_view_definitions_parsed += 1 + + ts = datetime.now(timezone.utc) + view_lineage[view] = set( + make_lineage_edges_from_parsing_result( + raw_view_lineage, + audit_stamp=ts, + lineage_type=DatasetLineageTypeClass.VIEW, + ) + ) + + def gen_lineage_workunits_for_table( + self, lineage: Dict[str, Set[LineageEdge]], table_ref: BigQueryTableRef + ) -> Iterable[MetadataWorkUnit]: + dataset_urn = self.dataset_urn_builder(table_ref) + + lineage_info = self.get_lineage_for_table( + bq_table=table_ref, + bq_table_urn=dataset_urn, + lineage_metadata=lineage, + ) + if lineage_info: + yield from self.gen_lineage(dataset_urn, lineage_info) + + def gen_lineage( + self, + dataset_urn: str, + upstream_lineage: Optional[UpstreamLineageClass] = None, + ) -> Iterable[MetadataWorkUnit]: + if upstream_lineage is None: + return + + if upstream_lineage is not None: + if self.config.incremental_lineage: + patch_builder: DatasetPatchBuilder = DatasetPatchBuilder( + urn=dataset_urn + ) + for upstream in upstream_lineage.upstreams: + patch_builder.add_upstream_lineage(upstream) + + yield from [ + MetadataWorkUnit( + id=f"upstreamLineage-for-{dataset_urn}", + mcp_raw=mcp, + ) + for mcp in patch_builder.build() + ] + else: + if not self.config.extract_column_lineage: + upstream_lineage.fineGrainedLineages = None + + yield from [ + MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=upstream_lineage + ).as_workunit() + ] def lineage_via_catalog_lineage_api( self, project_id: str @@ -328,22 +454,28 @@ def lineage_via_catalog_lineage_api( try: lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient() - bigquery_client: BigQueryClient = get_bigquery_client(self.config) + + data_dictionary = BigQuerySchemaApi( + self.report.schema_api_perf, self.config.get_bigquery_client() + ) + # Filtering datasets - datasets = list(bigquery_client.list_datasets(project_id)) + datasets = list(data_dictionary.get_datasets_for_project_id(project_id)) project_tables = [] for dataset in datasets: # Enables only tables where type is TABLE, VIEW or MATERIALIZED_VIEW (not EXTERNAL) project_tables.extend( [ table - for table in bigquery_client.list_tables(dataset.dataset_id) + for table in data_dictionary.list_tables( + dataset.name, project_id + ) if table.table_type in ["TABLE", "VIEW", "MATERIALIZED_VIEW"] ] ) # Convert project tables to .. format - project_tables = list( + project_table_names = list( map( lambda table: "{}.{}.{}".format( table.project, table.dataset_id, table.table_id @@ -354,7 +486,7 @@ def lineage_via_catalog_lineage_api( lineage_map: Dict[str, Set[LineageEdge]] = {} curr_date = datetime.now() - for table in project_tables: + for table in project_table_names: logger.info("Creating lineage map for table %s", table) upstreams = set() downstream_table = lineage_v1.EntityReference() @@ -411,127 +543,73 @@ def lineage_via_catalog_lineage_api( raise e def _get_parsed_audit_log_events(self, project_id: str) -> Iterable[QueryEvent]: + # We adjust the filter values a bit, since we need to make sure that the join + # between query events and read events is complete. For example, this helps us + # handle the case where the read happens within our time range but the query + # completion event is delayed and happens after the configured end time. + corrected_start_time = self.start_time - self.config.max_query_duration + corrected_end_time = self.end_time + -self.config.max_query_duration + self.report.log_entry_start_time = corrected_start_time + self.report.log_entry_end_time = corrected_end_time + parse_fn: Callable[[Any], Optional[Union[ReadEvent, QueryEvent]]] if self.config.use_exported_bigquery_audit_metadata: - logger.info("Populating lineage info via exported GCP audit logs") - bq_client = get_bigquery_client(self.config) - entries = self._get_exported_bigquery_audit_metadata(bq_client) + entries = self.get_exported_log_entries( + corrected_start_time, corrected_end_time + ) parse_fn = self._parse_exported_bigquery_audit_metadata else: - logger.info("Populating lineage info via exported GCP audit logs") - logging_client = _make_gcp_logging_client(project_id) - entries = self._get_bigquery_log_entries(logging_client) + entries = self.get_log_entries_via_gcp_logging( + project_id, corrected_start_time, corrected_end_time + ) parse_fn = self._parse_bigquery_log_entries for entry in entries: - self.report.num_total_log_entries[project_id] += 1 + self.report.num_lineage_total_log_entries[project_id] += 1 try: event = parse_fn(entry) if event: - self.report.num_parsed_log_entries[project_id] += 1 + self.report.num_lineage_parsed_log_entries[project_id] += 1 yield event except Exception as e: logger.warning(f"Unable to parse log entry `{entry}`: {e}") self.report.num_lineage_log_parse_failures[project_id] += 1 - def _get_bigquery_log_entries( - self, client: GCPLoggingClient, limit: Optional[int] = None - ) -> Iterable[AuditLogEntry]: - self.report.num_total_log_entries[client.project] = 0 - # Add a buffer to start and end time to account for delays in logging events. - start_time = (self.start_time - self.config.max_query_duration).strftime( - BQ_DATETIME_FORMAT - ) - self.report.log_entry_start_time = start_time - - end_time = (self.config.end_time + self.config.max_query_duration).strftime( - BQ_DATETIME_FORMAT - ) - self.report.log_entry_end_time = end_time - - filter = self.BQ_FILTER_RULE_TEMPLATE_V2.format( - start_time=start_time, - end_time=end_time, - ) - - logger.info( - f"Start loading log entries from BigQuery for {client.project} with start_time={start_time} and end_time={end_time}" + def get_exported_log_entries( + self, corrected_start_time, corrected_end_time, limit=None + ): + logger.info("Populating lineage info via exported GCP audit logs") + bq_client = self.config.get_bigquery_client() + entries = self.audit_log_api.get_exported_bigquery_audit_metadata( + bigquery_client=bq_client, + bigquery_audit_metadata_query_template=bigquery_audit_metadata_query_template_lineage, + bigquery_audit_metadata_datasets=self.config.bigquery_audit_metadata_datasets, + use_date_sharded_audit_log_tables=self.config.use_date_sharded_audit_log_tables, + start_time=corrected_start_time, + end_time=corrected_end_time, + limit=limit, ) + return entries - if self.config.rate_limit: - with RateLimiter(max_calls=self.config.requests_per_min, period=60): - entries = client.list_entries( - filter_=filter, - page_size=self.config.log_page_size, - max_results=limit, - ) - else: - entries = client.list_entries( - filter_=filter, page_size=self.config.log_page_size, max_results=limit - ) + def get_log_entries_via_gcp_logging( + self, project_id, corrected_start_time, corrected_end_time + ): + logger.info("Populating lineage info via exported GCP audit logs") + logging_client = self.config.make_gcp_logging_client(project_id) logger.info( - f"Start iterating over log entries from BigQuery for {client.project}" + f"Start loading log entries from BigQuery for {project_id} " + f"with start_time={corrected_start_time} and end_time={corrected_end_time}" ) - for entry in entries: - self.report.num_total_log_entries[client.project] += 1 - if self.report.num_total_log_entries[client.project] % 1000 == 0: - logger.info( - f"{self.report.num_total_log_entries[client.project]} log entries loaded for project {client.project} so far..." - ) - yield entry - - logger.info( - f"Finished loading {self.report.num_total_log_entries[client.project]} log entries from BigQuery project {client.project} so far" + entries = self.audit_log_api.get_bigquery_log_entries_via_gcp_logging( + logging_client, + BQ_FILTER_RULE_TEMPLATE_V2_LINEAGE.format( + start_time=corrected_start_time.strftime(BQ_DATETIME_FORMAT), + end_time=corrected_end_time.strftime(BQ_DATETIME_FORMAT), + ), + self.config.log_page_size, ) - - def _get_exported_bigquery_audit_metadata( - self, bigquery_client: BigQueryClient, limit: Optional[int] = None - ) -> Iterable[BigQueryAuditMetadata]: - if self.config.bigquery_audit_metadata_datasets is None: - self.error( - logger, "audit-metadata", "bigquery_audit_metadata_datasets not set" - ) - self.report.bigquery_audit_metadata_datasets_missing = True - return - - corrected_start_time = self.start_time - self.config.max_query_duration - start_time = corrected_start_time.strftime(BQ_DATETIME_FORMAT) - start_date = corrected_start_time.strftime(BQ_DATE_SHARD_FORMAT) - self.report.audit_start_time = start_time - - corrected_end_time = self.end_time + self.config.max_query_duration - end_time = corrected_end_time.strftime(BQ_DATETIME_FORMAT) - end_date = corrected_end_time.strftime(BQ_DATE_SHARD_FORMAT) - self.report.audit_end_time = end_time - - for dataset in self.config.bigquery_audit_metadata_datasets: - logger.info( - f"Start loading log entries from BigQueryAuditMetadata in {dataset}" - ) - - query: str = self.bigquery_audit_metadata_query_template( - dataset=dataset, - use_date_sharded_tables=self.config.use_date_sharded_audit_log_tables, - limit=limit, - ).format( - start_time=start_time, - end_time=end_time, - start_date=start_date, - end_date=end_date, - ) - - query_job = bigquery_client.query(query) - - logger.info( - f"Finished loading log entries from BigQueryAuditMetadata in {dataset}" - ) - - if self.config.rate_limit: - with RateLimiter(max_calls=self.config.requests_per_min, period=60): - yield from query_job - else: - yield from query_job + return entries # Currently we only parse JobCompleted events but in future we would want to parse other # events to also create field level lineage. @@ -674,39 +752,6 @@ def _create_lineage_map( logger.info("Exiting create lineage map function") return lineage_map - def _compute_bigquery_lineage( - self, - project_id: str, - sql_parser_schema_resolver: SchemaResolver, - ) -> Dict[str, Set[LineageEdge]]: - lineage_metadata: Dict[str, Set[LineageEdge]] - try: - if self.config.extract_lineage_from_catalog: - lineage_metadata = self.lineage_via_catalog_lineage_api(project_id) - else: - events = self._get_parsed_audit_log_events(project_id) - lineage_metadata = self._create_lineage_map( - events, sql_parser_schema_resolver - ) - except Exception as e: - if project_id: - self.report.lineage_failed_extraction.append(project_id) - self.error( - logger, - "lineage", - f"{project_id}: {e}", - ) - self.report_status(f"{project_id}-lineage", False) - lineage_metadata = {} - - self.report.lineage_mem_size[project_id] = humanfriendly.format_size( - memory_footprint.total_size(lineage_metadata) - ) - self.report.lineage_metadata_entries[project_id] = len(lineage_metadata) - logger.info(f"Built lineage map containing {len(lineage_metadata)} entries.") - logger.debug(f"lineage metadata is {lineage_metadata}") - return lineage_metadata - def get_upstream_tables( self, bq_table: BigQueryTableRef, @@ -767,28 +812,11 @@ def get_upstream_tables( return set(upstreams.values()) - def calculate_lineage_for_project( - self, - project_id: str, - sql_parser_schema_resolver: SchemaResolver, - ) -> Dict[str, Set[LineageEdge]]: - with PerfTimer() as timer: - lineage = self._compute_bigquery_lineage( - project_id, sql_parser_schema_resolver - ) - - self.report.lineage_extraction_sec[project_id] = round( - timer.elapsed_seconds(), 2 - ) - - return lineage - def get_lineage_for_table( self, bq_table: BigQueryTableRef, bq_table_urn: str, lineage_metadata: Dict[str, Set[LineageEdge]], - platform: str, ) -> Optional[UpstreamLineageClass]: upstream_list: List[UpstreamClass] = [] fine_grained_lineages: List[FineGrainedLineageClass] = [] @@ -796,12 +824,7 @@ def get_lineage_for_table( # even if the lineage is same but the order is different. for upstream in sorted(self.get_upstream_tables(bq_table, lineage_metadata)): upstream_table = BigQueryTableRef.from_string_name(upstream.table) - upstream_table_urn = mce_builder.make_dataset_urn_with_platform_instance( - platform, - upstream_table.table_identifier.get_table_name(), - self.config.platform_instance, - self.config.env, - ) + upstream_table_urn = self.dataset_urn_builder(upstream_table) # Generate table-level lineage. upstream_table_class = UpstreamClass( @@ -852,19 +875,27 @@ def get_lineage_for_table( def test_capability(self, project_id: str) -> None: if self.config.use_exported_bigquery_audit_metadata: - bigquery_client: BigQueryClient = BigQueryClient(project=project_id) - entries = self._get_exported_bigquery_audit_metadata( - bigquery_client=bigquery_client, limit=1 - ) - for entry in entries: + for entry in self.get_exported_log_entries( + self.start_time, + self.end_time, + limit=1, + ): logger.debug( f"Connection test got one exported_bigquery_audit_metadata {entry}" ) else: - gcp_logging_client: GCPLoggingClient = _make_gcp_logging_client( - project_id, self.config.extra_client_options + gcp_logging_client: GCPLoggingClient = self.config.make_gcp_logging_client( + project_id ) - for entry in self._get_bigquery_log_entries(gcp_logging_client, limit=1): + for entry in self.audit_log_api.get_bigquery_log_entries_via_gcp_logging( + gcp_logging_client, + filter=BQ_FILTER_RULE_TEMPLATE_V2_LINEAGE.format( + self.start_time.strftime(BQ_DATETIME_FORMAT), + self.end_time.strftime(BQ_DATETIME_FORMAT), + ), + log_page_size=self.config.log_page_size, + limit=1, + ): logger.debug(f"Connection test got one audit metadata entry {entry}") def report_status(self, step: str, status: bool) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries.py new file mode 100644 index 0000000000000..5be7a0a7f6b2f --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries.py @@ -0,0 +1,426 @@ +import textwrap +from typing import Optional + + +class BigqueryTableType: + # See https://cloud.google.com/bigquery/docs/information-schema-tables#schema + BASE_TABLE = "BASE TABLE" + EXTERNAL = "EXTERNAL" + VIEW = "VIEW" + MATERIALIZED_VIEW = "MATERIALIZED VIEW" + CLONE = "CLONE" + SNAPSHOT = "SNAPSHOT" + + +class BigqueryQuery: + show_datasets: str = ( + "select schema_name from `{project_id}`.INFORMATION_SCHEMA.SCHEMATA" + ) + + datasets_for_project_id: str = """ +select + s.CATALOG_NAME as catalog_name, + s.schema_name as table_schema, + s.location as location, + s.CREATION_TIME as created, + s.LAST_MODIFIED_TIME as last_altered, + o.OPTION_VALUE as comment +from + `{project_id}`.INFORMATION_SCHEMA.SCHEMATA as s + left join `{project_id}`.INFORMATION_SCHEMA.SCHEMATA_OPTIONS as o on o.schema_name = s.schema_name + and o.option_name = "description" +order by + s.schema_name +""" + + # https://cloud.google.com/bigquery/docs/information-schema-table-storage?hl=en + tables_for_dataset = f""" +SELECT + t.table_catalog as table_catalog, + t.table_schema as table_schema, + t.table_name as table_name, + t.table_type as table_type, + t.creation_time as created, + ts.last_modified_time as last_altered, + tos.OPTION_VALUE as comment, + is_insertable_into, + ddl, + row_count, + size_bytes as bytes, + num_partitions, + max_partition_id, + active_billable_bytes, + long_term_billable_bytes, + REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix, + REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base + +FROM + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + and t.TABLE_NAME = tos.TABLE_NAME + and tos.OPTION_NAME = "description" + left join ( + select + table_name, + sum(case when partition_id not in ('__NULL__', '__UNPARTITIONED__', '__STREAMING_UNPARTITIONED__') then 1 else 0 END) as num_partitions, + max(case when partition_id not in ('__NULL__', '__UNPARTITIONED__', '__STREAMING_UNPARTITIONED__') then partition_id else NULL END) as max_partition_id, + sum(total_rows) as total_rows, + sum(case when storage_tier = 'LONG_TERM' then total_billable_bytes else 0 end) as long_term_billable_bytes, + sum(case when storage_tier = 'ACTIVE' then total_billable_bytes else 0 end) as active_billable_bytes, + from + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.PARTITIONS + group by + table_name) as p on + t.table_name = p.table_name +WHERE + table_type in ('{BigqueryTableType.BASE_TABLE}', '{BigqueryTableType.EXTERNAL}') +{{table_filter}} +order by + table_schema ASC, + table_base ASC, + table_suffix DESC +""" + + tables_for_dataset_without_partition_data = f""" +SELECT + t.table_catalog as table_catalog, + t.table_schema as table_schema, + t.table_name as table_name, + t.table_type as table_type, + t.creation_time as created, + tos.OPTION_VALUE as comment, + is_insertable_into, + ddl, + REGEXP_EXTRACT(t.table_name, r".*_(\\d+)$") as table_suffix, + REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base + +FROM + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + and t.TABLE_NAME = tos.TABLE_NAME + and tos.OPTION_NAME = "description" +WHERE + table_type in ('{BigqueryTableType.BASE_TABLE}', '{BigqueryTableType.EXTERNAL}') +{{table_filter}} +order by + table_schema ASC, + table_base ASC, + table_suffix DESC +""" + + views_for_dataset: str = f""" +SELECT + t.table_catalog as table_catalog, + t.table_schema as table_schema, + t.table_name as table_name, + t.table_type as table_type, + t.creation_time as created, + ts.last_modified_time as last_altered, + tos.OPTION_VALUE as comment, + is_insertable_into, + ddl as view_definition, + row_count, + size_bytes +FROM + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + and t.TABLE_NAME = tos.TABLE_NAME + and tos.OPTION_NAME = "description" +WHERE + table_type in ('{BigqueryTableType.VIEW}', '{BigqueryTableType.MATERIALIZED_VIEW}') +order by + table_schema ASC, + table_name ASC +""" + + views_for_dataset_without_data_read: str = f""" +SELECT + t.table_catalog as table_catalog, + t.table_schema as table_schema, + t.table_name as table_name, + t.table_type as table_type, + t.creation_time as created, + tos.OPTION_VALUE as comment, + is_insertable_into, + ddl as view_definition +FROM + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + and t.TABLE_NAME = tos.TABLE_NAME + and tos.OPTION_NAME = "description" +WHERE + table_type in ('{BigqueryTableType.VIEW}', '{BigqueryTableType.MATERIALIZED_VIEW}') +order by + table_schema ASC, + table_name ASC +""" + + columns_for_dataset: str = """ +select + c.table_catalog as table_catalog, + c.table_schema as table_schema, + c.table_name as table_name, + c.column_name as column_name, + c.ordinal_position as ordinal_position, + cfp.field_path as field_path, + c.is_nullable as is_nullable, + CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, + description as comment, + c.is_hidden as is_hidden, + c.is_partitioning_column as is_partitioning_column, + c.clustering_ordinal_position as clustering_ordinal_position, +from + `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c + join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name + and cfp.column_name = c.column_name +ORDER BY + table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC""" + + optimized_columns_for_dataset: str = """ +select * from +(select + c.table_catalog as table_catalog, + c.table_schema as table_schema, + c.table_name as table_name, + c.column_name as column_name, + c.ordinal_position as ordinal_position, + cfp.field_path as field_path, + c.is_nullable as is_nullable, + CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, + description as comment, + c.is_hidden as is_hidden, + c.is_partitioning_column as is_partitioning_column, + c.clustering_ordinal_position as clustering_ordinal_position, + -- We count the columns to be able limit it later + row_number() over (partition by c.table_catalog, c.table_schema, c.table_name order by c.ordinal_position asc, c.data_type DESC) as column_num, + -- Getting the maximum shard for each table + row_number() over (partition by c.table_catalog, c.table_schema, ifnull(REGEXP_EXTRACT(c.table_name, r'(.*)_\\d{{8}}$'), c.table_name), cfp.field_path order by c.table_catalog, c.table_schema asc, c.table_name desc) as shard_num +from + `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMNS c + join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name + and cfp.column_name = c.column_name + ) +-- We filter column limit + 1 to make sure we warn about the limit being reached but not reading too much data +where column_num <= {column_limit} and shard_num = 1 +ORDER BY + table_catalog, table_schema, table_name, ordinal_position, column_num ASC, data_type DESC""" + + columns_for_table: str = """ +select + c.table_catalog as table_catalog, + c.table_schema as table_schema, + c.table_name as table_name, + c.column_name as column_name, + c.ordinal_position as ordinal_position, + cfp.field_path as field_path, + c.is_nullable as is_nullable, + CASE WHEN CONTAINS_SUBSTR(field_path, ".") THEN NULL ELSE c.data_type END as data_type, + c.is_hidden as is_hidden, + c.is_partitioning_column as is_partitioning_column, + c.clustering_ordinal_position as clustering_ordinal_position, + description as comment +from + `{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMNS as c + join `{table_identifier.project_id}`.`{table_identifier.dataset}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS as cfp on cfp.table_name = c.table_name + and cfp.column_name = c.column_name +where + c.table_name = '{table_identifier.table}' +ORDER BY + table_catalog, table_schema, table_name, ordinal_position ASC, data_type DESC""" + + +BQ_FILTER_RULE_TEMPLATE_V2_LINEAGE = """ +resource.type=("bigquery_project") +AND +( + protoPayload.methodName= + ( + "google.cloud.bigquery.v2.JobService.Query" + OR + "google.cloud.bigquery.v2.JobService.InsertJob" + ) + AND + protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE" + AND NOT protoPayload.metadata.jobChange.job.jobStatus.errorResult:* + AND ( + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:* + OR + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedViews:* + ) + AND ( + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/_.*/tables/anon.*" + AND + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/INFORMATION_SCHEMA.*" + AND + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/__TABLES__" + AND + protoPayload.metadata.jobChange.job.jobConfig.queryConfig.destinationTable !~ "projects/.*/datasets/_.*/tables/anon.*" + ) + +) +AND +timestamp >= "{start_time}" +AND +timestamp < "{end_time}" +""".strip() +BQ_FILTER_RULE_TEMPLATE_V2_USAGE = """ +resource.type=("bigquery_project" OR "bigquery_dataset") +AND +timestamp >= "{start_time}" +AND +timestamp < "{end_time}" +AND protoPayload.serviceName="bigquery.googleapis.com" +AND +( + ( + protoPayload.methodName= + ( + "google.cloud.bigquery.v2.JobService.Query" + OR + "google.cloud.bigquery.v2.JobService.InsertJob" + ) + AND protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE" + AND NOT protoPayload.metadata.jobChange.job.jobStatus.errorResult:* + AND protoPayload.metadata.jobChange.job.jobConfig.queryConfig:* + AND + ( + ( + protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:* + AND NOT protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/.*/tables/__TABLES__|__TABLES_SUMMARY__|INFORMATION_SCHEMA.*" + ) + OR + ( + protoPayload.metadata.jobChange.job.jobConfig.queryConfig.destinationTable:* + ) + ) + ) + OR + protoPayload.metadata.tableDataRead.reason = "JOB" +) +""".strip( + "\t \n" +) + + +def bigquery_audit_metadata_query_template_lineage( + dataset: str, use_date_sharded_tables: bool, limit: Optional[int] = None +) -> str: + """ + Receives a dataset (with project specified) and returns a query template that is used to query exported + AuditLogs containing protoPayloads of type BigQueryAuditMetadata. + Include only those that: + - have been completed (jobStatus.jobState = "DONE") + - do not contain errors (jobStatus.errorResults is none) + :param dataset: the dataset to query against in the form of $PROJECT.$DATASET + :param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log + tables + :param limit: set a limit for the maximum event to return. It is used for connection testing currently + :return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery + """ + limit_text = f"limit {limit}" if limit else "" + + shard_condition = "" + if use_date_sharded_tables: + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`" + shard_condition = ( + """ AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """ + ) + else: + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`" + + query = f""" + SELECT + timestamp, + logName, + insertId, + protopayload_auditlog AS protoPayload, + protopayload_auditlog.metadataJson AS metadata + FROM + {from_table} + WHERE ( + timestamp >= "{{start_time}}" + AND timestamp < "{{end_time}}" + ) + {shard_condition} + AND protopayload_auditlog.serviceName="bigquery.googleapis.com" + AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" + AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL + AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL + QUALIFY ROW_NUMBER() OVER (PARTITION BY insertId, timestamp, logName) = 1 + {limit_text}; + """ + + return textwrap.dedent(query) + + +def bigquery_audit_metadata_query_template_usage( + dataset: str, + use_date_sharded_tables: bool, + limit: Optional[int] = None, +) -> str: + """ + Receives a dataset (with project specified) and returns a query template that is used to query exported + v2 AuditLogs containing protoPayloads of type BigQueryAuditMetadata. + :param dataset: the dataset to query against in the form of $PROJECT.$DATASET + :param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log + tables + :param limit: maximum number of events to query for + :return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery + """ + + limit_text = f"limit {limit}" if limit else "" + + shard_condition = "" + if use_date_sharded_tables: + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`" + shard_condition = ( + """ AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """ + ) + else: + from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`" + + # Deduplicates insertId via QUALIFY, see: + # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry, insertId field + query = f""" + SELECT + timestamp, + logName, + insertId, + protopayload_auditlog AS protoPayload, + protopayload_auditlog.metadataJson AS metadata + FROM + {from_table} + WHERE ( + timestamp >= "{{start_time}}" + AND timestamp < "{{end_time}}" + ) + {shard_condition} + AND protopayload_auditlog.serviceName="bigquery.googleapis.com" + AND + ( + ( + protopayload_auditlog.methodName IN + ( + "google.cloud.bigquery.v2.JobService.Query", + "google.cloud.bigquery.v2.JobService.InsertJob" + ) + AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" + AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL + AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL + AND ( + JSON_EXTRACT_ARRAY(protopayload_auditlog.metadataJson, + "$.jobChange.job.jobStats.queryStats.referencedTables") IS NOT NULL + OR + JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig.destinationTable") IS NOT NULL + ) + ) + OR + JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.tableDataRead.reason") = "JOB" + ) + QUALIFY ROW_NUMBER() OVER (PARTITION BY insertId, timestamp, logName) = 1 + {limit_text}; + """ + + return textwrap.dedent(query) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index e112db31c5c63..201567e104a51 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -2,7 +2,6 @@ import json import logging import os -import textwrap import time import uuid from dataclasses import dataclass @@ -21,9 +20,6 @@ ) import humanfriendly -from google.cloud.bigquery import Client as BigQueryClient -from google.cloud.logging_v2.client import Client as GCPLoggingClient -from ratelimiter import RateLimiter from datahub.configuration.time_window_config import ( BaseTimeWindowConfig, @@ -35,8 +31,6 @@ from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( - BQ_AUDIT_V2, - BQ_FILTER_RULE_TEMPLATE, AuditEvent, AuditLogEntry, BigQueryAuditMetadata, @@ -45,13 +39,15 @@ QueryEvent, ReadEvent, ) +from datahub.ingestion.source.bigquery_v2.bigquery_audit_log_api import ( + BigQueryAuditLogApi, +) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report -from datahub.ingestion.source.bigquery_v2.common import ( - BQ_DATE_SHARD_FORMAT, - BQ_DATETIME_FORMAT, - _make_gcp_logging_client, - get_bigquery_client, +from datahub.ingestion.source.bigquery_v2.common import BQ_DATETIME_FORMAT +from datahub.ingestion.source.bigquery_v2.queries import ( + BQ_FILTER_RULE_TEMPLATE_V2_USAGE, + bigquery_audit_metadata_query_template_usage, ) from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantUsageRunSkipHandler, @@ -108,77 +104,6 @@ class OperationalDataMeta: custom_type: Optional[str] = None -def bigquery_audit_metadata_query_template( - dataset: str, - use_date_sharded_tables: bool, - limit: Optional[int] = None, -) -> str: - """ - Receives a dataset (with project specified) and returns a query template that is used to query exported - v2 AuditLogs containing protoPayloads of type BigQueryAuditMetadata. - :param dataset: the dataset to query against in the form of $PROJECT.$DATASET - :param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log - tables - :param limit: maximum number of events to query for - :return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery - """ - - limit_text = f"limit {limit}" if limit else "" - - shard_condition = "" - if use_date_sharded_tables: - from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access_*`" - shard_condition = ( - """ AND _TABLE_SUFFIX BETWEEN "{start_date}" AND "{end_date}" """ - ) - else: - from_table = f"`{dataset}.cloudaudit_googleapis_com_data_access`" - - # Deduplicates insertId via QUALIFY, see: - # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry, insertId field - query = f""" - SELECT - timestamp, - logName, - insertId, - protopayload_auditlog AS protoPayload, - protopayload_auditlog.metadataJson AS metadata - FROM - {from_table} - WHERE ( - timestamp >= "{{start_time}}" - AND timestamp < "{{end_time}}" - ) - {shard_condition} - AND protopayload_auditlog.serviceName="bigquery.googleapis.com" - AND - ( - ( - protopayload_auditlog.methodName IN - ( - "google.cloud.bigquery.v2.JobService.Query", - "google.cloud.bigquery.v2.JobService.InsertJob" - ) - AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE" - AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL - AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL - AND ( - JSON_EXTRACT_ARRAY(protopayload_auditlog.metadataJson, - "$.jobChange.job.jobStats.queryStats.referencedTables") IS NOT NULL - OR - JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig.destinationTable") IS NOT NULL - ) - ) - OR - JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.tableDataRead.reason") = "JOB" - ) - QUALIFY ROW_NUMBER() OVER (PARTITION BY insertId, timestamp, logName) = 1 - {limit_text}; - """ - - return textwrap.dedent(query) - - class BigQueryUsageState(Closeable): read_events: FileBackedDict[ReadEvent] query_events: FileBackedDict[QueryEvent] @@ -375,7 +300,8 @@ class BigQueryUsageExtractor: * Aggregation of these statistics into buckets, by day or hour granularity :::note - 1. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission. + 1. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. + In that case, use either admin or private log viewer permission. ::: """ @@ -674,109 +600,6 @@ def _store_usage_event( return True return False - def _get_exported_bigquery_audit_metadata( - self, - bigquery_client: BigQueryClient, - limit: Optional[int] = None, - ) -> Iterable[BigQueryAuditMetadata]: - if self.config.bigquery_audit_metadata_datasets is None: - self.report.bigquery_audit_metadata_datasets_missing = True - return - - corrected_start_time = self.start_time - self.config.max_query_duration - start_time = corrected_start_time.strftime(BQ_DATETIME_FORMAT) - start_date = corrected_start_time.strftime(BQ_DATE_SHARD_FORMAT) - self.report.audit_start_time = start_time - - corrected_end_time = self.end_time + self.config.max_query_duration - end_time = corrected_end_time.strftime(BQ_DATETIME_FORMAT) - end_date = corrected_end_time.strftime(BQ_DATE_SHARD_FORMAT) - self.report.audit_end_time = end_time - - for dataset in self.config.bigquery_audit_metadata_datasets: - logger.info( - f"Start loading log entries from BigQueryAuditMetadata in {dataset}" - ) - - query = bigquery_audit_metadata_query_template( - dataset, - self.config.use_date_sharded_audit_log_tables, - limit=limit, - ).format( - start_time=start_time, - end_time=end_time, - start_date=start_date, - end_date=end_date, - ) - - query_job = bigquery_client.query(query) - logger.info( - f"Finished loading log entries from BigQueryAuditMetadata in {dataset}" - ) - if self.config.rate_limit: - with RateLimiter(max_calls=self.config.requests_per_min, period=60): - yield from query_job - else: - yield from query_job - - def _get_bigquery_log_entries_via_gcp_logging( - self, client: GCPLoggingClient, limit: Optional[int] = None - ) -> Iterable[AuditLogEntry]: - filter = self._generate_filter(BQ_AUDIT_V2) - logger.debug(filter) - - list_entries: Iterable[AuditLogEntry] - rate_limiter: Optional[RateLimiter] = None - if self.config.rate_limit: - # client.list_entries is a generator, does api calls to GCP Logging when it runs out of entries and needs to fetch more from GCP Logging - # to properly ratelimit we multiply the page size by the number of requests per minute - rate_limiter = RateLimiter( - max_calls=self.config.requests_per_min * self.config.log_page_size, - period=60, - ) - - list_entries = client.list_entries( - filter_=filter, - page_size=self.config.log_page_size, - max_results=limit, - ) - - for i, entry in enumerate(list_entries): - if i == 0: - logger.info(f"Starting log load from GCP Logging for {client.project}") - if i % 1000 == 0: - logger.info(f"Loaded {i} log entries from GCP Log for {client.project}") - self.report.total_query_log_entries += 1 - - if rate_limiter: - with rate_limiter: - yield entry - else: - yield entry - - logger.info( - f"Finished loading {self.report.total_query_log_entries} log entries from GCP Logging for {client.project}" - ) - - def _generate_filter(self, audit_templates: Dict[str, str]) -> str: - # We adjust the filter values a bit, since we need to make sure that the join - # between query events and read events is complete. For example, this helps us - # handle the case where the read happens within our time range but the query - # completion event is delayed and happens after the configured end time. - - start_time = (self.start_time - self.config.max_query_duration).strftime( - BQ_DATETIME_FORMAT - ) - self.report.log_entry_start_time = start_time - end_time = (self.end_time + self.config.max_query_duration).strftime( - BQ_DATETIME_FORMAT - ) - self.report.log_entry_end_time = end_time - filter = audit_templates[BQ_FILTER_RULE_TEMPLATE].format( - start_time=start_time, end_time=end_time - ) - return filter - @staticmethod def _get_destination_table(event: AuditEvent) -> Optional[BigQueryTableRef]: if ( @@ -1011,27 +834,54 @@ def _parse_exported_bigquery_audit_metadata( def _get_parsed_bigquery_log_events( self, project_id: str, limit: Optional[int] = None ) -> Iterable[AuditEvent]: + audit_log_api = BigQueryAuditLogApi( + self.report.audit_log_api_perf, + self.config.rate_limit, + self.config.requests_per_min, + ) + # We adjust the filter values a bit, since we need to make sure that the join + # between query events and read events is complete. For example, this helps us + # handle the case where the read happens within our time range but the query + # completion event is delayed and happens after the configured end time. + corrected_start_time = self.start_time - self.config.max_query_duration + corrected_end_time = self.end_time + -self.config.max_query_duration + self.report.audit_start_time = corrected_start_time + self.report.audit_end_time = corrected_end_time + parse_fn: Callable[[Any], Optional[AuditEvent]] if self.config.use_exported_bigquery_audit_metadata: - bq_client = get_bigquery_client(self.config) - entries = self._get_exported_bigquery_audit_metadata( + bq_client = self.config.get_bigquery_client() + + entries = audit_log_api.get_exported_bigquery_audit_metadata( bigquery_client=bq_client, + bigquery_audit_metadata_datasets=self.config.bigquery_audit_metadata_datasets, + bigquery_audit_metadata_query_template=bigquery_audit_metadata_query_template_usage, + use_date_sharded_audit_log_tables=self.config.use_date_sharded_audit_log_tables, + start_time=corrected_start_time, + end_time=corrected_end_time, limit=limit, ) parse_fn = self._parse_exported_bigquery_audit_metadata else: - logging_client = _make_gcp_logging_client( - project_id, self.config.extra_client_options + logging_client = self.config.make_gcp_logging_client(project_id) + logger.info( + f"Start loading log entries from BigQuery for {project_id} " + f"with start_time={corrected_start_time} and end_time={corrected_end_time}" ) - entries = self._get_bigquery_log_entries_via_gcp_logging( - logging_client, limit=limit + entries = audit_log_api.get_bigquery_log_entries_via_gcp_logging( + logging_client, + filter=self._generate_filter(corrected_start_time, corrected_end_time), + log_page_size=self.config.log_page_size, + limit=limit, ) parse_fn = self._parse_bigquery_log_entry for entry in entries: try: + self.report.num_usage_total_log_entries[project_id] += 1 event = parse_fn(entry) if event: + self.report.num_usage_parsed_log_entries[project_id] += 1 yield event except Exception as e: logger.warning( @@ -1042,6 +892,12 @@ def _get_parsed_bigquery_log_events( f"log-parse-{project_id}", e, group="usage-log-parse" ) + def _generate_filter(self, corrected_start_time, corrected_end_time): + return BQ_FILTER_RULE_TEMPLATE_V2_USAGE.format( + start_time=corrected_start_time.strftime(BQ_DATETIME_FORMAT), + end_time=corrected_end_time.strftime(BQ_DATETIME_FORMAT), + ) + def get_tables_from_query( self, default_project: str, query: str ) -> Optional[List[BigQueryTableRef]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index c8623798f6937..bbe52b5d98ba3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -365,8 +365,8 @@ def populate_lineage( # Populate table level lineage by getting upstream tables from stl_scan redshift table query = RedshiftQuery.stl_scan_based_lineage_query( self.config.database, - self.config.start_time, - self.config.end_time, + self.start_time, + self.end_time, ) populate_calls.append((query, LineageCollectorType.QUERY_SCAN)) elif self.config.table_lineage_mode == LineageMode.SQL_BASED: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 811ea67981e18..240e0ffa1a0b6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -543,15 +543,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.connection.close() - lru_cache_functions: List[Callable] = [ - self.data_dictionary.get_tables_for_database, - self.data_dictionary.get_views_for_database, - self.data_dictionary.get_columns_for_schema, - self.data_dictionary.get_pk_constraints_for_schema, - self.data_dictionary.get_fk_constraints_for_schema, - ] - for func in lru_cache_functions: - self.report.lru_cache_info[func.__name__] = func.cache_info()._asdict() # type: ignore + self.report_cache_info() # TODO: The checkpoint state for stale entity detection can be committed here. @@ -596,6 +588,17 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) and self.usage_extractor: yield from self.usage_extractor.get_usage_workunits(discovered_datasets) + def report_cache_info(self): + lru_cache_functions: List[Callable] = [ + self.data_dictionary.get_tables_for_database, + self.data_dictionary.get_views_for_database, + self.data_dictionary.get_columns_for_schema, + self.data_dictionary.get_pk_constraints_for_schema, + self.data_dictionary.get_fk_constraints_for_schema, + ] + for func in lru_cache_functions: + self.report.lru_cache_info[func.__name__] = func.cache_info()._asdict() # type: ignore + def report_warehouse_failure(self): if self.config.warehouse is not None: self.report_error( diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 3fac1d68c3a9e..18384420bfefb 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -1,26 +1,49 @@ +import logging import time from contextlib import AbstractContextManager from typing import Any, Optional +logger: logging.Logger = logging.getLogger(__name__) + class PerfTimer(AbstractContextManager): """ A context manager that gives easy access to elapsed time for performance measurement. + """ - start_time: Optional[float] = None - end_time: Optional[float] = None + def __init__(self) -> None: + self.start_time: Optional[float] = None + self.end_time: Optional[float] = None + self._past_active_time: float = 0 + self.paused: bool = False + self._error_state = False def start(self) -> None: + if self.end_time is not None: + self._past_active_time = self.elapsed_seconds() + self.start_time = time.perf_counter() self.end_time = None + self.paused = False + + def pause(self) -> "PerfTimer": + self.assert_timer_is_running() + self._past_active_time = self.elapsed_seconds() + self.start_time = None + self.end_time = None + self.paused = True + return self def finish(self) -> None: - assert self.start_time is not None + self.assert_timer_is_running() self.end_time = time.perf_counter() def __enter__(self) -> "PerfTimer": - self.start() + if self.paused: # Entering paused timer context, NO OP + pass + else: + self.start() return self def __exit__( @@ -29,16 +52,46 @@ def __exit__( exc: Any, traceback: Any, ) -> Optional[bool]: - self.finish() + if self.paused: # Exiting paused timer context, resume timer + self.start() + else: + self.finish() return None def elapsed_seconds(self) -> float: """ Returns the elapsed time in seconds. """ + if self.paused or not self.start_time: + return self._past_active_time - assert self.start_time is not None if self.end_time is None: - return time.perf_counter() - self.start_time + return (time.perf_counter() - self.start_time) + (self._past_active_time) + else: + return (self.end_time - self.start_time) + self._past_active_time + + def assert_timer_is_running(self) -> None: + """ + Returns true if timer is in running state. + Timer is in NOT in running state if + 1. it has never been started. + 2. it is in paused state. + 3. it had been started and finished in the past but not started again. + """ + if self.start_time is None or self.paused or self.end_time: + self._error_state = True + logger.warning("Did you forget to start the timer ?") + + def __repr__(self) -> str: + return repr(self.as_obj()) + + def __str__(self) -> str: + return self.__repr__() + + def as_obj(self) -> Optional[str]: + if self.start_time is None: + return None else: - return self.end_time - self.start_time + time_taken = self.elapsed_seconds() + state = " (error)" if self._error_state else "" + return f"{time_taken:.3f} seconds{state}" diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index cc3ee1f6ceaa4..602401134dcd3 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -4,8 +4,10 @@ from freezegun import freeze_time from google.cloud.bigquery.table import TableListItem +from datahub.ingestion.source.bigquery_v2.bigquery import BigqueryV2Source from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryDataset, + BigQuerySchemaApi, BigqueryTable, ) from tests.test_helpers import mce_helpers @@ -15,15 +17,9 @@ @freeze_time(FROZEN_TIME) -@patch( - "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_tables_for_dataset" -) -@patch( - "datahub.ingestion.source.bigquery_v2.bigquery.BigqueryV2Source.get_core_table_details" -) -@patch( - "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_datasets_for_project_id" -) +@patch.object(BigQuerySchemaApi, "get_tables_for_dataset") +@patch.object(BigqueryV2Source, "get_core_table_details") +@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch("google.cloud.bigquery.Client") def test_bigquery_v2_ingest( client, diff --git a/metadata-ingestion/tests/unit/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_lineage.py index 9b09fa36ba586..e23494963e475 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_lineage.py @@ -3,6 +3,7 @@ import pytest +import datahub.emitter.mce_builder as builder from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BigQueryTableRef, QueryEvent, @@ -81,7 +82,9 @@ def lineage_entries() -> List[QueryEvent]: def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config() report = BigQueryV2Report() - extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report) + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, report, lambda x: builder.make_dataset_urn("bigquery", str(x)) + ) bq_table = BigQueryTableRef.from_string_name( "projects/my_project/datasets/my_dataset/tables/my_table" @@ -96,7 +99,6 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: bq_table=bq_table, bq_table_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)", lineage_metadata=lineage_map, - platform="bigquery", ) assert upstream_lineage assert len(upstream_lineage.upstreams) == 4 @@ -105,7 +107,9 @@ def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None: def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False) report = BigQueryV2Report() - extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report) + extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( + config, report, lambda x: builder.make_dataset_urn("bigquery", str(x)) + ) bq_table = BigQueryTableRef.from_string_name( "projects/my_project/datasets/my_dataset/tables/my_table" @@ -120,7 +124,6 @@ def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None: bq_table=bq_table, bq_table_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)", lineage_metadata=lineage_map, - platform="bigquery", ) assert upstream_lineage assert len(upstream_lineage.upstreams) == 2 diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 6907f926249f5..4fc6c31626ba8 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -18,9 +18,10 @@ BigQueryTableRef, ) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config +from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( - BigQueryDataDictionary, BigqueryProject, + BigQuerySchemaApi, BigqueryView, ) from datahub.ingestion.source.bigquery_v2.lineage import ( @@ -92,15 +93,17 @@ def test_bigquery_uri_with_credential(): raise e -@patch("google.cloud.bigquery.client.Client") -def test_get_projects_with_project_ids(client_mock): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_with_project_ids(get_bq_client_mock): + client_mock = MagicMock() + get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( { "project_ids": ["test-1", "test-2"], } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) - assert source._get_projects(client_mock) == [ + assert source._get_projects() == [ BigqueryProject("test-1", "test-1"), BigqueryProject("test-2", "test-2"), ] @@ -110,14 +113,17 @@ def test_get_projects_with_project_ids(client_mock): {"project_ids": ["test-1", "test-2"], "project_id": "test-3"} ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test2")) - assert source._get_projects(client_mock) == [ + assert source._get_projects() == [ BigqueryProject("test-1", "test-1"), BigqueryProject("test-2", "test-2"), ] assert client_mock.list_projects.call_count == 0 -def test_get_projects_with_project_ids_overrides_project_id_pattern(): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_with_project_ids_overrides_project_id_pattern( + get_bq_client_mock, +): config = BigQueryV2Config.parse_obj( { "project_ids": ["test-project", "test-project-2"], @@ -125,7 +131,7 @@ def test_get_projects_with_project_ids_overrides_project_id_pattern(): } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - projects = source._get_projects(MagicMock()) + projects = source._get_projects() assert projects == [ BigqueryProject(id="test-project", name="test-project"), BigqueryProject(id="test-project-2", name="test-project-2"), @@ -143,7 +149,8 @@ def test_platform_instance_config_always_none(): assert config.platform_instance is None -def test_get_dataplatform_instance_aspect_returns_project_id(): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_dataplatform_instance_aspect_returns_project_id(get_bq_client_mock): project_id = "project_id" expected_instance = ( f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,{project_id})" @@ -162,7 +169,8 @@ def test_get_dataplatform_instance_aspect_returns_project_id(): assert metadata.aspect.instance == expected_instance -def test_get_dataplatform_instance_default_no_instance(): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_dataplatform_instance_default_no_instance(get_bq_client_mock): config = BigQueryV2Config.parse_obj({}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) @@ -176,18 +184,22 @@ def test_get_dataplatform_instance_default_no_instance(): assert metadata.aspect.instance is None -@patch("google.cloud.bigquery.client.Client") -def test_get_projects_with_single_project_id(client_mock): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_with_single_project_id(get_bq_client_mock): + client_mock = MagicMock() + get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj({"project_id": "test-3"}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) - assert source._get_projects(client_mock) == [ + assert source._get_projects() == [ BigqueryProject("test-3", "test-3"), ] assert client_mock.list_projects.call_count == 0 -@patch("google.cloud.bigquery.client.Client") -def test_get_projects_by_list(client_mock): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_by_list(get_bq_client_mock): + client_mock = MagicMock() + get_bq_client_mock.return_value = client_mock client_mock.list_projects.return_value = [ SimpleNamespace( project_id="test-1", @@ -201,15 +213,16 @@ def test_get_projects_by_list(client_mock): config = BigQueryV2Config.parse_obj({}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) - assert source._get_projects(client_mock) == [ + assert source._get_projects() == [ BigqueryProject("test-1", "one"), BigqueryProject("test-2", "two"), ] assert client_mock.list_projects.call_count == 1 -@patch.object(BigQueryDataDictionary, "get_projects") -def test_get_projects_filter_by_pattern(get_projects_mock): +@patch.object(BigQuerySchemaApi, "get_projects") +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): get_projects_mock.return_value = [ BigqueryProject("test-project", "Test Project"), BigqueryProject("test-project-2", "Test Project 2"), @@ -219,31 +232,35 @@ def test_get_projects_filter_by_pattern(get_projects_mock): {"project_id_pattern": {"deny": ["^test-project$"]}} ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - projects = source._get_projects(MagicMock()) + projects = source._get_projects() assert projects == [ BigqueryProject(id="test-project-2", name="Test Project 2"), ] -@patch.object(BigQueryDataDictionary, "get_projects") -def test_get_projects_list_empty(get_projects_mock): +@patch.object(BigQuerySchemaApi, "get_projects") +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): get_projects_mock.return_value = [] config = BigQueryV2Config.parse_obj( {"project_id_pattern": {"deny": ["^test-project$"]}} ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - projects = source._get_projects(MagicMock()) + projects = source._get_projects() assert len(source.report.failures) == 1 assert projects == [] -@patch.object(BigQueryDataDictionary, "get_projects") +@patch.object(BigQueryV2Config, "get_bigquery_client") def test_get_projects_list_failure( - get_projects_mock: MagicMock, caplog: pytest.LogCaptureFixture + get_bq_client_mock: MagicMock, + caplog: pytest.LogCaptureFixture, ) -> None: error_str = "my error" - get_projects_mock.side_effect = GoogleAPICallError(error_str) + bq_client_mock = MagicMock() + get_bq_client_mock.return_value = bq_client_mock + bq_client_mock.list_projects.side_effect = GoogleAPICallError(error_str) config = BigQueryV2Config.parse_obj( {"project_id_pattern": {"deny": ["^test-project$"]}} @@ -251,27 +268,29 @@ def test_get_projects_list_failure( source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) caplog.records.clear() with caplog.at_level(logging.ERROR): - projects = source._get_projects(MagicMock()) + projects = source._get_projects() assert len(caplog.records) == 1 assert error_str in caplog.records[0].msg assert len(source.report.failures) == 1 assert projects == [] -@patch.object(BigQueryDataDictionary, "get_projects") -def test_get_projects_list_fully_filtered(get_projects_mock): +@patch.object(BigQuerySchemaApi, "get_projects") +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock): get_projects_mock.return_value = [BigqueryProject("test-project", "Test Project")] config = BigQueryV2Config.parse_obj( {"project_id_pattern": {"deny": ["^test-project$"]}} ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - projects = source._get_projects(MagicMock()) + projects = source._get_projects() assert len(source.report.failures) == 0 assert projects == [] -def test_simple_upstream_table_generation(): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_simple_upstream_table_generation(get_bq_client_mock): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( project_id="test-project", dataset="test-dataset", table="a" @@ -302,7 +321,10 @@ def test_simple_upstream_table_generation(): assert list(upstreams)[0].table == str(b) -def test_upstream_table_generation_with_temporary_table_without_temp_upstream(): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_upstream_table_generation_with_temporary_table_without_temp_upstream( + get_bq_client_mock, +): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( project_id="test-project", dataset="test-dataset", table="a" @@ -332,7 +354,8 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream(): assert list(upstreams) == [] -def test_upstream_table_column_lineage_with_temp_table(): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock): from datahub.ingestion.api.common import PipelineContext a: BigQueryTableRef = BigQueryTableRef( @@ -406,7 +429,10 @@ def test_upstream_table_column_lineage_with_temp_table(): assert upstream.column_confidence == 0.7 -def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream(): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream( + get_bq_client_mock, +): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( project_id="test-project", dataset="test-dataset", table="a" @@ -466,11 +492,11 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr assert sorted_list[1].table == str(e) -@patch( - "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_tables_for_dataset" -) -@patch("google.cloud.bigquery.client.Client") -def test_table_processing_logic(client_mock, data_dictionary_mock): +@patch.object(BigQuerySchemaApi, "get_tables_for_dataset") +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_table_processing_logic(get_bq_client_mock, data_dictionary_mock): + client_mock = MagicMock() + get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( { "project_id": "test-project", @@ -523,7 +549,7 @@ def test_table_processing_logic(client_mock, data_dictionary_mock): _ = list( source.get_tables_for_dataset( - conn=client_mock, project_id="test-project", dataset_name="test-dataset" + project_id="test-project", dataset_name="test-dataset" ) ) @@ -531,17 +557,19 @@ def test_table_processing_logic(client_mock, data_dictionary_mock): # args only available from python 3.8 and that's why call_args_list is sooo ugly tables: Dict[str, TableListItem] = data_dictionary_mock.call_args_list[0][0][ - 3 + 2 ] # alternatively for table in tables.keys(): assert table in ["test-table", "test-sharded-table_20220102"] -@patch( - "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_tables_for_dataset" -) -@patch("google.cloud.bigquery.client.Client") -def test_table_processing_logic_date_named_tables(client_mock, data_dictionary_mock): +@patch.object(BigQuerySchemaApi, "get_tables_for_dataset") +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_table_processing_logic_date_named_tables( + get_bq_client_mock, data_dictionary_mock +): + client_mock = MagicMock() + get_bq_client_mock.return_value = client_mock # test that tables with date names are processed correctly config = BigQueryV2Config.parse_obj( { @@ -595,7 +623,7 @@ def test_table_processing_logic_date_named_tables(client_mock, data_dictionary_m _ = list( source.get_tables_for_dataset( - conn=client_mock, project_id="test-project", dataset_name="test-dataset" + project_id="test-project", dataset_name="test-dataset" ) ) @@ -603,7 +631,7 @@ def test_table_processing_logic_date_named_tables(client_mock, data_dictionary_m # args only available from python 3.8 and that's why call_args_list is sooo ugly tables: Dict[str, TableListItem] = data_dictionary_mock.call_args_list[0][0][ - 3 + 2 ] # alternatively for table in tables.keys(): assert tables[table].table_id in ["test-table", "20220103"] @@ -644,16 +672,16 @@ def bigquery_view_2() -> BigqueryView: ) -@patch( - "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_query_result" -) -@patch("google.cloud.bigquery.client.Client") +@patch.object(BigQuerySchemaApi, "get_query_result") +@patch.object(BigQueryV2Config, "get_bigquery_client") def test_get_views_for_dataset( - client_mock: Mock, + get_bq_client_mock: Mock, query_mock: Mock, bigquery_view_1: BigqueryView, bigquery_view_2: BigqueryView, ) -> None: + client_mock = MagicMock() + get_bq_client_mock.return_value = client_mock assert bigquery_view_1.last_altered row1 = create_row( dict( @@ -675,9 +703,11 @@ def test_get_views_for_dataset( ) ) query_mock.return_value = [row1, row2] + bigquery_data_dictionary = BigQuerySchemaApi( + BigQueryV2Report().schema_api_perf, client_mock + ) - views = BigQueryDataDictionary.get_views_for_dataset( - conn=client_mock, + views = bigquery_data_dictionary.get_views_for_dataset( project_id="test-project", dataset_name="test-dataset", has_data_read=False, @@ -686,7 +716,10 @@ def test_get_views_for_dataset( @patch.object(BigqueryV2Source, "gen_dataset_workunits", lambda *args, **kwargs: []) -def test_gen_view_dataset_workunits(bigquery_view_1, bigquery_view_2): +@patch.object(BigQueryV2Config, "get_bigquery_client") +def test_gen_view_dataset_workunits( + get_bq_client_mock, bigquery_view_1, bigquery_view_2 +): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( diff --git a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py index 6ee1f05f0582c..4cf42da4395f9 100644 --- a/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigqueryv2_usage_source.py @@ -4,7 +4,6 @@ from freezegun import freeze_time from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( - BQ_AUDIT_V2, BigqueryTableIdentifier, BigQueryTableRef, ) @@ -111,10 +110,12 @@ def test_bigqueryv2_filters(): OR protoPayload.metadata.tableDataRead.reason = "JOB" )""" # noqa: W293 - source = BigQueryUsageExtractor( - config, BigQueryV2Report(), dataset_urn_builder=lambda _: "" - ) - filter: str = source._generate_filter(BQ_AUDIT_V2) + + corrected_start_time = config.start_time - config.max_query_duration + corrected_end_time = config.end_time + config.max_query_duration + filter: str = BigQueryUsageExtractor( + config, BigQueryV2Report(), lambda x: "" + )._generate_filter(corrected_start_time, corrected_end_time) assert filter == expected_filter diff --git a/metadata-ingestion/tests/unit/utilities/test_perf_timer.py b/metadata-ingestion/tests/unit/utilities/test_perf_timer.py new file mode 100644 index 0000000000000..d5fde314c2b57 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_perf_timer.py @@ -0,0 +1,46 @@ +import time +from functools import partial + +import pytest + +from datahub.utilities.perf_timer import PerfTimer + +approx = partial(pytest.approx, rel=1e-2) + + +def test_perf_timer_simple(): + with PerfTimer() as timer: + time.sleep(1) + assert approx(timer.elapsed_seconds()) == 1 + + assert approx(timer.elapsed_seconds()) == 1 + + +def test_perf_timer_paused_timer(): + with PerfTimer() as current_timer: + time.sleep(1) + assert approx(current_timer.elapsed_seconds()) == 1 + with current_timer.pause(): + time.sleep(2) + assert approx(current_timer.elapsed_seconds()) == 1 + assert approx(current_timer.elapsed_seconds()) == 1 + time.sleep(1) + + assert approx(current_timer.elapsed_seconds()) == 2 + + +def test_generator_with_paused_timer(): + def generator_function(): + with PerfTimer() as inner_timer: + time.sleep(1) + for i in range(10): + time.sleep(0.2) + with inner_timer.pause(): + time.sleep(0.2) + yield i + assert approx(inner_timer.elapsed_seconds()) == 1 + 0.2 * 10 + + with PerfTimer() as outer_timer: + seq = generator_function() + list([i for i in seq]) + assert approx(outer_timer.elapsed_seconds()) == 1 + 0.2 * 10 + 0.2 * 10