Skip to content

Commit

Permalink
fix(ingest/sql-parsing): ignore processed query_id from temp upstream (
Browse files Browse the repository at this point in the history
…datahub-project#11798)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
mayurinehate and hsheth2 authored Nov 8, 2024
1 parent ca063dd commit 8ca8fd9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,23 @@ def get_workunits_internal(
logger.info(f"Found {self.report.num_unique_queries} unique queries")

with self.report.audit_log_load_timer, queries_deduped:
i = 0
for _, query_instances in queries_deduped.items():
last_log_time = datetime.now()
last_report_time = datetime.now()
for i, (_, query_instances) in enumerate(queries_deduped.items()):
for query in query_instances.values():
if i > 0 and i % 10000 == 0:
now = datetime.now()
if (now - last_log_time).total_seconds() >= 60:
logger.info(
f"Added {i} query log equeries_dedupedntries to SQL aggregator"
f"Added {i} deduplicated query log entries to SQL aggregator"
)
last_log_time = now

if (now - last_report_time).total_seconds() >= 300:
if self.report.sql_aggregator:
logger.info(self.report.sql_aggregator.as_string())
last_report_time = now

self.aggregator.add(query)
i += 1

yield from auto_workunit(self.aggregator.gen_metadata())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,10 @@ def _recurse_into_query(
if upstream_query_ids:
for upstream_query_id in upstream_query_ids:
upstream_query = self._query_map.get(upstream_query_id)
if upstream_query:
if (
upstream_query
and upstream_query.query_id not in composed_of_queries
):
temp_query_lineage_info = _recurse_into_query(
upstream_query, recursion_path
)
Expand Down

0 comments on commit 8ca8fd9

Please sign in to comment.