Skip to content

Commit

Permalink
ensure properties and ids contain ISO8601-compliant datetime (#330)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dukestep authored May 2, 2024
1 parent 08b501a commit 4e01c7c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
31 changes: 29 additions & 2 deletions msc_pygeoapi/connector/elasticsearch_.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,21 +180,48 @@ def delete(self, indexes):

return True

def create_template(self, name, settings):
def create_template(self, name, settings, overwrite=False):
"""
create an Elasticsearch index template
:param name: `str` index template name
:param settings: `dict` settings dictionnary for index template
:param overwrite: `bool` indicating whether to overwrite existing
template
:returns: `bool` of index template creation status
"""

if not self.Elasticsearch.indices.exists_template(name=name):
template_exists = self.Elasticsearch.indices.exists_template(name=name)

if template_exists and overwrite:
self.Elasticsearch.indices.delete_template(name=name)
self.Elasticsearch.indices.put_template(name=name, body=settings)
elif template_exists:
LOGGER.warning(f'Template {name} already exists')
return False
else:
self.Elasticsearch.indices.put_template(name=name, body=settings)

return True

def get_template(self, name):
"""
get an Elasticsearch index template
:param name: `str` index template name
:returns: `dict` of index template settings
"""

try:
template = self.Elasticsearch.indices.get_template(name=name)
except NotFoundError:
LOGGER.warning(f'Template {name} not found')
return None

return template

def delete_template(self, name):
"""
delete an Elasticsearch index template
Expand Down
28 changes: 23 additions & 5 deletions msc_pygeoapi/loader/hydrometric_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from msc_pygeoapi.util import (
check_es_indexes_to_delete,
configure_es_connection,
DATETIME_RFC3339_FMT
)


Expand All @@ -60,7 +61,7 @@
SETTINGS = {
'order': 0,
'version': 1,
'index_patterns': [INDEX_BASENAME],
'index_patterns': [f'{INDEX_BASENAME}*'],
'settings': {
'number_of_shards': 1,
'number_of_replicas': 0
Expand Down Expand Up @@ -98,7 +99,7 @@
},
'DATETIME': {
'type': 'date',
'format': 'strict_date_hour_minute_second||strict_date_optional_time' # noqa
'format': 'date_time_no_millis||strict_date_optional_time' # noqa
},
'DATETIME_LST': {
'type': 'date',
Expand Down Expand Up @@ -174,7 +175,25 @@ def __init__(self, conn_config={}):
BaseLoader.__init__(self)

self.conn = ElasticsearchConnector(conn_config)
self.conn.create_template(INDEX_BASENAME, SETTINGS)

index_template = self.conn.get_template(INDEX_BASENAME)

# compare index template mappping with mapping defined in SETTINGS
if index_template:
# if mappings are different, update the index template
if (
index_template[INDEX_BASENAME]['mappings']
!= SETTINGS['mappings']
):
LOGGER.info(
f'Updating "{INDEX_BASENAME}" index template with'
' mapping changes in provider.'
)
self.conn.create_template(
INDEX_BASENAME, SETTINGS, overwrite=True
)
else:
self.conn.create_template(INDEX_BASENAME, SETTINGS)

self.stations = {}
self.read_stations_list()
Expand Down Expand Up @@ -284,11 +303,10 @@ def generate_observations(self, filepath):
try:
# Convert timestamp to UTC time.
utc_datetime = delocalize_date(date_)
utc_datestamp = utc_datetime.strftime('%Y-%m-%d.%H:%M:%S')
utc_datestamp = utc_datetime.strftime(DATETIME_RFC3339_FMT)
# Generate an ID now that all fields are known.
observation_id = f'{station}.{utc_datestamp}'

utc_datestamp = utc_datestamp.replace('.', 'T')
except Exception as err:
LOGGER.error(f'Cannot interpret datetime value {date_} in {filepath}' # noqa
f' due to: {err} (skipping)')
Expand Down

0 comments on commit 4e01c7c

Please sign in to comment.