diff --git a/deploy/default/msc-pygeoapi-config.yml b/deploy/default/msc-pygeoapi-config.yml index 21e9dd21..eda917e0 100644 --- a/deploy/default/msc-pygeoapi-config.yml +++ b/deploy/default/msc-pygeoapi-config.yml @@ -1197,6 +1197,120 @@ resources: id_field: id time_field: date_tm-value + swob-station: + type: collection + title: + en: Surface Weather Observations Stations + fr: Stations d'observations météorologiques à la surface + description: + en: Surface Observations measured at the automatic and manual stations of the Environment and Climate Change Canada and partners networks, either for a single station, or for the stations of specific provinces and territories (last 30 days). + fr: Observations de surface mesurées aux stations automatiques et manuelles des réseaux d'Environnement et Changement climatique Canada et de ses partenaires soit pour une seule station, soit pour les stations de provinces et territoires spécifiques (30 derniers jours). + keywords: + en: [surface, observations, weather, station] + fr: [surface, observations, météo, station] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Stations of in situ observations + fr: Stations d'observations + href: + en: https://eccc-msc.github.io/open-data/msc-data/obs_station/readme_obs_insitu_swobdatamart_en + fr: https://eccc-msc.github.io/open-data/msc-data/obs_station/readme_obs_insitu_swobdatamart_fr + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-142, 42, -52, 84] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + temporal: + begin: null + end: null # or empty + providers: + - type: feature + name: Elasticsearch + data: ${MSC_PYGEOAPI_ES_URL}/swob-surface-stations + id_field: id + + swob-partner-station: + type: collection + title: + en: Surface Weather Observations Stations - Partners + fr: Stations d'observations météorologiques à la surface - Partenaires + description: + en: Surface Observations measured at the automatic and manual stations of the Environment and Climate Change Canada and partners networks, either for a single station, or for the stations of specific provinces and territories (last 30 days). + fr: Observations de surface mesurées aux stations automatiques et manuelles des réseaux d'Environnement et Changement climatique Canada et de ses partenaires soit pour une seule station, soit pour les stations de provinces et territoires spécifiques (30 derniers jours). + keywords: + en: [surface, observations, weather, station] + fr: [surface, observations, météo, station] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Stations of in situ observations + fr: Stations d'observations + href: + en: https://eccc-msc.github.io/open-data/msc-data/obs_station/readme_obs_insitu_swobdatamart_en + fr: https://eccc-msc.github.io/open-data/msc-data/obs_station/readme_obs_insitu_swobdatamart_fr + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-142, 42, -52, 84] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + temporal: + begin: null + end: null # or empty + providers: + - type: feature + name: Elasticsearch + data: ${MSC_PYGEOAPI_ES_URL}/swob-partner-stations + id_field: id + + swob-marine-station: + type: collection + title: + en: Surface Weather Observations Stations - Marine + fr: Stations d'observations météorologiques à la surface - Marine + description: + en: Surface Observations measured at the automatic and manual stations of the Environment and Climate Change Canada and partners networks, either for a single station, or for the stations of specific provinces and territories (last 30 days). + fr: Observations de surface mesurées aux stations automatiques et manuelles des réseaux d'Environnement et Changement climatique Canada et de ses partenaires soit pour une seule station, soit pour les stations de provinces et territoires spécifiques (30 derniers jours). + keywords: + en: [surface, observations, weather, station] + fr: [surface, observations, météo, station] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Stations of in situ observations + fr: Stations d'observations + href: + en: https://eccc-msc.github.io/open-data/msc-data/obs_station/readme_obs_insitu_swobdatamart_en + fr: https://eccc-msc.github.io/open-data/msc-data/obs_station/readme_obs_insitu_swobdatamart_fr + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-142, 42, -52, 84] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + temporal: + begin: null + end: null # or empty + providers: + - type: feature + name: Elasticsearch + data: ${MSC_PYGEOAPI_ES_URL}/swob-marine-stations + id_field: id + ltce-stations: type: collection title: diff --git a/msc_pygeoapi/loader/__init__.py b/msc_pygeoapi/loader/__init__.py index c0e8f868..ec77f48a 100644 --- a/msc_pygeoapi/loader/__init__.py +++ b/msc_pygeoapi/loader/__init__.py @@ -64,6 +64,7 @@ def metadata(): ('msc_pygeoapi.loader.swob_realtime', 'swob_realtime'), ('msc_pygeoapi.loader.aqhi_realtime', 'aqhi_realtime'), ('msc_pygeoapi.loader.aqhi_stations', 'aqhi_stations'), + ('msc_pygeoapi.loader.swob_stations', 'swob_stations'), ('msc_pygeoapi.loader.ltce', 'ltce'), ('msc_pygeoapi.loader.climate_archive', 'climate_archive'), ('msc_pygeoapi.loader.metnotes', 'metnotes'), diff --git a/msc_pygeoapi/loader/swob_stations.py b/msc_pygeoapi/loader/swob_stations.py new file mode 100644 index 00000000..36d065bd --- /dev/null +++ b/msc_pygeoapi/loader/swob_stations.py @@ -0,0 +1,542 @@ + +# ================================================================= +# +# Author: Bruno Fang +# +# Author: Louis-Philippe Rousseau-Lambert +# +# +# Copyright (c) 2024 Bruno Fang +# Copyright (c) 2024 Louis-Philippe Rousseau-Lambert +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import logging +import os +import urllib.request +import csv +# import chardet + +import click + +from msc_pygeoapi import cli_options +from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector +from msc_pygeoapi.env import MSC_PYGEOAPI_CACHEDIR +from msc_pygeoapi.loader.base import BaseLoader +from msc_pygeoapi.util import ( + configure_es_connection +) + +LOGGER = logging.getLogger(__name__) + +# index settings +INDEX_BASENAME = 'swob-{}-stations' +STATIONS_LIST_NAME = 'swob-xml_{}station_list.csv' +STATIONS_LIST_URL = f'https://dd.weather.gc.ca/observations/doc/{STATIONS_LIST_NAME}' # noqa +STATIONS_CACHE = os.path.join(MSC_PYGEOAPI_CACHEDIR, STATIONS_LIST_NAME) + +if not os.path.exists(MSC_PYGEOAPI_CACHEDIR): + os.makedirs(MSC_PYGEOAPI_CACHEDIR) + +SETTINGS = { + 'order': 0, + 'version': 1, + 'index_patterns': [INDEX_BASENAME], + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + }, + 'properties': { + 'properties': { + 'iata_id': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'name': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'name_fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'name_en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'province_territory': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'auto_man': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'wmo_id': { + "type": "integer" + }, + 'msc_id': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'icao_id': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'dst_time': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'std_time': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'data_provider': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'data_provider_en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'data_provider_fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'data_attribution_notice': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'data_attribution_notice_en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'data_attribution_notice_fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'dataset_network': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + } + } + } + } + } +} + +DATASET_LIST = ['surface', 'partner', 'marine'] + + +class SWOBStationLoader(BaseLoader): + """SWOB Station loader""" + + def __init__(self, conn_config={}, file_='', dataset=None): + """initializer""" + + BaseLoader.__init__(self) + + self.conn = ElasticsearchConnector(conn_config) + self.filepath = file_ + self.items = [] + self.dataset = dataset + if not os.path.exists(self.filepath): + LOGGER.warning(f'file {self.filepath} does not exist') + download_stations(self.dataset) + + self.conn.create_template(INDEX_BASENAME.format(self.dataset), + SETTINGS) + + def generate_geojson_features(self): + """ + Generates and yields a series of swob station. + Forecasts and observations are returned as Elasticsearch bulk API + upsert actions,with documents in GeoJSON to match the Elasticsearch + index mappings. + :returns: Generator of Elasticsearch actions to upsert the SWOB + stations + """ + + if os.path.exists(self.filepath): + file_list = self.filepath + + elif self.dataset == 'surface': + file_list = os.path.join(MSC_PYGEOAPI_CACHEDIR, + STATIONS_LIST_NAME.format('')) + with open(file_list, mode='r', newline='', + encoding='utf-8') as file: + csv_reader = csv.DictReader(file) + for row in csv_reader: + id_ = row['MSC_ID'] + # id_ = f"{row['IATA_ID']}_{row['Dataset/Network']}" + if row['WMO_ID'] == '': + row['WMO_ID'] = None + else: + row['WMO_ID'] = int(row['WMO_ID']) + # ignore stations with undefined coordinates + if ( + row['Latitude'] != '' + and row['Longitude'] != '' + and row['Elevation(m)'] != '' + ): + station_info = { + 'type': 'Feature', + 'id': id_, + 'geometry': { + 'type': 'Point', + 'coordinates': [ + float(row['Longitude']), + float(row['Latitude']), + float(row['Elevation(m)']) + ] + }, + 'properties': { + 'iata_id': row[ + 'IATA_ID' + ], + 'name': row[ + 'Name' + ], + 'wmo_id': row[ + 'WMO_ID' + ], + 'msc_id': id_, + 'data_provider': row[ + 'Data_Provider' + ], + 'dataset_network': row[ + 'Dataset/Network' + ], + 'auto_man': row[ + 'AUTO/MAN' + ], + 'province_territory': row[ + 'Province/Territory' + ] + } + } + self.items.append(station_info) + + elif self.dataset == 'partner': + file_list = os.path.join(MSC_PYGEOAPI_CACHEDIR, + STATIONS_LIST_NAME.format('partner_')) + with open(file_list, mode='r', newline='', + encoding='Windows-1252') as file: + # use result = chardet.detect(file.read()) to find encoding + # in this case it's Windows-1252 + csv_reader = csv.DictReader(file) + for row in csv_reader: + id_ = row['# MSC ID'] + if row['# WMO ID'] == '': + row['# WMO ID'] = None + else: + row['# WMO ID'] = int(row['# WMO ID']) + # ignore test, temporary, inactive or + # quick deploy (lat/lon not fixed) stations + if id_ not in ['ON-MNRF-AFFES_0QD', 'ON-MNRF-AFFES_1QD', + 'ON-MNRF-AFFES_2QD', 'ON-MNRF-AFFES_3QD', + 'ON-MNRF-AFFES_4QD']: + station_info = { + 'type': 'Feature', + 'id': id_, + 'geometry': { + 'type': 'Point', + 'coordinates': [ + float(row['Longitude']), + float(row['Latitude']), + float(row['Elevation']) + ] + }, + 'properties': { + 'iata_id': row[ + '#IATA' + ], + 'name_fr': row[ + 'FR name' + ], + 'name_en': row[ + 'FR name' + ], + 'province_territory': row[ + 'Province' + ], + 'auto_man': row[ + 'AUTO/MAN' + ], + 'icao_id': row[ + '# ICAO ID' + ], + 'wmo_id': row[ + '# WMO ID' + ], + 'msc_id': id_, + 'dst_time': row[ + 'DST Time' + ], + 'std_time': row[ + 'STD Time' + ], + 'data_provider_en': row[ + 'Data Provider' + ], + 'data_provider_fr': row[ + 'Data Provider French' + ], + 'data_attribution_notice_en': row[ + 'Data Attribution Notice' + ], + 'data_attribution_notice_fr': row[ + 'Data Attribution Notice French' + ] + } + } + self.items.append(station_info) + + elif self.dataset == 'marine': + file_list = os.path.join(MSC_PYGEOAPI_CACHEDIR, + STATIONS_LIST_NAME.format('marine_')) + with open(file_list, mode='r', newline='', + encoding='utf-8') as file: + csv_reader = csv.DictReader(file) + for row in csv_reader: + id_ = row['# MSC'] + if row['# WMO'] == '': + row['# WMO'] = None + else: + row['# WMO'] = int(row['# WMO']) + # ignore test, temporary, inactive or + # quick deploy (lat/lon not fixed) stations + if id_ not in ['9100990', '9300425', '9300990', + '9300991', '9400790', '9400990', + '9400991']: + station_info = { + 'type': 'Feature', + 'id': id_, + 'geometry': { + 'type': 'Point', + 'coordinates': [ + float(row['Longitude']), + float(row['Latitude']), + float(row['Elevation']) + ] + }, + 'properties': { + 'iata_id': row[ + '#IATA' + ], + 'name_fr': row[ + 'FR name' + ], + 'name_en': row[ + 'FR name' + ], + 'province_territory': row[ + 'Province' + ], + 'auto_man': row[ + 'AUTO/MAN' + ], + 'icao_id': row[ + '# ICAO' + ], + 'wmo_id': row[ + '# WMO' + ], + 'msc_id': id_, + 'dst_time': row[ + 'DST Time' + ], + 'std_time': row[ + 'STD Time' + ], + 'data_provider': row[ + 'Data Provider' + ], + 'data_attribution_notice': row[ + 'Data Attribution Notice' + ] + } + } + self.items.append(station_info) + + for item in self.items: + action = { + '_id': item['id'], + '_index': INDEX_BASENAME.format(self.dataset), + '_op_type': 'update', + 'doc': item, + 'doc_as_upsert': True + } + yield action + + def load_data(self): + """ + loads data from event to target + :returns: `bool` of status result + """ + + LOGGER.debug(f'Received file {self.filepath}') + + # generate geojson features + package = self.generate_geojson_features() + self.conn.submit_elastic_package(package) + + return True + + +def download_stations(dataset): + """ + Download realtime stations + :returns: void + """ + + dataset_dict = { + 'surface': '', + 'partner': 'partner_', + 'marine': 'marine_', + } + station = dataset_dict[dataset] + + try: + station_file = STATIONS_LIST_NAME.format(station) + station_url = ( + f'https://dd.weather.gc.ca/observations/doc/{station_file}' + ) + station_cache = os.path.join(MSC_PYGEOAPI_CACHEDIR, station_file) + LOGGER.debug(f'Caching {station_url} to {station_cache}') + urllib.request.urlretrieve(station_url, station_cache) + except urllib.error.HTTPError as e: + LOGGER.error(f'{station_url} error: {e}') + + +@click.group() +def swob_stations(): + """Manages SWOB indexes""" + pass + + +@click.command() +@click.pass_context +@cli_options.OPTION_DATASET( + help='SWOB dataset indexes to delete.', + type=click.Choice(DATASET_LIST), +) +@cli_options.OPTION_FILE() +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +def add(ctx, dataset, file_, es, username, password, ignore_certs): + """Add SWOB stations to Elasticsearch""" + + if dataset is None: + raise click.ClickException('Missing --dataset option') + + conn_config = configure_es_connection(es, username, password, ignore_certs) + + if file_ is None: + file_ = '' + loader = SWOBStationLoader(conn_config, file_, dataset) + result = loader.load_data() + if not result: + click.echo('features not generated') + + +@click.command() +@click.pass_context +@cli_options.OPTION_DATASET( + help='SWOB dataset indexes to delete.', + type=click.Choice(DATASET_LIST + ['all']), +) +@cli_options.OPTION_ELASTICSEARCH() +@cli_options.OPTION_ES_USERNAME() +@cli_options.OPTION_ES_PASSWORD() +@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_INDEX_TEMPLATE() +def delete_indexes(ctx, dataset, es, username, password, ignore_certs, + index_template): + """Delete all SWOB stations indexes""" + + conn_config = configure_es_connection(es, username, password, ignore_certs) + conn = ElasticsearchConnector(conn_config) + + if dataset == 'all': + indexes = 'swob-*' + else: + indexes = INDEX_BASENAME.format(dataset) + + click.echo(f'Deleting indexes {indexes}') + + conn.delete(indexes) + + if index_template: + for type_ in DATASET_LIST: + index_name = INDEX_BASENAME.format(type_) + click.echo(f'Deleting index template {index_name}') + conn.delete_template(index_name) + + click.echo('Done') + + +swob_stations.add_command(add) +swob_stations.add_command(delete_indexes)