From 88257a16a46f972a8415dc89cee453c7c1f77985 Mon Sep 17 00:00:00 2001 From: Louis-Philippe Rousseau Lambert Date: Wed, 5 Jun 2024 17:10:53 +0000 Subject: [PATCH] first commit for thuinderstorm outlook outlook update update for the thunderstorm outlooks flake8 thing flake8 and logic for empty outlooks rmv trailing comma --- debian/conffiles | 1 + debian/msc-pygeoapi.cron.d | 5 +- deploy/default/msc-pygeoapi-config.yml | 35 ++ .../sarracenia/thunderstorm-outlook.conf | 20 + msc_pygeoapi/loader/__init__.py | 3 +- msc_pygeoapi/loader/thunderstorm_outlook.py | 347 ++++++++++++++++++ msc_pygeoapi/plugin.py | 4 + 7 files changed, 413 insertions(+), 2 deletions(-) create mode 100644 deploy/default/sarracenia/thunderstorm-outlook.conf create mode 100644 msc_pygeoapi/loader/thunderstorm_outlook.py diff --git a/debian/conffiles b/debian/conffiles index e6e32662..a3376b4d 100644 --- a/debian/conffiles +++ b/debian/conffiles @@ -8,3 +8,4 @@ /opt/msc-pygeoapi/etc/sarracenia/aqhi-realtime.conf /opt/msc-pygeoapi/etc/sarracenia/metnotes.conf /opt/msc-pygeoapi/etc/sarracenia/umos-realtime.conf +/opt/msc-pygeoapi/etc/sarracenia/thunderstorm-outlook.conf diff --git a/debian/msc-pygeoapi.cron.d b/debian/msc-pygeoapi.cron.d index a94ac458..04b8a238 100644 --- a/debian/msc-pygeoapi.cron.d +++ b/debian/msc-pygeoapi.cron.d @@ -50,4 +50,7 @@ MAILTO="" 0 3 * * * geoadm . /local/home/geoadm/.profile && /usr/bin/find $MSC_PYGEOAPI_CACHEDIR -type d -empty -delete > /dev/null 2>&1 # every day at 0800h, clean umos realtime data older than 7 days -0 8 * * * geoadm . /local/home/geoadm/.profile && msc-pygeoapi data umos-realtime clean-indexes --dataset all --days 7 --yes \ No newline at end of file +0 8 * * * geoadm . /local/home/geoadm/.profile && msc-pygeoapi data umos-realtime clean-indexes --dataset all --days 7 --yes + +# every hour at 00h, clean expired thunderstorm outlooks +0 * * * * geoadm . /local/home/geoadm/.profile && msc-pygeoapi data thunderstorm-outlook clean-outlooks --yes diff --git a/deploy/default/msc-pygeoapi-config.yml b/deploy/default/msc-pygeoapi-config.yml index 473e85da..cc935733 100644 --- a/deploy/default/msc-pygeoapi-config.yml +++ b/deploy/default/msc-pygeoapi-config.yml @@ -4076,6 +4076,41 @@ resources: # data: ${MSC_PYGEOAPI_ES_URL}/discovery-metadata # id_field: identifier + thunderstorm_outlook: + type: collection + title: + en: Thunderstorm Outlook + fr: Potentiel orageux + description: + en: The Thunderstorm Outlook is a graphical or GeoJSON forecast product, which depicts the expected geographic areas of thunderstorms in Canada. Thunderstorm Outlooks are issued by storm prediction centres across Canada during the convective season (May to October) each year. Both the graphical and GeoJSON products use the same source data that forecasters enter through the Ninjo software. Thunderstorm Outlooks are issued once per day at around 12:00 pm regional local time. Since each office has their own standards of procedure, the forecasters are not required to send it at 12:00 pm. Nonetheless, the timestamp in the data is preconfigured to say it is issued at 12:00 pm in the GeoJSON product. Subsequent amendments to existing Thunderstorm Outlooks are issued on an as-needed basis during the day. Each amendment generates a new GeoJSON file and graphical product. The Thunderstorm Outlook may not be updated with active severe weather alerts. Users are asked to check with the latest watches and warnings for the most current conditions. + fr: Le Potentiel orageux est un produit de prévision graphique ou en GeoJSON qui décrit les zones géographiques prévues pour les orages au Canada. Les Potentiels orageux sont émis par les centres de prévision des intempéries à travers le Canada pendant la saison convective (mai à octobre) chaque année. Les produits graphiques et en GeoJSON utilisent les mêmes données sources que les prévisionnistes entrent dans le logiciel Ninjo. Les Potentiels orageux sont émis une fois par jour vers 12h00, heure locale régionale. Puisque chaque bureau a ses propres normes de procédure, les prévisionnistes ne sont pas tenus de l'envoyer à 12h00. Néanmoins, l'horodatage des données est préconfiguré pour indiquer qu'elles sont émises à 12h00 dans le produit GeoJSON. Les amendements ultérieures des Potentiels orageux existants sont émis selon les besoins au cours de la journée. Chaque amendement génère un nouveau fichier GeoJSON et un nouveau produit graphique. Le Potentiel orageux pourrait ne pas être à jour avec les alertes de temps violent en cours. Les utilisateurs sont priés de consulter les dernières veilles et avertissements pour connaître les conditions les plus récentes. + keywords: + en: [Thunderstorm, Outlook, Rain, Tornado, Hail, Gust, Severity, Atmospheric Conditions] + fr: [Orage, Potentiel, Pluie, Tornade, Grêle, Rafale, Sévérité, Conditions atmosphériques] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Meteorological Service of Canada open data + fr: Données ouvertes du Service météorologique du Canada + href: + en: https://eccc-msc.github.io/open-data/msc-data/readme_en + fr: https://eccc-msc.github.io/open-data/msc-data/readme_fr + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-145.27, 37.3, -48.11, 87.61] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: Elasticsearch + data: ${MSC_PYGEOAPI_ES_URL}/thunderstorm_outlook + id_field: id + wis2-discovery-metadata: type: collection title: WMO WIS2 discovery metadata (experimental) diff --git a/deploy/default/sarracenia/thunderstorm-outlook.conf b/deploy/default/sarracenia/thunderstorm-outlook.conf new file mode 100644 index 00000000..6c6cda50 --- /dev/null +++ b/deploy/default/sarracenia/thunderstorm-outlook.conf @@ -0,0 +1,20 @@ +# TODO: Update for HPFX once the data is on HPFX +broker amqps://anonymous:anonymous@dd.alpha.weather.gc.ca +queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME} + +subtopic thunderstorm-outlooks.# + +mirror True + +discard on + +plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY} + +directory ${MSC_PYGEOAPI_CACHEDIR} + +loglevel ${MSC_PYGEOAPI_LOGGING_LOGLEVEL} + +report_back False + +instances 2 +accept .* diff --git a/msc_pygeoapi/loader/__init__.py b/msc_pygeoapi/loader/__init__.py index 9a199ef6..9ecbd70a 100644 --- a/msc_pygeoapi/loader/__init__.py +++ b/msc_pygeoapi/loader/__init__.py @@ -70,7 +70,8 @@ def metadata(): ('msc_pygeoapi.loader.cumulative_effects_hs', 'cumulative_effects_hs'), ('msc_pygeoapi.loader.radar_coverage_realtime', 'radar_coverage_realtime'), ('msc_pygeoapi.loader.nwp_dataset_footprints', 'nwp_dataset_footprints'), - ('msc_pygeoapi.loader.umos_realtime', 'umos_realtime') + ('msc_pygeoapi.loader.umos_realtime', 'umos_realtime'), + ('msc_pygeoapi.loader.thunderstorm_outlook', 'thunderstorm_outlook') ) for module, name in commands: diff --git a/msc_pygeoapi/loader/thunderstorm_outlook.py b/msc_pygeoapi/loader/thunderstorm_outlook.py new file mode 100644 index 00000000..ae8ab40b --- /dev/null +++ b/msc_pygeoapi/loader/thunderstorm_outlook.py @@ -0,0 +1,347 @@ +# ================================================================= +# +# Author: Louis-Philippe Rousseau-Lambert +# +# +# Copyright (c) 2024 Louis-Philippe Rousseau-Lambert +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the 'Software'), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +from datetime import datetime +import json +import logging +import os +from pathlib import Path +import re + +import click + +from msc_pygeoapi import cli_options +from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector +from msc_pygeoapi.loader.base import BaseLoader +from msc_pygeoapi.util import configure_es_connection + +LOGGER = logging.getLogger(__name__) + +# index settings +INDEX_NAME = 'thunderstorm_outlook' + +DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + +MAPPINGS = { + 'properties': { + 'geometry': {'type': 'geo_shape'}, + 'properties': { + 'properties': { + 'publication_datetime': { + 'type': 'date', + 'format': 'strict_date_time_no_millis' + }, + 'expiration_datetime': { + 'type': 'date', + 'format': 'strict_date_time_no_millis' + }, + 'validity_datetime': { + 'type': 'date', + 'format': 'strict_date_time_no_millis' + } + } + } + } +} + +SETTINGS = { + 'settings': {'number_of_shards': 1, 'number_of_replicas': 0} +} + + +class ThunderstormOutlookLoader(BaseLoader): + """Thunder storm outlook loader""" + + def __init__(self, conn_config={}): + """initializer""" + + BaseLoader.__init__(self) + + self.filepath = None + self.datetime = None + self.conn = ElasticsearchConnector(conn_config) + + SETTINGS['mappings'] = MAPPINGS + if not self.conn.exists(INDEX_NAME): + LOGGER.debug(f'Creating index {INDEX_NAME}') + self.conn.create(INDEX_NAME, SETTINGS) + + def generate_geojson_features(self): + """ + Generates and yields a series of thunderstorm outlook. + They are returned as Elasticsearch bulk API upsert actions, + with documents in GeoJSON to match the Elasticsearch index mappings. + + :returns: Generator of Elasticsearch actions to upsert the + thunderstorm outlook + """ + + with open(self.filepath.resolve()) as f: + data = json.load(f)['features'] + + features = [] + filename = f.name.split('/')[-1] + file_id = re.sub(r'_v\d+\.json', '', filename) + + if len(data) > 0: + for feature in data: + + # flatten metobject properties + metobj = feature['properties']['metobject'] + for item, values in metobj.items(): + try: + metobj_flat_item = self.flatten_json(item, + values, + 'metobject') + feature['properties'].update(metobj_flat_item) + feature['properties']['file_id'] = file_id + except Exception as err: + msg = f'Error while flattening Thunderstorm JSON {err}' + LOGGER.error(f'{msg}') + pass + + del feature['properties']['metobject'] + f_exp_datetime = feature['properties']['expiration_datetime'] + exp_datetime = datetime.strptime(f_exp_datetime, + DATETIME_FORMAT) + + if exp_datetime > datetime.now(): + features.append(feature) + + # check if id is already in ES and if amendment is +=1 + amendment = features[0]['properties']['amendment'] + is_newer = self.check_if_newer(file_id, amendment) + + if is_newer['update']: + for outlook in features: + action = { + '_id': outlook['properties']['id'], + '_index': INDEX_NAME, + '_op_type': 'update', + 'doc': outlook, + 'doc_as_upsert': True + } + + yield action + + for id_ in is_newer['id_list']: + self.conn.Elasticsearch.delete(index=INDEX_NAME, + id=id_) + else: + LOGGER.warning(f'empty thunderstorm outlook json in {filename}') + + version = re.search(r'v(\d+)\.json$', filename).group(1) + if int(version) > 1: + # we need to delete the associated outlooks + query = { + "query": { + "match": { + "properties.file_id": file_id + } + } + } + self.conn.Elasticsearch.delete_by_query(index=INDEX_NAME, + body=query) + + def flatten_json(self, key, values, parent_key=''): + """ + flatten GeoJSON properties + + :returns: item array + """ + + items = {} + new_key = f'{parent_key}.{key}' + value = values + if isinstance(values, dict): + for sub_key, sub_value in values.items(): + new_key = f'{parent_key}.{key}.{sub_key}' + items[new_key] = sub_value + else: + items[new_key] = value + return items + + def check_if_newer(self, file_id, amendment): + """ + check if the thunderstorm outlook is the newest version + + :returns: `bool` if latest, id_list for ids to delete + """ + + upt_ = True + id_list = [] + + query = { + "query": { + "match": { + "properties.file_id": file_id + } + } + } + + # Fetch the document + try: + result = self.conn.Elasticsearch.search(index=INDEX_NAME, + body=query) + if result: + hit = result['hits']['hits'][0] + es_amendement = hit["_source"]['properties']['amendment'] + if es_amendement >= amendment: + upt_ = False + else: + for id_ in result['hits']['hits']: + id_list.append(id_['_id']) + except Exception: + LOGGER.warning(f'Item ({file_id}) does not exist in index') + + return {'update': upt_, 'id_list': id_list} + + def load_data(self, filepath): + """ + loads data from event to target + + :returns: `bool` of status result + """ + + self.filepath = Path(filepath) + + LOGGER.debug(f'Received file {self.filepath}') + + # generate geojson features + package = self.generate_geojson_features() + try: + r = self.conn.submit_elastic_package(package) + LOGGER.debug(f'Result: {r}') + return True + except Exception as err: + LOGGER.warning(f'Error indexing: {err}') + return False + + +@click.group() +def thunderstorm_outlook(): + """Manages thunderstorm outlook index""" + pass + + +@click.command() +@click.pass_context +@cli_options.OPTION_FILE() +@cli_options.OPTION_DIRECTORY() +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +def add(ctx, file_, directory, es, username, password, ignore_certs): + """add data to system""" + + if file_ is None and directory is None: + raise click.ClickException('Missing --file/-f or --dir/-d option') + + conn_config = configure_es_connection(es, username, password, ignore_certs) + + files_to_process = [] + + if file_ is not None: + files_to_process = [file_] + elif directory is not None: + for root, dirs, files in os.walk(directory): + for f in [f for f in files if f.endswith('.json')]: + files_to_process.append(os.path.join(root, f)) + files_to_process.sort(key=os.path.getmtime) + + for file_to_process in files_to_process: + loader = ThunderstormOutlookLoader(conn_config) + result = loader.load_data(file_to_process) + + if not result: + click.echo('features not generated') + + +@click.command() +@click.pass_context +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_YES( + prompt='Are you sure you want to delete old outlooks?' +) +def clean_outlooks(ctx, es, username, password, ignore_certs): + """Delete expired outlook documents""" + + conn_config = configure_es_connection(es, username, password, ignore_certs) + conn = ElasticsearchConnector(conn_config) + + click.echo('Deleting documents older than datetime.now()') + now = datetime.now().strftime(DATETIME_FORMAT) + + query = { + 'query': { + 'range': { + 'properties.expiration_datetime': { + 'lte': now + } + } + } + } + + conn.Elasticsearch.delete_by_query(index=INDEX_NAME, body=query) + + +@click.command() +@click.pass_context +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_INDEX_TEMPLATE() +@cli_options.OPTION_YES( + prompt='Are you sure you want to delete this index?' +) +def delete_index(ctx, es, username, password, ignore_certs, index_template): + """Delete cumulative effects hotspots index""" + + conn_config = configure_es_connection(es, username, password, ignore_certs) + conn = ElasticsearchConnector(conn_config) + + click.echo(f'Deleting indexes {INDEX_NAME}') + conn.delete(INDEX_NAME) + + if index_template: + click.echo(f'Deleting index template {INDEX_NAME}') + conn.delete_template(INDEX_NAME) + + click.echo('Done') + + +thunderstorm_outlook.add_command(add) +thunderstorm_outlook.add_command(clean_outlooks) +thunderstorm_outlook.add_command(delete_index) diff --git a/msc_pygeoapi/plugin.py b/msc_pygeoapi/plugin.py index 5ecd279b..556d1737 100644 --- a/msc_pygeoapi/plugin.py +++ b/msc_pygeoapi/plugin.py @@ -83,6 +83,10 @@ 'umos_realtime': { 'filename_pattern': 'stat-post-processing', 'handler': 'msc_pygeoapi.loader.umos_realtime.UMOSRealtimeLoader' + }, + 'thunderstorm_outlook': { + 'filename_pattern': 'ThunderstormOutlook', + 'handler': 'msc_pygeoapi.loader.thunderstorm_outlook.ThunderstormOutlookLoader' # noqa } } }