Skip to content

Commit

Permalink
changing tasks names
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Oct 2, 2024
1 parent e8e4ec6 commit 6e023ef
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 63 deletions.
33 changes: 7 additions & 26 deletions pipelines/precipitation_model/rionowcast/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
geolocalize_data,
get_dataset_processor_info,
get_dataflow_params,
get_prediction_dataset_ids_on_gypscie,
get_output_dataset_ids_on_gypscie,
get_prediction_on_gypscie,
query_data_from_gcp,
register_dataset_on_gypscie,
# wait_task_run,
task_wait_run,
)
from pipelines.tasks import ( # pylint: disable=E0611, E0401
task_create_partitions,
Expand All @@ -54,28 +54,6 @@
name="WEATHER FORECAST: Pré-processamento dos dados - Rionowcast",
) as preprocessing_previsao_chuva_rionowcast:

# Data parameters
# start_date = Parameter("start_date", default=None, required=False)
# end_date = Parameter("end_date", default=None, required=False)
# weather_dateset_info = Parameter(
# "weather_dataset_info",
# default={
# "dataset_id": "clima_estacao_meteorologica",
# "table_id": "meteorologia_inmet",
# "filename": "weather_station_bq"
# },
# required=False
# )
# pluviometer_dataset_info = Parameter(
# "pluviometer_dataset_info",
# default={
# "dataset_id": "clima_pluviometer",
# "table_id": "taxa_precipitacao_alertario",
# "filename": "gauge_station_bq",
# },
# required=False
# )

# Parameters to run a query on Bigquery
bd_project_mode = Parameter("bd_project_mode", default="prod", required=False)
billing_project_id = Parameter("billing_project_id", default="rj-cor", required=False)
Expand Down Expand Up @@ -146,14 +124,17 @@
"station_type": station_type,
}

treated_datasets = execute_dataset_processor(
dataset_processor_task_id = execute_dataset_processor(
api,
processor_id=dataset_processor_id,
dataset_id=[dataset_response["id"]],
environment_id=environment_id,
project_id=project_id,
parameters=processor_parameters,
)
task_wait_run(api, task_response, flow_type)
output_datasets_id = get_output_dataset_ids_on_gypscie(api, dataset_processor_task_id)

# TODO: criar função para adicionar a coluna de update_date
# Save pre-treated data on local file with partitions
now_datetime = get_now_datetime()
Expand Down Expand Up @@ -298,7 +279,7 @@
api,
model_params,
)
prediction_dataset_ids = get_prediction_dataset_ids_on_gypscie(api, task_id)
prediction_dataset_ids = get_output_dataset_ids_on_gypscie(api, task_id)
prediction_datasets = get_prediction_on_gypscie(api, prediction_dataset_ids)
desnormalized_prediction_datasets = desnormalize_data(prediction_datasets)
now_datetime = get_now_datetime()
Expand Down
43 changes: 25 additions & 18 deletions pipelines/precipitation_model/rionowcast/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pipelines.constants import constants # pylint: disable=E0611, E0401
from pipelines.precipitation_model.rionowcast.utils import ( # pylint: disable=E0611, E0401
GypscieApi,
wait_task_run,
wait_run,
)


Expand Down Expand Up @@ -127,7 +127,7 @@ def register_dataset_on_gypscie(api, filepath: Path, domain_id: int = 1) -> Dict

data = {
"domain_id": domain_id,
"name": str(filepath).split("/")[-1].split(".csv")[0], # pylint: disable=use-maxsplit-arg
"name": str(filepath).split("/")[-1].split(".csv")[0], # pylint: disable=use-maxsplit-arg # TODO: nome tem que ser único
}
log(type(data), data)
files = {
Expand Down Expand Up @@ -190,18 +190,20 @@ def execute_dataset_processor(
"project_id": project_id,
},
)
# task_response = {'task_id': '227e74bc-0057-4e63-a30f-8374604e442b'}

response = wait_task_run(api, task_response.json())
# response = wait_run(api, task_response.json())

if response["state"] != "SUCCESS":
failed_message = "Error processing this dataset. Stop flow or restart this task"
log(failed_message)
task_state = Failed(failed_message)
raise ENDRUN(state=task_state)
# if response["state"] != "SUCCESS":
# failed_message = "Error processing this dataset. Stop flow or restart this task"
# log(failed_message)
# task_state = Failed(failed_message)
# raise ENDRUN(state=task_state)

output_datasets = response["result"]["output_datasets"] # returns a list with datasets
log(f"\nFinish executing dataset processing, we have {len(output_datasets)} datasets")
return output_datasets
# output_datasets = response["result"]["output_datasets"] # returns a list with datasets
# log(f"\nFinish executing dataset processing, we have {len(output_datasets)} datasets")
# return output_datasets
return task_response.json(["task_id"])


@task()
Expand Down Expand Up @@ -252,7 +254,7 @@ def query_data_from_gcp( # pylint: disable=too-many-arguments
if not os.path.exists(directory_path):
os.makedirs(directory_path)

savepath = directory_path / f"{dataset_id}_{table_id}"
savepath = directory_path / f"{dataset_id}_{table_id}" ### TODO:

# pylint: disable=consider-using-f-string
query = """
Expand Down Expand Up @@ -289,11 +291,12 @@ def query_data_from_gcp( # pylint: disable=too-many-arguments
@task()
def execute_prediction_on_gypscie(
api,
model_params,
model_params: dict,
# hours_to_predict,
):
) -> str:
"""
Requisição de execução de um processo de Predição
Return task_id
"""
log("Starting prediction")
task_response = api.post(
Expand All @@ -305,7 +308,7 @@ def execute_prediction_on_gypscie(
# "dataset_id": dataset_id,
# "project_id": project_id,
# },
response = wait_task_run(api, task_response.json())
response = wait_run(api, task_response.json())

if response["state"] != "SUCCESS":
failed_message = "Error processing this dataset. Stop flow or restart this task"
Expand All @@ -318,6 +321,10 @@ def execute_prediction_on_gypscie(

return response.json().get("task_id") # response.json().get('task_id')

@task
def task_wait_run(api, task_response, flow_type: str = "dataflow") -> Dict:
return wait_run(api, task_response, flow_type)


@task
def get_dataflow_params( # pylint: disable=too-many-arguments
Expand Down Expand Up @@ -376,7 +383,7 @@ def get_dataflow_params( # pylint: disable=too-many-arguments


@task()
def get_prediction_dataset_ids_on_gypscie(
def get_output_dataset_ids_on_gypscie(
api,
task_id,
) -> List:
Expand All @@ -388,14 +395,14 @@ def get_prediction_dataset_ids_on_gypscie(
response = response.json()
except HTTPError as err:
if err.response.status_code == 404:
print(f"Task {os.environ['DATAFLOW_TASK_ID']} not found")
print(f"Task {task_id} not found")
return []

return response.get("output_datasets")


@task()
def get_prediction_on_gypscie(
def get_output_dataset_on_gypscie(
api,
prediction_dataset_ids,
):
Expand Down
4 changes: 2 additions & 2 deletions pipelines/precipitation_model/rionowcast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def bq_project(kind: str = "bigquery_prod"):
return bd.upload.base.Base().client[kind].project


def wait_task_run(api, task_response, flow_type: str = "dataflow") -> Dict:
def wait_run(api, task_response, flow_type: str = "dataflow") -> Dict:
"""
Force flow wait for the end of data processing
flow_type: dataflow or processor
Expand All @@ -149,7 +149,7 @@ def wait_task_run(api, task_response, flow_type: str = "dataflow") -> Dict:
log(f"Response state: {response['state']}")
while response["state"] == "STARTED":
sleep(5)
response = wait_task_run(api, task_response)
response = wait_run(api, task_response)

if response["state"] != "SUCCESS":
log("Error processing this dataset. Stop flow or restart this task")
Expand Down
34 changes: 17 additions & 17 deletions pipelines/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,27 +340,27 @@ def bq_project(kind: str = "bigquery_prod"):
return bd.upload.base.Base().client[kind].project


def wait_task_run(api, task_id) -> Dict:
"""
Force flow wait for the end of data processing
"""
if "task_id" in task_id.keys():
_id = task_id.get("task_id")
# def wait_task_run(api, task_id) -> Dict:
# """
# Force flow wait for the end of data processing
# """
# if "task_id" in task_id.keys():
# _id = task_id.get("task_id")

# Requisição do resultado da task_id
response = api.get(
path="status_processor_run/" + _id,
)
# # Requisição do resultado da task_id
# response = api.get(
# path="status_processor_run/" + _id,
# )

print(f"Response state: {response['state']}")
while response["state"] == "STARTED":
sleep(5)
response = wait_task_run(api, task_id)
# print(f"Response state: {response['state']}")
# while response["state"] == "STARTED":
# sleep(5)
# response = wait_task_run(api, task_id)

if response["state"] != "SUCCESS":
print("Error processing this dataset. Stop flow or restart this task")
# if response["state"] != "SUCCESS":
# print("Error processing this dataset. Stop flow or restart this task")

return response
# return response


###############
Expand Down

0 comments on commit 6e023ef

Please sign in to comment.