Skip to content

Commit

Permalink
Merge pull request #10 from prefeitura-rio/staging/migrate_smfp_flows
Browse files Browse the repository at this point in the history
Staging/migrate smfp flows
  • Loading branch information
d116626 authored Sep 3, 2024
2 parents 083645c + eebeb47 commit 2af08df
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 0 deletions.
72 changes: 72 additions & 0 deletions pipelines/ergon/dump_db_ergon/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,78 @@
FROM ERGON.RUBRICAS
""",
},
"inscritos": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
CHAVE, CHAVE_VAGA, IDENTIFICACAO, NOME,
NUMVINC, NUMFUNC, CONVOCACAO, NOMEACAO, POSSE,
DESISTENCIA, CLASSIFICACAO, DEFICIENTE, SEXO, DATA_NASCIMENTO,
ESTADO_CIVIL, TIPORG, ORGAORG, NUMRG, UFRG, CPF, TIPOLOGENDER, NOMELOGENDER,
NUMENDER, COMPLENDER, BAIRROENDER, CIDADEENDER, UFENDER, CEPENDER,
PONTOS_TITULO, EMP_CODIGO, FLEX_CAMPO_01, FLEX_CAMPO_02, FLEX_CAMPO_03,
FLEX_CAMPO_04, FLEX_CAMPO_05, NACIONALIDADE, ORDEM_CHAMADA, SETOR, PONTPUBL,
DT_EMISSAO_RG, FLEX_CAMPO_06, FLEX_CAMPO_07, FLEX_CAMPO_08, FLEX_CAMPO_09,
FLEX_CAMPO_10, FLEX_CAMPO_11, FLEX_CAMPO_12, FLEX_CAMPO_13, FLEX_CAMPO_14,
FLEX_CAMPO_15, TELEFONE, ID_PESSOA, ID_COTAS_CONCURSOS, FLEX_CAMPO_16,
FLEX_CAMPO_17, FLEX_CAMPO_18, FLEX_CAMPO_19, FLEX_CAMPO_20, FLEX_CAMPO_21,
FLEX_CAMPO_22, FLEX_CAMPO_23, FLEX_CAMPO_24, FLEX_CAMPO_25, FLEX_CAMPO_26,
FLEX_CAMPO_27, FLEX_CAMPO_28, FLEX_CAMPO_29, FLEX_CAMPO_30
FROM ERGON.INSCRITOS
""",
},
"vagas_concurso": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
CHAVE, CHAVE_CONCURSO, CARGO, ESPECIALIDADE, HLOCAL, QTD_VAGAS,
QTD_VAGAS_DEF, FLEX_CAMPO_01, FLEX_CAMPO_02, FLEX_CAMPO_03,
FLEX_CAMPO_04, FLEX_CAMPO_05, EMP_CODIGO, SEXO, PERCENTUAL_VAGAS_DEF,
DIAS_PRAZO_POSSE, DIAS_PRAZO_PRORROG_POSSE, DIAS_PRAZO_EXERC,
DIAS_PRAZO_PRORROG_EXERC, GRUPO_VAGAS, FLEX_CAMPO_06, FLEX_CAMPO_07,
FLEX_CAMPO_08, FLEX_CAMPO_09, FLEX_CAMPO_10
FROM ERGON.VAGAS_CONCURSO
""",
},
"concursos": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
CHAVE, SIGLA, REGIMEJUR, DTINI, VALIDADE, PUBLIC_EDITAL, RESULTADO,
OBJETIVO, FLEX_CAMPO_01, FLEX_CAMPO_02, FLEX_CAMPO_03, FLEX_CAMPO_04,
FLEX_CAMPO_05, EMP_CODIGO, PRORROGACAO, CANCELAMENTO, ABRANGENCIA,
PONTPUBL, DTINI_INSCRICAO, DTFIM_INSCRICAO, SETOR, TIPO_CONCURSO,
TIPOVINC, INSTITUICAO, DIAS_PRAZO_POSSE, DIAS_PRAZO_PRORROG_POSSE,
DIAS_PRAZO_EXERC, DIAS_PRAZO_PRORROG_EXERC, FLEX_CAMPO_06, FLEX_CAMPO_07,
FLEX_CAMPO_08, FLEX_CAMPO_09, FLEX_CAMPO_10
FROM ERGON.CONCURSOS
""",
},
"formas_vac": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
SIGLA, NOME, FLEX_CAMPO_01, FLEX_CAMPO_02, FLEX_CAMPO_03,
FLEX_CAMPO_04, FLEX_CAMPO_05, PONTLEI, FLEX_CAMPO_06, FLEX_CAMPO_07,
FLEX_CAMPO_08, FLEX_CAMPO_09, FLEX_CAMPO_10, FLEX_CAMPO_11,
FLEX_CAMPO_12, FLEX_CAMPO_13, FLEX_CAMPO_14, FLEX_CAMPO_15,
FLEX_CAMPO_16, FLEX_CAMPO_17, FLEX_CAMPO_18, FLEX_CAMPO_19,
FLEX_CAMPO_20, CODIGO_CAGED
FROM ERGON.FORMAS_VAC_
""",
},
}

ergon_clocks = generate_dump_db_schedules(
Expand Down
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
from pipelines.exemplo import * # noqa
from pipelines.fincon import * # noqa
from pipelines.iptu_inad import * # noqa
from pipelines.receita_federal_cnpj import * # noqa
from pipelines.sigma import * # noqa
from pipelines.templates import * # noqa
2 changes: 2 additions & 0 deletions pipelines/receita_federal_cnpj/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.receita_federal_cnpj.dump_db_porte_empresa.flows import * # noqa
Empty file.
44 changes: 44 additions & 0 deletions pipelines/receita_federal_cnpj/dump_db_porte_empresa/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.dump_db.flows import flow as dump_sql_flow
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.constants import constants
from pipelines.receita_federal_cnpj.dump_db_porte_empresa.schedules import (
porte_empresa_schedule,
)

porte_empresa_flow = deepcopy(dump_sql_flow)
porte_empresa_flow.name = "SMFP: CNPJ porte_empresa - Ingerir tabelas de banco SQL"
porte_empresa_flow.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]
porte_empresa_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
porte_empresa_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
)
porte_empresa_default_parameters = {
"db_database": "SDI",
"db_host": "10.70.1.34",
"db_port": "1433",
"db_type": "sql_server",
"dataset_id": "porte_empresa",
"infisical_secret_path": "/db-porte-empresa",
}
porte_empresa_flow = set_default_parameters(
porte_empresa_flow, default_parameters=porte_empresa_default_parameters
)

porte_empresa_flow.schedule = porte_empresa_schedule
54 changes: 54 additions & 0 deletions pipelines/receita_federal_cnpj/dump_db_porte_empresa/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
"""
Schedules for the database dump pipeline
"""

from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

from pipelines.constants import constants

#####################################
#
# Inadimplente Schedules
#
#####################################

porte_empresa_queries = {
"situacao_cadastral": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"partition_columns": "dt_SituacaoCadastral",
"partition_date_format": "%Y-%m-%d",
"dump_mode": "append",
"lower_bound_date": "current_month",
"execute_query": """
SELECT
CNPJ_basico, CNPJ_ordem, CNPJ_dv, RazaoSocial,
cd_PorteEmpresa, cd_SituacaoCadastral, dt_SituacaoCadastral
FROM SDI.ReceitaFederal.Vw_PorteEmpresa_Sigma
""",
}
}

porte_empresa_clocks = generate_dump_db_schedules(
interval=timedelta(days=7),
start_date=datetime(2022, 10, 30, 23, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
db_database="SDI",
db_host="10.70.1.34",
db_port="1433",
db_type="sql_server",
dataset_id="porte_empresa",
infisical_secret_path="/db-porte-empresa",
table_parameters=porte_empresa_queries,
)

porte_empresa_schedule = Schedule(clocks=untuple(porte_empresa_clocks))
4 changes: 4 additions & 0 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,9 @@ models:
recursos_humanos_ergon_pericia_medica:
+materialized: table
+schema: recursos_humanos_ergon_pericia_medica
porte_empresa:
+materialized: table
+schema: porte_empresa



20 changes: 20 additions & 0 deletions queries/models/porte_empresa/situacao_cadastral.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(
materialized='table',
partition_by={
"field": "data_particao",
"data_type": "date",
"granularity": "month",
}
)
}}
SELECT
SAFE_CAST(cnpj_basico AS STRING) AS cnpj_basico,
SAFE_CAST(cnpj_ordem AS STRING) AS cnpj_ordem,
SAFE_CAST(cnpj_dv AS STRING) AS cnpj_dv,
SAFE_CAST(razaosocial AS STRING) AS razao_social,
SAFE_CAST(REGEXP_REPLACE(cd_porteempresa, r'\.0$', '') AS STRING) AS id_porte_empresa,
SAFE_CAST(REGEXP_REPLACE(cd_situacaocadastral, r'\.0$', '') AS STRING) AS id_situacao_cadastral,
SAFE_CAST(dt_situacaocadastral AS DATE) AS data_situacao_cadastral
SAFE_CAST(data_particao AS DATE) data_particao
FROM `rj-smfp.porte_empresa_staging.situacao_cadastral`

0 comments on commit 2af08df

Please sign in to comment.