Skip to content

Commit

Permalink
feat: add one big table query generator
Browse files Browse the repository at this point in the history
  • Loading branch information
vncsna committed Apr 24, 2024
1 parent aeef353 commit aee3e08
Showing 1 changed file with 100 additions and 0 deletions.
100 changes: 100 additions & 0 deletions bd_api/apps/api/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass
from datetime import datetime
from math import log10
from typing import Optional
from uuid import uuid4

from django.core.exceptions import ValidationError
Expand Down Expand Up @@ -784,6 +785,7 @@ class Table(BaseModel, OrderedModel):
default=0,
help_text="Number of page views by Google Analytics",
)
one_big_table_query = models.TextField(blank=True, null=True)

order_with_respect_to = ("dataset",)
graphql_nested_filter_fields_whitelist = ["id", "dataset"]
Expand Down Expand Up @@ -1210,6 +1212,10 @@ class CloudTable(BaseModel):
def __str__(self):
return f"{self.gcp_project_id}.{self.gcp_dataset_id}.{self.gcp_table_id}"

@property
def gcp_prefix_id(self):
return f"{self.gcp_project_id}.{self.gcp_dataset_id}"

def clean(self) -> None:
errors = {}
if self.gcp_project_id and not check_kebab_case(self.gcp_project_id):
Expand Down Expand Up @@ -1791,3 +1797,97 @@ def get_full_coverage(resources: list) -> dict:
return [open_since.as_dict, open_until.as_dict]
if paid_since.str and paid_until.str:
return [paid_since.as_dict, paid_until.as_dict]


def get_one_big_table_query(table: Table):
"""Get a denormalized sql query, similar to a one big table"""

def covered_by_directory(column: Column) -> Optional[CloudTable]:
if not (directory_column := column.directory_primary_key):
return
if not (cloud_table := directory_column.cloud_tables.first()):
return
if "setor_censitario" in str(cloud_table):
return
if "diretorios_data_tempo" in str(cloud_table):
return
return cloud_table

def get_directory(column: Column, cloud_table: CloudTable):
map_directory_description_column = {
"br_bd_diretorios_brasil.area_conhecimento": f"descricao_{column.name}",
"br_bd_diretorios_brasil.cbo_1994": "descricao",
"br_bd_diretorios_brasil.cbo_2002": "descricao",
"br_bd_diretorios_brasil.cep": "centroide",
"br_bd_diretorios_brasil.cid_10": f"descricao_{column.name}",
"br_bd_diretorios_brasil.cid_9": "descricao",
"br_bd_diretorios_brasil.cnae_1": "descricao",
"br_bd_diretorios_brasil.cnae_2": "descricao",
"br_bd_diretorios_brasil.curso_superior": "nome_curso",
"br_bd_diretorios_brasil.distrito": "nome",
"br_bd_diretorios_brasil.empresa": "nome_fantasia",
"br_bd_diretorios_brasil.escola": "nome",
"br_bd_diretorios_brasil.etnia_indigena": "nome",
"br_bd_diretorios_brasil.instituicao_ensino_superior": "nome",
"br_bd_diretorios_brasil.municipio": "nome",
"br_bd_diretorios_brasil.natureza_juridica": "descricao",
"br_bd_diretorios_brasil.regiao_metropolitana": "nome",
"br_bd_diretorios_brasil.uf": "nome",
"br_bd_diretorios_brasil.cnae_2_3_subclasses": "descricao",
}
return map_directory_description_column.get(cloud_table.gcp_prefix_id)

def get_components():
sql_ctes, sql_joins, sql_selects = [], [], []

column: Column
for column in table.columns.order_by("order").all():
if column.covered_by_dictionary:
sql_cte_table = f"dicionario_{column.name}"
sql_cte = f"""
{sql_cte_table} AS (
SELECT
chave AS chave_{column.name},
valor AS descricao_{column.name}
FROM `basedosdados.{table.dataset.name}.dicionario`
WHERE
TRUE
AND id_tabela = '{table.name}'
AND coluna = '{column.name}'
)
"""
sql_join = f"""
LEFT JOIN {sql_cte_table}
ON chave_{column.name} = {column.name}
"""
sql_select = f"descricao_{column.name} AS {column.name}"
sql_ctes.append(sql_cte)
sql_joins.append(sql_join)
sql_selects.append(sql_select)
elif cloud_table := covered_by_directory(column):
sql_cte_table = f"diretorio_{column.name}"
sql_cte_column = get_directory(column, cloud_table)
sql_select_id = f"dados.{column.name} AS {column.name}"
sql_select_label = (
f"{sql_cte_table}.{sql_cte_column} AS {column.name}_{sql_cte_column}"
)
sql_join = f"""
LEFT JOIN {cloud_table.gcp_prefix_id} AS {sql_cte_table}
ON dados.{column.name} = {sql_cte_table}.{cloud_table.gcp_table_id}
"""
sql_joins.append(sql_join)
sql_selects.append(sql_select_id)
sql_selects.append(sql_select_label)
else:
sql_selects.append(column.name)

return sql_ctes, sql_joins, sql_selects

sql_ctes, sql_joins, sql_selects = get_components()

sql_ctes = "WITH " + ",\n\n".join(sql_ctes) + "\n\n"
sql_joins = "\n".join(sql_joins)
sql_selects = "SELECT " + ",\n\t".join(sql_selects)
sql_from = f"\nFROM `basedosdados.{table.dataset.name}.{table.name}` AS dados\n"

return sql_ctes + sql_selects + sql_from + sql_joins

0 comments on commit aee3e08

Please sign in to comment.