Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autorefresh import script for Flood Events #855

Merged
merged 12 commits into from
Feb 21, 2024
5 changes: 4 additions & 1 deletion scripts/earthengine/events_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
'gcs_bucket': 'my-bucket',
'gcs_folder': 'my-import-name',
},
# Stage sepcific settings
# Stage specific settings
'stages': [
{
'stage': 'download',
Expand Down Expand Up @@ -195,7 +195,10 @@ def run_stage(self, stage_name: str, input_files: list = []) -> list:
'''Run a single stage and return the output files generated.'''
for stage_runner in self.stage_runners:
if stage_name == stage_runner.get_name():
logging.info(f'Running stage {stage_name} with {input_files}')
return stage_runner.run_stage(input_files)
logging.error(
f'No stage runner for {stage_name} with input: {input_files}')
return []

def run(self, run_stages: list = []) -> list:
Expand Down
42 changes: 26 additions & 16 deletions scripts/earthengine/pipeline_stage_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class to run the events pipeline stage to download files from a URL.
"""
"""Class to run the events pipeline stage to download files from a URL."""

import os
import re
import sys
import time

from absl import logging

Expand All @@ -35,26 +36,34 @@


class DownloadRunner(StageRunner):
'''Class to download data files from URL source.'''
"""Class to download data files from URL source."""

def __init__(self,
config_dicts: list = [],
state: dict = {},
counters=None):
self.set_up('download', config_dicts, state, counters)

def run(self,
input_files: list = None,
config_dict: dict = {},
counters: Counters = None) -> list:
'''Returns the list of files downloaded from the URL in the config.
URLs are downloaded for each time period until the current date.'''
def run(
self,
input_files: list = None,
config_dict: dict = {},
counters: Counters = None,
) -> list:
"""Returns the list of files downloaded from the URL in the config.

URLs are downloaded for each time period until the current date.
"""
# Download data from start_date up to end_date
# advancing date by the time_period.
start_date = self.get_config('start_date', '', config_dict)
end_date = self.get_config('end_date', '', config_dict)
if not end_date:
end_date = utils.date_yesterday()
yesterday = utils.date_yesterday()
end_date = self.get_config('end_date', yesterday, config_dict)
if end_date > yesterday:
end_date = yesterday
logging.info(
f'Running download with start_date: {start_date}, end_date:{end_date}'
)
data_files = []
while start_date and start_date <= end_date:
# Download data for the start_date
Expand All @@ -70,7 +79,7 @@ def run(self,
return data_files

def download_file_with_config(self, config_dict: dict = {}) -> list:
'''Returns list of files downloaded for config.'''
"""Returns list of files downloaded for config."""
logging.info(f'Downloading data for config: {config_dict}')
downloaded_files = []
urls = config_dict.get('url', [])
Expand Down Expand Up @@ -98,7 +107,8 @@ def download_file_with_config(self, config_dict: dict = {}) -> list:
output=config_dict.get('response_type', 'text'),
timeout=config_dict.get('timeout', 60),
retries=config_dict.get('retry_count', 3),
retry_secs=retry_secs)
retry_secs=retry_secs,
)
if download_content:
# Check if the downloaded content matches the regex.
regex = config_dict.get('successful_response_regex', '')
Expand All @@ -124,8 +134,8 @@ def download_file_with_config(self, config_dict: dict = {}) -> list:
with file_util.FileIO(filename, mode='w') as file:
file.write(download_content)
logging.info(
f'Downloaded {len(download_content)} bytes from {url} into file: {filename}'
)
f'Downloaded {len(download_content)} bytes from {url} into file:'
f' {filename}')
downloaded_files.append(filename)

return downloaded_files
Expand Down
148 changes: 115 additions & 33 deletions scripts/earthengine/process_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

_FLAGS = flags.FLAGS

_SCRIPTS_DIR = os.path.dirname(os.path.dirname(__file__))
_SCRIPTS_DIR = os.path.dirname(__file__)
sys.path.append(_SCRIPTS_DIR)
sys.path.append(os.path.dirname(_SCRIPTS_DIR))
sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR)))
Expand All @@ -76,6 +76,54 @@
from config_map import ConfigMap
from dc_api_wrapper import dc_api_batched_wrapper

# List of place types in increasing order of preference for name.
# This is used to pick the name of the place from the list of affectedPlaces
# for an event for the place type with the highest index.
_PLACE_TYPE_ORDER = [
'Place',
'OceanicBasin',
'Continent',
'Country',
'CensusRegion',
'CensusDivision',
'State',
'AdministrativeArea1',
'County',
'AdministrativeArea2',
'CensusCountyDivision',
'EurostatNUTS1',
'EurostatNUTS2',
'CongressionalDistrict',
'UDISEDistrict',
'CensusCoreBasedStatisticalArea',
'EurostatNUTS3',
'SuperfundSite',
'Glacier',
'AdministrativeArea3',
'AdministrativeArea4',
'PublicUtility',
'CollegeOrUniversity',
'EpaParentCompany',
'UDISEBlock',
'AdministrativeArea5',
'EpaReportingFacility',
'SchoolDistrict',
'CensusZipCodeTabulationArea',
'PrivateSchool',
'CensusTract',
'City',
'AirQualitySite',
'PublicSchool',
'Neighborhood',
'CensusBlockGroup',
'AdministrativeArea',
'Village',
]

_PLACE_TYPE_RANK = {
_PLACE_TYPE_ORDER[index]: index for index in range(len(_PLACE_TYPE_ORDER))
}


class GeoEvent:
"""Class for a Geo Event."""
Expand Down Expand Up @@ -791,7 +839,7 @@ def get_event_output_properties(self, event_id: str) -> dict:
return None
# Generate polygon for the event
polygon_prop = self._config.get('output_affected_place_polygon',
'geoJsonCoordinatesDP1')
'geoJsonCoordinates')
if polygon_prop:
event_polygon = self.get_event_polygon(event_id)
if event_polygon:
Expand All @@ -804,10 +852,10 @@ def get_event_output_properties(self, event_id: str) -> dict:
event_pvs[polygon_prop] = json.dumps(
json.dumps(geo_json))
self._counters.add_counter('output_events_with_polygon', 1)
if not event_pvs.get('name', None):
event_pvs['name'] = self._get_event_name(event_pvs)
# Set the affectedPlace and containedInPlace for the event.
self._set_event_places(event, event_pvs)
if not event_pvs.get('name', None):
event_pvs['name'] = self._get_event_name(event_pvs)
return event_pvs

def delete_event(self, event_id):
Expand Down Expand Up @@ -835,6 +883,12 @@ def get_place_date_output_properties(self, event_ids: list,
event property values, such as { 'area': 100 } aggregated
by place and date
'''
# default aggregation settings for event properties across places for a date.
property_config_per_date = dict(
_DEFAULT_CONFIG['property_config_per_date'])
property_config_per_date.update(
self._config.get('property_config_per_date', {}))

# Collect data for each event's (place, date)
# as a dict: {(place, date): {'area': NN},... }
place_date_pvs = dict()
Expand Down Expand Up @@ -987,7 +1041,7 @@ def write_events_csv(self,
output_columns.append(prop)
# Add column for affected place polygon
polygon_prop = self._config.get('output_affected_place_polygon',
'geoJsonCoordinatesDP1')
'geoJsonCoordinates')
if polygon_prop:
output_columns.append(polygon_prop)
if event_ids is None:
Expand Down Expand Up @@ -1340,19 +1394,6 @@ def write_events_place_svobs(
f'Generating place svobs for {len(event_ids)} events for dates: {date_formats}'
)

# Aggregation settings for event properties across places for a date.
property_config_per_date = {
'aggregate': 'sum',
'area': {
'aggregate': 'sum'
},
'EventId': {
'aggregate': 'set'
}
}
property_config_per_date.update(
self._config.get('property_config_per_date', {}))

# Collect data for each event place and date
# as a dict: {(place, date): {'area': NN},... }
place_date_pvs = self.get_place_date_output_properties(
Expand Down Expand Up @@ -1498,7 +1539,7 @@ def get_place_id_for_event(self, place_id: str) -> str:
# Got a location. Convert it to a grid.
if (output_place_type
== 'grid_1') and (not utils.is_grid_id(place_id)):
grid_id = utils.grid_id_from_lat_lng(1, int(lat), int(lng))
grid_id = utils.grid_id_from_lat_lng(1, lat, lng)
place_id = grid_id
self._counters.add_counter(f'place_converted_to_grid_1', 1)
elif (output_place_type
Expand Down Expand Up @@ -1546,12 +1587,14 @@ def output_events(self,
_set_counter_stage(self._counters, 'emit_events_svobs_')
output_files.extend(
self.write_events_svobs(
output_path=output_path,
output_path=_get_output_subdir_path(output_path,
'event_svobs'),
output_ended_events=output_ended_events))
if self._config.get('output_place_svobs', False):
output_files.extend(
self.write_events_place_svobs(
output_path=output_path,
output_path=_get_output_subdir_path(output_path,
'place_svobs'),
event_props=self._config.get(
'output_place_svobs_properties', ['area', 'count']),
date_formats=self._config.get(
Expand Down Expand Up @@ -1637,6 +1680,31 @@ def _get_contained_for_place(self, place_id: str) -> list:
self.set_place_property(place_id, 'containedInPlace', contained_places)
return contained_places

def _get_smallest_place_name(self, place_ids: list) -> str:
'''Returns the name of the smallest place in the place list.'''
max_place_rank = -1
place_name = ''
# Get the place with the highest rank (smallest place)
for place in _get_list(place_ids):
place = utils.strip_namespace(place)
if place == 'Earth' or utils.is_s2_cell_id(
place) or utils.is_grid_id(place) or utils.is_ipcc_id(
place):
# Ignore non admin places
continue
place_types = self.get_place_property_list(place, 'typeOf')
for place_type in place_types:
place_rank = _PLACE_TYPE_RANK.get(place_type, -1)
if place_rank > max_place_rank:
# This place is smaller. Use its name if available.
new_place_name = self.get_place_property_list(place, 'name')
if new_place_name:
new_place_name = new_place_name[0]
if new_place_name:
max_place_rank = place_rank
place_name = new_place_name
return place_name

def _get_event_name(self, event_pvs: dict, locations: list = None) -> str:
'''Get the name for the event.'''
typeof = event_pvs.get('typeOf',
Expand All @@ -1646,17 +1714,7 @@ def _get_event_name(self, event_pvs: dict, locations: list = None) -> str:
start_location = event_pvs.get('startLocation')
if not locations:
locations = _get_list(event_pvs.get('affectedPlace', ''))
location_name = ''
for placeid in locations:
placeid = utils.strip_namespace(placeid)
if not utils.is_ipcc_id(placeid) and not utils.is_grid_id(placeid):
location_name = self.get_place_property(placeid, 'name')
if location_name:
# place_type = self.get_place_property(placeid, 'typeOf')
# if place_type:
# location_name = f'{place_type[0]} {location_name[0]}'
location_name = f'{location_name[0]}'
break
location_name = self._get_smallest_place_name(locations)
if not location_name:
# Use the lat lng from start place.
lat_lng = event_pvs.get('startLocation')
Expand Down Expand Up @@ -1842,7 +1900,31 @@ def _get_list(items: str) -> list:
return sorted(items_set)


_DEFAULT_CONFIG = {}
def _get_output_subdir_path(path: str, sub_dir: str) -> str:
'''Adds a sub directory for the path prefix.'''
dirname = os.path.dirname(path)
basename = os.path.basename(path)
if dirname:
sub_dir = os.path.join(dirname, sub_dir)
return os.path.join(sub_dir, basename)


_DEFAULT_CONFIG = {
# Aggregation settings for properties across events for a date.
'property_config_per_date': {
'aggregate': 'sum',
'area': {
'aggregate': 'sum',
'unit': 'SquareKilometer',
},
'EventId': {
'aggregate': 'set'
},
'affectedPlace': {
'aggregate': 'list',
},
}
}


def get_default_config() -> dict:
Expand Down
37 changes: 21 additions & 16 deletions scripts/earthengine/process_events_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def compare_csv_files(self,
expected_file: str,
actual_file: str,
ignore_columns: list = []):
'''Compare CSV files with statvar obsevration data.'''
'''Compare CSV files with statvar observation data.'''
# Sort files by columns.
df_expected = pd.read_csv(expected_file)
df_actual = pd.read_csv(actual_file)
Expand Down Expand Up @@ -98,21 +98,26 @@ def test_process(self):
output_path=output_prefix,
config=self._config)
# Verify generated events.
for file in [
'events.csv',
'events.tmcf',
'svobs.csv',
'svobs.tmcf',
'place_svobs.csv',
'place_svobs.tmcf',
]:
if file.endswith('.csv'):
# compare csv output without geoJson that is not deterministic
self.compare_csv_files(test_prefix + file,
output_prefix + file,
['geoJsonCoordinatesDP1'])
else:
self.compare_files(test_prefix + file, output_prefix + file)
self.compare_csv_files(
os.path.join(tmp_dir, 'events_test_events.csv'),
os.path.join(_TESTDIR, test_prefix + 'events.csv'))
self.compare_files(
os.path.join(tmp_dir, 'events_test_events.tmcf'),
os.path.join(_TESTDIR, test_prefix + 'events.tmcf'))
self.compare_csv_files(
os.path.join(tmp_dir, 'event_svobs', 'events_test_svobs.csv'),
os.path.join(_TESTDIR, test_prefix + 'svobs.csv'))
self.compare_files(
os.path.join(tmp_dir, 'event_svobs', 'events_test_svobs.tmcf'),
os.path.join(_TESTDIR, test_prefix + 'svobs.tmcf'))
self.compare_csv_files(
os.path.join(tmp_dir, 'place_svobs',
'events_test_place_svobs.csv'),
os.path.join(_TESTDIR, test_prefix + 'place_svobs.csv'))
self.compare_files(
os.path.join(tmp_dir, 'place_svobs',
'events_test_place_svobs.tmcf'),
os.path.join(_TESTDIR, test_prefix + 'place_svobs.tmcf'))

def test_process_event_data(self):
'''Verify events can be added by date.'''
Expand Down
Loading
Loading