From 10b12ca1ffc4c7309c358aff13ee1928d53ea0bd Mon Sep 17 00:00:00 2001 From: Robin Gloster Date: Sun, 24 Sep 2023 15:47:40 +0200 Subject: [PATCH] ecfmp: task and API endpoint part of #17 --- atciss/app/controllers/ecfmp.py | 32 +++++++++ atciss/app/router.py | 15 ++++- atciss/app/tasks/ecfmp.py | 47 +++++++++++++ atciss/app/views/ecfmp.py | 114 ++++++++++++++++++++++++++++++++ atciss/celery.py | 8 +++ 5 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 atciss/app/controllers/ecfmp.py create mode 100644 atciss/app/tasks/ecfmp.py create mode 100644 atciss/app/views/ecfmp.py diff --git a/atciss/app/controllers/ecfmp.py b/atciss/app/controllers/ecfmp.py new file mode 100644 index 00000000..64c01ac6 --- /dev/null +++ b/atciss/app/controllers/ecfmp.py @@ -0,0 +1,32 @@ +"""Application controllers - ECFMP.""" +from typing import Annotated, Optional, cast +from fastapi import APIRouter, Depends, HTTPException +from pydantic import TypeAdapter + +from atciss.app.views.ecfmp import FlowMeasure + +from ..controllers.auth import get_user +from ..models import User + +from ..utils.redis import RedisClient + +router = APIRouter() + + +@router.get( + "/ecfmp/{fir}", + responses={404: {}}, +) +async def get_flow_measures( + fir: str, + user: Annotated[User, Depends(get_user)], +) -> list[FlowMeasure]: + """Get ECFMP flow measures for a FIR.""" + async with RedisClient.open() as redis_client: + flow_measures = cast( + Optional[str], await redis_client.get(f"ecfmp:flow_measures:{fir}") + ) + if flow_measures is None: + raise HTTPException(status_code=404) + + return TypeAdapter(list[FlowMeasure]).validate_json(flow_measures) diff --git a/atciss/app/router.py b/atciss/app/router.py index 58bb06cf..19676f40 100644 --- a/atciss/app/router.py +++ b/atciss/app/router.py @@ -1,7 +1,19 @@ """Application configuration - root APIRouter.""" from fastapi import APIRouter -from .controllers import ready, metar, notam, atis, ad, auth, airspace, vatsim, loa, taf +from .controllers import ( + ready, + metar, + notam, + atis, + ad, + auth, + airspace, + vatsim, + loa, + taf, + ecfmp, +) root_api_router = APIRouter(prefix="/api") @@ -16,3 +28,4 @@ root_api_router.include_router(auth.router, tags=["user"]) root_api_router.include_router(airspace.router, tags=["airspace"]) root_api_router.include_router(loa.router, tags=["airspace"]) +root_api_router.include_router(ecfmp.router, tags=["airspace"]) diff --git a/atciss/app/tasks/ecfmp.py b/atciss/app/tasks/ecfmp.py new file mode 100644 index 00000000..8796dff5 --- /dev/null +++ b/atciss/app/tasks/ecfmp.py @@ -0,0 +1,47 @@ +from collections import defaultdict +from loguru import logger +from pydantic import TypeAdapter + +from atciss.app.utils.redis import RedisClient + +from ..views.ecfmp import ECFMP, Event, FlowMeasure + +from ..utils import AiohttpClient, ClientConnectorError + + +async def fetch_ecfmp() -> None: + """Periodically fetch loa data.""" + redis_client = await RedisClient.get() + + async with AiohttpClient.get() as aiohttp_client: + try: + res = await aiohttp_client.get( + "https://ecfmp.vatsim.net/api/v1/plugin", + ) + except ClientConnectorError as e: + logger.exception(f"Could not connect {e!s}") + return + + ecfmp = ECFMP.model_validate(await res.json()) + + logger.info(f"ECFMP: {len(ecfmp.flow_measures)} flow measures received") + logger.info(f"ECFMP: {len(ecfmp.events)} events received") + + async with redis_client.pipeline() as pipe: + flow_measures_by_fir = defaultdict(list) + for flow_measure in ecfmp.flow_measures: + for fir in flow_measure.notified_firs: + flow_measures_by_fir[fir].append(flow_measure) + events_by_fir = defaultdict(list) + for event in ecfmp.events: + events_by_fir[event.fir].append(event) + + for fir, flow_measures in flow_measures_by_fir.items(): + pipe.set( + f"ecfmp:flow_measures:{fir}", + TypeAdapter(list[FlowMeasure]).dump_json(flow_measures), + ) + for fir, events in events_by_fir.items(): + pipe.set(f"ecfmp:events:{fir}", TypeAdapter(list[Event]).dump_json(events)) + + await pipe.execute() diff --git a/atciss/app/views/ecfmp.py b/atciss/app/views/ecfmp.py new file mode 100644 index 00000000..ad49c18b --- /dev/null +++ b/atciss/app/views/ecfmp.py @@ -0,0 +1,114 @@ +from dataclasses import dataclass +from typing import Any, Literal, Optional + +from pydantic import AwareDatetime, BaseModel, model_validator + + +class Event(BaseModel): + id: int + name: str + date_start: AwareDatetime + date_end: AwareDatetime + fir: str + + +@dataclass +class FIR: + identifier: str + name: str + + +@dataclass +class FilterEvent: + event_id: int + event_vatcan: Optional[str] + + +@dataclass +class Filter: + type: Literal[ + "ADEP", + "ADES", + "level_above", + "level_below", + "level", + "member_event", + "member_non_event", + "waypoint", + ] + value: list[str | int | FilterEvent] | int + + +@dataclass +class Measure: + type: Literal[ + "prohibit", + "minimum_departure_interval", + "average_departure_interval", + "per_hour", + "miles_in_trail", + "max_ias", + "max_mach", + "ias_reduction", + "mach_reduction", + "ground_stop", + "mandatory_route", + ] + value: Optional[int | list[str]] + + +class FlowMeasure(BaseModel): + ident: str + event_id: Optional[int] + reason: str + starttime: AwareDatetime + endtime: AwareDatetime + measure: Measure + filters: list[Filter] + notified_firs: list[str] + withdrawn_at: Optional[AwareDatetime] + + +class ECFMP(BaseModel): + events: list[Event] + flight_information_regions: list[FIR] + flow_measures: list[FlowMeasure] + + @model_validator(mode="before") + @classmethod + def replace_fir_id(cls, input: Any) -> Any: + flow_measures = [ + fm + | { + "notified_firs": fm.get("notified_firs") + or [ + next( + ( + fir["identifier"] + for fir in input["flight_information_regions"] + if fir["id"] == fir_id + ), + "ZZZZ", + ) + for fir_id in fm["notified_flight_information_regions"] + ] + } + for fm in input["flow_measures"] + ] + events = [ + e + | { + "fir": e.get("fir") + or next( + ( + fir["identifier"] + for fir in input["flight_information_regions"] + if fir["id"] == e["flight_information_region_id"] + ), + "ZZZZ", + ) + } + for e in input["events"] + ] + + return input | {"flow_measures": flow_measures, "events": events} diff --git a/atciss/celery.py b/atciss/celery.py index 07195932..8661ec62 100644 --- a/atciss/celery.py +++ b/atciss/celery.py @@ -33,6 +33,7 @@ def setup_task_logging_hook(loglevel: str, **_: dict[Any, Any]) -> None: "update_sectors": {"task": "update_sectors", "schedule": crontab(minute="*/60")}, "update_vatsim": {"task": "update_vatsim", "schedule": crontab(minute="*")}, "update_taf_metar": {"task": "update_taf_metar", "schedule": crontab(minute="*")}, + "update_ecfmp": {"task": "update_ecfmp", "schedule": crontab(minute="*")}, "update_dfs_ad_data": { "task": "update_dfs_ad_data", "schedule": crontab(day_of_week="1"), @@ -80,3 +81,10 @@ def update_dfs_ad_data(): from atciss.app.tasks.dfs_ad import fetch_dfs_ad_data async_to_sync(fetch_dfs_ad_data)() + + +@app.task(name="update_ecfmp") +def update_ecfmp(): + from atciss.app.tasks.ecfmp import fetch_ecfmp + + async_to_sync(fetch_ecfmp)()