Skip to content

Commit

Permalink
Merge pull request #333 from basedosdados/staging/update_metadata
Browse files Browse the repository at this point in the history
[feat] temporal_coverage_updater
  • Loading branch information
laura-l-amaral authored Jul 13, 2023
2 parents 6ec5391 + 8bd0376 commit a06fc0c
Show file tree
Hide file tree
Showing 9 changed files with 1,178 additions and 3 deletions.
1 change: 1 addition & 0 deletions pipelines/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
from pipelines.utils.dump_to_gcs.flows import *
from pipelines.utils.execute_dbt_model.flows import *
from pipelines.utils.traceroute.flows import *
from pipelines.utils.temporal_coverage_updater.flows import *

# from pipelines.utils.crawler_fgv_igp.flows import *
106 changes: 103 additions & 3 deletions pipelines/utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

from pipelines.constants import constants
from pipelines.utils.utils import (
get_credentials_from_secret,
log,
dump_header_to_csv,
find_ids,
parse_temporal_coverage,
get_credentials_utils,
create_update,
extract_last_update,
get_first_date,
log,
get_credentials_from_secret,
)

from typing import Tuple

@task
def log_task(msg: Any, level: str = "info"):
Expand Down Expand Up @@ -351,3 +357,97 @@ def get_date_time_str(wait=None) -> str:
Get current time as string
"""
return datetime.now().strftime("%Y-%m-%d %HH:%MM")


########################
#
# Update Django Metadata
#
########################
@task # noqa
def update_django_metadata(
dataset_id: str,
table_id: str,
metadata_type: str,
_last_date=None,
bq_last_update: bool = True,
):
"""
Updates Django metadata.
Args:
dataset_id (str): The ID of the dataset.
table_id (str): The ID of the table.
metadata_type (str): The type of metadata to update.
_last_date (optional): The last date for metadata update if `bq_last_update` is False. Defaults to None.
bq_last_update (bool, optional): Flag indicating whether to use BigQuery's last update date for metadata.
If True, `_last_date` is ignored. Defaults to True.
Returns:
None
Raises:
Exception: If the metadata_type is not supported.
"""
(email, password) = get_credentials_utils(secret_path="api_user_prod")

ids = find_ids(
dataset_id,
table_id,
email,
password,
)

if metadata_type == "DateTimeRange":
if bq_last_update:
last_date = extract_last_update(
dataset_id,
table_id,
)
first_date = get_first_date(
ids,
email,
password,
)

resource_to_temporal_coverage = parse_temporal_coverage(
f"{first_date}{last_date}"
)
resource_to_temporal_coverage["coverage"] = ids.get("coverage_id")
log(f"Mutation parameters: {resource_to_temporal_coverage}")

create_update(
query_class="allDatetimerange",
query_parameters={"$coverage_Id: ID": ids.get("coverage_id")},
mutation_class="CreateUpdateDateTimeRange",
mutation_parameters=resource_to_temporal_coverage,
update=True,
email=email,
password=password,
)
else:
last_date = _last_date
log(f"Última data {last_date}")
first_date = get_first_date(
ids,
email,
password,
)

resource_to_temporal_coverage = parse_temporal_coverage(
f"{first_date}{last_date}"
)

resource_to_temporal_coverage["coverage"] = ids.get("coverage_id")
log(f"Mutation parameters: {resource_to_temporal_coverage}")

create_update(
query_class="allDatetimerange",
query_parameters={"$coverage_Id: ID": ids.get("coverage_id")},
mutation_class="CreateUpdateDateTimeRange",
mutation_parameters=resource_to_temporal_coverage,
update=True,
email=email,
password=password,
)
Empty file.
15 changes: 15 additions & 0 deletions pipelines/utils/temporal_coverage_updater/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
"""
Constant values for the datasets projects
"""

from enum import Enum


class constants(Enum): # pylint: disable=c0103
"""
Constant values for the temporal_coverage_updater project
"""

EMAIL = "email"
PASSWORD = "pass"
59 changes: 59 additions & 0 deletions pipelines/utils/temporal_coverage_updater/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
"""
Flows for temporal_coverage_updater
"""

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.constants import constants
from pipelines.utils.tasks import (
update_django_metadata,
)

# from pipelines.datasets.temporal_coverage_updater.schedules import every_two_weeks
from pipelines.utils.decorators import Flow
from prefect import Parameter

# from pipelines.utils.utils import log

with Flow(
name="update_temporal_coverage_teste",
code_owners=[
"arthurfg",
],
) as temporal_coverage_updater_flow:
dataset_id = Parameter("dataset_id", default="test_dataset", required=True)
table_id = Parameter("table_id", default="test_laura_student", required=True)

update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
_last_date="2030-01-01",
)
# (email, password) = get_credentials(secret_path="api_user_prod")
# ids = find_ids(
# dataset_id, table_id, email, password, upstream_tasks=[email, password]
# )
# last_date = extract_last_update(
# dataset_id, table_id, upstream_tasks=[ids, email, password]
# )
# first_date = get_first_date(
# ids, email, password, upstream_tasks=[ids, last_date, email, password]
# )
# update_temporal_coverage(
# ids,
# first_date,
# last_date,
# email,
# password,
# upstream_tasks=[ids, last_date, first_date, email, password],
# )


temporal_coverage_updater_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
temporal_coverage_updater_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
# flow.schedule = every_two_weeks
21 changes: 21 additions & 0 deletions pipelines/utils/temporal_coverage_updater/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""
Schedules for temporal_coverage_updater
"""

from datetime import timedelta, datetime
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from pipelines.constants import constants

every_two_weeks = Schedule(
clocks=[
IntervalClock(
interval=timedelta(weeks=2),
start_date=datetime(2021, 1, 1),
labels=[
constants.DATASETS_AGENT_LABEL.value,
],
),
]
)
114 changes: 114 additions & 0 deletions pipelines/utils/temporal_coverage_updater/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
"""
Tasks for temporal_coverage_updater
"""


from prefect import task

# from basedosdados.upload.base import Base
import basedosdados as bd
from pipelines.utils.temporal_coverage_updater.utils import (
find_ids,
parse_temporal_coverage,
get_credentials,
create_update,
extract_last_update,
get_first_date,
)
from datetime import datetime
from pipelines.utils.utils import log, get_credentials_from_secret
from typing import Tuple


## TODO: Transformar flow em task OK
## TODO: Criar novo argumento na função update_temporal_coverage p/ selecionar o "tipo" (bool) do first date e last date OK
## TODO: migrar p/ utils.tasks
## TODO: fazer check dentro do parse se está no formato padrão da BD e avisar ao usuário quando n estiver OK
@task
def update_django_metadata(
dataset_id: str,
table_id: str,
metadata_type: str,
_last_date=None,
bq_last_update: bool = True,
):
"""
Updates Django metadata.
Args:
dataset_id (str): The ID of the dataset.
table_id (str): The ID of the table.
metadata_type (str): The type of metadata to update.
_last_date (optional): The last date for metadata update if `bq_last_update` is False. Defaults to None.
bq_last_update (bool, optional): Flag indicating whether to use BigQuery's last update date for metadata.
If True, `_last_date` is ignored. Defaults to True.
Returns:
None
Raises:
Exception: If the metadata_type is not supported.
"""
(email, password) = get_credentials(secret_path="api_user_prod")

ids = find_ids(
dataset_id,
table_id,
email,
password,
)

if metadata_type == "DateTimeRange":
if bq_last_update:
last_date = extract_last_update(
dataset_id,
table_id,
)
first_date = get_first_date(
ids,
email,
password,
)

resource_to_temporal_coverage = parse_temporal_coverage(
f"{first_date}{last_date}"
)
resource_to_temporal_coverage["coverage"] = ids.get("coverage_id")
log(f"Mutation parameters: {resource_to_temporal_coverage}")

create_update(
query_class="allDatetimerange",
query_parameters={"$coverage_Id: ID": ids.get("coverage_id")},
mutation_class="CreateUpdateDateTimeRange",
mutation_parameters=resource_to_temporal_coverage,
update=True,
email=email,
password=password,
)
else:
last_date = _last_date
log(f"Última data {last_date}")
first_date = get_first_date(
ids,
email,
password,
)

resource_to_temporal_coverage = parse_temporal_coverage(
f"{first_date}{last_date}"
)

resource_to_temporal_coverage["coverage"] = ids.get("coverage_id")
log(f"Mutation parameters: {resource_to_temporal_coverage}")

create_update(
query_class="allDatetimerange",
query_parameters={"$coverage_Id: ID": ids.get("coverage_id")},
mutation_class="CreateUpdateDateTimeRange",
mutation_parameters=resource_to_temporal_coverage,
update=True,
email=email,
password=password,
)
Loading

0 comments on commit a06fc0c

Please sign in to comment.