diff --git a/pipelines/datasets/__init__.py b/pipelines/datasets/__init__.py index cf36c83d6..205fc142f 100644 --- a/pipelines/datasets/__init__.py +++ b/pipelines/datasets/__init__.py @@ -32,7 +32,7 @@ from pipelines.datasets.br_ibge_ipca.flows import * from pipelines.datasets.br_ibge_pnadc.flows import * from pipelines.datasets.br_inmet_bdmep.flows import * -from pipelines.datasets.br_jota.flows import * +from pipelines.datasets.br_bd_siga_o_dinheiro.flows import * from pipelines.datasets.br_me_caged.flows import * from pipelines.datasets.br_me_cnpj.flows import * from pipelines.datasets.br_me_comex_stat.flows import * diff --git a/pipelines/datasets/br_jota/__init__.py b/pipelines/datasets/br_bd_siga_o_dinheiro/__init__.py similarity index 100% rename from pipelines/datasets/br_jota/__init__.py rename to pipelines/datasets/br_bd_siga_o_dinheiro/__init__.py diff --git a/pipelines/datasets/br_bd_siga_o_dinheiro/flows.py b/pipelines/datasets/br_bd_siga_o_dinheiro/flows.py new file mode 100644 index 000000000..aa9532dfd --- /dev/null +++ b/pipelines/datasets/br_bd_siga_o_dinheiro/flows.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_jota +""" + +from datetime import timedelta + +from prefect import Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run + +from pipelines.constants import constants +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.decorators import Flow +from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.tasks import get_current_flow_labels + +from pipelines.datasets.br_bd_siga_o_dinheiro.tasks import ( + get_table_ids +) + +from pipelines.datasets.br_bd_siga_o_dinheiro.schedules import ( + schedule_br_bd_siga_o_dinheiro +) + +with Flow( + name="BD template - br_bd_siga_o_dinheiro", code_owners=["luiz"] +) as br_bd_siga_o_dinheiro: + + dataset_id = Parameter("dataset_id", default="br_bd_siga_o_dinheiro", required=True) + + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + + table_ids = get_table_ids() + + for n, table_id in enumerate(table_ids): + + current_flow_labels = get_current_flow_labels(upstream_tasks=[wait_for_materialization] if n > 0 else None) + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + ) + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + +br_bd_siga_o_dinheiro.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_bd_siga_o_dinheiro.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value +) +br_bd_siga_o_dinheiro.schedule = schedule_br_bd_siga_o_dinheiro diff --git a/pipelines/datasets/br_bd_siga_o_dinheiro/schedules.py b/pipelines/datasets/br_bd_siga_o_dinheiro/schedules.py new file mode 100644 index 000000000..592d670d7 --- /dev/null +++ b/pipelines/datasets/br_bd_siga_o_dinheiro/schedules.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" +Schedules for br_jota +""" + +from datetime import datetime + +from prefect.schedules import Schedule +from prefect.schedules.clocks import CronClock + +from pipelines.constants import constants + + +schedule_br_bd_siga_o_dinheiro = Schedule( + clocks=[ + CronClock( + cron="30 4 * * *", + start_date=datetime(2024, 9, 1, 0, 0), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_bd_siga_o_dinheiro", + "materialization_mode": "prod", + "dbt_alias": True, + }, + ), + ], +) diff --git a/pipelines/datasets/br_bd_siga_o_dinheiro/tasks.py b/pipelines/datasets/br_bd_siga_o_dinheiro/tasks.py new file mode 100644 index 000000000..883c34525 --- /dev/null +++ b/pipelines/datasets/br_bd_siga_o_dinheiro/tasks.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +from prefect import task + +@task +def get_table_ids() -> tuple[str, str, str, str]: + + table_ids = ("eleicao_perfil_candidato_2024", "eleicao_prestacao_contas_candidato_origem_2024", + "eleicao_prestacao_contas_candidato_2024", "eleicao_prestacao_contas_partido_2024") + + return table_ids \ No newline at end of file diff --git a/pipelines/datasets/br_jota/flows.py b/pipelines/datasets/br_jota/flows.py deleted file mode 100644 index 31bf31dc6..000000000 --- a/pipelines/datasets/br_jota/flows.py +++ /dev/null @@ -1,158 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Flows for br_jota -""" -from datetime import timedelta - -from prefect import Parameter -from prefect.run_configs import KubernetesRun -from prefect.storage import GCS -from prefect.tasks.prefect import create_flow_run, wait_for_flow_run - -from pipelines.constants import constants -from pipelines.utils.constants import constants as utils_constants -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.tasks import get_current_flow_labels - -with Flow( - name="br_jota.eleicao_perfil_candidato_2022", code_owners=["lauris"] -) as eleicao_perfil_candidato_2022: - dataset_id = Parameter("dataset_id", default="br_jota", required=True) - table_id = Parameter( - "table_id", default="eleicao_perfil_candidato_2022", required=True - ) - materialization_mode = Parameter( - "materialization_mode", default="prod", required=False - ) - dbt_alias = Parameter("dbt_alias", default=False, required=False) - - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - -eleicao_perfil_candidato_2022.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -eleicao_perfil_candidato_2022.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value -) -eleicao_perfil_candidato_2022.schedule = None - - -with Flow( - name="br_jota.eleicao_prestacao_contas_candidato_2022", code_owners=["lauris"] -) as eleicao_prestacao_contas_candidato_2022: - dataset_id = Parameter("dataset_id", default="br_jota", required=True) - table_id = Parameter( - "table_id", default="eleicao_prestacao_contas_candidato_2022", required=True - ) - materialization_mode = Parameter( - "materialization_mode", default="dev", required=False - ) - dbt_alias = Parameter("dbt_alias", default=False, required=False) - - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - -eleicao_prestacao_contas_candidato_2022.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -eleicao_prestacao_contas_candidato_2022.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value -) -eleicao_prestacao_contas_candidato_2022.schedule = None - - -with Flow( - name="br_jota.eleicao_prestacao_contas_candidato_origem_2022", - code_owners=["lauris"], -) as eleicao_prestacao_contas_candidato_origem_2022: - dataset_id = Parameter("dataset_id", default="br_jota", required=True) - table_id = Parameter( - "table_id", - default="eleicao_prestacao_contas_candidato_origem_2022", - required=True, - ) - materialization_mode = Parameter( - "materialization_mode", default="prod", required=False - ) - dbt_alias = Parameter("dbt_alias", default=False, required=False) - - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - -eleicao_prestacao_contas_candidato_origem_2022.storage = GCS( - constants.GCS_FLOWS_BUCKET.value -) -eleicao_prestacao_contas_candidato_origem_2022.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value -) -eleicao_prestacao_contas_candidato_origem_2022.schedule = None diff --git a/pipelines/datasets/br_jota/schedules.py b/pipelines/datasets/br_jota/schedules.py deleted file mode 100644 index a56e174ff..000000000 --- a/pipelines/datasets/br_jota/schedules.py +++ /dev/null @@ -1,68 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Schedules for br_jota -""" - -from datetime import datetime, timedelta - -from prefect.schedules import Schedule, filters -from prefect.schedules.clocks import IntervalClock - -from pipelines.constants import constants - -schedule_candidatos = Schedule( - clocks=[ - IntervalClock( - interval=timedelta(days=1), - start_date=datetime(2021, 1, 1, 10, 45), - labels=[ - constants.BASEDOSDADOS_PERGUNTAS_AGENT_LABEL.value, - ], - parameter_defaults={ - "dataset_id": "br_jota", - "table_id": "eleicao_perfil_candidato_2022", - "materialization_mode": "dev", - "dbt_alias": False, - }, - ), - ], - filters=[filters.is_weekday], -) - -schedule_contas_candidato = Schedule( - clocks=[ - IntervalClock( - interval=timedelta(days=1), - start_date=datetime(2021, 1, 1, 10, 50), - labels=[ - constants.BASEDOSDADOS_PERGUNTAS_AGENT_LABEL.value, - ], - parameter_defaults={ - "dataset_id": "br_jota", - "table_id": "eleicao_prestacao_contas_candidato_2022", - "materialization_mode": "dev", - "dbt_alias": False, - }, - ), - ], - filters=[filters.is_weekday], -) - -schedule_contas_candidato_origem = Schedule( - clocks=[ - IntervalClock( - interval=timedelta(days=1), - start_date=datetime(2021, 1, 1, 10, 55), - labels=[ - constants.BASEDOSDADOS_PERGUNTAS_AGENT_LABEL.value, - ], - parameter_defaults={ - "dataset_id": "br_jota", - "table_id": "eleicao_prestacao_contas_candidato_origem_2022", - "materialization_mode": "dev", - "dbt_alias": False, - }, - ), - ], - filters=[filters.is_weekday], -)