Skip to content

Commit

Permalink
Scripts for events pipeline (datacommonsorg#847)
Browse files Browse the repository at this point in the history
* events pipeline scripts and utilities

* merge file_util

* remove unused files

* fix lint

* add pipeline to readme

* remove unused file

* add events pipeline unit test

* fix lint

* add new requirements

* fix python for older syntax

* review comment changes

* review comments

* fix yera in hdeader comment.

* skip ee image generation for existing files

* fix start date across runs

* fix containedInplace chain for affected place

* fix lint

* add observationDates and list of affectedplaces

* fix lint

* fix typo

* fix typo

* Revert "fix typo"

This reverts commit fc99c95.

* fix comment typo

* fix lint as per python_format_check https://github.com/datacommonsorg/data/pull/847/checks?check_run_id=12965968831
  • Loading branch information
ajaits authored Apr 24, 2023
1 parent 3155cfc commit 061b4a5
Show file tree
Hide file tree
Showing 46 changed files with 8,882 additions and 1,414 deletions.
10 changes: 7 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
# Requirements for all Python code in this repo, except for import-automation

absl-py==0.9.0
absl-py
arcgis2geojson
chembl-webresource-client>=0.10.2
dataclasses==0.6
datacommons==1.4.3
deepdiff==6.3.0
earthengine-api
flask_restful==0.3.9
frozendict==1.2
func-timeout==4.3.5
geojson==2.5.0
geopandas==0.8.1
geopy
google-cloud-bigquery
google-cloud-storage>=2.7.0
google-cloud-logging==3.4.0
google-cloud-scheduler==2.10.0
gspread
lxml==4.9.1
matplotlib==3.3.0
netCDF4==1.5.7
numpy==1.18.5
netCDF4
numpy
openpyxl==3.0.7
pandas==1.3.5
pylint
pytest
rasterio
rdp==0.8
requests==2.27.1
retry==0.9.2
Expand Down
4 changes: 2 additions & 2 deletions run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function run_py_lint_test {
setup_python
echo "#### Testing Python lint"
if ! yapf -r --diff -p --style=google $PYTHON_FOLDERS; then
echo "ERROR: Fix lint errors by running ./run_test.sh -f" >&2
echo "ERROR: Fix lint errors by running ./run_tests.sh -f" >&2
exit 1
fi
}
Expand All @@ -66,7 +66,7 @@ function help {
echo "Usage: $0 -rplaf"
echo "-r Install Python requirements"
echo "-l Test lint on Python code"
echo "-p Run Python tests in specified folder, e.g. ./run_test.sh -p util"
echo "-p Run Python tests in specified folder, e.g. ./run_tests.sh -p util"
echo "-a Run all tests"
echo "-f Fix lint"
exit 1
Expand Down
13 changes: 13 additions & 0 deletions scripts/earthengine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ python3 process_events.py \
# - flood_svobs.{csv}: Observations for each event
```

## Events pipeline
The script `events_pipeline.py` can run all the above steps, including
downloading data from URLs or extracting geoTiff from EarthEngine and
generate events from the data.

It uses a config with settings for each stage and can resume processing
from a previous instance, incrementally processing new data.
```
# Process data from source to generate events
python3 events_pipeline.py \
--config=events_pipeline_config.py
```


## Testing
To test the scripts in this folder, run the commands:
Expand Down
20 changes: 20 additions & 0 deletions scripts/earthengine/common_flags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common flags used across scripts."""

from absl import flags

flags.DEFINE_string('input_csv', '', 'CSV file to process')
flags.DEFINE_string('config', '', 'Config dictionary with parameter settings.')
flags.DEFINE_bool('debug', False, 'Enable debug messages.')
214 changes: 133 additions & 81 deletions scripts/earthengine/earthengine_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,19 @@
from datetime import timedelta
from dateutil.relativedelta import relativedelta

flags.DEFINE_string('config', '',
'File with configuration parameters as a dictionary.')
_SCRIPTS_DIR = os.path.dirname(__file__)
sys.path.append(_SCRIPTS_DIR)
sys.path.append(os.path.dirname(_SCRIPTS_DIR))
sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR)))
sys.path.append(
os.path.join(os.path.dirname(os.path.dirname(_SCRIPTS_DIR)), 'util'))

import common_flags
import file_util
import utils

from counters import Counters

flags.DEFINE_string('ee_dataset', '',
'Load earth engine data set define in config datasets.')
flags.DEFINE_string('gcs_output', '', 'Prefix for output file names on GCS.')
Expand Down Expand Up @@ -108,7 +119,6 @@
'Number of images to generate, each advanced by --time_period.')
flags.DEFINE_bool('ee_export_image', True,
'If true, submit a task to export image.')
flags.DEFINE_bool('debug', False, 'Enable debug messages.')

_FLAGS = flags.FLAGS
_FLAGS(sys.argv) # Allow invocation without app.run()
Expand Down Expand Up @@ -199,6 +209,9 @@
'debug': _FLAGS.debug,
}

# Interval in secs to cehck for EE task status
_EE_TASK_WAIT_INTERVAL = 10


def _update_dict(src_dict: dict, dst_dict: dict) -> dict:
'''Merge the src and dst dict, merging dictionary values instead of overwriting.'''
Expand Down Expand Up @@ -238,37 +251,6 @@ def _load_config(config: str, default_config: dict = EE_DEFAULT_CONFIG) -> dict:
return config_dict


def _parse_time_period(time_period: str) -> (int, str):
'''Parse time period into a tuple of (number, unit), for eg: P1M: (1, month).'''
re_pat = r'P?(?P<delta>[+-]?[0-9]+)(?P<unit>[A-Z])'
m = re.search(re_pat, time_period.upper())
if m:
m_dict = m.groupdict()
delta = int(m_dict.get('delta', '0'))
unit = m_dict.get('unit', 'M')
period_dict = {'D': 'days', 'M': 'months', 'Y': 'years'}
period = period_dict.get(unit, 'day')
return (delta, period)
return (0, 'days')


def _advance_date(date_str: str,
time_period: str,
date_format: str = '%Y-%m-%d') -> str:
'''Returns the date string advanced by the time period.'''
next_date = ''
if not date_str:
return next_date
dt = datetime.strptime(date_str, date_format)
(delta, unit) = _parse_time_period(time_period)
if not delta or not unit:
logging.error(
f'Unable to parse time period: {time_period} for date: {date_str}')
return next_date
next_dt = dt + relativedelta(**{unit: delta})
return next_dt.strftime(date_format)


def _get_bbox_coordinates(bounds: str) -> ee.Geometry.BBox:
'''Returns a bounding box coordinates dictionary for the bounds.
bounds is a comma separated list of points of the form [lat,lng...].'''
Expand Down Expand Up @@ -331,7 +313,7 @@ def ee_filter_date(col: ee.ImageCollection,
end_dt = ee.Date(end_date)
elif time_period:
# Extract the delta and units from the period such as 'P1M'.
end_date = _advance_date(start_date, time_period)
end_date = utils.date_advance_by_period(start_date, time_period)
end_dt = ee.Date(end_date)
if end_dt is None:
end_dt = ee.Date(date.today().strftime('%Y-%m-%d'))
Expand Down Expand Up @@ -579,45 +561,29 @@ def export_ee_image_to_gcs(ee_image: ee.Image, config: dict = {}) -> str:
if config.get('ee_bounds'):
bbox_coords = _get_bbox_coordinates(config['ee_bounds'])
region_bbox = ee.Geometry.BBox(**bbox_coords)
file_prefix = config.get('gcs_output')
if not file_prefix:
# Create name from config parameters.
img_config = [
('ee_image',
config.get(
'ee_dataset',
config.get('ee_image',
config.get('ee_image_collection', 'ee_image'))))
]
img_config.append(('band', config.get('band', '')))
reducers = config.get('ee_reducer')
if reducers:
img_config.append(('r', str(reducers)))
img_config.append(('mask', config.get('ee_mask', '')))
img_config.append(('s', str(config.get('scale', ''))))
img_config.append(('from', config.get('start_date', '')))
img_config.append(('to', config.get('end_date', '')))
if bbox_coords:
img_config.append(
('bbox', '_'.join([f'{p:.2f}' for p in bbox_coords.values()])))
file_prefix = '-'.join(
'_'.join((p, v)) for p, v in img_config if v and isinstance(v, str))
file_prefix = re.sub(r'[^A-Za-z0-9_-]', '_', file_prefix)
scale = config.get('scale', 1000)
# Create output filename prefix from config parameters.
# Large images may be sharded across multiple tif files.
gcs_bucket = config.get('gcs_bucket', '')
gcs_folder = config.get('gcs_folder', '')
if gcs_folder and gcs_folder[-1] != '/':
gcs_folder = gcs_folder + '/'
file_prefix = get_gcs_file_prefix_from_config(config, bbox_coords)
if config.get('skip_existing_output', True):
gcs_path = f'gs://{gcs_bucket}/{file_prefix}*.tif'
existing_images = file_util.file_get_matching(gcs_path)
if existing_images:
logging.info(
f'Skipping ee image generation for existing files: {existing_images}'
)
return None
scale = config.get('scale', 1000)
logging.info(
f'Exporting image: {ee_image.id()} to GCS bucket:{gcs_bucket}, {gcs_folder}{file_prefix}'
f'Exporting image: {ee_image.id()} to GCS bucket:{gcs_bucket}, {file_prefix}*.tif'
)
task = ee.batch.Export.image.toCloudStorage(
description=file_prefix[:90],
description=file_prefix.split('/')[-1][:90],
image=ee_image,
region=region_bbox,
scale=scale,
bucket=gcs_bucket,
fileNamePrefix=f'{gcs_folder}{file_prefix}',
fileNamePrefix=f'{file_prefix}',
maxPixels=10000000000000,
fileFormat='GeoTIFF')
task.start()
Expand All @@ -634,11 +600,19 @@ def ee_process(config) -> list:
For supported params, refer to _DEFAULT_CONFIG.
if ee_image_count > 1, then multiple images are exported with
the start_time, end_time advanced by time_period.
Returns:
List of competed task status with gfs_file set to the image generated
if config['ee_wait_task'] is True, else a list of tasks launched.
'''
ee_tasks = []
ee.Initialize()
config['ee_image_count'] = config.get('ee_image_count', 1)
while config['ee_image_count'] > 0:
time_period = config.get('time_period', 'P1M')
cur_date = utils.date_format_by_time_period(utils.date_today(), time_period)
counters = Counters()
# Get images by count or until yesterday
while (config['ee_image_count'] and
(config.get('start_date', '') < cur_date)):
logging.info(f'Getting image for config: {config}')
img = ee_generate_image(config)
if isinstance(img, ee.ImageCollection):
Expand All @@ -648,30 +622,108 @@ def ee_process(config) -> list:
logging.info(f'Generated image {img.get("id")}')
if config.get('ee_export_image', True):
task = export_ee_image_to_gcs(img, config)
ee_tasks.append(task)
if task is not None:
ee_tasks.append(task)
counters.add_counter('total', 1)
else:
logging.error(f'Failed to generate image for config: {config}')
# Advance time to next period.
time_period = config.get('time_period', 'P1M')
for ts in ['start_date', 'end_date']:
dt = _advance_date(config.get(ts, ''), time_period)
dt = utils.date_advance_by_period(config.get(ts, ''), time_period)
if dt:
config[ts] = dt
config['ee_image_count'] = config['ee_image_count'] - 1
logging.info(f'Advanced dates by {time_period}: {config}')

logging.info(f'Created ee tasks: {ee_tasks}')
completed_tasks = []
if not config.get('ee_wait_task', True):
return ee_tasks

# Wait for tasks to complete
if config.get('ee_wait_task', True):
pending_tasks = set(ee_tasks)
while len(pending_tasks) > 0:
task = pending_tasks.pop()
if task.active():
pending_tasks.add(task)
logging.info(f'Waiting for task: {task}')
time.sleep(10)
else:
task_status = ee.data.getTaskStatus(task.id)
logging.info(f'EE task completed: {task_status}')
pending_tasks = set(ee_tasks)
while len(pending_tasks) > 0:
task = pending_tasks.pop()
if task.active():
pending_tasks.add(task)
logging.info(f'Waiting for task: {task}')
time.sleep(_EE_TASK_WAIT_INTERVAL)
else:
task_status = ee.data.getTaskStatus(task.id)
logging.info(f'EE task completed: {task_status}')
for status in task_status:
# Get the destination file for each completed task.
image_file = status.get('description', '')
gcs_path = status.get('destination_uris', [])[0]
if gcs_path and image_file:
gcs_path = re.sub('https.*storage/browser/', 'gs://',
gcs_path)
status['output_file'] = f'{gcs_path}{image_file}.tif'
completed_tasks.append(status)
counters.add_counter('processed', 1)
return completed_tasks


def get_gcs_file_prefix_from_config(config: dict,
bbox_coords: ee.Geometry.BBox) -> str:
'''Returns the file name prefix from the config settings.
The filename is of the form:
{gcs_folder}/ee_image-<dataset>-band_<name>-r_<reducer>-mask_<dataset>
-s_<scale>-from_<YYYY-MM-DD>-to_YYYY-MM-DD
The GCS bucket is added by the caller to get the full path.
'''
# Return the file prefix set in the config.
file_prefix = config.get('gcs_output')
if file_prefix:
return file_prefix
# Collect all config tuples (parameter, value) to be addd to the filename
img_config = [('ee_image',
config.get(
'ee_dataset',
config.get('ee_image',
config.get('ee_image_collection',
'ee_image'))))]
img_config.append(('band', config.get('band', '')))
reducers = config.get('ee_reducer')
if reducers:
img_config.append(('r', str(reducers)))
img_config.append(('mask', config.get('ee_mask', '')))
img_config.append(('s', str(config.get('scale', ''))))
img_config.append(('from', config.get('start_date', '')))
img_config.append(('to', config.get('end_date', '')))
if bbox_coords is not None:
# Add bounding box if specified
img_config.append(
('bbox', '_'.join([f'{p:.2f}' for p in bbox_coords.values()])))

# Merge all parameters with non-empty values.
file_prefix = '-'.join(
'_'.join((p, v)) for p, v in img_config if v and isinstance(v, str))

# Remove any special characters
file_prefix = re.sub(r'[^A-Za-z0-9_-]', '_', file_prefix)
gcs_folder = config.get('gcs_folder', '')
if gcs_folder and gcs_folder[-1] != '/':
gcs_folder = gcs_folder + '/'
return f'{gcs_folder}{file_prefix}'


def get_date_from_filename(filename: str, time_period: str = '') -> str:
'''Returns the date from the filename.
GeoTifs generated by EE may not have the date as a band.
Args:
filename: string of the form '*_from_<YYYY-MM-DD>_*.tif',
the filename for generated geoTifs from get_file_prefix_from_config().
time_period: string of the form P<N><L>, for eg: P1D.
'''
# Get the date from the filename
matches = re.search(r'from_(?P<date>[0-9]{4}[0-9-]{2}[0-9-]{0,2})',
filename)
if matches:
# format the date string into YYYY-MM or YYYY by the time period.
return utils.date_format_by_time_period(
matches.groupdict().get('date', ''), time_period)
return ''


def main(_):
Expand Down
Loading

0 comments on commit 061b4a5

Please sign in to comment.