From 8d6d37e9d1afa9560cd8ba2e96e3c3130f86030a Mon Sep 17 00:00:00 2001 From: Helen Lin <46795546+helen-m-lin@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:40:45 -0700 Subject: [PATCH 1/4] feat: add filters in jobs status page (#134) --- src/aind_data_transfer_service/models.py | 29 +++- src/aind_data_transfer_service/server.py | 48 +++++-- .../templates/job_status.html | 136 ++++++++++++++++-- .../templates/job_status_table.html | 9 +- tests/resources/airflow_dag_run_response.json | 51 +++++++ tests/test_server.py | 120 +++++++++++++--- 6 files changed, 348 insertions(+), 45 deletions(-) create mode 100644 tests/resources/airflow_dag_run_response.json diff --git a/src/aind_data_transfer_service/models.py b/src/aind_data_transfer_service/models.py index 5e3a656..36d4a2d 100644 --- a/src/aind_data_transfer_service/models.py +++ b/src/aind_data_transfer_service/models.py @@ -1,9 +1,10 @@ """Module for data models used in application""" -from datetime import datetime +import ast +from datetime import datetime, timedelta, timezone from typing import List, Optional -from pydantic import AwareDatetime, BaseModel, Field +from pydantic import AwareDatetime, BaseModel, Field, field_validator class AirflowDagRun(BaseModel): @@ -37,12 +38,32 @@ class AirflowDagRunsRequestParameters(BaseModel): limit: int = 25 offset: int = 0 - order_by: str = "-start_date" + state: Optional[list[str]] = [] + execution_date_gte: Optional[str] = ( + datetime.now(timezone.utc) - timedelta(weeks=2) + ).isoformat() + execution_date_lte: Optional[str] = None + order_by: str = "-execution_date" + + @field_validator("execution_date_gte", mode="after") + def validate_min_execution_date(cls, execution_date_gte: str): + """Validate the earliest submit date filter is within 2 weeks""" + min_execution_date = datetime.now(timezone.utc) - timedelta(weeks=2) + # datetime.fromisoformat does not support Z in python < 3.11 + date_to_check = execution_date_gte.replace("Z", "+00:00") + if datetime.fromisoformat(date_to_check) < min_execution_date: + raise ValueError( + "execution_date_gte must be within the last 2 weeks" + ) + return execution_date_gte @classmethod def from_query_params(cls, query_params: dict): """Maps the query parameters to the model""" - return cls(**query_params) + params = dict(query_params) + if "state" in params: + params["state"] = ast.literal_eval(params["state"]) + return cls(**params) class JobStatus(BaseModel): diff --git a/src/aind_data_transfer_service/server.py b/src/aind_data_transfer_service/server.py index ff8f815..19398b2 100644 --- a/src/aind_data_transfer_service/server.py +++ b/src/aind_data_transfer_service/server.py @@ -30,6 +30,7 @@ from aind_data_transfer_service.hpc.client import HpcClient, HpcClientConfigs from aind_data_transfer_service.hpc.models import HpcJobSubmitSettings from aind_data_transfer_service.models import ( + AirflowDagRun, AirflowDagRunsRequestParameters, AirflowDagRunsResponse, JobStatus, @@ -390,29 +391,45 @@ async def get_job_status_list(request: Request): """Get status of jobs with default pagination of limit=25 and offset=0.""" # TODO: Use httpx async client try: - params = AirflowDagRunsRequestParameters.from_query_params( - request.query_params - ) - params_dict = json.loads(params.model_dump_json()) + url = os.getenv("AIND_AIRFLOW_SERVICE_JOBS_URL", "").strip("/") + get_one_job = request.query_params.get("dag_run_id") is not None + if get_one_job: + dag_run_id = request.query_params["dag_run_id"] + else: + params = AirflowDagRunsRequestParameters.from_query_params( + request.query_params + ) + params_dict = json.loads(params.model_dump_json()) + # Send request to Airflow to ListDagRuns or GetDagRun response_jobs = requests.get( - url=os.getenv("AIND_AIRFLOW_SERVICE_JOBS_URL"), + url=f"{url}/{dag_run_id}" if get_one_job else url, auth=( os.getenv("AIND_AIRFLOW_SERVICE_USER"), os.getenv("AIND_AIRFLOW_SERVICE_PASSWORD"), ), - params=params_dict, + params=None if get_one_job else params_dict, ) status_code = response_jobs.status_code if response_jobs.status_code == 200: - dag_runs = AirflowDagRunsResponse.model_validate_json( - json.dumps(response_jobs.json()) - ) + if get_one_job: + dag_run = AirflowDagRun.model_validate_json( + json.dumps(response_jobs.json()) + ) + dag_runs = AirflowDagRunsResponse( + dag_runs=[dag_run], total_entries=1 + ) + else: + dag_runs = AirflowDagRunsResponse.model_validate_json( + json.dumps(response_jobs.json()) + ) job_status_list = [ JobStatus.from_airflow_dag_run(d) for d in dag_runs.dag_runs ] message = "Retrieved job status list from airflow" data = { - "params": params_dict, + "params": ( + {"dag_run_id": dag_run_id} if get_one_job else params_dict + ), "total_entries": dag_runs.total_entries, "job_status_list": [ json.loads(j.model_dump_json()) for j in job_status_list @@ -420,7 +437,12 @@ async def get_job_status_list(request: Request): } else: message = "Error retrieving job status list from airflow" - data = {"params": params_dict, "errors": [response_jobs.json()]} + data = { + "params": ( + {"dag_run_id": dag_run_id} if get_one_job else params_dict + ), + "errors": [response_jobs.json()], + } except ValidationError as e: logging.error(e) status_code = 406 @@ -486,6 +508,9 @@ async def jobs(request: Request): default_offset = AirflowDagRunsRequestParameters.model_fields[ "offset" ].default + default_state = AirflowDagRunsRequestParameters.model_fields[ + "state" + ].default return templates.TemplateResponse( name="job_status.html", context=( @@ -493,6 +518,7 @@ async def jobs(request: Request): "request": request, "default_limit": default_limit, "default_offset": default_offset, + "default_state": default_state, "project_names_url": os.getenv( "AIND_METADATA_SERVICE_PROJECT_NAMES_URL" ), diff --git a/src/aind_data_transfer_service/templates/job_status.html b/src/aind_data_transfer_service/templates/job_status.html index 75890f7..1fa71e0 100644 --- a/src/aind_data_transfer_service/templates/job_status.html +++ b/src/aind_data_transfer_service/templates/job_status.html @@ -3,8 +3,12 @@ + + + + {% block title %} {% endblock %} AIND Data Transfer Service Jobs @@ -17,8 +23,9 @@ Status Submit Time Start Time - End time + End Time Comment + Tasks {% for job_status in job_status_list %} @@ -33,9 +40,40 @@ {{job_status.start_time}} {{job_status.end_time}} {{job_status.comment}} + + + {% endfor %} + + {% if status_code != 200 %}