diff --git a/pipelines/utils/tasks.py b/pipelines/utils/tasks.py index a4fe1c856..3909799c5 100644 --- a/pipelines/utils/tasks.py +++ b/pipelines/utils/tasks.py @@ -19,7 +19,7 @@ from pipelines.constants import constants from pipelines.utils.utils import ( dump_header_to_csv, - find_ids, + get_ids, parse_temporal_coverage, get_credentials_utils, create_update, @@ -27,6 +27,7 @@ get_first_date, log, get_credentials_from_secret, + get_token, ) from typing import Tuple @@ -371,7 +372,9 @@ def update_django_metadata( table_id: str, metadata_type: str, _last_date=None, + date_format: str = "yy-mm-dd", bq_last_update: bool = True, + api_mode: str = "prod", ): """ Updates Django metadata. @@ -381,8 +384,10 @@ def update_django_metadata( table_id (str): The ID of the table. metadata_type (str): The type of metadata to update. _last_date (optional): The last date for metadata update if `bq_last_update` is False. Defaults to None. + date_format (str, optional): The date format to use when parsing dates ('yy-mm-dd', 'yy-mm', or 'yy'). Defaults to 'yy-mm-dd'. bq_last_update (bool, optional): Flag indicating whether to use BigQuery's last update date for metadata. If True, `_last_date` is ignored. Defaults to True. + api_mode (str, optional): The API mode to be used ('prod', 'staging'). Defaults to 'prod'. Returns: None @@ -391,30 +396,25 @@ def update_django_metadata( Exception: If the metadata_type is not supported. """ - (email, password) = get_credentials_utils(secret_path="api_user_prod") + (email, password) = get_credentials_utils(secret_path=f"api_user_{api_mode}") - ids = find_ids( + ids = get_ids( dataset_id, table_id, email, password, ) + log(f"IDS:{ids}") if metadata_type == "DateTimeRange": if bq_last_update: last_date = extract_last_update( dataset_id, table_id, - ) - first_date = get_first_date( - ids, - email, - password, + date_format, ) - resource_to_temporal_coverage = parse_temporal_coverage( - f"{first_date}{last_date}" - ) + resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id") log(f"Mutation parameters: {resource_to_temporal_coverage}") @@ -426,19 +426,13 @@ def update_django_metadata( update=True, email=email, password=password, + api_mode=api_mode, ) else: last_date = _last_date log(f"Última data {last_date}") - first_date = get_first_date( - ids, - email, - password, - ) - resource_to_temporal_coverage = parse_temporal_coverage( - f"{first_date}{last_date}" - ) + resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id") log(f"Mutation parameters: {resource_to_temporal_coverage}") @@ -451,4 +445,5 @@ def update_django_metadata( update=True, email=email, password=password, + api_mode=api_mode, ) diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index ed0ef7fe1..9a7f6e2bb 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -576,13 +576,19 @@ def get_credentials_from_env( ####################### # Django Metadata Utils ####################### -def get_first_date(ids, email, password): +def get_first_date( + ids, + email, + password, + date_format: str, + api_mode: str = "prod", +): """ Retrieves the first date from the given coverage ID. Args: ids (dict): A dictionary containing the dataset ID, table ID, and coverage ID. - + date_format (str): Date format ('yy-mm-dd', 'yy-mm', or 'yy') Returns: str: The first date in the format 'YYYY-MM-DD(interval)'. @@ -595,9 +601,17 @@ def get_first_date(ids, email, password): query_parameters={"$coverage_Id: ID": ids.get("coverage_id")}, email=email, password=password, + api_mode=api_mode, ) data = date["data"]["allDatetimerange"]["edges"][0]["node"] - first_date = f"{data['startYear']}-{data['startMonth']}-{data['startDay']}({data['interval']})" + log(data) + if date_format == "yy-mm-dd": + first_date = f"{data['startYear']}-{data['startMonth']}-{data['startDay']}({data['interval']})" + elif date_format == "yy-mm": + first_date = f"{data['startYear']}-{data['startMonth']}({data['interval']})" + elif date_format == "yy": + first_date = f"{data['startYear']}({data['interval']})" + log(f"Primeira data: {first_date}") return first_date except Exception as e: @@ -605,13 +619,14 @@ def get_first_date(ids, email, password): raise -def extract_last_update(dataset_id, table_id): +def extract_last_update(dataset_id, table_id, date_format: str): """ Extracts the last update date of a given dataset table. Args: dataset_id (str): The ID of the dataset. table_id (str): The ID of the table. + date_format (str): Date format ('yy-mm-dd', 'yy-mm', or 'yy') Returns: str: The last update date in the format 'YYYY-MM-DD'. @@ -628,8 +643,7 @@ def extract_last_update(dataset_id, table_id): WHERE table_id = '{table_id}' """ - # bd_base = Base() - # billing_project_id = bd_base.config["gcloud-projects"]["prod"]["name"] + t = bd.read_sql( query=query_bd, billing_project_id="basedosdados-dev", @@ -639,7 +653,12 @@ def extract_last_update(dataset_id, table_id): t["last_modified_time"][0] / 1000 ) # Convert to seconds by dividing by 1000 dt = datetime.fromtimestamp(timestamp) - last_date = dt.strftime("%Y-%m-%d") + if date_format == "yy-mm-dd": + last_date = dt.strftime("%Y-%m-%d") + elif date_format == "yy-mm": + last_date = dt.strftime("%Y-%m") + elif date_format == "yy": + last_date = dt.strftime("%Y") log(f"Última data: {last_date}") return last_date except Exception as e: @@ -683,22 +702,44 @@ def get_credentials_utils(secret_path: str) -> Tuple[str, str]: return email, password -def get_token(email, password): - r = requests.post( - "http://api.basedosdados.org/api/v1/graphql", - headers={"Content-Type": "application/json"}, - json={ - "query": """ - mutation ($email: String!, $password: String!) { - tokenAuth(email: $email, password: $password) { - token +def get_token(email, password, api_mode: str = "prod"): + """ + Get api token. + """ + r = None + if api_mode == "prod": + r = requests.post( + "http://api.basedosdados.org/api/v1/graphql", + headers={"Content-Type": "application/json"}, + json={ + "query": """ + mutation ($email: String!, $password: String!) { + tokenAuth(email: $email, password: $password) { + token + } + } + """, + "variables": {"email": email, "password": password}, + }, + ) + + r.raise_for_status() + elif api_mode == "staging": + r = requests.post( + "http://staging.api.basedosdados.org/api/v1/graphql", + headers={"Content-Type": "application/json"}, + json={ + "query": """ + mutation ($email: String!, $password: String!) { + tokenAuth(email: $email, password: $password) { + token + } } - } - """, - "variables": {"email": email, "password": password}, - }, - ) - r.raise_for_status() + """, + "variables": {"email": email, "password": password}, + }, + ) + r.raise_for_status() return r.json()["data"]["tokenAuth"]["token"] @@ -707,11 +748,10 @@ def get_id( query_parameters, email, password, -): # sourcery skip: avoid-builtin-shadow - # email = temp_constants.EMAIL.value - # password = temp_constants.PASSWORD.value - # backend = b.Backend(graphql_url="http://api.basedosdados.org/api/v1/graphql") - token = get_token(email, password) + api_mode: str = "prod", + cloud_table: bool = True, +): + token = get_token(email, password, api_mode) header = { "Authorization": f"Bearer {token}", } @@ -723,20 +763,42 @@ def get_id( values = list(query_parameters.values()) _input = ", ".join([f"{key}:${key}" for key in keys]) - query = f"""query({_filter}) {{ - {query_class}({_input}){{ - edges{{ - node{{ - id, + if cloud_table: + query = f"""query({_filter}) {{ + {query_class}({_input}){{ + edges{{ + node{{ + id, + table{{ + _id + }} + }} }} - }} - }} - }}""" - r = requests.post( - url="https://api.basedosdados.org/api/v1/graphql", - json={"query": query, "variables": dict(zip(keys, values))}, - headers=header, - ).json() + }} + }}""" + else: + query = f"""query({_filter}) {{ + {query_class}({_input}){{ + edges{{ + node{{ + id, + }} + }} + }} + }}""" + + if api_mode == "staging": + r = requests.post( + url=f"https://{api_mode}.api.basedosdados.org/api/v1/graphql", + json={"query": query, "variables": dict(zip(keys, values))}, + headers=header, + ).json() + elif api_mode == "prod": + r = requests.post( + url="https://api.basedosdados.org/api/v1/graphql", + json={"query": query, "variables": dict(zip(keys, values))}, + headers=header, + ).json() if "data" in r and r is not None: if r.get("data", {}).get(query_class, {}).get("edges") == []: @@ -753,15 +815,22 @@ def get_id( def get_date( - query_class, query_parameters, email, password -): # sourcery skip: avoid-builtin-shadow - # email = temp_constants.EMAIL.value - # password = temp_constants.PASSWORD.value - # backend = b.Backend(graphql_url="http://api.basedosdados.org/api/v1/graphql") - token = get_token(email=email, password=password) + query_class, + query_parameters, + email, + password, + api_mode: str = "prod", +): + token = get_token( + email=email, + password=password, + api_mode=api_mode, + ) + log("puxou token") header = { "Authorization": f"Bearer {token}", } + log(f"{header}") _filter = ", ".join(list(query_parameters.keys())) keys = [ parameter.replace("$", "").split(":")[0] @@ -782,11 +851,20 @@ def get_date( }} }} }}""" - r = requests.post( - url="https://api.basedosdados.org/api/v1/graphql", - json={"query": query, "variables": dict(zip(keys, values))}, - headers=header, - ).json() + + if api_mode == "staging": + r = requests.post( + url=f"https://{api_mode}.api.basedosdados.org/api/v1/graphql", + json={"query": query, "variables": dict(zip(keys, values))}, + headers=header, + ).json() + elif api_mode == "prod": + r = requests.post( + url="https://api.basedosdados.org/api/v1/graphql", + json={"query": query, "variables": dict(zip(keys, values))}, + headers=header, + ).json() + return r @@ -798,11 +876,13 @@ def create_update( query_class, query_parameters, update=False, + api_mode: str = "prod", ): - # email = temp_constants.EMAIL.value - # password = temp_constants.PASSWORD.value - # backend = b.Backend(graphql_url="http://api.basedosdados.org/api/v1/graphql") - token = get_token(email=email, password=password) + token = get_token( + email=email, + password=password, + api_mode=api_mode, + ) header = { "Authorization": f"Bearer {token}", } @@ -812,6 +892,8 @@ def create_update( query_parameters=query_parameters, email=email, password=password, + cloud_table=False, + api_mode=api_mode, ) if id is not None: r["r"] = "query" @@ -836,11 +918,19 @@ def create_update( if update is True and id is not None: mutation_parameters["id"] = id - r = requests.post( - "https://api.basedosdados.org/api/v1/graphql", - json={"query": query, "variables": {"input": mutation_parameters}}, - headers=header, - ).json() + + if api_mode == "prod": + r = requests.post( + "https://api.basedosdados.org/api/v1/graphql", + json={"query": query, "variables": {"input": mutation_parameters}}, + headers=header, + ).json() + elif api_mode == "staging": + r = requests.post( + f"https://{api_mode}api.basedosdados.org/api/v1/graphql", + json={"query": query, "variables": {"input": mutation_parameters}}, + headers=header, + ).json() r["r"] = "mutation" if "data" in r and r is not None: @@ -851,7 +941,6 @@ def create_update( raise Exception("create: Error") else: id = r["data"][mutation_class][_classe]["id"] - # print(f"create: created {id}") id = id.split(":")[1] return r, id @@ -889,7 +978,7 @@ def parse_temporal_coverage(temporal_coverage): start_str = end_str elif end_str == "" and start_str != "": end_str = start_str - elif len(temporal_coverage) == 4: + elif len(temporal_coverage) >= 4: start_str, interval_str, end_str = temporal_coverage, 1, temporal_coverage start_len = 0 if start_str == "" else len(start_str.split("-")) end_len = 0 if end_str == "" else len(end_str.split("-")) @@ -917,51 +1006,38 @@ def parse_date(position, date_str, date_len): if interval_str != 0: start_result["interval"] = int(interval_str) - return start_result + return end_result -def get_ids(dataset_name: str, table_name: str, email: str, password: str) -> dict: +def get_ids( + dataset_name: str, + table_name: str, + email: str, + password: str, + api_mode: str = "prod", +) -> dict: """ - Obtains the IDs of the dataset, table, and coverage based on the provided names. - - Args: - dataset_name (str): Name of the dataset. - table_name (str): Name of the table. - - Returns: - dict: Dictionary containing the 3 IDs. - - Raises: - ValueError: If any of the IDs are not found. - Exception: If an error occurs while retrieving the IDs. + Obtains the IDs of the table and coverage based on the provided names. """ try: - # Get the dataset ID - dataset_result = get_id( - email=email, - password=password, - query_class="allDataset", - query_parameters={"$slug: String": dataset_name}, - ) - if not dataset_result: - raise ValueError("Dataset ID not found.") - - dataset_id = dataset_result[1] - # Get the table ID table_result = get_id( email=email, password=password, - query_class="allTable", + query_class="allCloudtable", query_parameters={ - "$slug: String": table_name, - "$dataset_Id: ID": dataset_id, + "$gcpDatasetId: String": dataset_name, + "$gcpTableId: String": table_name, }, + cloud_table=True, + api_mode=api_mode, ) if not table_result: raise ValueError("Table ID not found.") - table_id = table_result[1] + table_id = table_result[0]["data"]["allCloudtable"]["edges"][0]["node"][ + "table" + ].get("_id") # Get the coverage IDs coverage_result = get_id( @@ -969,12 +1045,13 @@ def get_ids(dataset_name: str, table_name: str, email: str, password: str) -> di password=password, query_class="allCoverage", query_parameters={"$table_Id: ID": table_id}, + api_mode=api_mode, + cloud_table=True, ) if not coverage_result: raise ValueError("Coverage ID not found.") coverage_ids = coverage_result[0] - # print(coverage_ids) # Check if there are multiple coverage IDs if len(coverage_ids["data"]["allCoverage"]["edges"]) > 1: @@ -986,10 +1063,9 @@ def get_ids(dataset_name: str, table_name: str, email: str, password: str) -> di coverage_id = coverage_ids["data"]["allCoverage"]["edges"][0]["node"][ "id" ].split(":")[-1] - # print(coverage_id) - # Return the 3 IDs in a dictionary + + # Return the 2 IDs in a dictionary return { - "dataset_id": dataset_id, "table_id": table_id, "coverage_id": coverage_id, }