From 0c75a8bb618b48479448a61206f7f7148fb90f4b Mon Sep 17 00:00:00 2001 From: Louis-Philippe Rousseau Lambert <11634764+RousseauLambertLP@users.noreply.github.com> Date: Thu, 25 Jul 2024 15:05:34 -0400 Subject: [PATCH] add support for coastal flood risk index in geomet-ogc-api (#346) updated cron flake8 Co-authored-by: Louis-Philippe Rousseau Lambert --- debian/conffiles | 1 + debian/msc-pygeoapi.cron.d | 3 + deploy/default/msc-pygeoapi-config.yml | 35 ++ .../sarracenia/coastal-flood-risk-index.conf | 20 + msc_pygeoapi/loader/__init__.py | 4 +- .../loader/coastal_flood_risk_index.py | 357 ++++++++++++++++++ 6 files changed, 419 insertions(+), 1 deletion(-) create mode 100644 deploy/default/sarracenia/coastal-flood-risk-index.conf create mode 100644 msc_pygeoapi/loader/coastal_flood_risk_index.py diff --git a/debian/conffiles b/debian/conffiles index a3376b4d..ea6d4ef3 100644 --- a/debian/conffiles +++ b/debian/conffiles @@ -9,3 +9,4 @@ /opt/msc-pygeoapi/etc/sarracenia/metnotes.conf /opt/msc-pygeoapi/etc/sarracenia/umos-realtime.conf /opt/msc-pygeoapi/etc/sarracenia/thunderstorm-outlook.conf +/opt/msc-pygeoapi/etc/sarracenia/coastal-flood-risk-index.conf diff --git a/debian/msc-pygeoapi.cron.d b/debian/msc-pygeoapi.cron.d index 04b8a238..60ccb6f5 100644 --- a/debian/msc-pygeoapi.cron.d +++ b/debian/msc-pygeoapi.cron.d @@ -54,3 +54,6 @@ MAILTO="" # every hour at 00h, clean expired thunderstorm outlooks 0 * * * * geoadm . /local/home/geoadm/.profile && msc-pygeoapi data thunderstorm-outlook clean-outlooks --yes + +# every hour at 00h, clean expired tcoastal flood risk index +0 * * * * geoadm . /local/home/geoadm/.profile && msc-pygeoapi data coastal-flood-risk-index clean-index --yes \ No newline at end of file diff --git a/deploy/default/msc-pygeoapi-config.yml b/deploy/default/msc-pygeoapi-config.yml index cc935733..7e3983ad 100644 --- a/deploy/default/msc-pygeoapi-config.yml +++ b/deploy/default/msc-pygeoapi-config.yml @@ -4111,6 +4111,41 @@ resources: data: ${MSC_PYGEOAPI_ES_URL}/thunderstorm_outlook id_field: id + coastal_flood_risk_index: + type: collection + title: + en: Coastal Flooding Risk Index [experimental] + fr: Indice de risque de submersion côtière [expérimental] + description: + en: The Coastal Flooding Risk Index is a geo- and time-referenced polygon product issued by the Meteorological Service of Canada (MSC) to articulate the coastal flooding risk, impact and probability. Products are issued daily by Storm Prediction Centres and are intended to provide early notification, out to 5 days, of coastal flooding due to astronomical tide, storm surge and wave impacts. + fr: L'indice de risque de submersion côtière est un produit polygonal géoréférencé et temporel émis par le Service météorologique du Canada (SMC) afin d'articuler le risque, l'impact et la probabilité de submersion côtière. Les produits sont générés quotidiennement par les centres de prévision des intempéries et sont destinés à fournir une notification précoce, jusqu'à 5 jours, des submersions côtières dues aux marées astronomiques, aux ondes de tempête et aux impacts des vagues. + keywords: + en: [Risk, Index, Probability, Storm surge, Wave impact] + fr: [Risque, Indice, Probabilité, Onde de tempête, Impact des vagues] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Meteorological Service of Canada open data + fr: Données ouvertes du Service météorologique du Canada + href: + en: https://eccc-msc.github.io/open-data/msc-data/readme_en + fr: https://eccc-msc.github.io/open-data/msc-data/readme_fr + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-72.0416665, 41.9583335, -44.2917685, 60.0415978] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: Elasticsearch + data: ${MSC_PYGEOAPI_ES_URL}/coastal_flood_risk_index + id_field: id + wis2-discovery-metadata: type: collection title: WMO WIS2 discovery metadata (experimental) diff --git a/deploy/default/sarracenia/coastal-flood-risk-index.conf b/deploy/default/sarracenia/coastal-flood-risk-index.conf new file mode 100644 index 00000000..9a922301 --- /dev/null +++ b/deploy/default/sarracenia/coastal-flood-risk-index.conf @@ -0,0 +1,20 @@ +# TODO: Update for HPFX once the data is on HPFX +broker amqps://anonymous:anonymous@dd.alpha.weather.gc.ca +queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME} + +subtopic coastal-flooding.risk-index.# + +mirror True + +discard on + +plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY} + +directory ${MSC_PYGEOAPI_CACHEDIR} + +loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} + +report_back False + +instances 2 +accept .* diff --git a/msc_pygeoapi/loader/__init__.py b/msc_pygeoapi/loader/__init__.py index 9ecbd70a..c0e8f868 100644 --- a/msc_pygeoapi/loader/__init__.py +++ b/msc_pygeoapi/loader/__init__.py @@ -71,7 +71,9 @@ def metadata(): ('msc_pygeoapi.loader.radar_coverage_realtime', 'radar_coverage_realtime'), ('msc_pygeoapi.loader.nwp_dataset_footprints', 'nwp_dataset_footprints'), ('msc_pygeoapi.loader.umos_realtime', 'umos_realtime'), - ('msc_pygeoapi.loader.thunderstorm_outlook', 'thunderstorm_outlook') + ('msc_pygeoapi.loader.thunderstorm_outlook', 'thunderstorm_outlook'), + ('msc_pygeoapi.loader.coastal_flood_risk_index', + 'coastal_flood_risk_index') ) for module, name in commands: diff --git a/msc_pygeoapi/loader/coastal_flood_risk_index.py b/msc_pygeoapi/loader/coastal_flood_risk_index.py new file mode 100644 index 00000000..dda9ad52 --- /dev/null +++ b/msc_pygeoapi/loader/coastal_flood_risk_index.py @@ -0,0 +1,357 @@ +# ================================================================= +# +# Author: Louis-Philippe Rousseau-Lambert +# +# +# Copyright (c) 2024 Louis-Philippe Rousseau-Lambert +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the 'Software'), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +from datetime import datetime, timedelta +import json +import logging +import os +from pathlib import Path +import re + +import click + +from msc_pygeoapi import cli_options +from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector +from msc_pygeoapi.loader.base import BaseLoader +from msc_pygeoapi.util import configure_es_connection + +LOGGER = logging.getLogger(__name__) + +# index settings +INDEX_NAME = 'coastal_flood_risk_index' + +DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + +MAPPINGS = { + 'properties': { + 'geometry': {'type': 'geo_shape'}, + 'properties': { + 'properties': { + 'publication_datetime': { + 'type': 'date', + 'format': 'strict_date_time_no_millis' + }, + 'expiration_datetime': { + 'type': 'date', + 'format': 'strict_date_time_no_millis' + }, + 'validity_datetime': { + 'type': 'date', + 'format': 'strict_date_time_no_millis' + } + } + } + } +} + +SETTINGS = { + 'settings': {'number_of_shards': 1, 'number_of_replicas': 0} +} + + +class CoastalFloodRiskIndexLoader(BaseLoader): + """Coastal flodd risk index loader""" + + def __init__(self, conn_config={}): + """initializer""" + + BaseLoader.__init__(self) + + self.filepath = None + self.datetime = None + self.conn = ElasticsearchConnector(conn_config) + + SETTINGS['mappings'] = MAPPINGS + if not self.conn.exists(INDEX_NAME): + LOGGER.debug(f'Creating index {INDEX_NAME}') + self.conn.create(INDEX_NAME, SETTINGS) + + def generate_geojson_features(self): + """ + Generates and yields a series of flod risk index. + They are returned as Elasticsearch bulk API upsert actions, + with documents in GeoJSON to match the Elasticsearch index mappings. + + :returns: Generator of Elasticsearch actions to upsert the + flod risk index + """ + + with open(self.filepath.resolve()) as f: + data = json.load(f)['features'] + + features = [] + filename = f.name.split('/')[-1] + file_id = re.sub(r'_v\d+\.json', '', filename) + + if len(data) > 0: + for feature in data: + + # flatten metobject properties + metobj = feature['properties']['metobject'] + for item, values in metobj.items(): + try: + metobj_flat_item = self.flatten_json(item, + values, + 'metobject') + feature['properties'].update(metobj_flat_item) + feature['properties']['file_id'] = file_id + except Exception as err: + msg = f'Error while flattening flood index JSON {err}' + LOGGER.error(f'{msg}') + pass + + del feature['properties']['metobject'] + + try: + f_exp_time = feature['properties']['expiration_datetime'] + exp_time = datetime.strptime(f_exp_time, + DATETIME_FORMAT) + except KeyError: + validity = feature['properties']['validity_datetime'] + validity_datetime = datetime.strptime(validity, + DATETIME_FORMAT) + exp_time = validity_datetime + timedelta(hours=24) + + expiration = datetime.strftime(exp_time, DATETIME_FORMAT) + feature['properties']['expiration_datetime'] = expiration + + if exp_time > datetime.now(): + features.append(feature) + + # check if id is already in ES and if amendment is +=1 + amendment = features[0]['properties']['amendment'] + is_newer = self.check_if_newer(file_id, amendment) + + if is_newer['update']: + for outlook in features: + action = { + '_id': outlook['properties']['id'], + '_index': INDEX_NAME, + '_op_type': 'update', + 'doc': outlook, + 'doc_as_upsert': True + } + + yield action + + for id_ in is_newer['id_list']: + self.conn.Elasticsearch.delete(index=INDEX_NAME, + id=id_) + else: + LOGGER.warning(f'empty flood risk index json in {filename}') + + version = re.search(r'v(\d+)\.json$', filename).group(1) + if int(version) > 1: + # we need to delete the associated outlooks + query = { + "query": { + "match": { + "properties.file_id": file_id + } + } + } + self.conn.Elasticsearch.delete_by_query(index=INDEX_NAME, + body=query) + + def flatten_json(self, key, values, parent_key=''): + """ + flatten GeoJSON properties + + :returns: item array + """ + + items = {} + new_key = f'{parent_key}.{key}' + value = values + if isinstance(values, dict): + for sub_key, sub_value in values.items(): + new_key = f'{parent_key}.{key}.{sub_key}' + items[new_key] = sub_value + else: + items[new_key] = value + return items + + def check_if_newer(self, file_id, amendment): + """ + check if the coastal flood risk index is the newest version + + :returns: `bool` if latest, id_list for ids to delete + """ + + upt_ = True + id_list = [] + + query = { + "query": { + "match": { + "properties.file_id": file_id + } + } + } + + # Fetch the document + try: + result = self.conn.Elasticsearch.search(index=INDEX_NAME, + body=query) + if result: + hit = result['hits']['hits'][0] + es_amendement = hit["_source"]['properties']['amendment'] + if es_amendement >= amendment: + upt_ = False + else: + for id_ in result['hits']['hits']: + id_list.append(id_['_id']) + except Exception: + LOGGER.warning(f'Item ({file_id}) does not exist in index') + + return {'update': upt_, 'id_list': id_list} + + def load_data(self, filepath): + """ + loads data from event to target + + :returns: `bool` of status result + """ + + self.filepath = Path(filepath) + + LOGGER.debug(f'Received file {self.filepath}') + + # generate geojson features + package = self.generate_geojson_features() + try: + r = self.conn.submit_elastic_package(package) + LOGGER.debug(f'Result: {r}') + return True + except Exception as err: + LOGGER.warning(f'Error indexing: {err}') + return False + + +@click.group() +def coastal_flood_risk_index(): + """Manages coastal flood risk index outlook index""" + pass + + +@click.command() +@click.pass_context +@cli_options.OPTION_FILE() +@cli_options.OPTION_DIRECTORY() +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +def add(ctx, file_, directory, es, username, password, ignore_certs): + """add data to system""" + + if file_ is None and directory is None: + raise click.ClickException('Missing --file/-f or --dir/-d option') + + conn_config = configure_es_connection(es, username, password, ignore_certs) + + files_to_process = [] + + if file_ is not None: + files_to_process = [file_] + elif directory is not None: + for root, dirs, files in os.walk(directory): + for f in [f for f in files if f.endswith('.json')]: + files_to_process.append(os.path.join(root, f)) + files_to_process.sort(key=os.path.getmtime) + + for file_to_process in files_to_process: + loader = CoastalFloodRiskIndexLoader(conn_config) + result = loader.load_data(file_to_process) + + if not result: + click.echo('features not generated') + + +@click.command() +@click.pass_context +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_YES( + prompt='Are you sure you want to delete old outlooks?' +) +def clean_index(ctx, es, username, password, ignore_certs): + """Delete expired outlook documents""" + + conn_config = configure_es_connection(es, username, password, ignore_certs) + conn = ElasticsearchConnector(conn_config) + + click.echo('Deleting documents older than datetime.now()') + now = datetime.now().strftime(DATETIME_FORMAT) + + query = { + 'query': { + 'range': { + 'properties.expiration_datetime': { + 'lte': now + } + } + } + } + + conn.Elasticsearch.delete_by_query(index=INDEX_NAME, body=query) + + +@click.command() +@click.pass_context +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_INDEX_TEMPLATE() +@cli_options.OPTION_YES( + prompt='Are you sure you want to delete this index?' +) +def delete_index(ctx, es, username, password, ignore_certs, index_template): + """Delete cumulative effects hotspots index""" + + conn_config = configure_es_connection(es, username, password, ignore_certs) + conn = ElasticsearchConnector(conn_config) + + click.echo(f'Deleting indexes {INDEX_NAME}') + conn.delete(INDEX_NAME) + + if index_template: + click.echo(f'Deleting index template {INDEX_NAME}') + conn.delete_template(INDEX_NAME) + + click.echo('Done') + + +coastal_flood_risk_index.add_command(add) +coastal_flood_risk_index.add_command(clean_index) +coastal_flood_risk_index.add_command(delete_index)