Skip to content

Commit

Permalink
feat: migrating to utils.utils
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurfg committed Jul 10, 2023
1 parent 3163ee0 commit 00e1720
Show file tree
Hide file tree
Showing 5 changed files with 767 additions and 160 deletions.
106 changes: 103 additions & 3 deletions pipelines/utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

from pipelines.constants import constants
from pipelines.utils.utils import (
get_credentials_from_secret,
log,
dump_header_to_csv,
find_ids,
parse_temporal_coverage,
get_credentials_utils,
create_update,
extract_last_update,
get_first_date,
log,
get_credentials_from_secret,
)

from typing import Tuple

##################
#
Expand Down Expand Up @@ -343,3 +349,97 @@ def get_date_time_str(wait=None) -> str:
Get current time as string
"""
return datetime.now().strftime("%Y-%m-%d %HH:%MM")


########################
#
# Update Django Metadata
#
########################
@task # noqa
def update_django_metadata(
dataset_id: str,
table_id: str,
metadata_type: str,
_last_date=None,
bq_last_update: bool = True,
):
"""
Updates Django metadata.
Args:
dataset_id (str): The ID of the dataset.
table_id (str): The ID of the table.
metadata_type (str): The type of metadata to update.
_last_date (optional): The last date for metadata update if `bq_last_update` is False. Defaults to None.
bq_last_update (bool, optional): Flag indicating whether to use BigQuery's last update date for metadata.
If True, `_last_date` is ignored. Defaults to True.
Returns:
None
Raises:
Exception: If the metadata_type is not supported.
"""
(email, password) = get_credentials_utils(secret_path="api_user_prod")

ids = find_ids(
dataset_id,
table_id,
email,
password,
)

if metadata_type == "DateTimeRange":
if bq_last_update:
last_date = extract_last_update(
dataset_id,
table_id,
)
first_date = get_first_date(
ids,
email,
password,
)

resource_to_temporal_coverage = parse_temporal_coverage(
f"{first_date}{last_date}"
)
resource_to_temporal_coverage["coverage"] = ids.get("coverage_id")
log(f"Mutation parameters: {resource_to_temporal_coverage}")

create_update(
query_class="allDatetimerange",
query_parameters={"$coverage_Id: ID": ids.get("coverage_id")},
mutation_class="CreateUpdateDateTimeRange",
mutation_parameters=resource_to_temporal_coverage,
update=True,
email=email,
password=password,
)
else:
last_date = _last_date
log(f"Última data {last_date}")
first_date = get_first_date(
ids,
email,
password,
)

resource_to_temporal_coverage = parse_temporal_coverage(
f"{first_date}{last_date}"
)

resource_to_temporal_coverage["coverage"] = ids.get("coverage_id")
log(f"Mutation parameters: {resource_to_temporal_coverage}")

create_update(
query_class="allDatetimerange",
query_parameters={"$coverage_Id: ID": ids.get("coverage_id")},
mutation_class="CreateUpdateDateTimeRange",
mutation_parameters=resource_to_temporal_coverage,
update=True,
email=email,
password=password,
)
52 changes: 28 additions & 24 deletions pipelines/utils/temporal_coverage_updater/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.constants import constants
from pipelines.utils.temporal_coverage_updater.tasks import (
find_ids,
extract_last_update,
get_first_date,
update_temporal_coverage,
get_credentials,
from pipelines.utils.tasks import (
update_django_metadata,
)

# from pipelines.datasets.temporal_coverage_updater.schedules import every_two_weeks
from pipelines.utils.decorators import Flow
from prefect import Parameter
from pipelines.utils.utils import log

# from pipelines.utils.utils import log

with Flow(
name="update_temporal_coverage_teste",
Expand All @@ -28,24 +25,31 @@
dataset_id = Parameter("dataset_id", default="test_dataset", required=True)
table_id = Parameter("table_id", default="test_laura_student", required=True)

(email, password) = get_credentials(secret_path="api_user_prod")
ids = find_ids(
dataset_id, table_id, email, password, upstream_tasks=[email, password]
)
last_date = extract_last_update(
dataset_id, table_id, upstream_tasks=[ids, email, password]
)
first_date = get_first_date(
ids, email, password, upstream_tasks=[ids, last_date, email, password]
)
update_temporal_coverage(
ids,
first_date,
last_date,
email,
password,
upstream_tasks=[ids, last_date, first_date, email, password],
update_django_metadata(
dataset_id,
table_id,
metadata_type="DateTimeRange",
bq_last_update=False,
_last_date="2030-01-01",
)
# (email, password) = get_credentials(secret_path="api_user_prod")
# ids = find_ids(
# dataset_id, table_id, email, password, upstream_tasks=[email, password]
# )
# last_date = extract_last_update(
# dataset_id, table_id, upstream_tasks=[ids, email, password]
# )
# first_date = get_first_date(
# ids, email, password, upstream_tasks=[ids, last_date, email, password]
# )
# update_temporal_coverage(
# ids,
# first_date,
# last_date,
# email,
# password,
# upstream_tasks=[ids, last_date, first_date, email, password],
# )


temporal_coverage_updater_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
Expand Down
Loading

0 comments on commit 00e1720

Please sign in to comment.