Skip to content

Commit

Permalink
fixing rionowcast
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Oct 16, 2024
1 parent 673f221 commit 8aa99fe
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
12 changes: 7 additions & 5 deletions pipelines/precipitation_model/rionowcast/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
get_dataset_info,
get_dataset_processor_info,
get_output_dataset_ids_on_gypscie,
get_prediction_on_gypscie,
# get_prediction_on_gypscie,
query_data_from_gcp,
register_dataset_on_gypscie,
)
Expand Down Expand Up @@ -111,7 +111,7 @@
billing_project_id="rj-cor",
start_date=start_date,
end_date=end_date,
format="parquet",
save_format="parquet",
)

with case(station_type, "radar"):
Expand Down Expand Up @@ -187,7 +187,8 @@
# preprocessing_previsao_chuva_rionowcast.schedule = update_schedule

# https://github.com/prefeitura-rio/pipelines_rj_escritorio/blob/
# 2433238db27adb1213059832f238495b9ecb5043/pipelines/deteccao_alagamento_cameras/flooding_detection/flows.py#L112
# 2433238db27adb1213059832f238495b9ecb5043/pipelines/deteccao_alagamento_cameras/
# flooding_detection/flows.py#L112
# https://linen.prefect.io/t/13543083/how-do-i-run-the-same-subflow-concurrently-for-items-in-a-li


Expand Down Expand Up @@ -260,7 +261,7 @@
billing_project_id="rj-cor",
start_date=start_date,
end_date=end_date,
format="parquet",
save_format="parquet",
)
radar_mendanha_path = query_data_from_gcp(
radar_dataset_info["dataset_id"],
Expand Down Expand Up @@ -294,7 +295,8 @@
model_params,
)
prediction_dataset_ids = get_output_dataset_ids_on_gypscie(api, task_id)
prediction_datasets = get_prediction_on_gypscie(api, prediction_dataset_ids)
# prediction_datasets = get_prediction_on_gypscie(api, prediction_dataset_ids)
prediction_datasets = None
desnormalized_prediction_datasets = desnormalize_data(prediction_datasets)
now_datetime = get_now_datetime()
geolocalized_prediction_datasets = geolocalize_data(
Expand Down
18 changes: 15 additions & 3 deletions pipelines/precipitation_model/rionowcast/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from time import sleep
from typing import Dict, List # Tuple

import numpy as np
# import numpy as np
import pandas as pd

# import basedosdados as bd
Expand Down Expand Up @@ -124,12 +124,12 @@ def register_dataset_on_gypscie(api, filepath: Path, domain_id: int = 1) -> Dict
}
"""
log(f"\nStart registring dataset by sending {filepath} Data to Gypscie")

# pylint: disable=use-maxsplit-arg
data = {
"domain_id": domain_id,
"name": str(filepath)
.split("/")[-1]
.split(".csv")[0], # pylint: disable=use-maxsplit-arg # TODO: nome tem que ser único
.split(".csv")[0], # TODO: nome tem que ser único
}
log(type(data), data)
files = {
Expand Down Expand Up @@ -326,6 +326,15 @@ def execute_prediction_on_gypscie(

@task
def task_wait_run(api, task_response, flow_type: str = "dataflow") -> Dict:
"""
Wait for a task to finish and return its result.
Args:
- api: Gypscie API instance
- task_response: Response from a Gypscie API task request
- flow_type: str, either "dataflow" or "processor", indicating the type of flow
Returns:
- A dictionary containing the result of the task, including its state and output datasets
"""
return wait_run(api, task_response, flow_type)


Expand Down Expand Up @@ -447,6 +456,8 @@ def geolocalize_data(prediction_datasets, now_datetime: str) -> pd.DataFrame:
Expected columns: latitude, longitude, janela_predicao,
valor_predicao, data_predicao (timestamp em que foi realizada a previsão)
"""
now_datetime = pd.to_datetime(now_datetime)
prediction_datasets["data_predicao"] = now_datetime
return prediction_datasets


Expand Down Expand Up @@ -518,6 +529,7 @@ def create_and_save_image(data: xr.xarray, variable) -> Path:
plt.show()
return save_image_path
"""
data = data - data.min()
save_image_path = "image.png"

return save_image_path
Expand Down

0 comments on commit 8aa99fe

Please sign in to comment.