Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier] monitorlib: ISAChange exposes subscribers as returned by DSS #838

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions monitoring/monitorlib/mutate/rid.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import datetime
from typing import Dict, List, Optional, Union, Set

from implicitdict import ImplicitDict
import s2sphere
from uas_standards import Operation

from monitoring.monitorlib.fetch import QueryType
from monitoring.monitorlib.fetch.rid import RIDQuery, Subscription, ISA
from monitoring.monitorlib.rid import RIDVersion
from uas_standards.astm.f3411 import v19, v22a
import uas_standards.astm.f3411.v19.api
import uas_standards.astm.f3411.v19.constants
import uas_standards.astm.f3411.v22a.api
import uas_standards.astm.f3411.v22a.constants
import yaml
from implicitdict import ImplicitDict
from uas_standards import Operation
from uas_standards.astm.f3411 import v19, v22a
from yaml.representer import Representer

from monitoring.monitorlib import (
Expand All @@ -22,6 +18,9 @@
rid_v1,
rid_v2,
)
from monitoring.monitorlib.fetch import QueryType
from monitoring.monitorlib.fetch.rid import RIDQuery, Subscription, ISA
from monitoring.monitorlib.rid import RIDVersion


class ChangedSubscription(RIDQuery):
Expand Down Expand Up @@ -450,6 +449,11 @@ class ISAChange(ImplicitDict):
notifications: Dict[str, ISAChangeNotification]
"""Mapping from USS base URL to change notification query"""

@property
def subscribers(self) -> Optional[List[SubscriberToNotify]]:
"""List of subscribers that required a notification for the change."""
return self.dss_query.subscribers


def build_isa_request_body(
area_vertices: List[s2sphere.LatLng],
Expand Down
Loading