From 8aa99fe7078c031516efe0d3ea1bb49ece93e09c Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 16 Oct 2024 14:45:09 -0300 Subject: [PATCH] fixing rionowcast --- .../precipitation_model/rionowcast/flows.py | 12 +++++++----- .../precipitation_model/rionowcast/tasks.py | 18 +++++++++++++++--- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pipelines/precipitation_model/rionowcast/flows.py b/pipelines/precipitation_model/rionowcast/flows.py index 3fa1aaa..e2fa19f 100644 --- a/pipelines/precipitation_model/rionowcast/flows.py +++ b/pipelines/precipitation_model/rionowcast/flows.py @@ -38,7 +38,7 @@ get_dataset_info, get_dataset_processor_info, get_output_dataset_ids_on_gypscie, - get_prediction_on_gypscie, + # get_prediction_on_gypscie, query_data_from_gcp, register_dataset_on_gypscie, ) @@ -111,7 +111,7 @@ billing_project_id="rj-cor", start_date=start_date, end_date=end_date, - format="parquet", + save_format="parquet", ) with case(station_type, "radar"): @@ -187,7 +187,8 @@ # preprocessing_previsao_chuva_rionowcast.schedule = update_schedule # https://github.com/prefeitura-rio/pipelines_rj_escritorio/blob/ -# 2433238db27adb1213059832f238495b9ecb5043/pipelines/deteccao_alagamento_cameras/flooding_detection/flows.py#L112 +# 2433238db27adb1213059832f238495b9ecb5043/pipelines/deteccao_alagamento_cameras/ +# flooding_detection/flows.py#L112 # https://linen.prefect.io/t/13543083/how-do-i-run-the-same-subflow-concurrently-for-items-in-a-li @@ -260,7 +261,7 @@ billing_project_id="rj-cor", start_date=start_date, end_date=end_date, - format="parquet", + save_format="parquet", ) radar_mendanha_path = query_data_from_gcp( radar_dataset_info["dataset_id"], @@ -294,7 +295,8 @@ model_params, ) prediction_dataset_ids = get_output_dataset_ids_on_gypscie(api, task_id) - prediction_datasets = get_prediction_on_gypscie(api, prediction_dataset_ids) + # prediction_datasets = get_prediction_on_gypscie(api, prediction_dataset_ids) + prediction_datasets = None desnormalized_prediction_datasets = desnormalize_data(prediction_datasets) now_datetime = get_now_datetime() geolocalized_prediction_datasets = geolocalize_data( diff --git a/pipelines/precipitation_model/rionowcast/tasks.py b/pipelines/precipitation_model/rionowcast/tasks.py index 7e44f06..b41c635 100644 --- a/pipelines/precipitation_model/rionowcast/tasks.py +++ b/pipelines/precipitation_model/rionowcast/tasks.py @@ -8,7 +8,7 @@ from time import sleep from typing import Dict, List # Tuple -import numpy as np +# import numpy as np import pandas as pd # import basedosdados as bd @@ -124,12 +124,12 @@ def register_dataset_on_gypscie(api, filepath: Path, domain_id: int = 1) -> Dict } """ log(f"\nStart registring dataset by sending {filepath} Data to Gypscie") - + # pylint: disable=use-maxsplit-arg data = { "domain_id": domain_id, "name": str(filepath) .split("/")[-1] - .split(".csv")[0], # pylint: disable=use-maxsplit-arg # TODO: nome tem que ser único + .split(".csv")[0], # TODO: nome tem que ser único } log(type(data), data) files = { @@ -326,6 +326,15 @@ def execute_prediction_on_gypscie( @task def task_wait_run(api, task_response, flow_type: str = "dataflow") -> Dict: + """ + Wait for a task to finish and return its result. + Args: + - api: Gypscie API instance + - task_response: Response from a Gypscie API task request + - flow_type: str, either "dataflow" or "processor", indicating the type of flow + Returns: + - A dictionary containing the result of the task, including its state and output datasets + """ return wait_run(api, task_response, flow_type) @@ -447,6 +456,8 @@ def geolocalize_data(prediction_datasets, now_datetime: str) -> pd.DataFrame: Expected columns: latitude, longitude, janela_predicao, valor_predicao, data_predicao (timestamp em que foi realizada a previsão) """ + now_datetime = pd.to_datetime(now_datetime) + prediction_datasets["data_predicao"] = now_datetime return prediction_datasets @@ -518,6 +529,7 @@ def create_and_save_image(data: xr.xarray, variable) -> Path: plt.show() return save_image_path """ + data = data - data.min() save_image_path = "image.png" return save_image_path