Skip to content

Commit

Permalink
ecfmp: task and API endpoint
Browse files Browse the repository at this point in the history
part of #17
  • Loading branch information
globin committed Sep 24, 2023
1 parent 351588b commit 10b12ca
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 1 deletion.
32 changes: 32 additions & 0 deletions atciss/app/controllers/ecfmp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Application controllers - ECFMP."""
from typing import Annotated, Optional, cast
from fastapi import APIRouter, Depends, HTTPException
from pydantic import TypeAdapter

from atciss.app.views.ecfmp import FlowMeasure

from ..controllers.auth import get_user
from ..models import User

from ..utils.redis import RedisClient

router = APIRouter()


@router.get(
"/ecfmp/{fir}",
responses={404: {}},
)
async def get_flow_measures(
fir: str,
user: Annotated[User, Depends(get_user)],
) -> list[FlowMeasure]:
"""Get ECFMP flow measures for a FIR."""
async with RedisClient.open() as redis_client:
flow_measures = cast(
Optional[str], await redis_client.get(f"ecfmp:flow_measures:{fir}")
)
if flow_measures is None:
raise HTTPException(status_code=404)

return TypeAdapter(list[FlowMeasure]).validate_json(flow_measures)
15 changes: 14 additions & 1 deletion atciss/app/router.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
"""Application configuration - root APIRouter."""
from fastapi import APIRouter

from .controllers import ready, metar, notam, atis, ad, auth, airspace, vatsim, loa, taf
from .controllers import (
ready,
metar,
notam,
atis,
ad,
auth,
airspace,
vatsim,
loa,
taf,
ecfmp,
)


root_api_router = APIRouter(prefix="/api")
Expand All @@ -16,3 +28,4 @@
root_api_router.include_router(auth.router, tags=["user"])
root_api_router.include_router(airspace.router, tags=["airspace"])
root_api_router.include_router(loa.router, tags=["airspace"])
root_api_router.include_router(ecfmp.router, tags=["airspace"])
47 changes: 47 additions & 0 deletions atciss/app/tasks/ecfmp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from collections import defaultdict
from loguru import logger
from pydantic import TypeAdapter

from atciss.app.utils.redis import RedisClient

from ..views.ecfmp import ECFMP, Event, FlowMeasure

from ..utils import AiohttpClient, ClientConnectorError


async def fetch_ecfmp() -> None:
"""Periodically fetch loa data."""
redis_client = await RedisClient.get()

async with AiohttpClient.get() as aiohttp_client:
try:
res = await aiohttp_client.get(
"https://ecfmp.vatsim.net/api/v1/plugin",
)
except ClientConnectorError as e:
logger.exception(f"Could not connect {e!s}")
return

ecfmp = ECFMP.model_validate(await res.json())

logger.info(f"ECFMP: {len(ecfmp.flow_measures)} flow measures received")
logger.info(f"ECFMP: {len(ecfmp.events)} events received")

async with redis_client.pipeline() as pipe:
flow_measures_by_fir = defaultdict(list)
for flow_measure in ecfmp.flow_measures:
for fir in flow_measure.notified_firs:
flow_measures_by_fir[fir].append(flow_measure)
events_by_fir = defaultdict(list)
for event in ecfmp.events:
events_by_fir[event.fir].append(event)

for fir, flow_measures in flow_measures_by_fir.items():
pipe.set(
f"ecfmp:flow_measures:{fir}",
TypeAdapter(list[FlowMeasure]).dump_json(flow_measures),
)
for fir, events in events_by_fir.items():
pipe.set(f"ecfmp:events:{fir}", TypeAdapter(list[Event]).dump_json(events))

await pipe.execute()
114 changes: 114 additions & 0 deletions atciss/app/views/ecfmp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from dataclasses import dataclass
from typing import Any, Literal, Optional

from pydantic import AwareDatetime, BaseModel, model_validator


class Event(BaseModel):
id: int
name: str
date_start: AwareDatetime
date_end: AwareDatetime
fir: str


@dataclass
class FIR:
identifier: str
name: str


@dataclass
class FilterEvent:
event_id: int
event_vatcan: Optional[str]


@dataclass
class Filter:
type: Literal[
"ADEP",
"ADES",
"level_above",
"level_below",
"level",
"member_event",
"member_non_event",
"waypoint",
]
value: list[str | int | FilterEvent] | int


@dataclass
class Measure:
type: Literal[
"prohibit",
"minimum_departure_interval",
"average_departure_interval",
"per_hour",
"miles_in_trail",
"max_ias",
"max_mach",
"ias_reduction",
"mach_reduction",
"ground_stop",
"mandatory_route",
]
value: Optional[int | list[str]]


class FlowMeasure(BaseModel):
ident: str
event_id: Optional[int]
reason: str
starttime: AwareDatetime
endtime: AwareDatetime
measure: Measure
filters: list[Filter]
notified_firs: list[str]
withdrawn_at: Optional[AwareDatetime]


class ECFMP(BaseModel):
events: list[Event]
flight_information_regions: list[FIR]
flow_measures: list[FlowMeasure]

@model_validator(mode="before")
@classmethod
def replace_fir_id(cls, input: Any) -> Any:
flow_measures = [
fm
| {
"notified_firs": fm.get("notified_firs")
or [
next(
(
fir["identifier"]
for fir in input["flight_information_regions"]
if fir["id"] == fir_id
),
"ZZZZ",
)
for fir_id in fm["notified_flight_information_regions"]
]
}
for fm in input["flow_measures"]
]
events = [
e
| {
"fir": e.get("fir")
or next(
(
fir["identifier"]
for fir in input["flight_information_regions"]
if fir["id"] == e["flight_information_region_id"]
),
"ZZZZ",
)
}
for e in input["events"]
]

return input | {"flow_measures": flow_measures, "events": events}
8 changes: 8 additions & 0 deletions atciss/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def setup_task_logging_hook(loglevel: str, **_: dict[Any, Any]) -> None:
"update_sectors": {"task": "update_sectors", "schedule": crontab(minute="*/60")},
"update_vatsim": {"task": "update_vatsim", "schedule": crontab(minute="*")},
"update_taf_metar": {"task": "update_taf_metar", "schedule": crontab(minute="*")},
"update_ecfmp": {"task": "update_ecfmp", "schedule": crontab(minute="*")},
"update_dfs_ad_data": {
"task": "update_dfs_ad_data",
"schedule": crontab(day_of_week="1"),
Expand Down Expand Up @@ -80,3 +81,10 @@ def update_dfs_ad_data():
from atciss.app.tasks.dfs_ad import fetch_dfs_ad_data

async_to_sync(fetch_dfs_ad_data)()


@app.task(name="update_ecfmp")
def update_ecfmp():
from atciss.app.tasks.ecfmp import fetch_ecfmp

async_to_sync(fetch_ecfmp)()

0 comments on commit 10b12ca

Please sign in to comment.