Skip to content

Commit

Permalink
feat: Add alvaras pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
PedroSiqueira1 committed Oct 15, 2024
1 parent f883c7f commit 105950b
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 0 deletions.
1 change: 1 addition & 0 deletions pipelines/alvaras/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pipelines.alvaras.dump_db.flows import * # noqa
50 changes: 50 additions & 0 deletions pipelines/alvaras/dump_db/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Database dumping flows for alvaras.
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.dump_db.flows import flow as dump_sql_flow
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.alvaras.dump_db.schedules import (
alvaras_infra_daily_update_schedule,
)
from pipelines.constants import constants

rj_iplanrio_alvaras_flow = deepcopy(dump_sql_flow)
rj_iplanrio_alvaras_flow.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]
rj_iplanrio_alvaras_flow.name = "IPLANRIO: Alvaras - Ingerir tabelas de banco SQL"
rj_iplanrio_alvaras_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)

rj_iplanrio_alvaras_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_IPLANRIO_AGENT_LABEL.value, # label do agente
],
)

alvaras_default_parameters = {
"db_database": "DW_BI_ALVARAS",
"db_host": "10.70.15.11",
"db_port": "1433",
"db_type": "sql_server",
"dataset_id": "alvaras",
"infisical_secret_path": "/db-alvaras",
}

rj_iplanrio_alvaras_flow = set_default_parameters(
rj_iplanrio_alvaras_flow,
default_parameters=alvaras_default_parameters,
)

rj_iplanrio_alvaras_flow.schedule = alvaras_infra_daily_update_schedule
249 changes: 249 additions & 0 deletions pipelines/alvaras/dump_db/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
"""
Schedules for the database dump pipeline.
"""

from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

from pipelines.constants import constants

#####################################
#
# Alvaras Schedules
#
#####################################

_alvaras_infra_query = {
"fact_fatoalvaras": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Alvara,
Quantidade,
ID_AtvProcesso,
ID_CAE, ID_CNAE,
ID_DiaDeferimento,
ID_DiaSolicitacao,
ID_DiaTaxaPagamen,
ID_Direcionamento,
ID_TipoContribuint,
ID_TipoSolicitacao
FROM DW_BI_ALVARAS.dbo.FACT_FatoAlvaras;
""",
},
"fact_fatocp": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_AtvProcesso,
Quantidade_cp,
ID_CAE,
ID_CNAE,
ID_Consulta,
ID_DiaInicial,
ID_Direcionamento,
ID_TipoContribuint,
ID_TipoSolicitacao
FROM DW_BI_ALVARAS.dbo.FACT_FatoCP;
""",
},
"fact_fatodatacarga": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
DataCarga
FROM DW_BI_ALVARAS.dbo.FACT_FatoDataCarga;
""",
},
"tab_alvara": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Alvara,
DSC_Alvara,
DSC_Endereco,
DSC_Bairro,
DSC_Zoneamento,
DSC_IRLF,
DSC_TipoAnalise,
DSC_TempoRespDia,
DSC_StatusIntermediario,
DSC_StatusCPL,
DSC_TempoRespMinuto,
DSC_TipoAlvara,
DSC_TaxaOriginal,
DSC_TaxaMulta,
DSC_TaxaMora,
DSC_TaxaTotal,
DSC_IsentoTaxa,
DSC_Numero,
DSC_AlvaraLiberado
FROM DW_BI_ALVARAS.dbo.TAB_ALVARA;
""",
},
"tab_atvprocesso": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_AtvProcesso,
DSC_AtvProcesso,
DSC_RespAtividade,
DSC_RefAtividade
FROM DW_BI_ALVARAS.dbo.TAB_AtvProcesso;
""",
},
"tab_cae": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_CAE,
DSC_CAE,
ID_TipoAtividade,
DSC_TipoAtividade
FROM DW_BI_ALVARAS.dbo.TAB_CAE;
""",
},
"tab_cnae": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_CNAE,
DSC_CNAE
FROM DW_BI_ALVARAS.dbo.TAB_CNAE;
""",
},
"tab_cnae_tmp": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_CNAE,
DSC_CNAE
FROM DW_BI_ALVARAS.dbo.TAB_CNAE_TMP;
""",
},
"tab_consulta": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Consulta,
DSC_Consulta,
DSC_Endereco_cp,
DSC_Bairro_cp,
DSC_Zoneamento_cp,
DSC_CodeConsulta,
DSC_IRLF_cp,
DSC_StatusCPL_cp,
DSC_TipoAnalise_cp,
DSC_Status_cp
FROM DW_BI_ALVARAS.dbo.TAB_Consulta;
""",
},
"tab_direcionamento": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Direcionamento,
DSC_Direcionamento
FROM DW_BI_ALVARAS.dbo.TAB_Direcionamento;
""",
},
"tab_tipocontribuinte_tipocontribuint": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_TipoContribuint,
DSC_TipoContribuint
FROM DW_BI_ALVARAS.dbo.TAB_TipoContribuinte_TipoContribuint;
""",
},
"tab_tiposolicitacao": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_TipoSolicitacao,
DSC_TipoSolicitacao
FROM DW_BI_ALVARAS.dbo.TAB_TipoSolicitacao;
""",
},
}

alvaras_infra_clocks = generate_dump_db_schedules(
interval=timedelta(days=1),
start_date=datetime(2022, 3, 21, 2, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_IPLANRIO_AGENT_LABEL.value,
],
db_database="DW_BI_ALVARAS",
db_host="10.70.15.11",
db_port="1433",
db_type="sql_server",
dataset_id="alvaras",
infisical_secret_path="/db-alvaras",
table_parameters=_alvaras_infra_query,
)

alvaras_infra_daily_update_schedule = Schedule(clocks=untuple(alvaras_infra_clocks))
14 changes: 14 additions & 0 deletions queries/models/alvaras/fact_fatoalvaras.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
SELECT
SAFE_CAST(ID_Alvara AS STRING) AS ID_Alvara,
SAFE_CAST(Quantidade AS FLOAT64) AS Quantidade,
SAFE_CAST(ID_AtvProcesso AS STRING) AS ID_AtvProcesso,
SAFE_CAST(ID_CAE AS STRING) AS ID_CAE,
SAFE_CAST(ID_CNAE AS STRING) AS ID_CNAE,
SAFE_CAST(ID_DiaDeferimento AS STRING) AS ID_DiaDeferimento,
SAFE_CAST(ID_DiaSolicitacao AS STRING) AS ID_DiaSolicitacao,
SAFE_CAST(ID_DiaTaxaPagamen AS STRING) AS ID_DiaTaxaPagamen,
SAFE_CAST(ID_Direcionamento AS STRING) AS ID_Direcionamento,
SAFE_CAST(ID_TipoContribuint AS STRING) AS ID_TipoContribuint,
SAFE_CAST(ID_TipoSolicitacao AS STRING) AS ID_TipoSolicitacao

FROM `rj-iplanrio.alvaras_staging.fact_fatoalvaras`
12 changes: 12 additions & 0 deletions queries/models/alvaras/fact_fatocp.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
SELECT
SAFE_CAST(ID_AtvProcesso AS STRING) AS ID_AtvProcesso,
SAFE_CAST(Quantidade_cp AS FLOAT64) AS Quantidade_cp,
SAFE_CAST(ID_CAE AS STRING) AS ID_CAE,
SAFE_CAST(ID_CNAE AS STRING) AS ID_CNAE,
SAFE_CAST(ID_Consulta AS STRING) AS ID_Consulta,
SAFE_CAST(ID_DiaInicial AS STRING) AS ID_DiaInicial,
SAFE_CAST(ID_Direcionamento AS STRING) AS ID_Direcionamento,
SAFE_CAST(ID_TipoContribuint AS STRING) AS ID_TipoContribuint,
SAFE_CAST(ID_TipoSolicitacao AS STRING) AS ID_TipoSolicitacao

FROM `rj-iplanrio.alvaras_staging.fact_fatocp`
22 changes: 22 additions & 0 deletions queries/models/alvaras/tab_alvara.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
SELECT
SAFE_CAST(ID_Alvara AS STRING) AS ID_Alvara,
SAFE_CAST(DSC_Alvara AS STRING) AS DSC_Alvara,
SAFE_CAST(DSC_Endereco AS STRING) AS DSC_Endereco,
SAFE_CAST(DSC_Bairro AS STRING) AS DSC_Bairro,
SAFE_CAST(DSC_Zoneamento AS STRING) AS DSC_Zoneamento,
SAFE_CAST(DSC_IRLF AS STRING) AS DSC_IRLF,
SAFE_CAST(DSC_TipoAnalise AS STRING) AS DSC_TipoAnalise,
SAFE_CAST(DSC_TempoRespDia AS FLOAT64) AS DSC_TempoRespDia,
SAFE_CAST(DSC_StatusIntermediario AS STRING) AS DSC_StatusIntermediario,
SAFE_CAST(DSC_StatusCPL AS STRING) AS DSC_StatusCPL,
SAFE_CAST(DSC_TempoRespMinuto AS FLOAT64) AS DSC_TempoRespMinuto,
SAFE_CAST(DSC_TipoAlvara AS STRING) AS DSC_TipoAlvara,
SAFE_CAST(DSC_TaxaOriginal AS FLOAT64) AS DSC_TaxaOriginal,
SAFE_CAST(DSC_TaxaMulta AS FLOAT64) AS DSC_TaxaMulta,
SAFE_CAST(DSC_TaxaMora AS FLOAT64) AS DSC_TaxaMora,
SAFE_CAST(DSC_TaxaTotal AS FLOAT64) AS DSC_TaxaTotal,
SAFE_CAST(DSC_IsentoTaxa AS STRING) AS DSC_IsentoTaxa,
SAFE_CAST(DSC_Numero AS FLOAT64) AS DSC_Numero,
SAFE_CAST(DSC_AlvaraLiberado AS STRING) AS DSC_AlvaraLiberado

FROM `rj-iplanrio.alvaras_staging.tab_alvara`
7 changes: 7 additions & 0 deletions queries/models/alvaras/tab_atvprocesso.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SELECT
SAFE_CAST(ID_AtvProcesso AS STRING) AS ID_AtvProcesso,
SAFE_CAST(DSC_AtvProcesso AS STRING) AS DSC_AtvProcesso,
SAFE_CAST(DSC_RespAtividade AS STRING) AS DSC_RespAtividade,
SAFE_CAST(DSC_RefAtividade AS STRING) AS DSC_RefAtividade

FROM `rj-iplanrio.alvaras_staging.tab_atvprocesso`
7 changes: 7 additions & 0 deletions queries/models/alvaras/tab_cae.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SELECT
SAFE_CAST(ID_CAE AS STRING) AS ID_CAE,
SAFE_CAST(DSC_CAE AS STRING) AS DSC_CAE,
SAFE_CAST(ID_TipoAtividade AS STRING) AS ID_TipoAtividade,
SAFE_CAST(DSC_TipoAtividade AS STRING) AS DSC_TipoAtividade

FROM `rj-iplanrio.alvaras_staging.tab_cae`
5 changes: 5 additions & 0 deletions queries/models/alvaras/tab_cnae.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT
SAFE_CAST(ID_CNAE AS STRING) AS ID_CNAE,
SAFE_CAST(DSC_CNAE AS STRING) AS DSC_CNAE

FROM `rj-iplanrio.alvaras_staging.tab_cnae`
13 changes: 13 additions & 0 deletions queries/models/alvaras/tab_consulta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SELECT
SAFE_CAST(ID_Consulta AS STRING) AS ID_Consulta,
SAFE_CAST(DSC_Consulta AS STRING) AS DSC_Consulta,
SAFE_CAST(DSC_Endereco_cp AS STRING) AS DSC_Endereco_cp,
SAFE_CAST(DSC_Bairro_cp AS STRING) AS DSC_Bairro_cp,
SAFE_CAST(DSC_Zoneamento_cp AS STRING) AS DSC_Zoneamento_cp,
SAFE_CAST(DSC_CodeConsulta AS FLOAT64) AS DSC_CodeConsulta,
SAFE_CAST(DSC_IRLF_cp AS STRING) AS DSC_IRLF_cp,
SAFE_CAST(DSC_StatusCPL_cp AS STRING) AS DSC_StatusCPL_cp,
SAFE_CAST(DSC_TipoAnalise_cp AS STRING) AS DSC_TipoAnalise_cp,
SAFE_CAST(DSC_Status_cp AS STRING) AS DSC_Status_cp

FROM `rj-iplanrio.alvaras_staging.tab_consulta`
Loading

0 comments on commit 105950b

Please sign in to comment.