From 632902f184d29743b252cc4a9e78ee43c9273cdb Mon Sep 17 00:00:00 2001 From: Helen Lin Date: Wed, 21 Aug 2024 17:30:03 -0700 Subject: [PATCH 1/8] feat: /api/v1/get_tasks_list for tasks instances from airflow --- src/aind_data_transfer_service/models.py | 77 +++++++++++++++++++++++- src/aind_data_transfer_service/server.py | 66 ++++++++++++++++++++ 2 files changed, 142 insertions(+), 1 deletion(-) diff --git a/src/aind_data_transfer_service/models.py b/src/aind_data_transfer_service/models.py index 36d4a2d..6316a2a 100644 --- a/src/aind_data_transfer_service/models.py +++ b/src/aind_data_transfer_service/models.py @@ -2,7 +2,7 @@ import ast from datetime import datetime, timedelta, timezone -from typing import List, Optional +from typing import List, Optional, Union from pydantic import AwareDatetime, BaseModel, Field, field_validator @@ -66,6 +66,52 @@ def from_query_params(cls, query_params: dict): return cls(**params) +class AirflowTaskInstancesRequestParameters(BaseModel): + """Model for parameters when requesting info from task_instances endpoint""" + + dag_run_id: str = Field(..., min_length=1) + + @classmethod + def from_query_params(cls, query_params: dict): + """Maps the query parameters to the model""" + return cls(**query_params) + +class AirflowTaskInstance(BaseModel): + """Data model for task_instance entry when requesting info from airflow""" + + dag_id: Optional[str] + dag_run_id: Optional[str] + duration: Optional[Union[int, float]] + end_date: Optional[AwareDatetime] + execution_date: Optional[AwareDatetime] + executor_config: Optional[str] + hostname: Optional[str] + map_index: Optional[int] + max_tries: Optional[int] + note: Optional[str] + operator: Optional[str] + pid: Optional[int] + pool: Optional[str] + pool_slots: Optional[int] + priority_weight: Optional[int] + queue: Optional[str] + queued_when: Optional[AwareDatetime] + rendered_fields: Optional[dict] + sla_miss: Optional[dict] + start_date: Optional[AwareDatetime] + state: Optional[str] + task_id: Optional[str] + trigger: Optional[dict] + triggerer_job: Optional[dict] + try_number: Optional[int] + unixname: Optional[str] + +class AirflowTaskInstancesResponse(BaseModel): + """Data model for response when requesting info from task_instances endpoint""" + + task_instances: List[AirflowTaskInstance] + total_entries: int + class JobStatus(BaseModel): """Model for what we want to render to the user.""" @@ -95,3 +141,32 @@ def from_airflow_dag_run(cls, airflow_dag_run: AirflowDagRun): def jinja_dict(self): """Map model to a dictionary that jinja can render""" return self.model_dump(exclude_none=True) + + +class JobTasks(BaseModel): + """Model for what is rendered to the user for each task.""" + + job_id: Optional[str] = Field(None) + task_id: Optional[str] = Field(None) + try_number: Optional[int] = Field(None) + task_state: Optional[str] = Field(None) + priority_weight: Optional[int] = Field(None) + map_index: Optional[int] = Field(None) + start_time: Optional[datetime] = Field(None) + end_time: Optional[datetime] = Field(None) + duration: Optional[Union[int, float]] = Field(None) + + @classmethod + def from_airflow_task_instance(cls, airflow_task_instance: AirflowTaskInstance): + """Maps the fields from the HpcJobStatusResponse to this model""" + return cls( + job_id=airflow_task_instance.dag_run_id, + task_id=airflow_task_instance.task_id, + try_number=airflow_task_instance.try_number, + task_state=airflow_task_instance.state, + priority_weight=airflow_task_instance.priority_weight, + map_index=airflow_task_instance.map_index, + start_time=airflow_task_instance.start_date, + end_time=airflow_task_instance.end_date, + duration=airflow_task_instance.duration, + ) diff --git a/src/aind_data_transfer_service/server.py b/src/aind_data_transfer_service/server.py index 19398b2..f0e3679 100644 --- a/src/aind_data_transfer_service/server.py +++ b/src/aind_data_transfer_service/server.py @@ -33,7 +33,10 @@ AirflowDagRun, AirflowDagRunsRequestParameters, AirflowDagRunsResponse, + AirflowTaskInstancesRequestParameters, + AirflowTaskInstancesResponse, JobStatus, + JobTasks, ) template_directory = os.path.abspath( @@ -461,6 +464,64 @@ async def get_job_status_list(request: Request): }, ) +async def get_tasks_list(request: Request): + """Get list of tasks instances given a job id.""" + try: + url = os.getenv("AIND_AIRFLOW_SERVICE_JOBS_URL", "").strip("/") + params = AirflowTaskInstancesRequestParameters.from_query_params( + request.query_params + ) + params_dict = json.loads(params.model_dump_json()) + response_tasks = requests.get( + url=f"{url}/{params.dag_run_id}/taskInstances", + auth=( + os.getenv("AIND_AIRFLOW_SERVICE_USER"), + os.getenv("AIND_AIRFLOW_SERVICE_PASSWORD"), + ), + ) + status_code = response_tasks.status_code + if response_tasks.status_code == 200: + task_instances = AirflowTaskInstancesResponse.model_validate_json( + json.dumps(response_tasks.json()) + ) + job_tasks_list = sorted( + [ + JobTasks.from_airflow_task_instance(d) for d in task_instances.task_instances + ], + key=lambda x: (-x.priority_weight, x.map_index), + ) + message = "Retrieved job tasks list from airflow" + data = { + "params": params_dict, + "total_entries": task_instances.total_entries, + "job_tasks_list": [ + json.loads(j.model_dump_json()) for j in job_tasks_list + ] + } + else: + message = "Error retrieving job tasks list from airflow" + data = { + "params": params_dict, + "errors": [response_tasks.json()] + } + except ValidationError as e: + logging.error(e) + status_code = 406 + message = "Error validating request parameters" + data = {"errors": json.loads(e.json())} + except Exception as e: + logging.error(e) + status_code = 500 + message = "Unable to retrieve job tasks list from airflow" + data = {"errors": [f"{e.__class__.__name__}{e.args}"]} + return JSONResponse( + status_code=status_code, + content={ + "message": message, + "data": data, + }, + ) + async def index(request: Request): """GET|POST /: form handler""" @@ -571,6 +632,11 @@ async def download_job_template(_: Request): endpoint=get_job_status_list, methods=["GET"], ), + Route( + "/api/v1/get_tasks_list", + endpoint=get_tasks_list, + methods=["GET"], + ), Route("/jobs", endpoint=jobs, methods=["GET"]), Route("/job_status_table", endpoint=job_status_table, methods=["GET"]), Route( From 5375bad683b38baa1bfcd24c8dd5e3ec6c3b87e3 Mon Sep 17 00:00:00 2001 From: Helen Lin Date: Wed, 21 Aug 2024 17:30:45 -0700 Subject: [PATCH 2/8] feat: job tasks table UI --- src/aind_data_transfer_service/server.py | 21 ++++++ .../templates/job_status_table.html | 2 + .../templates/job_tasks_table.html | 75 +++++++++++++++++++ 3 files changed, 98 insertions(+) create mode 100644 src/aind_data_transfer_service/templates/job_tasks_table.html diff --git a/src/aind_data_transfer_service/server.py b/src/aind_data_transfer_service/server.py index f0e3679..354ac61 100644 --- a/src/aind_data_transfer_service/server.py +++ b/src/aind_data_transfer_service/server.py @@ -560,6 +560,26 @@ async def job_status_table(request: Request): ), ) +async def job_tasks_table(request: Request): + """Get Job Tasks table given a job id""" + response_tasks = await get_tasks_list(request) + response_tasks_json = json.loads(response_tasks.body) + data = response_tasks_json.get("data") + params = data.get("params") + return templates.TemplateResponse( + name="job_tasks_table.html", + context=( + { + "request": request, + "status_code": response_tasks.status_code, + "message": response_tasks_json.get("message"), + "errors": data.get("errors", []), + "dag_run_id": params.get("dag_run_id") if params else None, + "total_entries": data.get("total_entries", 0), + "job_tasks_list": data.get("job_tasks_list", []), + } + ), + ) async def jobs(request: Request): """Get Job Status page with pagination""" @@ -639,6 +659,7 @@ async def download_job_template(_: Request): ), Route("/jobs", endpoint=jobs, methods=["GET"]), Route("/job_status_table", endpoint=job_status_table, methods=["GET"]), + Route("/job_tasks_table", endpoint=job_tasks_table, methods=["GET"]), Route( "/api/job_upload_template", endpoint=download_job_template, diff --git a/src/aind_data_transfer_service/templates/job_status_table.html b/src/aind_data_transfer_service/templates/job_status_table.html index b283c7f..42fb541 100644 --- a/src/aind_data_transfer_service/templates/job_status_table.html +++ b/src/aind_data_transfer_service/templates/job_status_table.html @@ -19,6 +19,7 @@ Start Time End time Comment + Tasks {% for job_status in job_status_list %} @@ -33,6 +34,7 @@ {{job_status.start_time}} {{job_status.end_time}} {{job_status.comment}} + View tasks {% endfor %} diff --git a/src/aind_data_transfer_service/templates/job_tasks_table.html b/src/aind_data_transfer_service/templates/job_tasks_table.html new file mode 100644 index 0000000..c18e0eb --- /dev/null +++ b/src/aind_data_transfer_service/templates/job_tasks_table.html @@ -0,0 +1,75 @@ + + + + + + + + + + + +

Task Instances

+

DAG Run ID: {{dag_run_id}}

+ + + + + + + + + + + + + {% for job_task in job_tasks_list %} + + + + + + + + + + + {% endfor %} +
Task IDTry NumberPriority WeightMap IndexStateStart TimeEnd TimeDuration
{{job_task.task_id}}{{job_task.try_number}}{{job_task.priority_weight}}{{job_task.map_index}}{{job_task.task_state}}{{job_task.start_time}}{{job_task.end_time}}{{job_task.duration}}
+ + {% if status_code != 200 %} + + {% endif %} + + + + \ No newline at end of file From 024dba1207ed084282c031f1a503c483c54c5957 Mon Sep 17 00:00:00 2001 From: Helen Lin Date: Thu, 22 Aug 2024 16:54:30 -0700 Subject: [PATCH 3/8] feat: job tasks modal popup --- src/aind_data_transfer_service/models.py | 4 + .../templates/job_status_table.html | 74 ++++++++++++++++++- .../templates/job_tasks_table.html | 17 ++--- 3 files changed, 81 insertions(+), 14 deletions(-) diff --git a/src/aind_data_transfer_service/models.py b/src/aind_data_transfer_service/models.py index 6316a2a..8e2fefb 100644 --- a/src/aind_data_transfer_service/models.py +++ b/src/aind_data_transfer_service/models.py @@ -152,9 +152,11 @@ class JobTasks(BaseModel): task_state: Optional[str] = Field(None) priority_weight: Optional[int] = Field(None) map_index: Optional[int] = Field(None) + submit_time: Optional[datetime] = Field(None) start_time: Optional[datetime] = Field(None) end_time: Optional[datetime] = Field(None) duration: Optional[Union[int, float]] = Field(None) + comment: Optional[str] = Field(None) @classmethod def from_airflow_task_instance(cls, airflow_task_instance: AirflowTaskInstance): @@ -166,7 +168,9 @@ def from_airflow_task_instance(cls, airflow_task_instance: AirflowTaskInstance): task_state=airflow_task_instance.state, priority_weight=airflow_task_instance.priority_weight, map_index=airflow_task_instance.map_index, + submit_time=airflow_task_instance.execution_date, start_time=airflow_task_instance.start_date, end_time=airflow_task_instance.end_date, duration=airflow_task_instance.duration, + comment=airflow_task_instance.note, ) diff --git a/src/aind_data_transfer_service/templates/job_status_table.html b/src/aind_data_transfer_service/templates/job_status_table.html index 42fb541..d7dcd06 100644 --- a/src/aind_data_transfer_service/templates/job_status_table.html +++ b/src/aind_data_transfer_service/templates/job_status_table.html @@ -4,8 +4,14 @@ + + @@ -17,7 +23,7 @@ Status Submit Time Start Time - End time + End Time Comment Tasks @@ -34,10 +40,40 @@ {{job_status.start_time}} {{job_status.end_time}} {{job_status.comment}} - View tasks + + + {% endfor %} + + {% if status_code != 200 %}