Skip to content

Commit

Permalink
Autorefresh import script for Flood Events (datacommonsorg#855)
Browse files Browse the repository at this point in the history
* autorefresh script for flood events

* udpate pipeline config

* update floods configs

* fix unit tests

* add readme

* add requirements.txt for ee scripts

* fix review comments

* address review comments
  • Loading branch information
ajaits authored Feb 21, 2024
1 parent 0f5ee41 commit 4394fdf
Show file tree
Hide file tree
Showing 26 changed files with 1,712 additions and 1,056 deletions.
5 changes: 4 additions & 1 deletion scripts/earthengine/events_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
'gcs_bucket': 'my-bucket',
'gcs_folder': 'my-import-name',
},
# Stage sepcific settings
# Stage specific settings
'stages': [
{
'stage': 'download',
Expand Down Expand Up @@ -195,7 +195,10 @@ def run_stage(self, stage_name: str, input_files: list = []) -> list:
'''Run a single stage and return the output files generated.'''
for stage_runner in self.stage_runners:
if stage_name == stage_runner.get_name():
logging.info(f'Running stage {stage_name} with {input_files}')
return stage_runner.run_stage(input_files)
logging.error(
f'No stage runner for {stage_name} with input: {input_files}')
return []

def run(self, run_stages: list = []) -> list:
Expand Down
42 changes: 26 additions & 16 deletions scripts/earthengine/pipeline_stage_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class to run the events pipeline stage to download files from a URL.
"""
"""Class to run the events pipeline stage to download files from a URL."""

import os
import re
import sys
import time

from absl import logging

Expand All @@ -35,26 +36,34 @@


class DownloadRunner(StageRunner):
'''Class to download data files from URL source.'''
"""Class to download data files from URL source."""

def __init__(self,
config_dicts: list = [],
state: dict = {},
counters=None):
self.set_up('download', config_dicts, state, counters)

def run(self,
input_files: list = None,
config_dict: dict = {},
counters: Counters = None) -> list:
'''Returns the list of files downloaded from the URL in the config.
URLs are downloaded for each time period until the current date.'''
def run(
self,
input_files: list = None,
config_dict: dict = {},
counters: Counters = None,
) -> list:
"""Returns the list of files downloaded from the URL in the config.
URLs are downloaded for each time period until the current date.
"""
# Download data from start_date up to end_date
# advancing date by the time_period.
start_date = self.get_config('start_date', '', config_dict)
end_date = self.get_config('end_date', '', config_dict)
if not end_date:
end_date = utils.date_yesterday()
yesterday = utils.date_yesterday()
end_date = self.get_config('end_date', yesterday, config_dict)
if end_date > yesterday:
end_date = yesterday
logging.info(
f'Running download with start_date: {start_date}, end_date:{end_date}'
)
data_files = []
while start_date and start_date <= end_date:
# Download data for the start_date
Expand All @@ -70,7 +79,7 @@ def run(self,
return data_files

def download_file_with_config(self, config_dict: dict = {}) -> list:
'''Returns list of files downloaded for config.'''
"""Returns list of files downloaded for config."""
logging.info(f'Downloading data for config: {config_dict}')
downloaded_files = []
urls = config_dict.get('url', [])
Expand Down Expand Up @@ -98,7 +107,8 @@ def download_file_with_config(self, config_dict: dict = {}) -> list:
output=config_dict.get('response_type', 'text'),
timeout=config_dict.get('timeout', 60),
retries=config_dict.get('retry_count', 3),
retry_secs=retry_secs)
retry_secs=retry_secs,
)
if download_content:
# Check if the downloaded content matches the regex.
regex = config_dict.get('successful_response_regex', '')
Expand All @@ -124,8 +134,8 @@ def download_file_with_config(self, config_dict: dict = {}) -> list:
with file_util.FileIO(filename, mode='w') as file:
file.write(download_content)
logging.info(
f'Downloaded {len(download_content)} bytes from {url} into file: {filename}'
)
f'Downloaded {len(download_content)} bytes from {url} into file:'
f' {filename}')
downloaded_files.append(filename)

return downloaded_files
Expand Down
148 changes: 115 additions & 33 deletions scripts/earthengine/process_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

_FLAGS = flags.FLAGS

_SCRIPTS_DIR = os.path.dirname(os.path.dirname(__file__))
_SCRIPTS_DIR = os.path.dirname(__file__)
sys.path.append(_SCRIPTS_DIR)
sys.path.append(os.path.dirname(_SCRIPTS_DIR))
sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR)))
Expand All @@ -76,6 +76,54 @@
from config_map import ConfigMap
from dc_api_wrapper import dc_api_batched_wrapper

# List of place types in increasing order of preference for name.
# This is used to pick the name of the place from the list of affectedPlaces
# for an event for the place type with the highest index.
_PLACE_TYPE_ORDER = [
'Place',
'OceanicBasin',
'Continent',
'Country',
'CensusRegion',
'CensusDivision',
'State',
'AdministrativeArea1',
'County',
'AdministrativeArea2',
'CensusCountyDivision',
'EurostatNUTS1',
'EurostatNUTS2',
'CongressionalDistrict',
'UDISEDistrict',
'CensusCoreBasedStatisticalArea',
'EurostatNUTS3',
'SuperfundSite',
'Glacier',
'AdministrativeArea3',
'AdministrativeArea4',
'PublicUtility',
'CollegeOrUniversity',
'EpaParentCompany',
'UDISEBlock',
'AdministrativeArea5',
'EpaReportingFacility',
'SchoolDistrict',
'CensusZipCodeTabulationArea',
'PrivateSchool',
'CensusTract',
'City',
'AirQualitySite',
'PublicSchool',
'Neighborhood',
'CensusBlockGroup',
'AdministrativeArea',
'Village',
]

_PLACE_TYPE_RANK = {
_PLACE_TYPE_ORDER[index]: index for index in range(len(_PLACE_TYPE_ORDER))
}


class GeoEvent:
"""Class for a Geo Event."""
Expand Down Expand Up @@ -791,7 +839,7 @@ def get_event_output_properties(self, event_id: str) -> dict:
return None
# Generate polygon for the event
polygon_prop = self._config.get('output_affected_place_polygon',
'geoJsonCoordinatesDP1')
'geoJsonCoordinates')
if polygon_prop:
event_polygon = self.get_event_polygon(event_id)
if event_polygon:
Expand All @@ -804,10 +852,10 @@ def get_event_output_properties(self, event_id: str) -> dict:
event_pvs[polygon_prop] = json.dumps(
json.dumps(geo_json))
self._counters.add_counter('output_events_with_polygon', 1)
if not event_pvs.get('name', None):
event_pvs['name'] = self._get_event_name(event_pvs)
# Set the affectedPlace and containedInPlace for the event.
self._set_event_places(event, event_pvs)
if not event_pvs.get('name', None):
event_pvs['name'] = self._get_event_name(event_pvs)
return event_pvs

def delete_event(self, event_id):
Expand Down Expand Up @@ -835,6 +883,12 @@ def get_place_date_output_properties(self, event_ids: list,
event property values, such as { 'area': 100 } aggregated
by place and date
'''
# default aggregation settings for event properties across places for a date.
property_config_per_date = dict(
_DEFAULT_CONFIG['property_config_per_date'])
property_config_per_date.update(
self._config.get('property_config_per_date', {}))

# Collect data for each event's (place, date)
# as a dict: {(place, date): {'area': NN},... }
place_date_pvs = dict()
Expand Down Expand Up @@ -987,7 +1041,7 @@ def write_events_csv(self,
output_columns.append(prop)
# Add column for affected place polygon
polygon_prop = self._config.get('output_affected_place_polygon',
'geoJsonCoordinatesDP1')
'geoJsonCoordinates')
if polygon_prop:
output_columns.append(polygon_prop)
if event_ids is None:
Expand Down Expand Up @@ -1340,19 +1394,6 @@ def write_events_place_svobs(
f'Generating place svobs for {len(event_ids)} events for dates: {date_formats}'
)

# Aggregation settings for event properties across places for a date.
property_config_per_date = {
'aggregate': 'sum',
'area': {
'aggregate': 'sum'
},
'EventId': {
'aggregate': 'set'
}
}
property_config_per_date.update(
self._config.get('property_config_per_date', {}))

# Collect data for each event place and date
# as a dict: {(place, date): {'area': NN},... }
place_date_pvs = self.get_place_date_output_properties(
Expand Down Expand Up @@ -1498,7 +1539,7 @@ def get_place_id_for_event(self, place_id: str) -> str:
# Got a location. Convert it to a grid.
if (output_place_type
== 'grid_1') and (not utils.is_grid_id(place_id)):
grid_id = utils.grid_id_from_lat_lng(1, int(lat), int(lng))
grid_id = utils.grid_id_from_lat_lng(1, lat, lng)
place_id = grid_id
self._counters.add_counter(f'place_converted_to_grid_1', 1)
elif (output_place_type
Expand Down Expand Up @@ -1546,12 +1587,14 @@ def output_events(self,
_set_counter_stage(self._counters, 'emit_events_svobs_')
output_files.extend(
self.write_events_svobs(
output_path=output_path,
output_path=_get_output_subdir_path(output_path,
'event_svobs'),
output_ended_events=output_ended_events))
if self._config.get('output_place_svobs', False):
output_files.extend(
self.write_events_place_svobs(
output_path=output_path,
output_path=_get_output_subdir_path(output_path,
'place_svobs'),
event_props=self._config.get(
'output_place_svobs_properties', ['area', 'count']),
date_formats=self._config.get(
Expand Down Expand Up @@ -1637,6 +1680,31 @@ def _get_contained_for_place(self, place_id: str) -> list:
self.set_place_property(place_id, 'containedInPlace', contained_places)
return contained_places

def _get_smallest_place_name(self, place_ids: list) -> str:
'''Returns the name of the smallest place in the place list.'''
max_place_rank = -1
place_name = ''
# Get the place with the highest rank (smallest place)
for place in _get_list(place_ids):
place = utils.strip_namespace(place)
if place == 'Earth' or utils.is_s2_cell_id(
place) or utils.is_grid_id(place) or utils.is_ipcc_id(
place):
# Ignore non admin places
continue
place_types = self.get_place_property_list(place, 'typeOf')
for place_type in place_types:
place_rank = _PLACE_TYPE_RANK.get(place_type, -1)
if place_rank > max_place_rank:
# This place is smaller. Use its name if available.
new_place_name = self.get_place_property_list(place, 'name')
if new_place_name:
new_place_name = new_place_name[0]
if new_place_name:
max_place_rank = place_rank
place_name = new_place_name
return place_name

def _get_event_name(self, event_pvs: dict, locations: list = None) -> str:
'''Get the name for the event.'''
typeof = event_pvs.get('typeOf',
Expand All @@ -1646,17 +1714,7 @@ def _get_event_name(self, event_pvs: dict, locations: list = None) -> str:
start_location = event_pvs.get('startLocation')
if not locations:
locations = _get_list(event_pvs.get('affectedPlace', ''))
location_name = ''
for placeid in locations:
placeid = utils.strip_namespace(placeid)
if not utils.is_ipcc_id(placeid) and not utils.is_grid_id(placeid):
location_name = self.get_place_property(placeid, 'name')
if location_name:
# place_type = self.get_place_property(placeid, 'typeOf')
# if place_type:
# location_name = f'{place_type[0]} {location_name[0]}'
location_name = f'{location_name[0]}'
break
location_name = self._get_smallest_place_name(locations)
if not location_name:
# Use the lat lng from start place.
lat_lng = event_pvs.get('startLocation')
Expand Down Expand Up @@ -1842,7 +1900,31 @@ def _get_list(items: str) -> list:
return sorted(items_set)


_DEFAULT_CONFIG = {}
def _get_output_subdir_path(path: str, sub_dir: str) -> str:
'''Adds a sub directory for the path prefix.'''
dirname = os.path.dirname(path)
basename = os.path.basename(path)
if dirname:
sub_dir = os.path.join(dirname, sub_dir)
return os.path.join(sub_dir, basename)


_DEFAULT_CONFIG = {
# Aggregation settings for properties across events for a date.
'property_config_per_date': {
'aggregate': 'sum',
'area': {
'aggregate': 'sum',
'unit': 'SquareKilometer',
},
'EventId': {
'aggregate': 'set'
},
'affectedPlace': {
'aggregate': 'list',
},
}
}


def get_default_config() -> dict:
Expand Down
37 changes: 21 additions & 16 deletions scripts/earthengine/process_events_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def compare_csv_files(self,
expected_file: str,
actual_file: str,
ignore_columns: list = []):
'''Compare CSV files with statvar obsevration data.'''
'''Compare CSV files with statvar observation data.'''
# Sort files by columns.
df_expected = pd.read_csv(expected_file)
df_actual = pd.read_csv(actual_file)
Expand Down Expand Up @@ -98,21 +98,26 @@ def test_process(self):
output_path=output_prefix,
config=self._config)
# Verify generated events.
for file in [
'events.csv',
'events.tmcf',
'svobs.csv',
'svobs.tmcf',
'place_svobs.csv',
'place_svobs.tmcf',
]:
if file.endswith('.csv'):
# compare csv output without geoJson that is not deterministic
self.compare_csv_files(test_prefix + file,
output_prefix + file,
['geoJsonCoordinatesDP1'])
else:
self.compare_files(test_prefix + file, output_prefix + file)
self.compare_csv_files(
os.path.join(tmp_dir, 'events_test_events.csv'),
os.path.join(_TESTDIR, test_prefix + 'events.csv'))
self.compare_files(
os.path.join(tmp_dir, 'events_test_events.tmcf'),
os.path.join(_TESTDIR, test_prefix + 'events.tmcf'))
self.compare_csv_files(
os.path.join(tmp_dir, 'event_svobs', 'events_test_svobs.csv'),
os.path.join(_TESTDIR, test_prefix + 'svobs.csv'))
self.compare_files(
os.path.join(tmp_dir, 'event_svobs', 'events_test_svobs.tmcf'),
os.path.join(_TESTDIR, test_prefix + 'svobs.tmcf'))
self.compare_csv_files(
os.path.join(tmp_dir, 'place_svobs',
'events_test_place_svobs.csv'),
os.path.join(_TESTDIR, test_prefix + 'place_svobs.csv'))
self.compare_files(
os.path.join(tmp_dir, 'place_svobs',
'events_test_place_svobs.tmcf'),
os.path.join(_TESTDIR, test_prefix + 'place_svobs.tmcf'))

def test_process_event_data(self):
'''Verify events can be added by date.'''
Expand Down
Loading

0 comments on commit 4394fdf

Please sign in to comment.