Skip to content

Commit

Permalink
feat: add prefect map to download_car task
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel committed Oct 1, 2024
1 parent 2004d9b commit e644a25
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 21 deletions.
8 changes: 7 additions & 1 deletion pipelines/datasets/br_sfb_sicar/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@
class Constants(Enum): # pylint: disable=c0103

INPUT_PATH = '/tmp/car/input'
OUTPUT_PATH = '/tmp/car/output'
OUTPUT_PATH = '/tmp/car/output'

UF_SIGLAS = [
'AC', 'AL', 'AP', #'AM', 'BA', 'CE', 'DF', 'ES', 'GO', 'MA',
#'MT', 'MS', 'MG', 'PA', 'PB', 'PR', 'PE', 'PI', 'RJ', 'RN',
#'RS', 'RO', 'RR', 'SC', 'SP', 'SE', 'TO'
]
18 changes: 12 additions & 6 deletions pipelines/datasets/br_sfb_sicar/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
# pylint: disable=invalid-name
from datetime import timedelta

from prefect import Parameter, case
from prefect import Parameter, case, unmapped
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

from pipelines.constants import constants
from pipelines.datasets.br_sfb_sicar.constants import Constants as car_constants
#rom pipelines.datasets.br_sfb_sicar.schedules import schedule_br_sfb_sicar_imoveis_rurais
from pipelines.datasets.br_sfb_sicar.tasks import download_car, unzip_to_parquet
from pipelines.datasets.br_sfb_sicar.tasks import download_car, unzip_to_parquet, get_each_uf_release_date
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
Expand All @@ -28,8 +28,10 @@
log_task,
rename_current_flow_run_dataset_table,
)

inputpath = car_constants.INPUT_PATH.value
outputpath = car_constants.OUTPUT_PATH.value
siglas_uf = car_constants.UF_SIGLAS.value

with Flow(
name="br_sfb_sicar.area_imovel", code_owners=["Gabriel Pisa"]
Expand All @@ -51,15 +53,19 @@
)


download_polygons = download_car(
inputpath=inputpath,
outputpath=outputpath
download_polygons = download_car.map(
inputpath=unmapped(inputpath),
outputpath=unmapped(outputpath),
sigla_uf=siglas_uf
)

ufs_release_dates = get_each_uf_release_date(upstream_tasks=[download_polygons])

unzip_from_shp_to_parquet_wkt = unzip_to_parquet(
inputpath=inputpath,
outputpath=outputpath,
upstream_tasks=[download_polygons]
uf_relase_dates=ufs_release_dates,
upstream_tasks=[download_polygons,ufs_release_dates]
)

wait_upload_table = create_table_and_upload_to_gcs(
Expand Down
40 changes: 32 additions & 8 deletions pipelines/datasets/br_sfb_sicar/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from SICAR import Sicar, Polygon, State
import os
from datetime import datetime, timedelta
from typing import Dict
import httpx

from pipelines.datasets.br_sfb_sicar.constants import (
Constants as car_constants,
Expand Down Expand Up @@ -37,27 +39,49 @@
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def download_car(inputpath, outputpath):
def download_car(inputpath, outputpath, sigla_uf):

os.makedirs( f'{inputpath}',exist_ok=True)
os.makedirs( f'{outputpath}',exist_ok=True)
ufs = ['AC', 'PA', 'MT', 'PB', 'RR', 'RO']

log('Downloading Car')

car = Sicar()
#TODo: make async cals
for uf in ufs:
log(f'downloading state {uf}')

log(f'downloading state {sigla_uf}')

try:
car.download_state(
state=uf,
state=sigla_uf,
polygon=Polygon.AREA_PROPERTY,
folder=inputpath)
except httpx.ReadTimeout as e:
log(f'Erro de timeout ao baixar {sigla_uf}. \n {e}')


@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def get_each_uf_release_date()-> Dict:

car = Sicar()
log('Extracting UFs relasea date')

car = Sicar()

ufs_release_dates = car.get_release_dates()

return ufs_release_dates




@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def unzip_to_parquet(inputpath, outputpath):
process_all_files(zip_folder=inputpath, output_folder=outputpath)
def unzip_to_parquet(inputpath, outputpath,uf_relase_dates):
process_all_files(zip_folder=inputpath, output_folder=outputpath,uf_relase_dates=uf_relase_dates)


8 changes: 8 additions & 0 deletions pipelines/datasets/br_sfb_sicar/temp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
from SICAR import Sicar

car = Sicar()

d = car.get_release_dates()

print(d['PA'])
23 changes: 17 additions & 6 deletions pipelines/datasets/br_sfb_sicar/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,37 @@

def unpack_zip(zip_file_path):
temp_dir = tempfile.mkdtemp()

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
try:
zip_ref.extractall(temp_dir)
except zipfile.BadZipFile as e:
log(f"The zipfile {zip_file_path.split('/')[-1]} is problably corrupted. {e}")
return temp_dir

def convert_shp_to_parquet(shp_file_path, output_parquet_path):
def convert_shp_to_parquet(shp_file_path, output_parquet_path,uf_relase_dates,sigla_uf):

gdf = gpd.read_file(shp_file_path)

# Convertendo geometria para WKT
gdf['geometry'] = gdf['geometry'].apply(lambda geom: geom.wkt)

# Convertendo para DataFrame do pandas
df = pd.DataFrame(gdf)
# Salvando em Parquet

date = datetime.strptime(uf_relase_dates[sigla_uf], '%d/%m/%Y').strftime('%Y-%m-%d')

log(f'The relase date for {sigla_uf} was {date}')

df['data_atualizacao_car'] = date

df.to_parquet(output_parquet_path, index=False)
#Todo: inserir data de extração como partição

return output_parquet_path
del df


def process_all_files(zip_folder, output_folder):
def process_all_files(zip_folder, output_folder, uf_relase_dates):

for zip_filename in os.listdir(zip_folder):

Expand Down Expand Up @@ -62,4 +73,4 @@ def process_all_files(zip_folder, output_folder):
log(f"Salvando {output_parquet_path}")

# Converte shapefile para parquet com coluna WKT para representar geometria
convert_shp_to_parquet(shp_file_path, output_parquet_path)
convert_shp_to_parquet(shp_file_path, output_parquet_path, uf_relase_dates, sigla_uf)

0 comments on commit e644a25

Please sign in to comment.