Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update citypageweather realtime loader #331

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions deploy/default/msc-pygeoapi-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,70 @@ resources:
- COOLING_DEGREE_DAYS
- HEATING_DEGREE_DAYS

current-conditions:
type: collection
title:
en: Current Weather Conditions - City Page Weather [experimental]
fr: Conditions météorologiques actuelles - Prévisions météorologiques par ville [experimentale]
description:
en: Current conditions for select Canadian cities
fr: Conditions actuelles pour une sélection de villes canadiennes
keywords:
en: [Meteorology, Weather]
fr: [Météorologie, Temps]
crs:
- CRS84
links:
- type: text/html
rel: canonical
title:
en: Weather forecast files by city
fr: Prévisions météorologiques par ville
href:
en: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_en/
fr: https://eccc-msc.github.io/open-data/msc-data/citypage-weather/readme_citypageweather_fr/
hreflang:
en: en-CA
fr: fr-CA
extents:
spatial:
bbox: [-145.27, 37.3, -48.11, 87.61]
crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84
temporal:
begin: null
end: null # or empty
providers:
- type: feature
default: true
name: Elasticsearch
data: ${MSC_PYGEOAPI_ES_URL}/current_conditions
id_field: identifier
time_field: timestamp
properties:
- identifier
- name
- station_en
- station_fr
- icon
- cond_en
- cond_fr
- temp
- dewpoint
- pres_en
- pres_fr
- prestnd_en
- prestnd_fr
- rel_hum
- speed
- direction_en
- direction_fr
- bearing
- timestamp
- url_en
- url_fr
- national
- nom

climate-daily:
type: collection
title:
Expand Down
80 changes: 63 additions & 17 deletions msc_pygeoapi/loader/citypageweather_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,17 +260,19 @@ def load_data(self, filepath):

data = self.xml2json_cpw(wxo_lookup, filepath)

try:
r = self.conn.Elasticsearch.index(
index=INDEX_NAME,
id=data['properties']['identifier'],
body=data
)
LOGGER.debug(f'Result: {r}')
return True
except Exception as err:
LOGGER.warning(f'Error indexing: {err}')
return False
if data:
try:
r = self.conn.Elasticsearch.update(
index=INDEX_NAME,
id=data['properties']['identifier'],
doc_as_upsert=True,
doc=data
)
LOGGER.debug(f'Result: {r}')
return True
except Exception as err:
LOGGER.warning(f'Error indexing: {err}')
return False

def _get_element(self, node, path, attrib=None):
"""
Expand Down Expand Up @@ -301,10 +303,13 @@ def if_none(self, type_, value):
:returns: converted variable
"""

if type_ == 'f':
variable = float(value) if value else 'null'
elif type_ == 'i':
variable = int(value) if value else 'null'
try:
if type_ == 'f':
variable = float(value) if value else 'null'
elif type_ == 'i':
variable = int(value) if value else 'null'
except ValueError:
variable = value

return variable

Expand All @@ -328,6 +333,7 @@ def xml2json_cpw(self, wxo_lookup, xml):
root = etree.parse(xml).getroot()
except Exception as err:
LOGGER.error(f'ERROR: cannot process data: {err}')
return None

if root.findall("currentConditions/"):
sitecode = os.path.basename(xml)[:-6]
Expand Down Expand Up @@ -416,7 +422,7 @@ def xml2json_cpw(self, wxo_lookup, xml):
'rel_hum': self.if_none('i', row['rel_hum']),
'speed': self.if_none('i', row['speed']),
'gust': self.if_none('i', row['gust']),
'direction': row['direction'],
'direction_en': row['direction'],
'bearing': self.if_none('f', row['bearing']),
'timestamp': row['timestamp'],
'url_en': row['url_en'],
Expand Down Expand Up @@ -467,7 +473,7 @@ def xml2json_cpw(self, wxo_lookup, xml):
'rel_hum': self.if_none('i', row['rel_hum']),
'speed': self.if_none('i', row['speed']),
'gust': self.if_none('i', row['gust']),
'direction': row['direction'],
'direction_fr': row['direction'],
'bearing': self.if_none('f', row['bearing']),
'timestamp': row['timestamp'],
'url_fr': row['url_fr'],
Expand All @@ -481,13 +487,52 @@ def xml2json_cpw(self, wxo_lookup, xml):
conditions['properties'] = {key:val for key, val in conditions['properties'].items() if val != 'null'} # noqa
return conditions

else:
LOGGER.warning(
f'No current conditions found. Skippping file {xml}.'
)
return None


@click.group()
def citypageweather():
"""Manages current conditions index"""
pass


@click.command()
@click.pass_context
@cli_options.OPTION_FILE()
@cli_options.OPTION_DIRECTORY()
@cli_options.OPTION_ELASTICSEARCH()
@cli_options.OPTION_ES_USERNAME()
@cli_options.OPTION_ES_PASSWORD()
@cli_options.OPTION_ES_IGNORE_CERTS()
def add(ctx, file_, directory, es, username, password, ignore_certs):
"""adds data to system"""

if all([file_ is None, directory is None]):
raise click.ClickException('Missing --file/-f or --dir/-d option')

conn_config = configure_es_connection(es, username, password, ignore_certs)

files_to_process = []

if file_ is not None:
files_to_process = [file_]
elif directory is not None:
for root, dirs, files in os.walk(directory):
for f in [file for file in files if file.endswith('.xml')]:
files_to_process.append(os.path.join(root, f))
files_to_process.sort(key=os.path.getmtime)

for file_to_process in files_to_process:
loader = CitypageweatherRealtimeLoader(conn_config)
loader.load_data(file_to_process)

click.echo('Done')


@click.command()
@click.pass_context
@cli_options.OPTION_DAYS(
Expand Down Expand Up @@ -542,5 +587,6 @@ def delete_index(ctx, es, username, password, ignore_certs):
conn.delete(INDEX_NAME)


citypageweather.add_command(add)
citypageweather.add_command(clean_records)
citypageweather.add_command(delete_index)
Loading