Skip to content

Commit

Permalink
Merge branch 'master' into staging/update_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 13, 2023
2 parents 3d49cb2 + 6ec5391 commit 8bd0376
Show file tree
Hide file tree
Showing 15 changed files with 6,656 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ jobs:

- name: Register Prefect flows
run: |-
python .github/workflows/scripts/register_flows.py --project $PREFECT__SERVER__PROJECT --path pipelines/ --schedule --filter-affected-flows
python .github/workflows/scripts/register_flows.py --project $PREFECT__SERVER__PROJECT --path pipelines/ --schedule
1 change: 1 addition & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class constants(Enum): # pylint: disable=c0103
TASK_MAX_RETRIES = 5
TASK_RETRY_DELAY = 10 # seconds
PREFECT_DEFAULT_PROJECT = "main"
PREFECT_STAGING_PROJECT = "staging"
# Code Owners #

######################################
Expand Down
1 change: 1 addition & 0 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@
from pipelines.datasets.br_ons_avaliacao_operacao.flows import *
from pipelines.datasets.br_ons_estimativa_custos.flows import *
from pipelines.datasets.br_b3_cotacoes.flows import *
from pipelines.datasets.br_mercadolivre_ofertas.flows import *
72 changes: 36 additions & 36 deletions pipelines/datasets/br_cvm_fi/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from pipelines.utils.constants import constants as utils_constants
from pipelines.datasets.br_cvm_fi.constants import constants as cvm_constants
from pipelines.constants import constants
from pipelines.utils.utils import (
log,
from pipelines.utils.tasks import (
log_task,
)
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
Expand All @@ -57,24 +57,24 @@
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=True)
table_id = Parameter("table_id", default="documentos_informe_diario", required=True)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=True
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=False, required=True
"materialize_after_dump", default=False, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

url = Parameter(
"url",
default=cvm_constants.INFORME_DIARIO_URL.value,
required=True,
required=False,
)
df = extract_links_and_dates(url)
log(f"Links e datas: {df}")
log_task(f"Links e datas: {df}")
arquivos = check_for_updates(df, upstream_tasks=[df])
log(f"Arquivos: {arquivos}")
log_task(f"Arquivos: {arquivos}")
with case(is_empty(arquivos), True):
log(f"Não houveram atualizações em {url.default}!")
log_task(f"Não houveram atualizações em {url.default}!")

with case(is_empty(arquivos), False):
input_filepath = download_unzip_csv(
Expand Down Expand Up @@ -139,9 +139,9 @@
],
) as br_cvm_fi_documentos_carteiras_fundos_investimento:
# Parameters
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=False)
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=True)
table_id = Parameter(
"table_id", default="documentos_carteiras_fundos_investimento", required=False
"table_id", default="documentos_carteiras_fundos_investimento", required=True
)

materialization_mode = Parameter(
Expand All @@ -155,15 +155,15 @@
url = Parameter(
"url",
default=cvm_constants.CDA_URL.value,
required=True,
required=False,
)

df = extract_links_and_dates(url)
log(f"Links e datas: {df}")
log_task(f"Links e datas: {df}")
arquivos = check_for_updates(df, upstream_tasks=[df])
log(f"Arquivos: {arquivos}")
log_task(f"Arquivos: {arquivos}")
with case(is_empty(arquivos), True):
log(f"Não houveram atualizações em {url.default}!")
log_task(f"Não houveram atualizações em {url.default}!")

with case(is_empty(arquivos), False):
input_filepath = download_unzip_csv(
Expand Down Expand Up @@ -230,22 +230,22 @@
],
) as br_cvm_fi_documentos_extratos_informacoes:
# Parameters
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=False)
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=True)
table_id = Parameter(
"table_id", default="documentos_extratos_informacoes", required=False
"table_id", default="documentos_extratos_informacoes", required=True
)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=True
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=False, required=True
"materialize_after_dump", default=False, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

url = Parameter(
"url",
default=cvm_constants.URL_EXT.value,
required=True,
required=False,
)

file = Parameter(
Expand All @@ -258,7 +258,7 @@
arquivos = check_for_updates_ext(df, upstream_tasks=[df])

with case(is_empty(arquivos), True):
log(f"Não houveram atualizações em {url.default}!")
log_task(f"Não houveram atualizações em {url.default}!")

with case(is_empty(arquivos), False):
input_filepath = download_csv_cvm(
Expand Down Expand Up @@ -325,8 +325,8 @@
],
) as br_cvm_fi_documentos_perfil_mensal:
# Parameters
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=False)
table_id = Parameter("table_id", default="documentos_perfil_mensal", required=False)
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=True)
table_id = Parameter("table_id", default="documentos_perfil_mensal", required=True)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=False
)
Expand All @@ -338,14 +338,14 @@
url = Parameter(
"url",
default=cvm_constants.URL_PERFIL_MENSAL.value,
required=True,
required=False,
)

df = extract_links_and_dates(url)
arquivos = check_for_updates(df, upstream_tasks=[df])

with case(is_empty(arquivos), True):
log(f"Não houveram atualizações em {url.default}!")
log_task(f"Não houveram atualizações em {url.default}!")

with case(is_empty(arquivos), False):
input_filepath = download_csv_cvm(
Expand Down Expand Up @@ -410,28 +410,28 @@
],
) as br_cvm_fi_documentos_informacao_cadastral:
# Parameters
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=False)
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=True)
table_id = Parameter(
"table_id", default="documentos_informacao_cadastral", required=False
"table_id", default="documentos_informacao_cadastral", required=True
)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=True
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=False, required=True
"materialize_after_dump", default=False, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

url = Parameter(
"url",
default=cvm_constants.URL_INFO_CADASTRAL.value,
required=True,
required=False,
)

files = Parameter("files", default=cvm_constants.CAD_FILE.value, required=False)

with case(is_empty(files), True):
log(f"Não houveram atualizações em {url.default}!")
log_task(f"Não houveram atualizações em {url.default}!")

with case(is_empty(files), False):
input_filepath = download_csv_cvm(url=url, files=files, table_id=table_id)
Expand Down Expand Up @@ -496,28 +496,28 @@
],
) as br_cvm_fi_documentos_balancete:
# Parameters
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=False)
table_id = Parameter("table_id", default="documentos_balancete", required=False)
dataset_id = Parameter("dataset_id", default="br_cvm_fi", required=True)
table_id = Parameter("table_id", default="documentos_balancete", required=True)
materialization_mode = Parameter(
"materialization_mode", default="dev", required=True
"materialization_mode", default="dev", required=False
)
materialize_after_dump = Parameter(
"materialize_after_dump", default=False, required=True
"materialize_after_dump", default=False, required=False
)
dbt_alias = Parameter("dbt_alias", default=False, required=False)

url = Parameter(
"url",
default=cvm_constants.URL_BALANCETE.value,
required=True,
required=False,
)

df = extract_links_and_dates(url)

files = check_for_updates(df, upstream_tasks=[df])

with case(is_empty(files), True):
log(f"Não houveram atualizações em {url.default}!")
log_task(f"Não houveram atualizações em {url.default}!")

with case(is_empty(files), False):
input_filepath = download_unzip_csv(url=url, files=files, id=table_id)
Expand Down
9 changes: 9 additions & 0 deletions pipelines/datasets/br_cvm_fi/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from prefect.schedules.clocks import IntervalClock
from pipelines.constants import constants
from prefect.schedules.clocks import CronClock
from pipelines.datasets.br_cvm_fi.constants import constants as cvm_constants


every_day_informe = Schedule(
Expand All @@ -24,6 +25,7 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": False,
"url": cvm_constants.INFORME_DIARIO_URL.value,
},
),
],
Expand All @@ -43,6 +45,7 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": False,
"url": cvm_constants.CDA_URL.value,
},
),
],
Expand All @@ -62,6 +65,8 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": False,
"url": cvm_constants.URL_EXT.value,
"file": cvm_constants.FILE_EXT.value,
},
),
],
Expand All @@ -82,6 +87,7 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": False,
"url": cvm_constants.URL_PERFIL_MENSAL.value,
},
),
],
Expand All @@ -101,6 +107,8 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": False,
"url": cvm_constants.URL_INFO_CADASTRAL.value,
"files": cvm_constants.CAD_FILE.value,
},
),
],
Expand All @@ -120,6 +128,7 @@
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": False,
"url": cvm_constants.URL_BALANCETE.value,
},
),
],
Expand Down
Empty file.
Loading

0 comments on commit 8bd0376

Please sign in to comment.