From 7fff89c4f55a9392a29ae4a77794e1a9f5a565fc Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 31 Oct 2024 21:48:13 -0700 Subject: [PATCH 1/2] fix(ingest/bigquery): increase logging in bigquery-queries extractor --- .../source/bigquery_v2/bigquery_queries.py | 2 +- .../source/bigquery_v2/queries_extractor.py | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py index 47f21c9f32353..f1a311c6162c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_queries.py @@ -64,7 +64,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryQueriesSourceConfig): schema_api=BigQuerySchemaApi( self.report.schema_api_perf, self.connection, - projects_client=self.config.connection.get_projects_client(), + projects_client=self.connection, ), config=self.config, structured_report=self.report, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index afaaaf51964f8..40273c43f39f6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -276,18 +276,23 @@ def get_workunits_internal( logger.info(f"Found {self.report.num_unique_queries} unique queries") with self.report.audit_log_load_timer, queries_deduped: - i = 0 + last_log_time = datetime.now() + last_report_time = datetime.now() for _, query_instances in queries_deduped.items(): for query in query_instances.values(): - if i > 0 and i % 10000 == 0: + now = datetime.now() + if (now - last_log_time).total_seconds() >= 60: logger.info( - f"Added {i} query log equeries_dedupedntries to SQL aggregator" + f"Added {i} deduplicated query log entries to SQL aggregator" ) + last_log_time = now + + if (now - last_report_time).total_seconds() >= 300: if self.report.sql_aggregator: logger.info(self.report.sql_aggregator.as_string()) + last_report_time = now self.aggregator.add(query) - i += 1 yield from auto_workunit(self.aggregator.gen_metadata()) From da3f71934322c3ca108a1f60e7596e2231c992b7 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 31 Oct 2024 21:55:32 -0700 Subject: [PATCH 2/2] fix bug --- .../datahub/ingestion/source/bigquery_v2/queries_extractor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 40273c43f39f6..497947abe4ef9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -278,7 +278,7 @@ def get_workunits_internal( with self.report.audit_log_load_timer, queries_deduped: last_log_time = datetime.now() last_report_time = datetime.now() - for _, query_instances in queries_deduped.items(): + for i, (_, query_instances) in enumerate(queries_deduped.items()): for query in query_instances.values(): now = datetime.now() if (now - last_log_time).total_seconds() >= 60: