Skip to content

Commit

Permalink
Merge pull request #139 from AllenNeuralDynamics/release-v1.2.0
Browse files Browse the repository at this point in the history
Release v1.2.0
  • Loading branch information
helen-m-lin authored Aug 30, 2024
2 parents ccaa825 + e691c89 commit 1c41842
Show file tree
Hide file tree
Showing 10 changed files with 1,810 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/aind_data_transfer_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Init package"""
import os

__version__ = "1.1.0"
__version__ = "1.2.0"

# Global constants
OPEN_DATA_BUCKET_NAME = os.getenv("OPEN_DATA_BUCKET_NAME", "open")
Expand Down
150 changes: 144 additions & 6 deletions src/aind_data_transfer_service/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Module for data models used in application"""

from datetime import datetime
from typing import List, Optional
import ast
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Union

from pydantic import AwareDatetime, BaseModel, Field
from pydantic import AwareDatetime, BaseModel, Field, field_validator
from starlette.datastructures import QueryParams


class AirflowDagRun(BaseModel):
Expand Down Expand Up @@ -37,12 +39,113 @@ class AirflowDagRunsRequestParameters(BaseModel):

limit: int = 25
offset: int = 0
order_by: str = "-start_date"
state: Optional[list[str]] = []
execution_date_gte: Optional[str] = (
datetime.now(timezone.utc) - timedelta(weeks=2)
).isoformat()
execution_date_lte: Optional[str] = None
order_by: str = "-execution_date"

@field_validator("execution_date_gte", mode="after")
def validate_min_execution_date(cls, execution_date_gte: str):
"""Validate the earliest submit date filter is within 2 weeks"""
min_execution_date = datetime.now(timezone.utc) - timedelta(weeks=2)
# datetime.fromisoformat does not support Z in python < 3.11
date_to_check = execution_date_gte.replace("Z", "+00:00")
if datetime.fromisoformat(date_to_check) < min_execution_date:
raise ValueError(
"execution_date_gte must be within the last 2 weeks"
)
return execution_date_gte

@classmethod
def from_query_params(cls, query_params: dict):
def from_query_params(cls, query_params: QueryParams):
"""Maps the query parameters to the model"""
return cls(**query_params)
params = dict(query_params)
if "state" in params:
params["state"] = ast.literal_eval(params["state"])
return cls.model_validate(params)


class AirflowDagRunRequestParameters(BaseModel):
"""Model for parameters when requesting info from dag_run endpoint"""

dag_run_id: str = Field(..., min_length=1)

@classmethod
def from_query_params(cls, query_params: QueryParams):
"""Maps the query parameters to the model"""
params = dict(query_params)
return cls.model_validate(params)


class AirflowTaskInstancesRequestParameters(BaseModel):
"""Model for parameters when requesting info from task_instances
endpoint"""

dag_run_id: str = Field(..., min_length=1)

@classmethod
def from_query_params(cls, query_params: QueryParams):
"""Maps the query parameters to the model"""
params = dict(query_params)
return cls.model_validate(params)


class AirflowTaskInstance(BaseModel):
"""Data model for task_instance entry when requesting info from airflow"""

dag_id: Optional[str]
dag_run_id: Optional[str]
duration: Optional[Union[int, float]]
end_date: Optional[AwareDatetime]
execution_date: Optional[AwareDatetime]
executor_config: Optional[str]
hostname: Optional[str]
map_index: Optional[int]
max_tries: Optional[int]
note: Optional[str]
operator: Optional[str]
pid: Optional[int]
pool: Optional[str]
pool_slots: Optional[int]
priority_weight: Optional[int]
queue: Optional[str]
queued_when: Optional[AwareDatetime]
rendered_fields: Optional[dict]
sla_miss: Optional[dict]
start_date: Optional[AwareDatetime]
state: Optional[str]
task_id: Optional[str]
trigger: Optional[dict]
triggerer_job: Optional[dict]
try_number: Optional[int]
unixname: Optional[str]


class AirflowTaskInstancesResponse(BaseModel):
"""Data model for response when requesting info from task_instances
endpoint"""

task_instances: List[AirflowTaskInstance]
total_entries: int


class AirflowTaskInstanceLogsRequestParameters(BaseModel):
"""Model for parameters when requesting info from task_instance_logs
endpoint"""

# excluded fields are used to build the url
dag_run_id: str = Field(..., min_length=1, exclude=True)
task_id: str = Field(..., min_length=1, exclude=True)
try_number: int = Field(..., ge=0, exclude=True)
full_content: bool = True

@classmethod
def from_query_params(cls, query_params: QueryParams):
"""Maps the query parameters to the model"""
params = dict(query_params)
return cls.model_validate(params)


class JobStatus(BaseModel):
Expand Down Expand Up @@ -74,3 +177,38 @@ def from_airflow_dag_run(cls, airflow_dag_run: AirflowDagRun):
def jinja_dict(self):
"""Map model to a dictionary that jinja can render"""
return self.model_dump(exclude_none=True)


class JobTasks(BaseModel):
"""Model for what is rendered to the user for each task."""

job_id: Optional[str] = Field(None)
task_id: Optional[str] = Field(None)
try_number: Optional[int] = Field(None)
task_state: Optional[str] = Field(None)
priority_weight: Optional[int] = Field(None)
map_index: Optional[int] = Field(None)
submit_time: Optional[datetime] = Field(None)
start_time: Optional[datetime] = Field(None)
end_time: Optional[datetime] = Field(None)
duration: Optional[Union[int, float]] = Field(None)
comment: Optional[str] = Field(None)

@classmethod
def from_airflow_task_instance(
cls, airflow_task_instance: AirflowTaskInstance
):
"""Maps the fields from the HpcJobStatusResponse to this model"""
return cls(
job_id=airflow_task_instance.dag_run_id,
task_id=airflow_task_instance.task_id,
try_number=airflow_task_instance.try_number,
task_state=airflow_task_instance.state,
priority_weight=airflow_task_instance.priority_weight,
map_index=airflow_task_instance.map_index,
submit_time=airflow_task_instance.execution_date,
start_time=airflow_task_instance.start_date,
end_time=airflow_task_instance.end_date,
duration=airflow_task_instance.duration,
comment=airflow_task_instance.note,
)
Loading

0 comments on commit 1c41842

Please sign in to comment.