From aee3e0887fd2e9eaf0eebbc89b06f207a9f97d75 Mon Sep 17 00:00:00 2001 From: Vinicius Date: Wed, 24 Apr 2024 18:34:32 -0300 Subject: [PATCH] feat: add one big table query generator --- bd_api/apps/api/v1/models.py | 100 +++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/bd_api/apps/api/v1/models.py b/bd_api/apps/api/v1/models.py index 63f27784..b6f86a5a 100644 --- a/bd_api/apps/api/v1/models.py +++ b/bd_api/apps/api/v1/models.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime from math import log10 +from typing import Optional from uuid import uuid4 from django.core.exceptions import ValidationError @@ -784,6 +785,7 @@ class Table(BaseModel, OrderedModel): default=0, help_text="Number of page views by Google Analytics", ) + one_big_table_query = models.TextField(blank=True, null=True) order_with_respect_to = ("dataset",) graphql_nested_filter_fields_whitelist = ["id", "dataset"] @@ -1210,6 +1212,10 @@ class CloudTable(BaseModel): def __str__(self): return f"{self.gcp_project_id}.{self.gcp_dataset_id}.{self.gcp_table_id}" + @property + def gcp_prefix_id(self): + return f"{self.gcp_project_id}.{self.gcp_dataset_id}" + def clean(self) -> None: errors = {} if self.gcp_project_id and not check_kebab_case(self.gcp_project_id): @@ -1791,3 +1797,97 @@ def get_full_coverage(resources: list) -> dict: return [open_since.as_dict, open_until.as_dict] if paid_since.str and paid_until.str: return [paid_since.as_dict, paid_until.as_dict] + + +def get_one_big_table_query(table: Table): + """Get a denormalized sql query, similar to a one big table""" + + def covered_by_directory(column: Column) -> Optional[CloudTable]: + if not (directory_column := column.directory_primary_key): + return + if not (cloud_table := directory_column.cloud_tables.first()): + return + if "setor_censitario" in str(cloud_table): + return + if "diretorios_data_tempo" in str(cloud_table): + return + return cloud_table + + def get_directory(column: Column, cloud_table: CloudTable): + map_directory_description_column = { + "br_bd_diretorios_brasil.area_conhecimento": f"descricao_{column.name}", + "br_bd_diretorios_brasil.cbo_1994": "descricao", + "br_bd_diretorios_brasil.cbo_2002": "descricao", + "br_bd_diretorios_brasil.cep": "centroide", + "br_bd_diretorios_brasil.cid_10": f"descricao_{column.name}", + "br_bd_diretorios_brasil.cid_9": "descricao", + "br_bd_diretorios_brasil.cnae_1": "descricao", + "br_bd_diretorios_brasil.cnae_2": "descricao", + "br_bd_diretorios_brasil.curso_superior": "nome_curso", + "br_bd_diretorios_brasil.distrito": "nome", + "br_bd_diretorios_brasil.empresa": "nome_fantasia", + "br_bd_diretorios_brasil.escola": "nome", + "br_bd_diretorios_brasil.etnia_indigena": "nome", + "br_bd_diretorios_brasil.instituicao_ensino_superior": "nome", + "br_bd_diretorios_brasil.municipio": "nome", + "br_bd_diretorios_brasil.natureza_juridica": "descricao", + "br_bd_diretorios_brasil.regiao_metropolitana": "nome", + "br_bd_diretorios_brasil.uf": "nome", + "br_bd_diretorios_brasil.cnae_2_3_subclasses": "descricao", + } + return map_directory_description_column.get(cloud_table.gcp_prefix_id) + + def get_components(): + sql_ctes, sql_joins, sql_selects = [], [], [] + + column: Column + for column in table.columns.order_by("order").all(): + if column.covered_by_dictionary: + sql_cte_table = f"dicionario_{column.name}" + sql_cte = f""" + {sql_cte_table} AS ( + SELECT + chave AS chave_{column.name}, + valor AS descricao_{column.name} + FROM `basedosdados.{table.dataset.name}.dicionario` + WHERE + TRUE + AND id_tabela = '{table.name}' + AND coluna = '{column.name}' + ) + """ + sql_join = f""" + LEFT JOIN {sql_cte_table} + ON chave_{column.name} = {column.name} + """ + sql_select = f"descricao_{column.name} AS {column.name}" + sql_ctes.append(sql_cte) + sql_joins.append(sql_join) + sql_selects.append(sql_select) + elif cloud_table := covered_by_directory(column): + sql_cte_table = f"diretorio_{column.name}" + sql_cte_column = get_directory(column, cloud_table) + sql_select_id = f"dados.{column.name} AS {column.name}" + sql_select_label = ( + f"{sql_cte_table}.{sql_cte_column} AS {column.name}_{sql_cte_column}" + ) + sql_join = f""" + LEFT JOIN {cloud_table.gcp_prefix_id} AS {sql_cte_table} + ON dados.{column.name} = {sql_cte_table}.{cloud_table.gcp_table_id} + """ + sql_joins.append(sql_join) + sql_selects.append(sql_select_id) + sql_selects.append(sql_select_label) + else: + sql_selects.append(column.name) + + return sql_ctes, sql_joins, sql_selects + + sql_ctes, sql_joins, sql_selects = get_components() + + sql_ctes = "WITH " + ",\n\n".join(sql_ctes) + "\n\n" + sql_joins = "\n".join(sql_joins) + sql_selects = "SELECT " + ",\n\t".join(sql_selects) + sql_from = f"\nFROM `basedosdados.{table.dataset.name}.{table.name}` AS dados\n" + + return sql_ctes + sql_selects + sql_from + sql_joins