Skip to content

Commit

Permalink
Merge branch 'main' into staging/br_cgu_cartao_pagamento
Browse files Browse the repository at this point in the history
  • Loading branch information
tricktx authored Sep 16, 2024
2 parents 63c2d80 + f798335 commit fe3ca19
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 227 deletions.
2 changes: 1 addition & 1 deletion pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from pipelines.datasets.br_ibge_ipca.flows import *
from pipelines.datasets.br_ibge_pnadc.flows import *
from pipelines.datasets.br_inmet_bdmep.flows import *
from pipelines.datasets.br_jota.flows import *
from pipelines.datasets.br_bd_siga_o_dinheiro.flows import *
from pipelines.datasets.br_me_caged.flows import *
from pipelines.datasets.br_me_cnpj.flows import *
from pipelines.datasets.br_me_comex_stat.flows import *
Expand Down
File renamed without changes.
73 changes: 73 additions & 0 deletions pipelines/datasets/br_bd_siga_o_dinheiro/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""
Flows for br_jota
"""

from datetime import timedelta

from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.tasks import get_current_flow_labels

from pipelines.datasets.br_bd_siga_o_dinheiro.tasks import (
get_table_ids
)

from pipelines.datasets.br_bd_siga_o_dinheiro.schedules import (
schedule_br_bd_siga_o_dinheiro
)

with Flow(
name="BD template - br_bd_siga_o_dinheiro", code_owners=["luiz"]
) as br_bd_siga_o_dinheiro:

dataset_id = Parameter("dataset_id", default="br_bd_siga_o_dinheiro", required=True)

materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

table_ids = get_table_ids()

for n, table_id in enumerate(table_ids):

current_flow_labels = get_current_flow_labels(upstream_tasks=[wait_for_materialization] if n > 0 else None)
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
)
wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)


br_bd_siga_o_dinheiro.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_bd_siga_o_dinheiro.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
br_bd_siga_o_dinheiro.schedule = schedule_br_bd_siga_o_dinheiro
29 changes: 29 additions & 0 deletions pipelines/datasets/br_bd_siga_o_dinheiro/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
"""
Schedules for br_jota
"""

from datetime import datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

from pipelines.constants import constants


schedule_br_bd_siga_o_dinheiro = Schedule(
clocks=[
CronClock(
cron="30 4 * * *",
start_date=datetime(2024, 9, 1, 0, 0),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_bd_siga_o_dinheiro",
"materialization_mode": "prod",
"dbt_alias": True,
},
),
],
)
10 changes: 10 additions & 0 deletions pipelines/datasets/br_bd_siga_o_dinheiro/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
from prefect import task

@task
def get_table_ids() -> tuple[str, str, str, str]:

table_ids = ("eleicao_perfil_candidato_2024", "eleicao_prestacao_contas_candidato_origem_2024",
"eleicao_prestacao_contas_candidato_2024", "eleicao_prestacao_contas_partido_2024")

return table_ids
158 changes: 0 additions & 158 deletions pipelines/datasets/br_jota/flows.py

This file was deleted.

68 changes: 0 additions & 68 deletions pipelines/datasets/br_jota/schedules.py

This file was deleted.

0 comments on commit fe3ca19

Please sign in to comment.