Skip to content

Commit

Permalink
fix(ingest): bigquery - fix for hitting limit if there are too many p…
Browse files Browse the repository at this point in the history
…artitioned tables (datahub-project#4056)
  • Loading branch information
treff7es authored Feb 4, 2022
1 parent c8922b3 commit cc32c30
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,6 @@ def _generate_single_profile(
custom_sql: str = None,
**kwargs: Any,
) -> Optional[DatasetProfileClass]:
logger.info(f"Profiling {pretty_name}")
bigquery_temp_table: Optional[str] = None
ge_config = {
"schema": schema,
Expand Down
44 changes: 36 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,20 @@
is_partitioning_column = 'YES'
-- Filter out special partitions (https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables)
and p.partition_id not in ('__NULL__', '__UNPARTITIONED__')
and STORAGE_TIER='ACTIVE'
group by
c.table_catalog,
c.table_schema,
c.table_name,
c.column_name,
c.data_type
order by
c.table_catalog,
c.table_schema,
c.table_name,
c.column_name
limit {limit}
offset {offset}
""".strip()

SHARDED_TABLE_REGEX = r"^(.+)[_](\d{4}|\d{6}|\d{8}|\d{10})$"
Expand Down Expand Up @@ -467,19 +475,39 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
return lineage_map

def get_latest_partitions_for_schema(self, schema: str) -> None:
query_limit: int = 500
offset: int = 0
url = self.config.get_sql_alchemy_url()
engine = create_engine(url, **self.config.options)
with engine.connect() as con:
inspector = inspect(con)
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=self.get_db_name(inspector),
schema=schema,
)
result = con.execute(sql)
partitions = {}
for row in result:
partition = BigQueryPartitionColumn(**row)
partitions[partition.table_name] = partition

def get_partition_columns(
project_id: str, schema: str, limit: int, offset: int
) -> int:
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format(
project_id=project_id,
schema=schema,
limit=limit,
offset=offset,
)
result = con.execute(sql)
row_count: int = 0
for row in result:
partition = BigQueryPartitionColumn(**row)
partitions[partition.table_name] = partition
row_count = row_count + 1
return row_count

res_size = get_partition_columns(
self.get_db_name(inspector), schema, query_limit, offset
)
while res_size == query_limit:
offset = offset + query_limit
res_size = get_partition_columns(
self.get_db_name(inspector), schema, query_limit, offset
)

self.partiton_columns[schema] = partitions

Expand Down

0 comments on commit cc32c30

Please sign in to comment.