Skip to content

Commit

Permalink
Merge pull request #868 from basedosdados/staging/br_sicar
Browse files Browse the repository at this point in the history
br_sfb_sicar
  • Loading branch information
folhesgabriel authored Oct 9, 2024
2 parents 3724d02 + 8325860 commit 0ba452c
Show file tree
Hide file tree
Showing 9 changed files with 2,070 additions and 1,269 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ RUN apt-get update && \
python3-dev \
traceroute \
wget \
tesseract-ocr \
python3-opencv \
git \
bzip2 \
libxtst6 \
Expand Down
2 changes: 1 addition & 1 deletion pipelines/datasets/botdosdados/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
#import seaborn as sns
import tweepy
from basedosdados.download.metadata import _safe_fetch
from prefect import task
Expand Down
21 changes: 21 additions & 0 deletions pipelines/datasets/br_sfb_sicar/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""
Constants for br_sfb_sicar
"""

from enum import Enum

class Constants(Enum): # pylint: disable=c0103

INPUT_PATH = '/tmp/car/input'
OUTPUT_PATH = '/tmp/car/output'

UF_SIGLAS = [

'SP','BA','MT','RS','MG',
'AL','AP','AM','CE','RN',
'DF','ES','GO','MA','MS',
'PA','PB','PR','PE','PI','RJ',
'RO','RR','SC','AC','SE','TO'
]

133 changes: 133 additions & 0 deletions pipelines/datasets/br_sfb_sicar/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# -*- coding: utf-8 -*-
"""
Flows for br_sfb_sicar
"""

# pylint: disable=invalid-name
from datetime import timedelta

from prefect import Parameter, case, unmapped
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.datasets.br_sfb_sicar.constants import Constants as car_constants
from pipelines.datasets.br_sfb_sicar.schedules import schedule_br_sfb_sicar_area_imovel
from pipelines.datasets.br_sfb_sicar.tasks import download_car, unzip_to_parquet, get_each_uf_release_date
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import (
check_if_data_is_outdated,
update_django_metadata,
)
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
log_task,
rename_current_flow_run_dataset_table,
)

INPUTPATH = car_constants.INPUT_PATH.value
OUTPUTPATH = car_constants.OUTPUT_PATH.value
SIGLAS_UF = car_constants.UF_SIGLAS.value


with Flow(
name="br_sfb_sicar.area_imovel", code_owners=["Gabriel Pisa"]
) as br_sfb_sicar_area_imovel:
# Parameters
dataset_id = Parameter("dataset_id", default="br_sfb_sicar", required=True)
table_id = Parameter("table_id", default="area_imovel", required=True)
update_metadata = Parameter("update_metadata", default=False, required=False)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=True, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id
)


download_polygons = download_car.map(
inputpath=unmapped(INPUTPATH),
outputpath=unmapped(OUTPUTPATH),
sigla_uf=SIGLAS_UF,
polygon=unmapped('AREA_IMOVEL'),
)

ufs_release_dates = get_each_uf_release_date(upstream_tasks=[download_polygons])

unzip_from_shp_to_parquet_wkt = unzip_to_parquet(
inputpath=INPUTPATH,
outputpath=OUTPUTPATH,
uf_relase_dates=ufs_release_dates,
upstream_tasks=[download_polygons,ufs_release_dates]
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=OUTPUTPATH,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
source_format='parquet',
wait=unzip_from_shp_to_parquet_wkt,
)

with case(materialize_after_dump, True):

current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
"disable_elementary": False,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks = [wait_upload_table]
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"date": "data_extracao",},
date_format="%Y-%m-%d",
coverage_type="part_bdpro",
time_delta={"months": 6},
prefect_mode=materialization_mode,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)



br_sfb_sicar_area_imovel.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_sfb_sicar_area_imovel.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
br_sfb_sicar_area_imovel.schedule = schedule_br_sfb_sicar_area_imovel
33 changes: 33 additions & 0 deletions pipelines/datasets/br_sfb_sicar/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
"""
Schedules for br_sfb_sicar
"""

from datetime import datetime

from prefect.schedules import Schedule, adjustments, filters
from prefect.schedules.clocks import CronClock

from pipelines.constants import constants

schedule_br_sfb_sicar_area_imovel = Schedule(
clocks=[
CronClock(
cron="15 21 15 * *", #At 21:15 on day-of-month 15
start_date=datetime(2024, 10, 1, 0, 0),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_sfb_sicar",
"table_id": "area_imovel",
"materialization_mode": "prod",
"materialize_after_dump": True,
"update_metadata": True,
"dbt_alias": True,
},
)
],
filters=[filters.is_weekday],
adjustments=[adjustments.next_weekday],
)
81 changes: 81 additions & 0 deletions pipelines/datasets/br_sfb_sicar/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
"""
Tasks for br_sfb_sicar
"""

from prefect import task
from SICAR import Sicar, Polygon, State
import os
from datetime import datetime, timedelta
from typing import Dict
import httpx
import time as tm
from pipelines.datasets.br_sfb_sicar.constants import (
Constants as car_constants,
)

from pipelines.datasets.br_sfb_sicar.utils import (
process_all_files,
retry_download_car,
)

from pipelines.utils.utils import log
from pipelines.constants import constants


@task(
max_retries=3,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download_car(inputpath, outputpath, sigla_uf, polygon):
os.makedirs(f'{inputpath}', exist_ok=True)
os.makedirs(f'{outputpath}', exist_ok=True)

log('Baixando o CAR')

car = Sicar()

log(f'Iniciando o download do estado {sigla_uf}')

try:
retry_download_car(
car=car,
state=sigla_uf,
polygon=polygon,
folder=inputpath,
max_retries=8
)
except httpx.ReadTimeout as e:
log(f'Erro de Timeout {e} ao baixar dados de {sigla_uf} após múltiplas tentativas.')
raise e
except Exception as e:
log(f'Erro geral ao baixar {sigla_uf}: {e}')
raise e


@task(
max_retries=10,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_each_uf_release_date()-> Dict:

car = Sicar()
log('Extraindo a data de atualização dos dados de cada UF')

car = Sicar()

ufs_release_dates = car.get_release_dates()

return ufs_release_dates




@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def unzip_to_parquet(inputpath, outputpath,uf_relase_dates):
process_all_files(zip_folder=inputpath, output_folder=outputpath,uf_relase_dates=uf_relase_dates)


Loading

0 comments on commit 0ba452c

Please sign in to comment.