Skip to content

Commit

Permalink
update floods configs
Browse files Browse the repository at this point in the history
  • Loading branch information
ajaits committed Jun 12, 2023
1 parent 433efdf commit 2af2284
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 284 deletions.
2 changes: 1 addition & 1 deletion scripts/earthengine/events_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
'gcs_bucket': 'my-bucket',
'gcs_folder': 'my-import-name',
},
# Stage sepcific settings
# Stage specific settings
'stages': [
{
'stage': 'download',
Expand Down
9 changes: 6 additions & 3 deletions scripts/earthengine/pipeline_stage_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
"""

import os
import re
import sys
import time

from absl import logging

Expand Down Expand Up @@ -52,9 +54,10 @@ def run(self,
# Download data from start_date up to end_date
# advancing date by the time_period.
start_date = self.get_config('start_date', '', config_dict)
end_date = self.get_config('end_date', '', config_dict)
if not end_date:
end_date = utils.date_yesterday()
yesterday = utils.date_yesterday()
end_date = self.get_config('end_date', yesterday, config_dict)
if end_date > yesterday:
end_date = yesterday
data_files = []
while start_date and start_date <= end_date:
# Download data for the start_date
Expand Down
91 changes: 76 additions & 15 deletions scripts/earthengine/process_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,52 @@
from config_map import ConfigMap
from dc_api_wrapper import dc_api_batched_wrapper

# List of place types from biggest to smallest
_PLACE_TYPE_ORDER = [
'Place',
'OceanicBasin',
'Continent',
'Country',
'CensusRegion',
'CensusDivision',
'State',
'AdministrativeArea1',
'County',
'AdministrativeArea2',
'CensusCountyDivision',
'EurostatNUTS1',
'EurostatNUTS2',
'CongressionalDistrict',
'UDISEDistrict',
'CensusCoreBasedStatisticalArea',
'EurostatNUTS3',
'SuperfundSite',
'Glacier',
'AdministrativeArea3',
'AdministrativeArea4',
'PublicUtility',
'CollegeOrUniversity',
'EpaParentCompany',
'UDISEBlock',
'AdministrativeArea5',
'EpaReportingFacility',
'SchoolDistrict',
'CensusZipCodeTabulationArea',
'PrivateSchool',
'CensusTract',
'City',
'AirQualitySite',
'PublicSchool',
'Neighborhood',
'CensusBlockGroup',
'AdministrativeArea',
'Village',
]

_PLACE_TYPE_RANK = {
_PLACE_TYPE_ORDER[index]: index for index in range(len(_PLACE_TYPE_ORDER))
}


class GeoEvent:
"""Class for a Geo Event."""
Expand Down Expand Up @@ -791,7 +837,7 @@ def get_event_output_properties(self, event_id: str) -> dict:
return None
# Generate polygon for the event
polygon_prop = self._config.get('output_affected_place_polygon',
'geoJsonCoordinatesDP1')
'geoJsonCoordinates')
if polygon_prop:
event_polygon = self.get_event_polygon(event_id)
if event_polygon:
Expand All @@ -804,10 +850,10 @@ def get_event_output_properties(self, event_id: str) -> dict:
event_pvs[polygon_prop] = json.dumps(
json.dumps(geo_json))
self._counters.add_counter('output_events_with_polygon', 1)
if not event_pvs.get('name', None):
event_pvs['name'] = self._get_event_name(event_pvs)
# Set the affectedPlace and containedInPlace for the event.
self._set_event_places(event, event_pvs)
if not event_pvs.get('name', None):
event_pvs['name'] = self._get_event_name(event_pvs)
return event_pvs

def delete_event(self, event_id):
Expand Down Expand Up @@ -1000,7 +1046,7 @@ def write_events_csv(self,
output_columns.append(prop)
# Add column for affected place polygon
polygon_prop = self._config.get('output_affected_place_polygon',
'geoJsonCoordinatesDP1')
'geoJsonCoordinates')
if polygon_prop:
output_columns.append(polygon_prop)
if event_ids is None:
Expand Down Expand Up @@ -1639,6 +1685,31 @@ def _get_contained_for_place(self, place_id: str) -> list:
self.set_place_property(place_id, 'containedInPlace', contained_places)
return contained_places

def _get_smallest_place_name(self, place_ids: list) -> str:
'''Returns the name of the smallest place in the place list.'''
max_place_rank = -1
place_name = ''
# Get the place with the highest rank (smallest place)
for place in _get_list(place_ids):
place = utils.strip_namespace(place)
if place == 'Earth' or utils.is_s2_cell_id(
place) or utils.is_grid_id(place) or utils.is_ipcc_id(
place):
# Ignore non admin places
continue
place_types = self.get_place_property_list(place, 'typeOf')
for place_type in place_types:
place_rank = _PLACE_TYPE_RANK.get(place_type, -1)
if place_rank > max_place_rank:
# This place is smaller. Use its name if available.
new_place_name = self.get_place_property_list(place, 'name')
if new_place_name:
new_place_name = new_place_name[0]
if new_place_name:
max_place_rank = place_rank
place_name = new_place_name
return place_name

def _get_event_name(self, event_pvs: dict, locations: list = None) -> str:
'''Get the name for the event.'''
typeof = event_pvs.get('typeOf',
Expand All @@ -1648,17 +1719,7 @@ def _get_event_name(self, event_pvs: dict, locations: list = None) -> str:
start_location = event_pvs.get('startLocation')
if not locations:
locations = _get_list(event_pvs.get('affectedPlace', ''))
location_name = ''
for placeid in locations:
placeid = utils.strip_namespace(placeid)
if not utils.is_ipcc_id(placeid) and not utils.is_grid_id(placeid):
location_name = self.get_place_property(placeid, 'name')
if location_name:
# place_type = self.get_place_property(placeid, 'typeOf')
# if place_type:
# location_name = f'{place_type[0]} {location_name[0]}'
location_name = f'{location_name[0]}'
break
location_name = self._get_smallest_place_name(locations)
if not location_name:
# Use the lat lng from start place.
lat_lng = event_pvs.get('startLocation')
Expand Down
3 changes: 2 additions & 1 deletion scripts/earthengine/process_events_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def compare_csv_files(self,
expected_file: str,
actual_file: str,
ignore_columns: list = []):
'''Compare CSV files with statvar obsevration data.'''
'''Compare CSV files with statvar observation data.'''
# Sort files by columns.
df_expected = pd.read_csv(expected_file)
df_actual = pd.read_csv(actual_file)
Expand All @@ -90,6 +90,7 @@ def compare_csv_files(self,

def test_process(self):
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir = '/tmp/test_events'
output_prefix = os.path.join(tmp_dir, 'events_test_')
test_prefix = os.path.join(_TESTDIR, 'sample_floods_')
# Process flood s2 cells into events.
Expand Down
8 changes: 8 additions & 0 deletions scripts/earthengine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,17 @@ def dict_filter_values(pvs: dict, config: dict = {}) -> bool:
if value > prop_config['max']:
allow_value = False
if 'regex' in prop_config:
if not isinstance(value, str):
value = str(value)
matches = re.search(prop_config['regex'], value)
if not matches:
allow_value = False
if 'ignore' in prop_config:
if not isinstance(value, str):
value = str(value)
matches = re.search(prop_config['ignore'], value)
if matches:
allow_value = False
if not allow_value:
pvs.pop(p)
is_allowed = False
Expand Down
Loading

0 comments on commit 2af2284

Please sign in to comment.