Skip to content

Commit

Permalink
Pre-staging fixes (#106)
Browse files Browse the repository at this point in the history
* Add Admin1 geo code to database

- Add sync logic to pull data from go api (currently stage)

* Add tagging from circle and geocodes as fallback

* User _value2member_map_ to check geocode type is supported by using enum

* Fix mapping value issue

* Use go-api prod for geo data sync

* Skip circle polygon filter if tagged by normal polygons

* Use camelCase for argo hooks

* Add support for custom python script in argoHooks

* Add missing cronjobs for WEEKLY and MONTHLY subscription email

* Add total_alerts_count node for subscription alerts count

- Fix filtering issue for subscription alerts

* Disable querystring_auth for S3 static bucket

* Add test additional dataset for subscription/alert tagging test

* Add multiple queue: feeds and default

* Add celery flower

* Fix read me and email template

* Typos fix

---------

Co-authored-by: barshathakuri <[email protected]>
  • Loading branch information
thenav56 and barshathakuri authored Dec 8, 2024
1 parent fef661d commit df6b0a2
Show file tree
Hide file tree
Showing 30 changed files with 682 additions and 105 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ repos:
rev: v1.1.390
hooks:
- id: pyright

# - repo: https://github.com/gruntwork-io/pre-commit
# rev: v0.1.15
# hooks:
# - id: helmlint
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# IFRC/UCL Alert Hub - CAP Aggregator

The CAP Aggregator is an alert aggregation service built for IFRC's Alert Hub. Public alerts use the Common Alerting Protocol (CAP) Version 1.2 standard.
This repository houses the CAP Aggregator and Alert Manager for IFRC's Alert Hub. These services aggregate and distribute public alerts using the Common Alerting Protocol (CAP) Version 1.2 standard.

This is a Python web app using the Django framework and the Azure Database for PostgreSQL relational database service. The Django app is hosted in a fully managed Azure App Service. Requests to hundreds of publicly available alert feeds are managed by Celery and Redis, which interprets alerts and saves them to the database. The Alert Manager then makes them available to the Alert Hub Website.

Expand Down
41 changes: 41 additions & 0 deletions apps/cap_feed/data_injector/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,46 @@ def get_geojson_data():
mgr.done()
self.log_success(str(mgr.summary()))

def inject_admin1s_geo_codes(self):
# TODO: Confirm this
fetch_params = {
"is_independent": True,
"is_deprecated": False,
"limit": 1000,
}

go_data = self.handle_pagination(
"/api/v2/district/",
params=fetch_params,
headers={"Accept-Language": "EN"},
)

mgr = BulkUpdateManager(
update_fields=[
"emma_id",
"nuts1",
"nuts2",
"nuts3",
"fips_code",
]
)

for admin1_data in go_data:
ifrc_go_id = int(admin1_data.get("id"))
admin1 = Admin1.objects.filter(ifrc_go_id=ifrc_go_id).first()
if admin1 is None:
continue

admin1.emma_id = admin1_data["emma_id"]
admin1.nuts1 = admin1_data["nuts1"]
admin1.nuts2 = admin1_data["nuts2"]
admin1.nuts3 = admin1_data["nuts3"]
admin1.fips_code = admin1_data["fips_code"]
mgr.add(admin1)

mgr.done()
self.log_success(str(mgr.summary()))

@transaction.atomic
def sync(
self,
Expand All @@ -362,4 +402,5 @@ def sync(
self.inject_countries()
if not skip_admin1s_sync:
self.inject_admin1s()
self.inject_admin1s_geo_codes()
# TODO: Show change summary
100 changes: 85 additions & 15 deletions apps/cap_feed/formats/cap_xml.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import datetime
import logging
from collections import defaultdict
from xml.etree.ElementTree import Element as XmlElement

from django.contrib.gis.geos import GEOSGeometry
from django.db import IntegrityError
from django.contrib.gis.geos import GEOSGeometry, Point
from django.contrib.gis.measure import Distance
from django.db import IntegrityError, models
from django.utils import timezone

from apps.cap_feed.formats.utils import (
Expand Down Expand Up @@ -103,12 +105,14 @@ def process_alert_info(
alert_info_entry: XmlElement,
mgr: BulkCreateManager,
ns: dict,
) -> tuple[AlertInfo | None, list[GEOSGeometry]]:
) -> tuple[AlertInfo | None, list[GEOSGeometry], list[tuple[Point, int]]]:
polygons: list[GEOSGeometry] = []
circles: list[tuple[Point, int]] = []

expire_time = convert_datetime(find_element(alert_info_entry, ns, 'cap:expires'))
if expire_time is not None and expire_time < timezone.now():
return None, []
return None, polygons, circles

polygons = []
alert_info = create_alert_info(alert, alert_info_entry, expire_time, ns)

# navigate alert info parameter
Expand All @@ -132,12 +136,13 @@ def process_alert_info(

# navigate alert info area circle
for alert_info_area_circle_entry in alert_info_area_entry.findall('cap:circle', ns):
mgr.add(
AlertInfoAreaCircle(
alert_info_area=alert_info_area,
value=alert_info_area_circle_entry.text,
)
alert_info_area_circle = AlertInfoAreaCircle(
alert_info_area=alert_info_area,
value=alert_info_area_circle_entry.text,
)
mgr.add(alert_info_area_circle)
if parsed_circle := alert_info_area_circle.get_geos():
circles.append(parsed_circle)

# navigate info area geocode
for alert_info_area_geocode_entry in alert_info_area_entry.findall('cap:geocode', ns):
Expand All @@ -159,7 +164,42 @@ def process_alert_info(
mgr.add(alert_info_area_polygon)
if parsed_polygon := alert_info_area_polygon.value_geojson:
polygons.append(parsed_polygon)
return alert_info, polygons
return alert_info, polygons, circles


def process_geo_code_type(
admin1_base_qs: models.QuerySet[Admin1], geocode_name: Admin1.GeoCode, values: set[str]
) -> list[int]:
if geocode_name == Admin1.GeoCode.EMMA_ID:
qs = admin1_base_qs.filter(emma_id__in=values)
elif geocode_name == Admin1.GeoCode.NUTS1:
qs = admin1_base_qs.filter(nuts1__in=values)
elif geocode_name == Admin1.GeoCode.NUTS2:
qs = admin1_base_qs.filter(nuts2__in=values)
elif geocode_name == Admin1.GeoCode.NUTS3:
qs = admin1_base_qs.filter(nuts3__in=values)
elif geocode_name == Admin1.GeoCode.FIPS_CODE:
qs = admin1_base_qs.filter(fips_code__in=values)
return list(qs.values_list('id', flat=True))


def process_geo_codes(
alert: Alert,
admin1_base_qs: models.QuerySet[Admin1],
) -> list[int]:
geocode_map: dict[Admin1.GeoCode, set[str]] = defaultdict(set)

qs = AlertInfoAreaGeocode.objects.filter(alert_info_area__alert_info__alert=alert)
for value_name, value in qs.values_list("value_name", "value"):
# TODO: Remove _value2member_map_ after upgrading python version
if value_name.upper() in Admin1.GeoCode._value2member_map_:
geocode_map[Admin1.GeoCode[value_name.upper()]].add(value)

possible_admin1_ids: list[int] = []
for geocode_name, values in geocode_map.items():
possible_admin1_ids.extend(process_geo_code_type(admin1_base_qs, geocode_name, values))

return list(set(possible_admin1_ids))


def process_alert(
Expand All @@ -174,21 +214,23 @@ def process_alert(

mgr = BulkCreateManager()
alert_has_valid_info = False
alert_info_circles_collections = []
tagged_admin1s_id = set()
admin1_base_qs = Admin1.objects.filter(country=alert.country)

# navigate alert info
for alert_info_entry in alert_root.findall('cap:info', ns):
alert_info, alert_info_polygons = process_alert_info(alert, alert_info_entry, mgr, ns)
alert_info, alert_info_polygons, alert_info_circles = process_alert_info(alert, alert_info_entry, mgr, ns)
if not alert_info:
continue
alert_info_circles_collections.extend(alert_info_circles)

alert_has_valid_info = True

# XXX: Do we need to check circles as well?
# check polygon intersection with admin1s
for polygon in alert_info_polygons:
possible_admin1s = Admin1.objects.filter(
country=alert.country,
possible_admin1s = admin1_base_qs.filter(
# TODO: Check for performance issues
geometry__intersects=polygon,
).exclude(id__in=tagged_admin1s_id)
Expand All @@ -202,8 +244,36 @@ def process_alert(
)

if alert_has_valid_info:
# Fallback: Try circles
if not tagged_admin1s_id:
for circle in alert_info_circles_collections:
possible_admin1s = admin1_base_qs.filter(
# TODO: Check for performance issues
geometry___dwithin=(circle[0], Distance(m=circle[1])),
).exclude(id__in=tagged_admin1s_id)
for admin1_id in possible_admin1s.values_list('id', flat=True):
tagged_admin1s_id.add(admin1_id)
mgr.add(
AlertAdmin1(
alert=alert,
admin1_id=admin1_id,
)
)

# Fallback: Try geocodes
if not tagged_admin1s_id:
for admin1_id in process_geo_codes(alert, admin1_base_qs):
tagged_admin1s_id.add(admin1_id)
mgr.add(
AlertAdmin1(
alert=alert,
admin1_id=admin1_id,
)
)

# Last fallback is Unknown admin1
if not tagged_admin1s_id:
if unknown_admin1 := Admin1.objects.filter(country=alert.country, name='Unknown').first():
if unknown_admin1 := admin1_base_qs.filter(name='Unknown').first():
mgr.add(
AlertAdmin1(
alert=alert,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 4.2.13 on 2024-12-02 15:36

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('cap_feed', '0011_alter_alertinfo_category_alter_alertinfo_certainty_and_more'),
]

operations = [
migrations.AddField(
model_name='admin1',
name='emma_id',
field=models.CharField(blank=True, db_index=True, help_text='Meteoalarm EMMA_ID', max_length=10, null=True, verbose_name='emma_id'),
),
migrations.AddField(
model_name='admin1',
name='fips_code',
field=models.PositiveIntegerField(blank=True, db_index=True, help_text='USA FIPS Code', null=True, verbose_name='fips_code'),
),
migrations.AddField(
model_name='admin1',
name='nuts1',
field=models.CharField(blank=True, db_index=True, help_text='Nomenclature of Territorial Units for Statistics 1', max_length=3, null=True, verbose_name='nuts1'),
),
migrations.AddField(
model_name='admin1',
name='nuts2',
field=models.CharField(blank=True, db_index=True, help_text='Nomenclature of Territorial Units for Statistics 2', max_length=4, null=True, verbose_name='nuts2'),
),
migrations.AddField(
model_name='admin1',
name='nuts3',
field=models.CharField(blank=True, db_index=True, help_text='Nomenclature of Territorial Units for Statistics 3', max_length=5, null=True, verbose_name='nuts3'),
),
]
61 changes: 61 additions & 0 deletions apps/cap_feed/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
from datetime import timedelta
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -64,12 +65,60 @@ def create_unknown_admin1(sender, instance, created, **kwargs):


class Admin1(models.Model):
class GeoCode(enum.Enum):
EMMA_ID = "EMMA_ID"
NUTS1 = "NUTS1"
NUTS2 = "NUTS2"
NUTS3 = "NUTS3"
FIPS_CODE = "FIPS_CODE"

ifrc_go_id = models.IntegerField(unique=True, null=True, editable=False)
name = models.CharField()
country = models.ForeignKey(Country, on_delete=models.CASCADE)
bbox = gid_models.PolygonField(srid=4326, blank=True, null=True)
geometry = gid_models.GeometryField(null=True, blank=True, default=None)

# go-api: https://github.com/IFRCGo/go-api/blob/db2991bd588376f58a1db8422625e19aa5777dd3/api/models.py#L301-L323
emma_id = models.CharField(
verbose_name=_("emma_id"),
max_length=10,
blank=True,
null=True,
help_text=_("Meteoalarm EMMA_ID"),
db_index=True,
)
nuts1 = models.CharField(
verbose_name=_("nuts1"),
max_length=3,
blank=True,
null=True,
help_text=_("Nomenclature of Territorial Units for Statistics 1"),
db_index=True,
)
nuts2 = models.CharField(
verbose_name=_("nuts2"),
max_length=4,
blank=True,
null=True,
help_text=_("Nomenclature of Territorial Units for Statistics 2"),
db_index=True,
)
nuts3 = models.CharField(
verbose_name=_("nuts3"),
max_length=5,
blank=True,
null=True,
help_text=_("Nomenclature of Territorial Units for Statistics 3"),
db_index=True,
)
fips_code = models.PositiveIntegerField(
verbose_name=_("fips_code"),
blank=True,
null=True,
help_text=_("USA FIPS Code"),
db_index=True,
)

country_id: int

if TYPE_CHECKING:
Expand Down Expand Up @@ -395,6 +444,18 @@ class AlertInfoAreaCircle(models.Model):

# NOTE: Circle can't be drawn using Geojson. A polygon needs to be created which holds large data then raw value

def get_geos(self) -> tuple[Point, int] | None:
try:
# value: 22.448,71.060 199
point_raw, radius = self.value.split(" ")
point = point_raw.split(",")
return (
Point(float(point[1]), float(point[0])),
int(radius),
)
except Exception:
return

def to_dict(self):
alert_info_area_circle_dict = dict()
alert_info_area_circle_dict['value'] = self.value
Expand Down
27 changes: 27 additions & 0 deletions apps/subscription/dataloaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from asgiref.sync import sync_to_async
from django.db import models
from django.utils.functional import cached_property
from strawberry.dataloader import DataLoader

from .models import SubscriptionAlert


def load_alert_count_by_subscription(keys: list[int]) -> list[int]:
qs = (
SubscriptionAlert.objects.filter(subscription_id__in=keys)
.order_by()
.values('subscription_id')
.annotate(
alert_count=models.Count('alert'),
)
.values_list('subscription_id', 'alert_count')
)
_map = {subscription_id: alert_count for subscription_id, alert_count in qs}
return [_map.get(key, 0) for key in keys]


class SubscriptionDataloader:

@cached_property
def load_alert_count_by_subscription(self):
return DataLoader(load_fn=sync_to_async(load_alert_count_by_subscription))
1 change: 1 addition & 0 deletions apps/subscription/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class PrivateQuery:
async def user_alert_subscription(self, info: Info, pk: strawberry.ID) -> UserAlertSubscriptionType | None:
return await UserAlertSubscriptionType.get_queryset(None, None, info).filter(pk=pk).afirst()

# XXX: Is this used?
@strawberry_django.field
async def subscripted_alerts(
self,
Expand Down
Loading

0 comments on commit df6b0a2

Please sign in to comment.