Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier] Improve dynamic time specification #346

Merged
merged 4 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Dict

from implicitdict import ImplicitDict

Expand All @@ -15,7 +15,7 @@
Volume4DTemplateCollection,
Volume4DCollection,
)
from monitoring.monitorlib.temporal import Time
from monitoring.monitorlib.temporal import Time, TimeDuringTest
from uas_standards.interuss.automated_testing.scd.v1 import api as scd_api


Expand All @@ -31,11 +31,9 @@ class BasicFlightPlanInformationTemplate(ImplicitDict):
area: Volume4DTemplateCollection
"""User intends to or may fly anywhere in this entire area."""

def resolve(self, start_of_test: Time) -> BasicFlightPlanInformation:
def resolve(self, times: Dict[TimeDuringTest, Time]) -> BasicFlightPlanInformation:
kwargs = {k: v for k, v in self.items()}
kwargs["area"] = Volume4DCollection(
[t.resolve(start_of_test) for t in self.area]
)
kwargs["area"] = Volume4DCollection([t.resolve(times) for t in self.area])
return ImplicitDict.parse(kwargs, BasicFlightPlanInformation)


Expand All @@ -53,15 +51,17 @@ class FlightInfoTemplate(ImplicitDict):
additional_information: Optional[dict]
"""Any information relevant to a particular jurisdiction or use case not described in the standard schema. The keys and values must be agreed upon between the test designers and USSs under test."""

def resolve(self, start_of_test: Time) -> FlightInfo:
def resolve(self, times: Dict[TimeDuringTest, Time]) -> FlightInfo:
kwargs = {k: v for k, v in self.items()}
kwargs["basic_information"] = self.basic_information.resolve(start_of_test)
kwargs["basic_information"] = self.basic_information.resolve(times)
return ImplicitDict.parse(kwargs, FlightInfo)

def to_scd_inject_request(self, start_of_test: Time) -> scd_api.InjectFlightRequest:
def to_scd_inject_request(
self, times: Dict[TimeDuringTest, Time]
) -> scd_api.InjectFlightRequest:
"""Render a legacy SCD injection API request object from this object."""

info = self.resolve(start_of_test)
info = self.resolve(times)
if "astm_f3548_21" not in info or not info.astm_f3548_21:
raise ValueError(
f"Legacy SCD injection API requires astm_f3548_21 operational intent priority to be specified in FlightInfo"
Expand Down
10 changes: 5 additions & 5 deletions monitoring/monitorlib/geotemporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import math
from datetime import datetime, timedelta
from typing import Optional, List, Tuple, Iterator
from typing import Optional, List, Tuple, Dict

import arrow
from implicitdict import ImplicitDict, StringBasedTimeDelta
Expand All @@ -13,7 +13,7 @@

from monitoring.monitorlib import geo
from monitoring.monitorlib.geo import LatLngPoint, Circle, Altitude, Volume3D, Polygon
from monitoring.monitorlib.temporal import TestTime, Time
from monitoring.monitorlib.temporal import TestTime, Time, TimeDuringTest


class Volume4DTemplate(ImplicitDict):
Expand All @@ -38,7 +38,7 @@ class Volume4DTemplate(ImplicitDict):
altitude_upper: Optional[Altitude] = None
"""The maximum altitude at which the virtual user will fly while using this volume for their flight."""

def resolve(self, start_of_test: Time) -> Volume4D:
def resolve(self, times: Dict[TimeDuringTest, Time]) -> Volume4D:
"""Resolve Volume4DTemplate into concrete Volume4D."""
# Make 3D volume
kwargs = {}
Expand All @@ -56,12 +56,12 @@ def resolve(self, start_of_test: Time) -> Volume4D:
kwargs = {"volume": volume}

if self.start_time is not None:
time_start = self.start_time.resolve(start_of_test)
time_start = self.start_time.resolve(times)
else:
time_start = None

if self.end_time is not None:
time_end = self.end_time.resolve(start_of_test)
time_end = self.end_time.resolve(times)
else:
time_end = None

Expand Down
33 changes: 24 additions & 9 deletions monitoring/monitorlib/temporal.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, List
from typing import Optional, List, Dict

import arrow
from implicitdict import ImplicitDict, StringBasedTimeDelta, StringBasedDateTime
Expand Down Expand Up @@ -62,6 +62,17 @@ class NextDay(ImplicitDict):
"""Acceptable days of the week. Omit to indicate that any day of the week is acceptable."""


class TimeDuringTest(str, Enum):
StartOfTestRun = "StartOfTestRun"
"""The time at which the test run started."""

StartOfScenario = "StartOfScenario"
"""The time at which the current scenario started."""

TimeOfEvaluation = "TimeOfEvaluation"
"""The time at which a TestTime was resolved to an absolute time; generally close to 'now'."""


class TestTime(ImplicitDict):
"""Exactly one of the time option fields of this object must be specified."""

Expand All @@ -71,8 +82,8 @@ class TestTime(ImplicitDict):
The value of absolute_time is limited given that the specific time a test will be started is unknown, and the jurisdictions usually impose a limit on how far in the future an operation can be planned.
"""

start_of_test: Optional[dict] = None
"""Time option field to, if specified, use the timestamp at which the current test run started."""
time_during_test: Optional[TimeDuringTest] = None
"""Time option field to, if specified, use a timestamp relating to the current test run."""

next_day: Optional[NextDay] = None
"""Time option field to use a timestamp equal to midnight beginning the next occurrence of any matching day following the specified reference timestamp."""
Expand All @@ -90,16 +101,20 @@ class TestTime(ImplicitDict):
* "-08:00" (ISO time zone)
* "US/Pacific" (IANA time zone)"""

def resolve(self: TestTime, start_of_test: Time) -> Time:
def resolve(self, times: Dict[TimeDuringTest, Time]) -> Time:
"""Resolve TestTime into specific Time."""
result = None
if self.absolute_time is not None:
result = self.absolute_time.datetime
elif self.start_of_test is not None:
result = start_of_test.datetime
elif self.time_during_test is not None:
if self.time_during_test not in times:
raise ValueError(
f"Specified {self.time_during_test} time during test was not provided when resolving TestTime"
)
result = times[self.time_during_test].datetime
elif self.next_day is not None:
t0 = (
arrow.get(self.next_day.starting_from.resolve(start_of_test).datetime)
arrow.get(self.next_day.starting_from.resolve(times).datetime)
.to(self.next_day.time_zone)
.datetime
)
Expand All @@ -115,11 +130,11 @@ def resolve(self: TestTime, start_of_test: Time) -> Time:
result = t
elif self.offset_from is not None:
result = (
self.offset_from.starting_from.resolve(start_of_test).datetime
self.offset_from.starting_from.resolve(times).datetime
+ self.offset_from.offset.timedelta
)
elif self.next_sun_position is not None:
t0 = self.next_sun_position.starting_from.resolve(start_of_test).datetime
t0 = self.next_sun_position.starting_from.resolve(times).datetime

dt = timedelta(minutes=5)
lat = self.next_sun_position.observed_from.lat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ che_conflicting_flights:
file:
path: file://./test_data/che/flight_intents/conflicting_flights.yaml
# Note that this hash_sha512 field can be safely deleted if the content changes
hash_sha512: 381b6a75e66f2f4ead4cc637744a3c6594d0cdcabf80e13acef087acf3d9195a8dfb521b1f997824b5c0f9dd83167263695b99bdb4fd3e604ac6ac513dad54ab
hash_sha512: ac109b89c95381ba4685d6db5f7064dac64875a8875e58989aad19d91d2e7b10e568d5091749b5ea2b0892191a84486971d999b63ac8c73fd4249131111a6c58

che_invalid_flight_intents:
$content_schema: monitoring/uss_qualifier/resources/definitions/ResourceDeclaration.json
Expand Down Expand Up @@ -211,14 +211,14 @@ example_feature_check_table:
units: M
reference: SFC
start_time:
start_of_test: { }
time_during_test: StartOfTestRun
end_time:
offset_from:
starting_from:
next_day:
time_zone: Europe/Zurich
starting_from:
start_of_test: { }
time_during_test: StartOfTestRun
days_of_the_week: [ "Mo", "Fr" ]
offset: 12h
expected_result: Block
Expand Down Expand Up @@ -252,7 +252,7 @@ example_feature_check_table:
starting_from:
offset_from:
starting_from:
start_of_test: { }
time_during_test: StartOfTestRun
offset: 12h
use_timezone: +01:00
duration: 5m
Expand Down Expand Up @@ -289,7 +289,7 @@ example_feature_check_table:
next_day:
time_zone: Europe/Zurich
starting_from:
start_of_test: { }
time_during_test: StartOfTestRun
offset: 12h
duration: 5m
expected_result: Advise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from monitoring.monitorlib.clients.flight_planning.flight_info_template import (
FlightInfoTemplate,
)
from monitoring.monitorlib.temporal import Time
from monitoring.monitorlib.temporal import Time, TimeDuringTest

from monitoring.uss_qualifier.resources.files import ExternalFile
from monitoring.uss_qualifier.resources.overrides import apply_overrides
Expand All @@ -27,9 +27,14 @@ class FlightIntent(ImplicitDict):

@staticmethod
def from_flight_info_template(info_template: FlightInfoTemplate) -> FlightIntent:
t = Time(arrow.utcnow().datetime)
request = info_template.to_scd_inject_request(t)
return FlightIntent(reference_time=StringBasedDateTime(t), request=request)
t_now = Time(arrow.utcnow().datetime)
times = {
TimeDuringTest.StartOfTestRun: t_now,
TimeDuringTest.StartOfScenario: t_now,
TimeDuringTest.TimeOfEvaluation: t_now,
} # Not strictly correct, but this class is deprecated
request = info_template.to_scd_inject_request(times)
return FlightIntent(reference_time=StringBasedDateTime(t_now), request=request)


FlightIntentID = str
Expand Down
Loading
Loading