Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add better timers around every API call #8626

Merged
merged 20 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b84601c
feat(bigquery): add better timers around every API call
mayurinehate Aug 9, 2023
95bbcbe
wip, timers not added for unused methods - remove these ?
mayurinehate Aug 10, 2023
76ddc3f
refractor in lineage.py
mayurinehate Aug 11, 2023
e57f134
report composition vs inheritance
mayurinehate Aug 14, 2023
38b18bb
more refractor and fixes
mayurinehate Aug 14, 2023
8d049a7
Merge branch 'master' into bq_timers_api_refractor
mayurinehate Aug 14, 2023
31a3be8
fix lint, tests
mayurinehate Aug 14, 2023
eaa72a3
revert rename of bigquery_schema.py to bigquery_schema_api.py
mayurinehate Aug 22, 2023
ac2ab3b
Merge remote-tracking branch 'datahub-oss/master' into bq_timers_api_…
mayurinehate Aug 22, 2023
94077db
Merge branch 'master' into bq_timers_api_refractor
mayurinehate Aug 28, 2023
e500275
Merge remote-tracking branch 'datahub-oss/master' into bq_timers_api_…
mayurinehate Aug 28, 2023
1b3d5b5
move stateful check inside lineage module
mayurinehate Aug 28, 2023
c77a9ab
Merge branch 'master' into bq_timers_api_refractor
mayurinehate Sep 5, 2023
6a2a3d4
merge related changes
mayurinehate Sep 5, 2023
32fedfc
Merge branch 'master' into bq_timers_api_refractor
mayurinehate Sep 8, 2023
9dca7e5
Merge branch 'master' into bq_timers_api_refractor
mayurinehate Sep 11, 2023
79f84ba
address review comments
mayurinehate Sep 13, 2023
25f4f0b
Merge branch 'master' into bq_timers_api_refractor
mayurinehate Sep 13, 2023
0ba4efc
fix tests
mayurinehate Sep 14, 2023
ea2c4a1
Merge branch 'master' into bq_timers_api_refractor
mayurinehate Sep 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 59 additions & 182 deletions metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,48 +13,6 @@
get_first_missing_key_any,
)

BQ_FILTER_RULE_TEMPLATE = "BQ_FILTER_RULE_TEMPLATE"

BQ_AUDIT_V2 = {
BQ_FILTER_RULE_TEMPLATE: """
resource.type=("bigquery_project" OR "bigquery_dataset")
AND
timestamp >= "{start_time}"
AND
timestamp < "{end_time}"
AND protoPayload.serviceName="bigquery.googleapis.com"
AND
(
(
protoPayload.methodName=
(
"google.cloud.bigquery.v2.JobService.Query"
OR
"google.cloud.bigquery.v2.JobService.InsertJob"
)
AND protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE"
AND NOT protoPayload.metadata.jobChange.job.jobStatus.errorResult:*
AND protoPayload.metadata.jobChange.job.jobConfig.queryConfig:*
AND
(
(
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables:*
AND NOT protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/.*/tables/__TABLES__|__TABLES_SUMMARY__|INFORMATION_SCHEMA.*"
)
OR
(
protoPayload.metadata.jobChange.job.jobConfig.queryConfig.destinationTable:*
)
)
)
OR
protoPayload.metadata.tableDataRead.reason = "JOB"
)
""".strip(
"\t \n"
),
}

AuditLogEntry = Any

# BigQueryAuditMetadata is the v2 format in which audit logs are exported to BigQuery
Expand Down Expand Up @@ -606,7 +564,6 @@ def from_query_event(
query_event: QueryEvent,
debug_include_full_payloads: bool = False,
) -> "ReadEvent":

readEvent = ReadEvent(
actor_email=query_event.actor_email,
timestamp=query_event.timestamp,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import logging
from datetime import datetime
from typing import Callable, Iterable, List, Optional

from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from ratelimiter import RateLimiter

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditLogEntry,
BigQueryAuditMetadata,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import (
BigQueryAuditLogApiPerfReport,
)
from datahub.ingestion.source.bigquery_v2.common import (
BQ_DATE_SHARD_FORMAT,
BQ_DATETIME_FORMAT,
)

logger: logging.Logger = logging.getLogger(__name__)


# Api interfaces are separated based on functionality they provide
# rather than the underlying bigquery client that is used to
# provide the functionality.
class BigQueryAuditLogApi:
def __init__(
self,
report: BigQueryAuditLogApiPerfReport,
rate_limit: bool,
requests_per_min: int,
) -> None:
self.report = report
self.rate_limit = rate_limit
self.requests_per_min = requests_per_min

def get_exported_bigquery_audit_metadata(
self,
bigquery_client: bigquery.Client,
bigquery_audit_metadata_query_template: Callable[
[
str, # dataset: str
bool, # use_date_sharded_tables: bool
Optional[int], # limit: Optional[int] = None
],
str,
],
Comment on lines +41 to +48
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine here, but in general would like to avoid passing complex functions because it's kinda ugly and hard to extend

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

bigquery_audit_metadata_datasets: Optional[List[str]],
use_date_sharded_audit_log_tables: bool,
start_time: datetime,
end_time: datetime,
limit: Optional[int] = None,
) -> Iterable[BigQueryAuditMetadata]:
if bigquery_audit_metadata_datasets is None:
return

audit_start_time = start_time.strftime(BQ_DATETIME_FORMAT)
audit_start_date = start_time.strftime(BQ_DATE_SHARD_FORMAT)

audit_end_time = end_time.strftime(BQ_DATETIME_FORMAT)
audit_end_date = end_time.strftime(BQ_DATE_SHARD_FORMAT)

rate_limiter: Optional[RateLimiter] = None
if self.rate_limit:
rate_limiter = RateLimiter(max_calls=self.requests_per_min, period=60)

with self.report.get_exported_log_entries as current_timer:
for dataset in bigquery_audit_metadata_datasets:
logger.info(
f"Start loading log entries from BigQueryAuditMetadata in {dataset}"
)

query = bigquery_audit_metadata_query_template(
dataset,
use_date_sharded_audit_log_tables,
limit,
).format(
start_time=audit_start_time,
end_time=audit_end_time,
start_date=audit_start_date,
end_date=audit_end_date,
)

query_job = bigquery_client.query(query)
logger.info(
f"Finished loading log entries from BigQueryAuditMetadata in {dataset}"
)

for entry in query_job:
with current_timer.pause():
if rate_limiter:
with rate_limiter:
yield entry
else:
yield entry

def get_bigquery_log_entries_via_gcp_logging(
self,
client: GCPLoggingClient,
filter: str,
log_page_size: int,
limit: Optional[int] = None,
) -> Iterable[AuditLogEntry]:
logger.debug(filter)

list_entries: Iterable[AuditLogEntry]
rate_limiter: Optional[RateLimiter] = None
if self.rate_limit:
# client.list_entries is a generator, does api calls to GCP Logging when it runs out of entries and needs to fetch more from GCP Logging
# to properly ratelimit we multiply the page size by the number of requests per minute
rate_limiter = RateLimiter(
max_calls=self.requests_per_min * log_page_size,
period=60,
)

with self.report.list_log_entries as current_timer:
list_entries = client.list_entries(
filter_=filter,
page_size=log_page_size,
max_results=limit,
)

for i, entry in enumerate(list_entries):
if i % 1000 == 0:
logger.info(
f"Loaded {i} log entries from GCP Log for {client.project}"
)

with current_timer.pause():
if rate_limiter:
with rate_limiter:
yield entry
else:
yield entry

logger.info(
f"Finished loading log entries from GCP Log for {client.project}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from typing import Any, Dict, List, Optional

import pydantic
from pydantic import Field, PositiveInt, PrivateAttr, root_validator
from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
Expand Down Expand Up @@ -35,7 +37,52 @@ class BigQueryUsageConfig(BaseUsageConfig):
)


class BigQueryConnectionConfig(ConfigModel):
credential: Optional[BigQueryCredential] = Field(
default=None, description="BigQuery credential informations"
)

_credentials_path: Optional[str] = PrivateAttr(None)

extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)

project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.",
)

def __init__(self, **data: Any):
super().__init__(**data)

if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path

def get_bigquery_client(config) -> bigquery.Client:
client_options = config.extra_client_options
return bigquery.Client(config.project_on_behalf, **client_options)

def make_gcp_logging_client(
self, project_id: Optional[str] = None
) -> GCPLoggingClient:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.extra_client_options.copy()
client_options["_use_grpc"] = False
if project_id is not None:
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)


class BigQueryV2Config(
BigQueryConnectionConfig,
BigQueryBaseConfig,
SQLCommonConfig,
StatefulUsageConfigMixin,
Expand Down Expand Up @@ -122,11 +169,6 @@ class BigQueryV2Config(
),
)

project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.",
)

storage_project_id: None = Field(default=None, hidden_from_docs=True)

lineage_use_sql_parser: bool = Field(
Expand Down Expand Up @@ -180,14 +222,8 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
default=1000,
description="The number of log item will be queried per page for lineage collection",
)
credential: Optional[BigQueryCredential] = Field(
description="BigQuery credential informations"
)

# extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage.
extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)
include_table_lineage: Optional[bool] = Field(
default=True,
description="Option to enable/disable lineage generation. Is enabled by default.",
Expand All @@ -209,7 +245,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
default=False,
description="Whether to read date sharded tables or time partitioned tables when extracting usage from exported audit logs.",
)
_credentials_path: Optional[str] = PrivateAttr(None)

_cache_path: Optional[str] = PrivateAttr(None)

Expand All @@ -230,16 +265,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="Maximum number of entries for the in-memory caches of FileBacked data structures.",
)

def __init__(self, **data: Any):
super().__init__(**data)

if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path

@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.
Expand All @@ -248,6 +273,17 @@ def profile_default_settings(cls, values: Dict) -> Dict:

return values

@validator("bigquery_audit_metadata_datasets")
def validate_bigquery_audit_metadata_datasets(
cls, v: Optional[List[str]], values: Dict
) -> Optional[List[str]]:
if values.get("use_exported_bigquery_audit_metadata"):
assert (
v and len(v) > 0
), "`bigquery_audit_metadata_datasets` should be set if using `use_exported_bigquery_audit_metadata: True`."

return v

@root_validator(pre=False)
def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
project_id = values.get("project_id")
Expand Down
Loading
Loading