From 6ca4343333936fb63cc8f393bf89c11052da4451 Mon Sep 17 00:00:00 2001 From: Etienne Pelletier Date: Fri, 26 Apr 2024 17:33:07 +0000 Subject: [PATCH] update CPW loader to include 7day/hourly forecasts, warnings, sunrise/sunset --- deploy/default/msc-pygeoapi-config.yml | 80 +- .../default/sarracenia/citypageweather.conf | 17 +- .../loader/citypageweather_realtime.py | 3295 +++++++++++++++-- msc_pygeoapi/provider/cpw_elasticsearch.py | 416 +++ msc_pygeoapi/util.py | 21 + 5 files changed, 3382 insertions(+), 447 deletions(-) create mode 100644 msc_pygeoapi/provider/cpw_elasticsearch.py diff --git a/deploy/default/msc-pygeoapi-config.yml b/deploy/default/msc-pygeoapi-config.yml index 21e9dd21..c2f25bb2 100644 --- a/deploy/default/msc-pygeoapi-config.yml +++ b/deploy/default/msc-pygeoapi-config.yml @@ -679,47 +679,45 @@ resources: - DAYS_WITH_VALID_SUNSHINE - COOLING_DEGREE_DAYS - HEATING_DEGREE_DAYS - - # current-conditions: - # type: collection - # title: - # en: Current Weather Conditions - City Page Weather [experimental] - # fr: Conditions météorologiques actuelles - Prévisions météorologiques par ville [experimentale] - # description: - # en: Current conditions for select Canadian cities - # fr: Conditions actuelles pour une sélection de villes canadiennes - # keywords: - # en: [Meteorology, Weather] - # fr: [Météorologie, Temps] - # crs: - # - CRS84 - # links: - # - type: text/html - # rel: canonical - # title: - # en: Weather forecast files by city - # fr: Prévisions météorologiques par ville - # href: - # en: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_en/ - # fr: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_fr/ - # hreflang: - # en: en-CA - # fr: fr-CA - # extents: - # spatial: - # bbox: [-145.27, 37.3, -48.11, 87.61] - # crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 - # temporal: - # begin: null - # end: null # or empty - # providers: - # - type: feature - # default: true - # name: Elasticsearch - # data: ${MSC_PYGEOAPI_ES_URL}/current_conditions - # id_field: identifier - # time_field: timestamp - + citypageweather: + type: collection + title: + en: City Page Weather [experimental] + fr: Prévisions météorologiques par ville [expérimentale] + description: + en: Provides current conditions, forecasts, warnings and regional normals for a selection of Canadian cities. + fr: Fournit les conditions actuelles, les prévisions, les avertissements et les normales régionales pour une sélection de villes canadiennes. + keywords: + en: [Meteorology, Weather] + fr: [Météorologie, Temps] + crs: + - CRS84 + links: + - type: text/html + rel: canonical + title: + en: Weather forecast files by city + fr: Prévisions météorologiques par ville + href: + en: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_en/ + fr: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_fr/ + hreflang: + en: en-CA + fr: fr-CA + extents: + spatial: + bbox: [-145.27, 37.3, -48.11, 87.61] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + temporal: + begin: null + end: null # or empty + providers: + - type: feature + default: true + name: msc_pygeoapi.provider.cpw_elasticsearch.CPWElasticsearchProvider + data: ${MSC_PYGEOAPI_ES_URL}/citypageweather_realtime + id_field: identifier + time_field: lastUpdated climate-daily: type: collection title: diff --git a/deploy/default/sarracenia/citypageweather.conf b/deploy/default/sarracenia/citypageweather.conf index 7b22cb78..d4f3c7ca 100644 --- a/deploy/default/sarracenia/citypageweather.conf +++ b/deploy/default/sarracenia/citypageweather.conf @@ -1,11 +1,12 @@ broker amqps://MSC-GEOMET@hpfx.collab.science.gc.ca -queue_name q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME} -directory ${MSC_PYGEOAPI_CACHEDIR} +exchange xpublic +queueName q_${BROKER_USER}.${PROGRAM}.${CONFIG}.${HOSTNAME} instances 2 -subtopic *.WXO-DD.citypage_weather.xml.# -mirror True -discard True -slip 3 -accept .* -plugin ${MSC_PYGEOAPI_METPX_EVENT_FILE_PY} +subtopic *.WXO-DD.citypage_weather.xml.*.# + +directory ${MSC_PYGEOAPI_CACHEDIR} +callback ${MSC_PYGEOAPI_METPX_FLOW_CALLBACK} +mirror True +discard False +strip 3 \ No newline at end of file diff --git a/msc_pygeoapi/loader/citypageweather_realtime.py b/msc_pygeoapi/loader/citypageweather_realtime.py index 5337d7e8..b2be8fbb 100644 --- a/msc_pygeoapi/loader/citypageweather_realtime.py +++ b/msc_pygeoapi/loader/citypageweather_realtime.py @@ -1,10 +1,9 @@ # ================================================================= # -# Author: Louis-Philippe Rousseau-Lambert -# +# Author: Etienne Pelletier +# # -# Copyright (c) 2020 Louis-Philippe Rousseau-Lambert -# Copyright (c) 2023 Tom Kralidis +# Copyright (c) 2024 Etienne Pelletier # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -30,17 +29,25 @@ # ================================================================= import click -from datetime import datetime, timedelta +from datetime import datetime import json import logging from lxml import etree import os +from parse import parse +from pathlib import Path +import re from msc_pygeoapi import cli_options from msc_pygeoapi.connector.elasticsearch_ import ElasticsearchConnector -from msc_pygeoapi.env import MSC_PYGEOAPI_BASEPATH +from msc_pygeoapi.env import MSC_PYGEOAPI_BASEPATH, MSC_PYGEOAPI_CACHEDIR from msc_pygeoapi.loader.base import BaseLoader -from msc_pygeoapi.util import configure_es_connection +from msc_pygeoapi.util import ( + DATETIME_RFC3339_FMT, + configure_es_connection, + _get_element, + safe_cast_to_number, +) LOGGER = logging.getLogger(__name__) @@ -48,196 +55,1557 @@ DAYS_TO_KEEP = 30 # Index settings -INDEX_NAME = 'current_conditions' - -NATIONAL_CITIES = [ - 'Calgary', - 'Charlottetown', - 'Edmonton', - 'Fredericton', - 'Halifax', - 'Iqaluit', - u'Montréal', - u'Ottawa (Kanata - Orléans)', - 'Prince George', - u'Québec', - 'Regina', - 'Saskatoon', - 'St. John\'s', - 'Thunder Bay', - 'Toronto', - 'Vancouver', - 'Victoria', - 'Whitehorse', - 'Winnipeg', - 'Yellowknife', -] - -SETTINGS = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'geometry': { - 'type': 'geo_shape' +INDEX_NAME = 'citypageweather_realtime' + +CPW_PROPERTIES = { + 'properties': { + 'identifier': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'name': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': {'type': 'text', 'fields': {'raw': {'type': 'keyword'}}}, }, + }, + 'region': { + 'type': 'object', 'properties': { - 'properties': { - 'identifier': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': {'type': 'text', 'fields': {'raw': {'type': 'keyword'}}}, + }, + }, + 'url': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': {'type': 'text', 'fields': {'raw': {'type': 'keyword'}}}, + }, + }, + 'currentConditions': { + 'type': 'object', + 'properties': { + 'icon': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'timestamp': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'date', + 'format': 'date_time_no_millis', + }, + 'fr': { + 'type': 'date', + 'format': 'date_time_no_millis', + }, }, - 'name': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'relativeHumidity': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'nom': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'wind': { + 'type': 'object', + 'properties': { + 'speed': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, + 'gust': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'integer', + }, + 'fr': { + 'type': 'integer', + }, + }, + }, + }, + }, + 'direction': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'bearing': { + 'type': 'object', + 'properties': { + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, + }, + }, }, - 'station_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'temperature': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'stations_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'dewpoint': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'icon': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'windChill': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'cond_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'station': { + 'type': 'object', + 'properties': { + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'lat': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'lon': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'code': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, }, - 'cond_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'condition': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'temp': { - "type": "float" + }, + 'pressure': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'float', + }, + 'fr': { + 'type': 'float', + }, + }, + }, }, - 'dewpoint': { - "type": "float" + }, + }, + }, + 'forecastGroup': { + 'type': 'object', + 'properties': { + 'timestamp': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'windchill': { - "type": "integer" + }, + 'regionalNormals': { + 'type': 'object', + 'properties': { + 'textSummary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'temperature': { + 'type': 'nested', + 'properties': { + 'class': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'integer', + }, + 'fr': { + 'type': 'integer', + }, + }, + }, + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, }, - 'pres_en': { - "type": "float" + }, + 'forecasts': { + 'type': 'nested', + 'properties': { + 'period': { + 'type': 'object', + 'properties': { + 'textForecastName': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, + 'textSummary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'textForecast_name': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'cloud_precip': { + 'type': 'text', + }, + 'abbreviated_forecast': { + 'type': 'object', + 'properties': { + 'icon': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'pop': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'text_summary': { + 'type': 'text', + }, + }, + }, + 'temperatures': { + 'type': 'object', + 'properties': { + 'text_summary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'temp_high': { + 'type': 'integer', + }, + 'temp_low': { + 'type': 'integer', + }, + }, + }, + 'winds': { + 'type': 'object', + 'properties': { + 'text_summary': { + 'type': 'text', + }, + 'periods': { + 'type': 'object', + 'properties': { + 'index': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'rank': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'speed': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + }, + }, + 'gust': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + }, + }, + 'direction': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'bearing': { + 'type': 'object', + 'properties': { + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' # noqa + } + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + 'precipitation': { + 'type': 'object', + 'properties': { + 'textSummary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'precip_periods': { + 'type': 'object', + 'properties': { + 'start': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'end': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'precip_type': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, + 'windChill': { + 'type': 'object', + }, + 'uv': { + 'type': 'object', + 'properties': { + 'category': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'text_summary': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'index': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + }, + }, + 'rel_hum': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'humidex': { + 'type': 'object', + }, }, - 'pres_fr': { - "type": "float" + }, + }, + }, + 'hourlyForecastGroup': { + 'type': 'object', + 'properties': { + 'timestamp': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'prestnd_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'hourlyForecasts': { + 'type': 'nested', + 'properties': { + 'condition': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + }, + }, + 'humidex': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'byte', + }, + 'fr': { + 'type': 'byte', + }, + }, + }, + }, + }, + 'iconCode': { + 'type': 'object', + 'properties': { + 'format': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'url': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'value': { + 'type': 'byte', + }, + }, + }, + 'lop': { + 'type': 'object', + 'properties': { + 'category': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'byte', + }, + 'fr': { + 'type': 'byte', + }, + }, + }, + }, + }, + 'temperature': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'byte', + }, + 'fr': { + 'type': 'byte', + }, + }, + }, + }, + }, + 'timestamp': { + 'type': 'date', + }, + 'uv': { + 'type': 'object', + 'properties': { + 'index': { + 'type': 'object', + 'properties': { + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'byte', + }, + 'fr': { + 'type': 'byte', + }, + }, + }, + }, + }, + }, + }, + 'wind': { + 'type': 'object', + 'properties': { + 'direction': { + 'type': 'object', + 'properties': { + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'windDirFull': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + }, + }, + 'gust': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'byte', + }, + 'fr': { + 'type': 'byte', + }, + }, + }, + }, + }, + 'speed': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'units': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'short', + }, + 'fr': { + 'type': 'short', + }, + }, + }, + }, + }, + }, + }, + 'windChill': { + 'type': 'object', + 'properties': { + 'unitType': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + 'fr': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + }, + }, + }, + }, + 'value': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'byte', + }, + 'fr': { + 'type': 'byte', + }, + }, + }, + }, + }, }, - 'prestnd_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + }, + }, + 'riseSet': { + 'type': 'object', + 'properties': { + 'disclaimer': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'rel_hum': { - "type": "integer" + }, + 'sunrise': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'date', + }, + 'fr': { + 'type': 'date', + }, }, - 'speed': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'sunset': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'date', + }, + 'fr': { + 'type': 'date', + }, }, - 'gust': { - "type": "integer" + }, + }, + }, + 'warnings': { + 'type': 'nested', + 'properties': { + 'description': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'direction': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'eventIssue': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'date', + }, + 'fr': { + 'type': 'date', + }, }, - 'bearing': { - "type": "float" + }, + 'expiryTime': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'date', + }, + 'fr': { + 'type': 'date', + }, }, - 'timestamp': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'priority': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'url_en': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'type': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'url_fr': { - 'type': 'text', - 'fields': { - 'raw': { - 'type': 'keyword' - } - } + }, + 'url': { + 'type': 'object', + 'properties': { + 'en': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, + 'fr': { + 'type': 'text', + 'fields': {'raw': {'type': 'keyword'}}, + }, }, - 'national': { - 'type': 'integer', - } - } - } + }, + }, + }, + }, +} + +SETTINGS = { + 'settings': {'number_of_shards': 1, 'number_of_replicas': 0}, + 'mappings': { + 'properties': { + 'geometry': {'type': 'geo_shape'}, + 'properties': CPW_PROPERTIES, } - } + }, } +MAX_XML_DATETIME_DIFF_SECONDS = 10 + class CitypageweatherRealtimeLoader(BaseLoader): """Current conditions real-time loader""" @@ -248,22 +1616,129 @@ def __init__(self, conn_config={}): BaseLoader.__init__(self) self.conn = ElasticsearchConnector(conn_config) + self.filename_pattern = ( + '{datetime}_MSC_CitypageWeather_{sitecode}_{lang}.xml' # noqa + ) self.conn.create(INDEX_NAME, mapping=SETTINGS) + self.xml_root = None + self.lang = None + self.filepath_en = None + self.filepath_fr = None + self.parsed_filename = None + self.wxo_lookup = None + self.sitecode = None + self.citycode = None + self.cpw_feature = { + 'type': "Feature", + 'properties': { + 'lastUpdated': datetime.now().strftime(DATETIME_RFC3339_FMT), + }, + } - def load_data(self, filepath): + def load_data(self, filepath: str) -> bool: """ fonction from base to load the data in ES - :param filepath: filepath for parsing the current condition file + :param filepath: filepath for parsing the CPW file :returns: True/False """ - with open(os.path.join(MSC_PYGEOAPI_BASEPATH, - 'resources/wxo_lookup.json')) as json_file: - wxo_lookup = json.load(json_file) + LOGGER.debug(f'Received {filepath} for loading...') + # parse filename and extract current lang and alt lang + current_filepath = Path(filepath) + self.parsed_filename = parse( + self.filename_pattern, current_filepath.name + ) + current_lang = self.parsed_filename.named['lang'] + alt_lang = 'fr' if current_lang == 'en' else 'en' + + # set current file language filepath + setattr(self, f'filepath_{current_lang}', current_filepath) + + # set alternate file language filepath + alt_xml_wildcard = self.filename_pattern.format( + datetime='*', + sitecode=self.parsed_filename.named['sitecode'], + lang=alt_lang, + ) + + # glob all alternate language files and sort by + # absolute datetime difference to current file + associated_alt_files = sorted( + current_filepath.parent.glob(alt_xml_wildcard), + key=self._sort_by_datetime_diff, + ) + + if associated_alt_files: + # get file with datetime closest to current filepath + setattr( + self, + f'filepath_{alt_lang}', + associated_alt_files[0], + ) + else: + LOGGER.warning( + f'No associated {alt_lang} Citypage XML files found for ' + f'{current_filepath}. Skipping file...' + ) + return False + + LOGGER.debug( + f'Processing XML: {self.filepath_en} and {self.filepath_fr}' + ) + + # load wxo_lookup + with open( + os.path.join(MSC_PYGEOAPI_BASEPATH, 'resources/wxo_lookup.json') + ) as json_file: + self.wxo_lookup = json.load(json_file) - data = self.xml2json_cpw(wxo_lookup, filepath) + try: + self.xml_roots = { + 'en': etree.parse(self.filepath_en).getroot(), + 'fr': etree.parse(self.filepath_fr).getroot(), + } + except Exception as err: + LOGGER.error(f'ERROR: cannot process data: {err}') + return None + + xml_creation_dates = [ + datetime.strptime( + self.xml_roots[key].find('dateTime/timeStamp').text, + '%Y%m%d%H%M%S', + ) + for key in self.xml_roots + ] + + # calculate diff between the two nearest en/fr XML creation dates + xml_creation_diff_seconds = abs( + (xml_creation_dates[0] - xml_creation_dates[1]).total_seconds() + ) + if xml_creation_diff_seconds > MAX_XML_DATETIME_DIFF_SECONDS: + LOGGER.warning( + 'File creation times differ by more than ' + f'{MAX_XML_DATETIME_DIFF_SECONDS} seconds. ' + 'Skipping loading...' + ) + return False + else: + LOGGER.debug( + f'File creation times differ by {xml_creation_diff_seconds} ' + 'seconds. Proceeding...' + ) + + self.sitecode = self.parsed_filename.named['sitecode'] + try: + self.citycode = self.wxo_lookup[self.sitecode]['citycode'] + except KeyError: + LOGGER.error( + f'ERROR: cannot find sitecode {self.sitecode} key in WxO ' + 'lookup table.' + ) + return False + + data = self.xml2json_cpw() if data: try: @@ -271,7 +1746,7 @@ def load_data(self, filepath): index=INDEX_NAME, id=data['properties']['identifier'], doc_as_upsert=True, - doc=data + doc=data, ) LOGGER.debug(f'Result: {r}') return True @@ -279,224 +1754,1259 @@ def load_data(self, filepath): LOGGER.warning(f'Error indexing: {err}') return False - def _get_element(self, node, path, attrib=None): + def _sort_by_datetime_diff(self, file): """ - Convenience function to resolve lxml.etree.Element handling + Sort files by absolute datetime difference between filename and + parsed datetime in active file - :param node: xml node - :param path: path in the xml node - :param attrib: attribute to get in the node + :param file: `Path` object + :returns: `timedelta` object + """ + + return abs( + datetime.strptime( + self.parsed_filename.named['datetime'], '%Y%m%dT%H%M%S.%fZ' + ) + - datetime.strptime( + parse(self.filename_pattern, file.name).named['datetime'], + '%Y%m%dT%H%M%S.%fZ', + ), + ) - returns: attribute as text or None + def _node_to_dict(self, node, lang=None): """ + Convert an lxml.etree.Element to a dict + + :param node: `lxml.etree.Element` node + + :returns: `dict` representation of xml node + """ + + if node is not None: + # if node has no attributes, just return the text + if not node.attrib and node.text: + if lang: + return {lang: safe_cast_to_number(node.text)} + else: + return safe_cast_to_number(node.text) + else: + node_dict = {} + for attrib in node.attrib: + if node.attrib[attrib]: + # in some case node attributes contain datetime strings + # formatted as YYYYMMDDHHMMSS, in this case we + # want to convert them to RFC3339 + regex = r"^(?:[2][0-9]{3})(?:(?:0[1-9]|1[0-2]))(?:(?:0[1-9]|[12]\d|3[01]))(?:(?:[01]\d|2[0-3]))(?:[0-5]\d){2}$" # noqa + if re.match(regex, node.attrib[attrib]): + dt = datetime.strptime( + node.attrib[attrib], '%Y%m%d%H%M%S' + ) + if lang: + node_dict[attrib] = { + lang: dt.strftime(DATETIME_RFC3339_FMT) + } + else: + node_dict[attrib] = dt.strftime( + DATETIME_RFC3339_FMT + ) + elif lang: + node_dict[attrib] = { + lang: safe_cast_to_number(node.attrib[attrib]) + } + else: + node_dict[attrib] = safe_cast_to_number( + node.attrib[attrib] + ) + + if node.text and node.text.strip(): + if lang: + node_dict['value'] = {lang: safe_cast_to_number(node.text)} + else: + node_dict['value'] = safe_cast_to_number(node.text) + + return node_dict - val = node.find(path) - if attrib is not None and val is not None: - return val.attrib.get(attrib) - if hasattr(val, 'text') and val.text not in [None, '']: - return val.text return None - def if_none(self, type_, value): + def _deep_merge(self, d1, d2): """ - Convenience fonction to avoid errors when - converting to int or float + Deep merge two dictionaries + :param d1: `dict` to merge into + :param d2: `dict` to merge from - :param type_: f for float and i for int - :param value: value to convert to float/int + :returns: `dict` of merged dictionaries + """ + for key in d2: + if key in d1: + if isinstance(d1[key], dict) and isinstance(d2[key], dict): + self._deep_merge(d1[key], d2[key]) + else: + d1[key] = d2[key] + else: + d1[key] = d2[key] + return d1 - :returns: converted variable + def _set_nested_value(self, d, keys, value): """ + Set nested value in dictionary, and merges dictionaries if they + already exist at path + :param d: `dict` to set value in + :param keys: `list` of keys + :param value: value to set - try: - if type_ == 'f': - variable = float(value) if value else None - elif type_ == 'i': - variable = int(value) if value else None - except ValueError: - variable = value + :returns: `dict` of modified dictionary + """ + for key in keys[:-1]: + d = d.setdefault(key, {}) + + if keys[-1] in d: + # try to merge dictionaries + if isinstance(value, dict): + for k, v in value.items(): + if k in d[keys[-1]]: + if isinstance(v, dict): + d[keys[-1]][k] = self._deep_merge( + d[keys[-1]][k], v + ) + else: + d[keys[-1]][k] = v + else: + d[keys[-1]][k] = v + else: + d[keys[-1]] = value + else: + d[keys[-1]] = value - return variable + return d - def xml2json_cpw(self, wxo_lookup, xml): + def _get_utc_timestamp(self, node): """ - main for generating weather data + Get timestamp from node + :param node: `lxml.etree.Element` node - :param wxo_lookup: json file to have the city id - :param xml: xml file to parse and convert to json + :returns: `dict` of timestamp + """ + timestamp = node.find('timeStamp') + if timestamp is not None: + dt = datetime.strptime(timestamp.text, '%Y%m%d%H%M%S') + return {self.lang: dt.strftime('%Y-%m-%dT%H:%M:%SZ')} + return None - :returns: xml as json object + def _set_cpw_location(self): """ + Set location and identifier information for the citypageweather object - feature = {} - row = {} + :returns: `dict` of modified citypageweather object + """ - LOGGER.debug(f'Processing XML: {xml}') - LOGGER.debug('Fetching English elements') + self.cpw_feature['properties']['identifier'] = self.citycode - try: - root = etree.parse(xml).getroot() - except Exception as err: - LOGGER.error(f'ERROR: cannot process data: {err}') - return None + location = self.xml_root.find('location') + if location is not None: + self._set_nested_value( + self.cpw_feature['properties'], + ['name'], + {self.lang: location.find('name').text}, + ) - if root.findall("currentConditions/"): - sitecode = os.path.basename(xml)[:-6] - try: - citycode = wxo_lookup[sitecode]['citycode'] - except KeyError as err: - LOGGER.error(f'ERROR: cannot find sitecode {sitecode}: {err}') + self._set_nested_value( + self.cpw_feature['properties'], + ['region'], + {self.lang: location.find('region').text}, + ) - location_name = root.find('location/name') - x = float(location_name.attrib.get('lon')[:-1]) - y = float(location_name.attrib.get('lat')[:-1]) + lon = location.find('name').attrib.get('lon') + lat = location.find('name').attrib.get('lat') - if location_name.attrib.get('lat')[-1] == 'S': - y *= -1 # south means negative latitude - elif location_name.attrib.get('lon')[-1] in ['W', 'O']: - x *= -1 # west means negative longitude + lon, lon_dir = float(lon[:-1]), lon[-1] + lat, lat_dir = float(lat[:-1]), lat[-1] + + if lon_dir in ['W', 'O']: + lon *= -1 # west means negative longitude + if lat_dir == 'S': + lat *= -1 # south means negative latitude + + self.cpw_feature['geometry'] = { + 'type': 'Point', + 'coordinates': [lon, lat, 0.0], + } + + if self.lang == 'en': + self._set_nested_value( + self.cpw_feature['properties'], + ['url'], + { + self.lang: f'https://weather.gc.ca/city/pages/{self.citycode}_metric_e.html' # noqa + }, + ) + else: + self._set_nested_value( + self.cpw_feature['properties'], + ['url'], + { + self.lang: f'https://meteo.gc.ca/city/pages/{self.citycode}_metric_f.html' # noqa + }, + ) + + return self.cpw_feature + + def _set_cpw_current_conditions(self): + """ + Set current conditions information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ + + current_conditions = self.xml_root.find("currentConditions") + + current_conditions_dict = {} + + if current_conditions is not None and len(current_conditions): + + iconCode = current_conditions_dict['iconCode'] = ( + self._node_to_dict(current_conditions.find('iconCode')) + ) + + if iconCode and 'value' in iconCode: + current_conditions_dict['iconCode'][ + 'url' + ] = f'https://weather.gc.ca/weathericons/{current_conditions_dict["iconCode"]["value"]:02d}.gif' # noqa + + for date in self.xml_root.findall( + "currentConditions/dateTime" + "[@zone='UTC'][@name='observation']" + ): + timestamp = self._get_utc_timestamp(date) + if timestamp: + current_conditions_dict['timestamp'] = timestamp + + kv_mapping = { + 'relativeHumidity': 'relativeHumidity', + 'wind': [ + 'wind/speed', + 'wind/gust', + 'wind/direction', + 'wind/bearing', + ], + 'pressure': 'pressure', + 'temperature': 'temperature', + 'dewpoint': 'dewpoint', + 'windChill': 'windChill', + 'station': 'station', + 'condition': 'condition', + } + + for key, value in kv_mapping.items(): + if isinstance(value, list): + _dict = {} + for val in value: + node = current_conditions.find(val) + if node is not None and node.text: + _dict[val.split('/')[-1]] = self._node_to_dict( + current_conditions.find(val), self.lang + ) + if _dict: + current_conditions_dict[key] = _dict + else: + node = current_conditions.find(value) + if node is not None and (node.attrib or node.text): + current_conditions_dict[key] = self._node_to_dict( + current_conditions.find(value), self.lang + ) + + if self.cpw_feature.get('properties', {}).get('currentConditions', {}): + existing_dict = self.cpw_feature['properties']['currentConditions'] + current_conditions_dict = self._deep_merge( + existing_dict, current_conditions_dict + ) + else: + self.cpw_feature['properties'][ + 'currentConditions' + ] = current_conditions_dict + + return self.cpw_feature + + def _set_cpw_forecast_group_regional_normals(self): + """ + Set regional normals information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ - feature['geom'] = [x, y, 0.0] - icon = self._get_element(root, 'currentConditions/iconCode') + regional_normals = self.xml_root.find('forecastGroup/regionalNormals') + if regional_normals is not None and len(regional_normals): + regional_normals_dict = {} + textSummary = regional_normals.find('textSummary') + temperatures = regional_normals.findall('temperature') + + if textSummary.text: + self._set_nested_value( + regional_normals_dict, + ['textSummary'], + {self.lang: textSummary.text}, + ) - if icon: - row['icon'] = f'https://weather.gc.ca/weathericons/{icon}.gif' + regional_high_lows = [ + self._node_to_dict(temp, self.lang) for temp in temperatures + ] + + # if properties.forecast_group.regionalNormals.temperature + # exists, retrieve them and update them with the new values + if ( + self.cpw_feature.get('properties', {}) + .get('forecastGroup', {}) + .get('regionalNormals', {}) + .get('temperature', {}) + ): + for i, temp in enumerate(regional_high_lows): + existing_dict = self.cpw_feature['properties'][ + 'forecastGroup' + ]['regionalNormals']['temperature'][i] + regional_high_lows[i] = self._deep_merge( + existing_dict, temp + ) + + if regional_high_lows: + regional_normals_dict['temperature'] = regional_high_lows + + if ( + self.cpw_feature.get('properties', {}) + .get('forecastGroup', {}) + .get('regionalNormals', {}) + ): + existing_dict = self.cpw_feature['properties'][ + 'forecastGroup' + ]['regionalNormals'] + regional_normals_dict = self._deep_merge( + existing_dict, regional_normals_dict + ) else: - row['icon'] = None + self.cpw_feature['properties']['forecastGroup'][ + 'regionalNormals' + ] = regional_normals_dict - for dates in root.findall("currentConditions/dateTime" - "[@zone='UTC'][@name='observation']"): - timestamp = dates.find('timeStamp') - if timestamp is not None: - dt2 = datetime.strptime(timestamp.text, '%Y%m%d%H%M%S') - row['timestamp'] = dt2.strftime('%Y-%m-%dT%H:%M:%SZ') - - row['rel_hum'] = self._get_element( - root, - 'currentConditions/relativeHumidity') - row['speed'] = self._get_element(root, - 'currentConditions/wind/speed') - row['gust'] = self._get_element(root, - 'currentConditions/wind/gust') - row['direction'] = self._get_element( - root, - 'currentConditions/wind/direction') - row['bearing'] = self._get_element( - root, 'currentConditions/wind/bearing') - row['temp'] = self._get_element( - root, 'currentConditions/temperature') - row['dewpoint'] = self._get_element( - root, 'currentConditions/dewpoint') - row['windchill'] = self._get_element( - root, 'currentConditions/windChill') - - if xml.endswith('e.xml'): - row['name'] = self._get_element(root, 'location/name') - row['station_en'] = self._get_element( - root, 'currentConditions/station') - row['cond_en'] = self._get_element( - root, 'currentConditions/condition') - row['pres_en'] = self._get_element( - root, 'currentConditions/pressure') - row['prestnd_en'] = self._get_element( - root, - 'currentConditions/pressure', - 'tendency') - row['url_en'] = f'https://weather.gc.ca/city/pages/{citycode}_metric_e.html' # noqa - - row['national'] = 0 - if row['name'] in NATIONAL_CITIES: - row['national'] = 1 - - LOGGER.debug('Adding feature') - LOGGER.debug('Setting geometry') - - conditions = { - 'type': "Feature", - 'properties': { - 'identifier': citycode, - 'name': row['name'], - 'station_en': row['station_en'], - 'icon': row['icon'], - 'cond_en': row['cond_en'], - 'temp': self.if_none('f', row['temp']), - 'dewpoint': self.if_none('f', row['dewpoint']), - 'windchill': self.if_none('i', row['windchill']), - 'pres_en': self.if_none('f', row['pres_en']), - 'prestnd_en': row['prestnd_en'], - 'rel_hum': self.if_none('i', row['rel_hum']), - 'speed': self.if_none('i', row['speed']), - 'gust': self.if_none('i', row['gust']), - 'direction_en': row['direction'], - 'bearing': self.if_none('f', row['bearing']), - 'timestamp': row['timestamp'], - 'url_en': row['url_en'], - 'national': int(row['national']) - }, - 'geometry': { - 'type': "Point", - 'coordinates': feature['geom'] + return self.cpw_feature + + def _set_forecast_general_info(self, forecast, forecast_dict): + """ + Set general forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + period_dict = self._node_to_dict(forecast.find('period')) + + self._set_nested_value( + forecast_dict, + ['period', 'textForecastName'], + {self.lang: period_dict.get('textForecastName')}, + ) + self._set_nested_value( + forecast_dict, + ['period', 'value'], + {self.lang: period_dict.get('value')}, + ) + self._set_nested_value( + forecast_dict, + ['textSummary'], + {self.lang: _get_element(forecast, 'textSummary')}, + ) + + return forecast_dict + + def _set_forecast_cloud_precip(self, forecast_elem, forecast_dict): + """ + Set cloud precipitation forecast information for + the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + cloud_precip = forecast_elem.find('cloudPrecip') + if cloud_precip is not None and len(cloud_precip): + self._set_nested_value( + forecast_dict, + ['cloudPrecip'], + {self.lang: cloud_precip.find('textSummary').text}, + ) + + return forecast_dict + + def _set_forecast_abbreviated_forecast(self, forecast_elem, forecast_dict): + """ + Set abbreviated forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + abbreviated_forecast = forecast_elem.find('abbreviatedForecast') + if abbreviated_forecast is not None and len(abbreviated_forecast): + self._set_nested_value( + forecast_dict, + ['abbreviatedForecast', 'textSummary'], + {self.lang: abbreviated_forecast.find('textSummary').text}, + ) + + self._set_nested_value( + forecast_dict, + ['abbreviatedForecast', 'icon'], + self._node_to_dict(abbreviated_forecast.find('iconCode')), + ) + + self._set_nested_value( + forecast_dict, + ['abbreviatedForecast', 'icon', 'url'], + f'https://weather.gc.ca/weathericons/{forecast_dict["abbreviatedForecast"]["icon"]["value"]:02d}.gif', # noqa + ) + + return forecast_dict + + def _set_forecast_temperatures(self, forecast_elem, forecast_dict): + """ + Set temperatures forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + temperatures = forecast_elem.find('temperatures') + if temperatures is not None and len(temperatures): + self._set_nested_value( + forecast_dict, + ['temperatures', 'textSummary'], + {self.lang: temperatures.find('textSummary').text}, + ) + + temps = [] + for i, temp in enumerate(temperatures.findall('temperature')): + temp_dict = self._node_to_dict(temp, self.lang) + # get existing forecast_dict['temperatures']['temperature'][i] + # if it exists + if i < len( + forecast_dict['temperatures'].get('temperature', []) + ): + existing_dict = forecast_dict['temperatures'][ + 'temperature' + ][i] + for key in existing_dict.keys(): + if key in temp_dict: + existing_dict[key] = { + **existing_dict[key], + **temp_dict[key], + } + else: + temps.append(temp_dict) + if temps: + self._set_nested_value( + forecast_dict, ['temperatures', 'temperature'], temps + ) + + return forecast_dict + + def _set_forecast_winds(self, forecast_elem, forecast_dict): + """ + Set winds forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + winds = forecast_elem.find('winds') + if winds is not None and len(winds): + if winds.find('textSummary') is not None: + self._set_nested_value( + forecast_dict, + ['winds', 'textSummary'], + {self.lang: _get_element(winds, 'textSummary')}, + ) + + periods = [] + for i, period in enumerate(winds.findall('wind')): + wind_period_dict = {} + attrs = ['index', 'rank'] + for attr in attrs: + wind_period_dict[attr] = { + self.lang: safe_cast_to_number(period.attrib.get(attr)) } + + nodes = ['speed', 'gust', 'direction', 'bearing'] + for node in nodes: + wind_period_dict[node] = self._node_to_dict( + period.find(node), self.lang + ) + + if i < len(forecast_dict.get('winds', {}).get('periods', [])): + existing_dict = forecast_dict['winds']['periods'][i] + forecast_dict['winds']['periods'][i] = self._deep_merge( + existing_dict, wind_period_dict + ) + else: + periods.append(wind_period_dict) + + if periods: + self._set_nested_value( + forecast_dict, ['winds', 'periods'], periods + ) + + return forecast_dict + + def _set_forecast_precipitation(self, forecast_elem, forecast_dict): + """ + Set precipitation forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + precipitation = forecast_elem.find('precipitation') + if precipitation is not None and len(precipitation): + + # set textSummary if it exists + if precipitation.find('textSummary').text: + self._set_nested_value( + forecast_dict, + ['precipitation', 'textSummary'], + {self.lang: _get_element(precipitation, 'textSummary')}, + ) + + # get precipitation periods + precip_periods = [] + for i, precip_type in enumerate( + precipitation.findall('precipType') + ): + if precip_type.attrib.get('start') and precip_type.attrib.get( + 'end' + ): + precip_type_dict = self._node_to_dict( + precip_type, self.lang + ) + if i < len( + forecast_dict.get('precipitation', {}).get( + 'precipPeriods', [] + ) + ): + existing_dict = forecast_dict['precipitation'][ + 'precipPeriods' + ][i] + forecast_dict['precipitation']['precipPeriods'][i] = ( + self._deep_merge(existing_dict, precip_type_dict) + ) + else: + precip_periods.append(precip_type_dict) + + if precip_periods: + self._set_nested_value( + forecast_dict, + ['precipitation', 'precipPeriods'], + precip_periods, + ) + + # get accumulation + accumulation = precipitation.find('accumulation') + if accumulation is not None and len(accumulation): + name = accumulation.find('name').text + amount = accumulation.find('amount') + + if name or amount: + accumulation_dict = { + 'name': {self.lang: name}, + 'amount': self._node_to_dict(amount, self.lang), + } + if 'accumulation' in forecast_dict['precipitation']: + existing_accumulation_dict = forecast_dict[ + 'precipitation' + ]['accumulation'] + forecast_dict['precipitation']['accumulation'] = ( + self._deep_merge( + existing_accumulation_dict, accumulation_dict + ) + ) + + if 'accumulation' not in forecast_dict['precipitation']: + forecast_dict['precipitation']['accumulation'] = { + 'name': {self.lang: name}, + 'amount': self._node_to_dict(amount, self.lang), + } + + return forecast_dict + + def _set_forecast_windchill(self, forecast_elem, forecast_dict): + """ + Set windchill forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + windchill = forecast_elem.find('windChill') + if windchill is not None and len(windchill): + if windchill.find('textSummary').text: + self._set_nested_value( + forecast_dict, + ['windChill', 'textSummary'], + {self.lang: windchill.find('textSummary').text}, + ) + + if windchill.find('calculated').text: + self._set_nested_value( + forecast_dict, + ['windChill', 'calculated'], + {self.lang: windchill.find('calculated').text}, + ) + + if windchill.find('frostbite').text: + self._set_nested_value( + forecast_dict, + ['windChill', 'frostbite'], + {self.lang: windchill.find('frostbite').text}, + ) + + return forecast_dict + + def _set_forecast_uv(self, forecast, forecast_dict): + """ + Set uv forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + uv = forecast.find('uv') + if uv is not None and len(uv): + uv_dict = {} + if uv.attrib.get('category'): + uv_dict['category'] = {self.lang: uv.attrib.get('category')} + + if ( + uv.find('textSummary') is not None + and uv.find('textSummary').text + ): + uv_dict['textSummary'] = { + self.lang: uv.find('textSummary').text } - elif xml.endswith('f.xml'): - LOGGER.debug(f'Processing {xml}') - - row['nom'] = self._get_element(root, 'location/name') - row['station_fr'] = self._get_element( - root, 'currentConditions/station') - row['cond_fr'] = self._get_element( - root, 'currentConditions/condition') - row['pres_fr'] = self._get_element( - root, 'currentConditions/pressure') - row['prestnd_fr'] = self._get_element( - root, - 'currentConditions/pressure', - 'tendency') - row['url_fr'] = f'https://meteo.gc.ca/city/pages/{citycode}_metric_f.html' # noqa - - row['national'] = 0 - if row['nom'] in NATIONAL_CITIES: - row['national'] = 1 - - LOGGER.debug('Adding feature') - LOGGER.debug('Setting geometry') - - conditions = { - 'type': "Feature", - 'properties': { - 'identifier': citycode, - 'nom': row['nom'], - 'station_fr': row['station_fr'], - 'icon': row['icon'], - 'cond_fr': row['cond_fr'], - 'temp': self.if_none('f', row['temp']), - 'dewpoint': self.if_none('f', row['dewpoint']), - 'windchill': self.if_none('i', row['windchill']), - 'pres_fr': self.if_none('f', row['pres_fr']), - 'prestnd_fr': row['prestnd_fr'], - 'rel_hum': self.if_none('i', row['rel_hum']), - 'speed': self.if_none('i', row['speed']), - 'gust': self.if_none('i', row['gust']), - 'direction_fr': row['direction'], - 'bearing': self.if_none('f', row['bearing']), - 'timestamp': row['timestamp'], - 'url_fr': row['url_fr'], - 'national': int(row['national'])}, - 'geometry': { - 'type': "Point", - 'coordinates': feature['geom'] + if uv.find('index').text: + uv_dict['index'] = {self.lang: uv.find('index').text} + + if 'uv' in forecast_dict: + existing_dict = forecast_dict['uv'] + forecast_dict['uv'] = self._deep_merge(existing_dict, uv_dict) + else: + forecast_dict['uv'] = uv_dict + + return forecast_dict + + def _set_forecast_rel_hum(self, forecast_elem, forecast_dict): + """ + Set relative humidity forecast information for + the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + rel_hum = forecast_elem.find('relativeHumidity') + if rel_hum is not None: + self._set_nested_value( + forecast_dict, + ['relativeHumidity'], + self._node_to_dict(rel_hum, self.lang), + ) + + return forecast_dict + + def _set_forecast_humidex(self, forecast_elem, forecast_dict): + """ + Set humidex forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + humidex = forecast_elem.find('humidex') + if humidex is not None and len(humidex): + if humidex.find('textSummary').text: + self._set_nested_value( + forecast_dict, + ['humidex', 'textSummary'], + {self.lang: humidex.find('textSummary').text}, + ) + + if humidex.find('calculated').text: + self._set_nested_value( + forecast_dict, + ['humidex', 'calculated'], + {self.lang: humidex.find('calculated').text}, + ) + + return forecast_dict + + def _set_forecast_visibility(self, forecast_elem, forecast_dict): + """ + Set visibility forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + visibility = forecast_elem.find('visibility') + if visibility is not None and len(visibility): + visibility_dict = {} + for child in visibility: + if 'Visib' in child.tag and child.text: + visibility_dict['textSummary'] = { + self.lang: child.find('textSummary').text } + visibility_dict['cause'] = child.attrib.get('cause') + + if visibility_dict: + self._set_nested_value( + forecast_dict, ['visibility'], visibility_dict + ) + + return forecast_dict + + def _set_forecast_snowlevel(self, forecast_elem, forecast_dict): + """ + Set snowlevel forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + snow_level = forecast_elem.find('snowLevel') + if snow_level is not None and len(snow_level): + snow_level_dict = {} + if snow_level.find('textSummary').text: + snow_level_dict['textSummary'] = { + self.lang: snow_level.find('textSummary').text } - conditions['properties'] = {key:val for key, val in conditions['properties'].items() if val is not None} # noqa - return conditions + if snow_level_dict: + self._set_nested_value( + forecast_dict, ['snowLevel'], snow_level_dict + ) - else: - LOGGER.warning( - f'No current conditions found. Skippping file {xml}.' + return forecast_dict + + def _set_forecast_frost(self, forecast_elem, forecast_dict): + """ + Set frost forecast information for the citypageweather object + + :param forecast: `xml.etree.Element` of forecast + :param forecast_dict: `dict` of forecast information + + :returns: `dict` of modified citypageweather object + """ + + frost = forecast_elem.find('frost') + if frost is not None and len(frost): + frost_dict = {} + if frost.find('textSummary').text: + frost_dict['textSummary'] = { + self.lang: frost.find('textSummary').text + } + + if frost_dict: + self._set_nested_value(forecast_dict, ['frost'], frost_dict) + + return forecast_dict + + def _set_cpw_forecast_group(self): + """ + Set forecast group information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ + + forecast_group = self.xml_root.find("forecastGroup") + + if not 'forecastGroup' in self.cpw_feature['properties']: # noqa + self.cpw_feature['properties']['forecastGroup'] = {} + + if forecast_group is not None and len(forecast_group): + + for date in forecast_group.findall( + "dateTime" "[@zone='UTC'][@name='forecastIssue']" + ): + timestamp = self._get_utc_timestamp(date) + if timestamp is not None: + self._set_nested_value( + self.cpw_feature['properties']['forecastGroup'], + ['timestamp'], + timestamp, + ) + + self._set_cpw_forecast_group_regional_normals() + + # iterate over forecasts and populate + forecasts = forecast_group.findall("forecast") + + if ( + forecasts is not None + and len(forecasts) + and 'forecasts' + not in self.cpw_feature['properties']['forecastGroup'] + ): + self.cpw_feature['properties']['forecastGroup'][ + 'forecasts' + ] = [] + + for i, forecast_elem in enumerate(forecasts): + # if index exists in + # self.cpw_feature['properties']['forecastGroup']['forecasts'] + # use it, otherwise create a new dict + if i < len( + self.cpw_feature['properties']['forecastGroup'][ + 'forecasts' + ] + ): + forecast_dict = self.cpw_feature['properties'][ + 'forecastGroup' + ]['forecasts'][i] + else: + forecast_dict = {} + + # get list of all self._get_forecast_* functions and call each + # function to populate each forecast object + set_forecast_funcs = [ + getattr(self, f) + for f in dir(self) + if f.startswith('_set_forecast_') + ] + + for forecast_func in set_forecast_funcs: + forecast_dict = forecast_func(forecast_elem, forecast_dict) + + self.cpw_feature['properties']['forecastGroup'][ + 'forecasts' + ].append(forecast_dict) + + if not self.cpw_feature['properties'].get('forecast_group'): + self.cpw_feature['properties'].pop('forecast_group', None) + + return self.cpw_feature + + def _set_hourly_forecast_datetime( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast datetime information for the hourly forecast object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + dt = datetime.strptime( + hourly_forecast_elem.attrib.get('dateTimeUTC'), '%Y%m%d%H%M%S' + ) + if dt is not None: + self._set_nested_value( + hourly_forecast_dict, + ['timestamp'], + dt.strftime('%Y-%m-%dT%H:%M:%SZ'), ) - return None + + return hourly_forecast_dict + + def _set_hourly_forecast_condition( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast condition information for the hourly forecast + object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + condition = hourly_forecast_elem.find('condition') + if condition is not None: + self._set_nested_value( + hourly_forecast_dict, + ['condition'], + {self.lang: condition.text}, + ) + + return hourly_forecast_dict + + def _set_hourly_forecast_icon_code( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast icon code information for the citypageweather + object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of hourly forecast object + """ + + icon_code = hourly_forecast_elem.find('iconCode') + if icon_code is not None: + hourly_forecast_dict['iconCode'] = self._node_to_dict(icon_code) + + if 'iconCode' in hourly_forecast_dict: + hourly_forecast_dict['iconCode'][ + 'url' + ] = f'https://weather.gc.ca/weathericons/{hourly_forecast_dict["iconCode"]["value"]:02d}.gif' # noqa + + return hourly_forecast_dict + + def _set_hourly_forecast_temperature( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast temperature information for the hourly forecast + object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + temperature = hourly_forecast_elem.find('temperature') + if temperature is not None: + self._set_nested_value( + hourly_forecast_dict, + ['temperature'], + self._node_to_dict(temperature, self.lang), + ) + + return hourly_forecast_dict + + def _set_hourly_forecast_lop( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast lop information for the hourly forecast object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + lop = hourly_forecast_elem.find('lop') + if lop is not None and lop.text: + self._set_nested_value( + hourly_forecast_dict, + ['lop'], + self._node_to_dict(lop, self.lang), + ) + + return hourly_forecast_dict + + def _set_hourly_forecast_humidex( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast humidex information for the hourly forecast object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + humidex = hourly_forecast_elem.find('humidex') + if humidex is not None and humidex.text: + self._set_nested_value( + hourly_forecast_dict, + ['humidex'], + self._node_to_dict(humidex, self.lang), + ) + + return hourly_forecast_dict + + def _set_hourly_forecast_windchill( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast windchill information for the hourly forecast + object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + windchill = hourly_forecast_elem.find('windChill') + if windchill is not None and windchill.text: + self._set_nested_value( + hourly_forecast_dict, + ['windChill'], + self._node_to_dict(windchill, self.lang), + ) + + return hourly_forecast_dict + + def _set_hourly_forecast_uv( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast uv information for the hourly forecast object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + uv = hourly_forecast_elem.find('uv') + if uv is not None: + index = uv.find('index') + if index is not None and index.text: + self._set_nested_value( + hourly_forecast_dict, + ['uv', 'index', 'value'], + self._node_to_dict(index, self.lang), + ) + + return hourly_forecast_dict + + def _set_hourly_forecast_wind( + self, hourly_forecast_elem, hourly_forecast_dict + ): + """ + Set hourly forecast wind information for the hourly forecast object + + :param hourly_forecast_elem: `xml.etree.Element` of hourly forecast + :param hourly_forecast_dict: `dict` of hourly forecast information + + :returns: `dict` of modified hourly forecast object + """ + + wind = hourly_forecast_elem.find('wind') + if wind is not None: + wind_dict = {} + nodes = ['speed', 'direction', 'gust'] + for node in nodes: + node_elem = wind.find(node) + if node_elem is not None and node_elem.text: + wind_dict[node] = self._node_to_dict(node_elem, self.lang) + + self._set_nested_value(hourly_forecast_dict, ['wind'], wind_dict) + + return hourly_forecast_dict + + def _set_cpw_hourly_forecast_group(self): + """ + Set hourly forecast group information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ + + hourly_forecast_group = self.xml_root.find("hourlyForecastGroup") + + if 'hourlyForecastGroup' not in self.cpw_feature['properties']: + self.cpw_feature['properties']['hourlyForecastGroup'] = {} + + if hourly_forecast_group is not None and len(hourly_forecast_group): + + for date in hourly_forecast_group.findall( + "dateTime" "[@zone='UTC'][@name='forecastIssue']" + ): + timestamp = self._get_utc_timestamp(date) + if timestamp is not None: + self._set_nested_value( + self.cpw_feature['properties']['hourlyForecastGroup'], + ['timestamp'], + timestamp, + ) + + # iterate over hourly forecasts and populate + hourly_forecasts = hourly_forecast_group.findall("hourlyForecast") + if ( + hourly_forecasts is not None + and len(hourly_forecasts) + and 'hourlyForecasts' + not in self.cpw_feature['properties']['hourlyForecastGroup'] + ): + self.cpw_feature['properties']['hourlyForecastGroup'][ + 'hourlyForecasts' + ] = [] + + for i, hourly_forecast_elem in enumerate(hourly_forecasts): + # if index exists in + # self.cpw_feature['properties']['hourlyForecastGroup']['hourlyForecasts'] + # use it, otherwise create a new dict + if i < len( + self.cpw_feature['properties']['hourlyForecastGroup'][ + 'hourlyForecasts' + ] + ): + hourly_forecast_dict = self.cpw_feature['properties'][ + 'hourlyForecastGroup' + ]['hourlyForecasts'][i] + else: + hourly_forecast_dict = {} + + # get list of all self._set_forecast_* functions and call each + # function to populate each forecast object + set_forecast_funcs = [ + getattr(self, f) + for f in dir(self) + if f.startswith('_set_hourly_forecast_') + ] + + for forecast_func in set_forecast_funcs: + hourly_forecast_dict = forecast_func( + hourly_forecast_elem, hourly_forecast_dict + ) + + self.cpw_feature['properties']['hourlyForecastGroup'][ + 'hourlyForecasts' + ].append(hourly_forecast_dict) + + # if not self.cpw_feature['properties'].get('hourlyForecastGroup'): + # self.cpw_feature['properties'].pop('hourlyForecastGroup', None) + + return self.cpw_feature + + def _set_cpw_warnings(self): + """ + Set warnings information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ + + warnings = self.xml_root.find("warnings") + + if warnings is not None and len(warnings): + events = [] + warnings = self.xml_root.find("warnings") + + if ( + warnings is not None + and len(warnings) + and 'warnings' not in self.cpw_feature['properties'] + ): + self.cpw_feature['properties']['warnings'] = [] + + events = [] + if warnings is not None and len(warnings): + for i, event in enumerate(warnings): + if i < len(self.cpw_feature['properties']['warnings']): + event_dict = self.cpw_feature['properties']['warnings'][i] + else: + event_dict = {} + + event_dict = self._deep_merge( + event_dict, self._node_to_dict(event, self.lang) + ) + + event_issue = event.find("dateTime[@name='eventIssue']") + if event_issue is not None: + self._set_nested_value( + event_dict, + ['eventIssue'], + self._get_utc_timestamp(event_issue), + ) + + events.append(event_dict) + + self.cpw_feature['properties']['warnings'] = events + + return self.cpw_feature + + def _set_cpw_riseSet(self): + """ + Set rise and set information for the citypageweather object + + :returns: `dict` of modified citypageweather object + """ + + rise_set = self.xml_root.find("riseSet") + + if rise_set is not None and len(rise_set): + if 'riseSet' not in self.cpw_feature['properties']: + rise_set_dict = {} + else: + rise_set_dict = self.cpw_feature['properties']['riseSet'] + + # get disclaimer + disclaimer = rise_set.find('disclaimer') + if disclaimer is not None: + self._set_nested_value( + rise_set_dict, ['disclaimer'], {self.lang: disclaimer.text} + ) + + # get sunrise and sunset UTC + for node in rise_set.findall('dateTime[@zone="UTC"]'): + if node.attrib.get('name') == 'sunrise': + self._set_nested_value( + rise_set_dict, + ['sunrise'], + self._get_utc_timestamp(node), + ) + if node.attrib.get('name') == 'sunset': + self._set_nested_value( + rise_set_dict, + ['sunset'], + self._get_utc_timestamp(node), + ) + + if rise_set_dict: + self.cpw_feature['properties']['riseSet'] = rise_set_dict + + return self.cpw_feature + + def xml2json_cpw(self): + """ + main for generating weather data + + :param wxo_lookup: json file to have the city id + :param xml: xml file to parse and convert to json + + :returns: xml as json object + """ + + for lang, xml_root in self.xml_roots.items(): + self.xml_root = xml_root + self.lang = lang + self._set_cpw_location() + self._set_cpw_current_conditions() + self._set_cpw_forecast_group() + self._set_cpw_hourly_forecast_group() + self._set_cpw_warnings() + self._set_cpw_riseSet() + + return self.cpw_feature @click.group() @@ -526,9 +3036,8 @@ def add(ctx, file_, directory, es, username, password, ignore_certs): if file_ is not None: files_to_process = [file_] elif directory is not None: - for root, dirs, files in os.walk(directory): - for f in [file for file in files if file.endswith('.xml')]: - files_to_process.append(os.path.join(root, f)) + for file in Path(directory).rglob('*_MSC_CitypageWeather_*.xml'): + files_to_process.append(file) files_to_process.sort(key=os.path.getmtime) for file_to_process in files_to_process: @@ -540,58 +3049,48 @@ def add(ctx, file_, directory, es, username, password, ignore_certs): @click.command() @click.pass_context -@cli_options.OPTION_DAYS( - default=DAYS_TO_KEEP, - help=f'Delete documents older than n days (default={DAYS_TO_KEEP})' -) @cli_options.OPTION_ELASTICSEARCH() @cli_options.OPTION_ES_USERNAME() @cli_options.OPTION_ES_PASSWORD() @cli_options.OPTION_ES_IGNORE_CERTS() -@cli_options.OPTION_YES( - prompt='Are you sure you want to delete old documents?' -) -def clean_records(ctx, days, es, username, password, ignore_certs): - """Delete old citypageweather documents""" +@cli_options.OPTION_YES(prompt='Are you sure you want to delete this index?') +def delete_index(ctx, es, username, password, ignore_certs): + """Delete current conditions index""" conn_config = configure_es_connection(es, username, password, ignore_certs) conn = ElasticsearchConnector(conn_config) - older_than = (datetime.now() - timedelta(days=days)).strftime( - '%Y-%m-%d %H:%M') - click.echo(f'Deleting documents older than {older_than} ({days} days)') - - query = { - 'query': { - 'range': { - 'properties.datetime': { - 'lte': older_than - } - } - } - } - - conn.Elasticsearch.delete_by_query(index=INDEX_NAME, body=query) + conn.delete(INDEX_NAME) @click.command() @click.pass_context -@cli_options.OPTION_ELASTICSEARCH() -@cli_options.OPTION_ES_USERNAME() -@cli_options.OPTION_ES_PASSWORD() -@cli_options.OPTION_ES_IGNORE_CERTS() +@cli_options.OPTION_HOURS( + default=24, + help='Delete cache older than n hours (default=24)', +) @cli_options.OPTION_YES( - prompt='Are you sure you want to delete this index?' + prompt=f'Are you sure you want to clean the cache ({Path(MSC_PYGEOAPI_CACHEDIR) / "citypage_weather"})?' # noqa ) -def delete_index(ctx, es, username, password, ignore_certs): - """Delete current conditions index""" - - conn_config = configure_es_connection(es, username, password, ignore_certs) - conn = ElasticsearchConnector(conn_config) - - conn.delete(INDEX_NAME) +def clean_cache(ctx, hours): + """Clean the cache""" + + cache_location = Path(MSC_PYGEOAPI_CACHEDIR) / "citypage_weather" + if not cache_location.exists(): + click.echo('Cache is empty or does not exist.') + else: + cache_files = list(cache_location.rglob('*_MSC_CitypageWeather_*.xml')) + click.echo( + f"Removing {len(cache_files)} CityPage Weather XML files from cache..." # noqa + ) + for file in cache_files: + if ( + datetime.now() - datetime.fromtimestamp(file.stat().st_mtime) + ).seconds / 3600 > hours: + file.unlink() + click.echo('Done') citypageweather.add_command(add) -citypageweather.add_command(clean_records) +citypageweather.add_command(clean_cache) citypageweather.add_command(delete_index) diff --git a/msc_pygeoapi/provider/cpw_elasticsearch.py b/msc_pygeoapi/provider/cpw_elasticsearch.py new file mode 100644 index 00000000..9ace4d0c --- /dev/null +++ b/msc_pygeoapi/provider/cpw_elasticsearch.py @@ -0,0 +1,416 @@ +# ================================================================= +# +# Authors: Etienne Pelletier +# +# Copyright (c) 2024 Etienne Pelletier +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= +from collections import OrderedDict +import json +import logging + +from elasticsearch import exceptions, helpers + +from pygeoapi.provider.base import ProviderConnectionError, ProviderQueryError +from pygeoapi.provider.elasticsearch_ import ( + update_query, +) +from pygeoapi.util import crs_transform + +from msc_pygeoapi.provider.elasticsearch import MSCElasticsearchProvider + + +LOGGER = logging.getLogger(__name__) + + +class CPWElasticsearchProvider(MSCElasticsearchProvider): + """CPW Elasticsearch Provider""" + + def __init__(self, provider_def): + """ + Initialize object + + :param provider_def: provider definition + + :returns: msc_pygeoapi.provider.elasticsearch.CPWElasticsearchProvider + """ + self._nested_fields = [] + super().__init__(provider_def) + + def _set_field(self, field_name, field_def): + """ + Set a field in self._fields based on Elasticsearch field definition + + :param field_name: field name + :param field_def: field definition + + :returns: None + """ + + if 'properties' in field_def: + for k, v in field_def['properties'].items(): + self._set_field(f'{field_name}.{k}', v) + self._fields[field_name] = {'type': 'object'} + + if 'type' in field_def: + if field_def['type'] == 'text': + self._fields[field_name] = {'type': 'string'} + elif field_def['type'] == 'date': + self._fields[field_name] = {'type': 'string', 'format': 'date'} + elif field_def['type'] in ('float', 'long'): + self._fields[field_name] = { + 'type': 'number', + 'format': field_def['type'], + } + elif field_def['type'] == 'nested': + self._nested_fields.append(field_name) + self._fields[field_name] = {'type': field_def['type']} + else: + self._fields[field_name] = {'type': field_def['type']} + + def get_fields(self): + """ + Get provider field information (names, types) + + :returns: dict of fields + """ + if not self._fields: + ii = self.es.indices.get( + index=self.index_name, allow_no_indices=False + ) + + LOGGER.debug(f'Response: {ii}') + try: + if '*' not in self.index_name: + mappings = ii[self.index_name]['mappings'] + p = mappings['properties']['properties'] + else: + LOGGER.debug('Wildcard index; setting from first match') + index_name_ = list(ii.keys())[0] + p = ii[index_name_]['mappings']['properties']['properties'] + except KeyError: + LOGGER.warning('Trying for alias') + alias_name = next(iter(ii)) + p = ii[alias_name]['mappings']['properties']['properties'] + except IndexError: + LOGGER.warning('could not get fields; returning empty set') + return {} + + for k, v in p['properties'].items(): + self._set_field(k, v) + + return self._fields + + @crs_transform + def query( + self, + offset=0, + limit=10, + resulttype='results', + bbox=[], + datetime_=None, + properties=[], + sortby=[], + select_properties=[], + skip_geometry=False, + q=None, + filterq=None, + **kwargs, + ): + """ + query Elasticsearch index + + :param offset: starting record to return (default 0) + :param limit: number of records to return (default 10) + :param resulttype: return results or hit limit (default results) + :param bbox: bounding box [minx,miny,maxx,maxy] + :param datetime_: temporal (datestamp or extent) + :param properties: list of tuples (name, value) + :param sortby: list of dicts (property, order) + :param select_properties: list of property names + :param skip_geometry: bool of whether to skip geometry (default False) + :param q: full-text search term(s) + :param filterq: filter object + + :returns: dict of 0..n GeoJSON features + """ + + self.select_properties = select_properties + + query = {'track_total_hits': True, 'query': {'bool': {'filter': []}}} + filter_ = [] + + feature_collection = {'type': 'FeatureCollection', 'features': []} + + if resulttype == 'hits': + LOGGER.debug('hits only specified') + limit = 0 + + if bbox: + LOGGER.debug('processing bbox parameter') + minx, miny, maxx, maxy = bbox + bbox_filter = { + 'geo_shape': { + 'geometry': { + 'shape': { + 'type': 'envelope', + 'coordinates': [[minx, maxy], [maxx, miny]], + }, + 'relation': 'intersects', + } + } + } + + query['query']['bool']['filter'].append(bbox_filter) + + if datetime_ is not None: + LOGGER.debug('processing datetime parameter') + if self.time_field is None: + LOGGER.error('time_field not enabled for collection') + raise ProviderQueryError() + + time_field = self.mask_prop(self.time_field) + + if '/' in datetime_: # envelope + LOGGER.debug('detected time range') + time_begin, time_end = datetime_.split('/') + + range_ = { + 'range': {time_field: {'gte': time_begin, 'lte': time_end}} + } + if time_begin == '..': + range_['range'][time_field].pop('gte') + elif time_end == '..': + range_['range'][time_field].pop('lte') + + filter_.append(range_) + + else: # time instant + LOGGER.debug('detected time instant') + filter_.append({'match': {time_field: datetime_}}) + + LOGGER.debug(filter_) + query['query']['bool']['filter'].append(*filter_) + + if properties: + LOGGER.debug('processing properties') + for prop in properties: + prop_name = self.mask_prop(prop[0]) + matching_nested_field = next( + (f for f in self._nested_fields if prop[0].startswith(f)), + False, + ) + if matching_nested_field: + prop_values = prop[1].split('|') + occur = 'should' if '|' in prop[1] else 'must' + pf = { + 'nested': { + 'path': f'{self.mask_prop(matching_nested_field)}', + 'query': {'bool': {occur: []}}, + }, + } + for prop_value in prop_values: + pf['nested']['query']['bool'][occur].append( + {'match': {prop_name: {'query': prop_value}}} + ) + query['query']['bool']['filter'].append(pf) + else: + pf = {'match': {prop_name: {'query': prop[1]}}} + query['query']['bool']['filter'].append(pf) + + if '|' not in prop[1]: + pf['match'][prop_name]['minimum_should_match'] = '100%' + + if sortby: + LOGGER.debug('processing sortby') + query['sort'] = [] + for sort in sortby: + LOGGER.debug(f'processing sort object: {sort}') + + sp = sort['property'] + + if self.fields[sp]['type'] in ['object', 'nested']: + LOGGER.warning( + 'Cannot sort by property of type object or nested' + ) + continue + + if ( + self.fields[sp]['type'] == 'string' + and self.fields[sp].get('format') != 'date' + ): + LOGGER.debug('setting ES .raw on property') + sort_property = f'{self.mask_prop(sp)}.raw' + else: + sort_property = self.mask_prop(sp) + + sort_order = 'asc' + if sort['order'] == '-': + sort_order = 'desc' + + sort_ = {sort_property: {'order': sort_order}} + query['sort'].append(sort_) + + if q is not None: + LOGGER.debug('Adding free-text search') + query['query']['bool']['must'] = {'query_string': {'query': q}} + + query['_source'] = { + 'excludes': [ + 'properties._metadata-payload', + 'properties._metadata-schema', + 'properties._metadata-format', + ] + } + + if self.properties or self.select_properties: + LOGGER.debug('filtering properties') + + all_properties = self.get_properties() + + query['_source'] = { + 'includes': list(map(self.mask_prop, all_properties)) + } + + query['_source']['includes'].append('id') + query['_source']['includes'].append('type') + query['_source']['includes'].append('geometry') + + if skip_geometry: + LOGGER.debug('excluding geometry') + try: + query['_source']['excludes'] = ['geometry'] + except KeyError: + query['_source'] = {'excludes': ['geometry']} + try: + LOGGER.debug('querying Elasticsearch') + if filterq: + LOGGER.debug(f'adding cql object: {filterq.json()}') + query = update_query(input_query=query, cql=filterq) + LOGGER.debug(json.dumps(query, indent=4)) + + LOGGER.debug('Testing for ES scrolling') + if offset + limit > 10000: + gen = helpers.scan( + client=self.es, + query=query, + preserve_order=True, + index=self.index_name, + ) + results = {'hits': {'total': limit, 'hits': []}} + for i in range(offset + limit): + try: + if i >= offset: + results['hits']['hits'].append(next(gen)) + else: + next(gen) + except StopIteration: + break + + matched = len(results['hits']['hits']) + offset + returned = len(results['hits']['hits']) + else: + es_results = self.es.search( + index=self.index_name, from_=offset, size=limit, **query + ) + results = es_results + matched = es_results['hits']['total']['value'] + returned = len(es_results['hits']['hits']) + + except exceptions.ConnectionError as err: + LOGGER.error(err) + raise ProviderConnectionError() + except exceptions.RequestError as err: + LOGGER.error(err) + raise ProviderQueryError() + except exceptions.NotFoundError as err: + LOGGER.error(err) + raise ProviderQueryError() + + feature_collection['numberMatched'] = matched + + if resulttype == 'hits': + return feature_collection + + feature_collection['numberReturned'] = returned + + LOGGER.debug('serializing features') + for feature in results['hits']['hits']: + feature_ = self.esdoc2geojson(feature) + feature_collection['features'].append(feature_) + + return feature_collection + + def esdoc2geojson(self, doc): + """ + generate GeoJSON `dict` from ES document + + :param doc: `dict` of ES document + + :returns: GeoJSON `dict` + """ + + feature_ = {} + feature_thinned = {} + + LOGGER.debug('Fetching id and geometry from GeoJSON document') + feature_ = doc['_source'] + + try: + id_ = doc['_source']['properties'][self.id_field] + except KeyError as err: + LOGGER.debug(f'Missing field: {err}') + id_ = doc['_source'].get('id', doc['_id']) + + feature_['id'] = id_ + feature_['geometry'] = doc['_source'].get('geometry') + + # safeguard against ES returning doc without properties + if 'properties' not in feature_: + feature_['properties'] = {} + + if self.properties or self.select_properties: + LOGGER.debug('Filtering properties') + all_properties = self.get_properties() + feature_thinned = { + 'id': id_, + 'type': feature_['type'], + 'geometry': feature_.get('geometry'), + 'properties': OrderedDict(), + } + for p in all_properties: + if p in feature_['properties']: + feature_thinned['properties'][p] = feature_['properties'][ + p + ] + else: + if '.' in p: + p_root = p.split('.')[0] + feature_thinned['properties'][p_root] = feature_[ + 'properties' + ].get(p_root) + if feature_thinned: + return feature_thinned + else: + return feature_ diff --git a/msc_pygeoapi/util.py b/msc_pygeoapi/util.py index 27c09dca..b1604294 100644 --- a/msc_pygeoapi/util.py +++ b/msc_pygeoapi/util.py @@ -119,6 +119,27 @@ def _get_element(node, path, attrib=None): return None +def safe_cast_to_number(value): + """ + helper function to safely cast a value to a number + + :param value: value to cast + + :returns: value cast to int/float or original value if not castable + """ + + if value is None: + return value + + try: + return int(value) + except ValueError: + try: + return float(value) + except ValueError: + return value + + def strftime_rfc3339(datetimeobj): """ helper function to convert datetime object to RFC3393 compliant string.