Skip to content

Commit

Permalink
Merge pull request #353 from basedosdados/staging/br_rj_isp_estatisti…
Browse files Browse the repository at this point in the history
…cas_seguranca

fix dbt alias false
  • Loading branch information
tricktx authored Jul 26, 2023
2 parents 0e9f685 + ef5af9c commit 23da633
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 26 deletions.
143 changes: 126 additions & 17 deletions pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"""
Flows for br_rj_isp_estatisticas_seguranca.
"""

from datetime import timedelta
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
Expand All @@ -12,10 +11,12 @@
wait_for_flow_run,
)
from pipelines.constants import constants
from pipelines.utils.tasks import update_django_metadata
from pipelines.utils.constants import constants as utils_constants
from pipelines.datasets.br_rj_isp_estatisticas_seguranca.tasks import (
download_files,
clean_data,
get_today_date,
)

from pipelines.utils.decorators import Flow
Expand Down Expand Up @@ -62,11 +63,12 @@
"materialize_after_dump", default=True, required=False
)

dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
update_metadata = Parameter("update_metadata", default=True, required=False)

d_files = download_files(
file_name=isp_constants.EVOLUCAO_MENSAL_CISP.value,
Expand All @@ -82,7 +84,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -115,6 +117,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)

evolucao_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
evolucao_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
evolucao_mensal_cisp.schedule = every_month_evolucao_mensal_cisp
Expand All @@ -140,11 +154,12 @@
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
update_metadata = Parameter("update_metadata", default=True, required=False)

d_files = download_files(
file_name=isp_constants.TAXA_EVOLUCAO_MENSAL_UF.value,
Expand All @@ -160,7 +175,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -193,6 +208,17 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)

taxa_evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
taxa_evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down Expand Up @@ -221,11 +247,12 @@
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)
update_metadata = Parameter("update_metadata", default=True, required=False)

d_files = download_files(
file_name=isp_constants.TAXA_EVOLUCAO_MENSAL_MUNICIPIO.value,
Expand All @@ -241,7 +268,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -274,6 +301,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)


taxa_evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
taxa_evolucao_mensal_municipio.run_config = KubernetesRun(
Expand Down Expand Up @@ -301,7 +340,9 @@
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -321,7 +362,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -354,6 +395,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)


feminicidio_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
feminicidio_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down Expand Up @@ -381,7 +434,9 @@
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -401,7 +456,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -434,6 +489,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)

evolucao_policial_morto_servico_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
evolucao_policial_morto_servico_mensal.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
Expand Down Expand Up @@ -462,7 +529,9 @@
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -482,7 +551,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -515,6 +584,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)

armas_apreendidas_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
armas_apreendidas_mensal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
armas_apreendidas_mensal.schedule = every_month_armas_apreendidas_mensal
Expand All @@ -539,7 +620,9 @@
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -559,7 +642,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -592,6 +675,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)

evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
evolucao_mensal_municipio.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
evolucao_mensal_municipio.schedule = every_month_evolucao_mensal_municipio
Expand All @@ -616,7 +711,9 @@
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

update_metadata = Parameter("update_metadata", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
Expand All @@ -636,7 +733,7 @@
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="overwrite",
dump_mode="append",
wait=filepath,
)

Expand Down Expand Up @@ -669,6 +766,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
api_mode="prod",
date_format="yy-mm",
_last_date=date,
)

evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
evolucao_mensal_uf.schedule = every_month_evolucao_mensal_uf
Loading

0 comments on commit 23da633

Please sign in to comment.