diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index ecd23486a..f279c729b 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -2,7 +2,6 @@ """ Flows for br_rj_isp_estatisticas_seguranca. """ - from datetime import timedelta from prefect.run_configs import KubernetesRun from prefect.storage import GCS @@ -12,10 +11,12 @@ wait_for_flow_run, ) from pipelines.constants import constants +from pipelines.utils.tasks import update_django_metadata from pipelines.utils.constants import constants as utils_constants from pipelines.datasets.br_rj_isp_estatisticas_seguranca.tasks import ( download_files, clean_data, + get_today_date, ) from pipelines.utils.decorators import Flow @@ -62,11 +63,12 @@ "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) + update_metadata = Parameter("update_metadata", default=True, required=False) d_files = download_files( file_name=isp_constants.EVOLUCAO_MENSAL_CISP.value, @@ -82,7 +84,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -115,6 +117,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) + evolucao_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) evolucao_mensal_cisp.schedule = every_month_evolucao_mensal_cisp @@ -140,11 +154,12 @@ materialize_after_dump = Parameter( "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) + update_metadata = Parameter("update_metadata", default=True, required=False) d_files = download_files( file_name=isp_constants.TAXA_EVOLUCAO_MENSAL_UF.value, @@ -160,7 +175,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -193,6 +208,17 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) taxa_evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -221,11 +247,12 @@ materialize_after_dump = Parameter( "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id ) + update_metadata = Parameter("update_metadata", default=True, required=False) d_files = download_files( file_name=isp_constants.TAXA_EVOLUCAO_MENSAL_MUNICIPIO.value, @@ -241,7 +268,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -274,6 +301,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) + taxa_evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) taxa_evolucao_mensal_municipio.run_config = KubernetesRun( @@ -301,7 +340,9 @@ materialize_after_dump = Parameter( "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) + + update_metadata = Parameter("update_metadata", default=True, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -321,7 +362,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -354,6 +395,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) + feminicidio_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) feminicidio_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -381,7 +434,9 @@ materialize_after_dump = Parameter( "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) + + update_metadata = Parameter("update_metadata", default=True, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -401,7 +456,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -434,6 +489,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) + evolucao_policial_morto_servico_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_policial_morto_servico_mensal.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value @@ -462,7 +529,9 @@ materialize_after_dump = Parameter( "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) + + update_metadata = Parameter("update_metadata", default=True, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -482,7 +551,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -515,6 +584,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) + armas_apreendidas_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) armas_apreendidas_mensal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) armas_apreendidas_mensal.schedule = every_month_armas_apreendidas_mensal @@ -539,7 +620,9 @@ materialize_after_dump = Parameter( "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) + + update_metadata = Parameter("update_metadata", default=True, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -559,7 +642,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -592,6 +675,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) + evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_municipio.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) evolucao_mensal_municipio.schedule = every_month_evolucao_mensal_municipio @@ -616,7 +711,9 @@ materialize_after_dump = Parameter( "materialize_after_dump", default=True, required=False ) - dbt_alias = Parameter("dbt_alias", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=False, required=False) + + update_metadata = Parameter("update_metadata", default=True, required=False) rename_flow_run = rename_current_flow_run_dataset_table( prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id @@ -636,7 +733,7 @@ data_path=filepath, dataset_id=dataset_id, table_id=table_id, - dump_mode="overwrite", + dump_mode="append", wait=filepath, ) @@ -669,6 +766,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) + evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) evolucao_mensal_uf.schedule = every_month_evolucao_mensal_uf diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/schedules.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/schedules.py index e8d96a0f4..3318ae454 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/schedules.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/schedules.py @@ -22,7 +22,7 @@ "table_id": "evolucao_mensal_cisp", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] @@ -42,7 +42,7 @@ "table_id": "taxa_evolucao_mensal_uf", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] @@ -62,7 +62,7 @@ "table_id": "taxa_evolucao_mensal_municipio", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] @@ -82,7 +82,7 @@ "table_id": "feminicidio_mensal_cisp", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] @@ -102,7 +102,7 @@ "table_id": "evolucao_policial_morto_servico_mensal", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] @@ -123,7 +123,7 @@ "table_id": "armas_apreendidas_mensal", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] @@ -144,7 +144,7 @@ "table_id": "evolucao_mensal_municipio", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] @@ -164,7 +164,7 @@ "table_id": "evolucao_mensal_uf", # ! table_id do dataset que será executado "materialization_mode": "dev", # ! Aonde o dataset será materializado (dev, prod ou prod-staging) "materialize_after_dump": True, # ! Se o dataset será materializado após o dump - "dbt_alias": True, + "dbt_alias": False, }, ), ] diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/tasks.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/tasks.py index ce94d4ad2..7dced300b 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/tasks.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/tasks.py @@ -2,7 +2,7 @@ import pandas as pd import os import requests -from datetime import timedelta +from datetime import datetime, timedelta from prefect import task from pipelines.utils.utils import ( @@ -115,3 +115,11 @@ def clean_data( log(f"df {file_name} salvo com sucesso") return isp_constants.OUTPUT_PATH.value + novo_nome + + +# task para retornar o ano e mes paara a atualização dos metadados. +@task +def get_today_date(): + d = datetime.now() - timedelta(days=60) + + return d.strftime("%Y-%m")