diff --git a/msc_pygeoapi/connector/elasticsearch_.py b/msc_pygeoapi/connector/elasticsearch_.py index f637473e..a0050005 100644 --- a/msc_pygeoapi/connector/elasticsearch_.py +++ b/msc_pygeoapi/connector/elasticsearch_.py @@ -180,17 +180,27 @@ def delete(self, indexes): return True - def create_template(self, name, settings): + def create_template(self, name, settings, overwrite=False): """ create an Elasticsearch index template :param name: `str` index template name :param settings: `dict` settings dictionnary for index template + :param overwrite: `bool` indicating whether to overwrite existing + template :returns: `bool` of index template creation status """ - if not self.Elasticsearch.indices.exists_template(name=name): + template_exists = self.Elasticsearch.indices.exists_template(name=name) + + if template_exists and overwrite: + self.Elasticsearch.indices.delete_template(name=name) + self.Elasticsearch.indices.put_template(name=name, body=settings) + elif template_exists: + LOGGER.warning(f'Template {name} already exists') + return False + else: self.Elasticsearch.indices.put_template(name=name, body=settings) return True diff --git a/msc_pygeoapi/loader/hydrometric_realtime.py b/msc_pygeoapi/loader/hydrometric_realtime.py index 7b579454..30cbf2f5 100644 --- a/msc_pygeoapi/loader/hydrometric_realtime.py +++ b/msc_pygeoapi/loader/hydrometric_realtime.py @@ -41,6 +41,7 @@ from msc_pygeoapi.util import ( check_es_indexes_to_delete, configure_es_connection, + DATETIME_RFC3339_FMT ) @@ -60,7 +61,7 @@ SETTINGS = { 'order': 0, 'version': 1, - 'index_patterns': [INDEX_BASENAME], + 'index_patterns': [f'{INDEX_BASENAME}*'], 'settings': { 'number_of_shards': 1, 'number_of_replicas': 0 @@ -98,7 +99,7 @@ }, 'DATETIME': { 'type': 'date', - 'format': 'strict_date_hour_minute_second||strict_date_optional_time' # noqa + 'format': 'date_time_no_millis||strict_date_optional_time' # noqa }, 'DATETIME_LST': { 'type': 'date', @@ -174,7 +175,7 @@ def __init__(self, conn_config={}): BaseLoader.__init__(self) self.conn = ElasticsearchConnector(conn_config) - self.conn.create_template(INDEX_BASENAME, SETTINGS) + self.conn.create_template(INDEX_BASENAME, SETTINGS, overwrite=True) self.stations = {} self.read_stations_list() @@ -284,11 +285,10 @@ def generate_observations(self, filepath): try: # Convert timestamp to UTC time. utc_datetime = delocalize_date(date_) - utc_datestamp = utc_datetime.strftime('%Y-%m-%d.%H:%M:%S') + utc_datestamp = utc_datetime.strftime(DATETIME_RFC3339_FMT) # Generate an ID now that all fields are known. observation_id = f'{station}.{utc_datestamp}' - utc_datestamp = utc_datestamp.replace('.', 'T') except Exception as err: LOGGER.error(f'Cannot interpret datetime value {date_} in {filepath}' # noqa f' due to: {err} (skipping)')